4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <linux/libcfs/libcfs.h>
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
53 #include <lustre_ha.h>
54 #include <lprocfs_status.h>
55 #include <lustre_log.h>
56 #include <lustre_debug.h>
57 #include <lustre_param.h>
58 #include <lustre_fid.h>
59 #include "osc_internal.h"
60 #include "osc_cl_internal.h"
62 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63 static int brw_interpret(const struct lu_env *env,
64 struct ptlrpc_request *req, void *data, int rc);
65 int osc_cleanup(struct obd_device *obd);
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69 struct lov_stripe_md *lsm)
74 lmm_size = sizeof(**lmmp);
78 if (*lmmp != NULL && lsm == NULL) {
79 OBD_FREE(*lmmp, lmm_size);
82 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
87 OBD_ALLOC(*lmmp, lmm_size);
93 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
98 /* Unpack OSC object metadata from disk storage (LE byte order). */
99 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
100 struct lov_mds_md *lmm, int lmm_bytes)
103 struct obd_import *imp = class_exp2cliimp(exp);
107 if (lmm_bytes < sizeof(*lmm)) {
108 CERROR("%s: lov_mds_md too small: %d, need %d\n",
109 exp->exp_obd->obd_name, lmm_bytes,
113 /* XXX LOV_MAGIC etc check? */
115 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
116 CERROR("%s: zero lmm_object_id: rc = %d\n",
117 exp->exp_obd->obd_name, -EINVAL);
122 lsm_size = lov_stripe_md_size(1);
126 if (*lsmp != NULL && lmm == NULL) {
127 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
128 OBD_FREE(*lsmp, lsm_size);
134 OBD_ALLOC(*lsmp, lsm_size);
135 if (unlikely(*lsmp == NULL))
137 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
139 OBD_FREE(*lsmp, lsm_size);
142 loi_init((*lsmp)->lsm_oinfo[0]);
143 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
148 /* XXX zero *lsmp? */
149 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
152 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
153 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
155 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160 static inline void osc_pack_capa(struct ptlrpc_request *req,
161 struct ost_body *body, void *capa)
163 struct obd_capa *oc = (struct obd_capa *)capa;
164 struct lustre_capa *c;
169 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
172 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
173 DEBUG_CAPA(D_SEC, c, "pack");
176 static inline void osc_pack_req_body(struct ptlrpc_request *req,
177 struct obd_info *oinfo)
179 struct ost_body *body;
181 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
184 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
186 osc_pack_capa(req, body, oinfo->oi_capa);
189 static inline void osc_set_capa_size(struct ptlrpc_request *req,
190 const struct req_msg_field *field,
194 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
196 /* it is already calculated as sizeof struct obd_capa */
200 static int osc_getattr_interpret(const struct lu_env *env,
201 struct ptlrpc_request *req,
202 struct osc_async_args *aa, int rc)
204 struct ost_body *body;
210 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
212 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
213 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
214 aa->aa_oi->oi_oa, &body->oa);
216 /* This should really be sent by the OST */
217 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
218 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
220 CDEBUG(D_INFO, "can't unpack ost_body\n");
222 aa->aa_oi->oi_oa->o_valid = 0;
225 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
230 struct ptlrpc_request_set *set)
232 struct ptlrpc_request *req;
233 struct osc_async_args *aa;
237 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
242 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
244 ptlrpc_request_free(req);
248 osc_pack_req_body(req, oinfo);
250 ptlrpc_request_set_replen(req);
251 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
253 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
254 aa = ptlrpc_req_async_args(req);
257 ptlrpc_set_add_req(set, req);
261 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
262 struct obd_info *oinfo)
264 struct ptlrpc_request *req;
265 struct ost_body *body;
269 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
273 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
274 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276 ptlrpc_request_free(req);
280 osc_pack_req_body(req, oinfo);
282 ptlrpc_request_set_replen(req);
284 rc = ptlrpc_queue_wait(req);
288 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290 GOTO(out, rc = -EPROTO);
292 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
293 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
296 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
297 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
301 ptlrpc_req_finished(req);
305 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
306 struct obd_info *oinfo, struct obd_trans_info *oti)
308 struct ptlrpc_request *req;
309 struct ost_body *body;
313 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
320 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322 ptlrpc_request_free(req);
326 osc_pack_req_body(req, oinfo);
328 ptlrpc_request_set_replen(req);
330 rc = ptlrpc_queue_wait(req);
334 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336 GOTO(out, rc = -EPROTO);
338 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_setattr_args *sa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
364 rc = sa->sa_upcall(sa->sa_cookie, rc);
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369 struct obd_trans_info *oti,
370 obd_enqueue_update_f upcall, void *cookie,
371 struct ptlrpc_request_set *rqset)
373 struct ptlrpc_request *req;
374 struct osc_setattr_args *sa;
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
385 ptlrpc_request_free(req);
389 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
392 osc_pack_req_body(req, oinfo);
394 ptlrpc_request_set_replen(req);
396 /* do mds to ost setattr asynchronously */
398 /* Do not wait for response. */
399 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
401 req->rq_interpret_reply =
402 (ptlrpc_interpterer_t)osc_setattr_interpret;
404 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405 sa = ptlrpc_req_async_args(req);
406 sa->sa_oa = oinfo->oi_oa;
407 sa->sa_upcall = upcall;
408 sa->sa_cookie = cookie;
410 if (rqset == PTLRPCD_SET)
411 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413 ptlrpc_set_add_req(rqset, req);
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420 struct obd_trans_info *oti,
421 struct ptlrpc_request_set *rqset)
423 return osc_setattr_async_base(exp, oinfo, oti,
424 oinfo->oi_cb_up, oinfo, rqset);
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428 struct lov_stripe_md **ea, struct obd_trans_info *oti)
430 struct ptlrpc_request *req;
431 struct ost_body *body;
432 struct lov_stripe_md *lsm;
441 rc = obd_alloc_memmd(exp, &lsm);
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
448 GOTO(out, rc = -ENOMEM);
450 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
452 ptlrpc_request_free(req);
456 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
459 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
461 ptlrpc_request_set_replen(req);
463 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
464 oa->o_flags == OBD_FL_DELORPHAN) {
466 "delorphan from OST integration");
467 /* Don't resend the delorphan req */
468 req->rq_no_resend = req->rq_no_delay = 1;
471 rc = ptlrpc_queue_wait(req);
475 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
477 GOTO(out_req, rc = -EPROTO);
479 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
480 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
482 oa->o_blksize = cli_brw_size(exp->exp_obd);
483 oa->o_valid |= OBD_MD_FLBLKSZ;
485 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
486 * have valid lsm_oinfo data structs, so don't go touching that.
487 * This needs to be fixed in a big way.
489 lsm->lsm_oi = oa->o_oi;
493 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
495 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496 if (!oti->oti_logcookies)
497 oti_alloc_cookies(oti, 1);
498 *oti->oti_logcookies = oa->o_lcookie;
502 CDEBUG(D_HA, "transno: "LPD64"\n",
503 lustre_msg_get_transno(req->rq_repmsg));
505 ptlrpc_req_finished(req);
508 obd_free_memmd(exp, &lsm);
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513 obd_enqueue_update_f upcall, void *cookie,
514 struct ptlrpc_request_set *rqset)
516 struct ptlrpc_request *req;
517 struct osc_setattr_args *sa;
518 struct ost_body *body;
522 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
529 ptlrpc_request_free(req);
532 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533 ptlrpc_at_set_req_timeout(req);
535 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
537 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
539 osc_pack_capa(req, body, oinfo->oi_capa);
541 ptlrpc_request_set_replen(req);
543 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
544 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
545 sa = ptlrpc_req_async_args(req);
546 sa->sa_oa = oinfo->oi_oa;
547 sa->sa_upcall = upcall;
548 sa->sa_cookie = cookie;
549 if (rqset == PTLRPCD_SET)
550 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
552 ptlrpc_set_add_req(rqset, req);
557 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
558 struct obd_info *oinfo, struct obd_trans_info *oti,
559 struct ptlrpc_request_set *rqset)
561 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
562 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
563 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
564 return osc_punch_base(exp, oinfo,
565 oinfo->oi_cb_up, oinfo, rqset);
568 static int osc_sync_interpret(const struct lu_env *env,
569 struct ptlrpc_request *req,
572 struct osc_fsync_args *fa = arg;
573 struct ost_body *body;
579 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
581 CERROR ("can't unpack ost_body\n");
582 GOTO(out, rc = -EPROTO);
585 *fa->fa_oi->oi_oa = body->oa;
587 rc = fa->fa_upcall(fa->fa_cookie, rc);
591 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
592 obd_enqueue_update_f upcall, void *cookie,
593 struct ptlrpc_request_set *rqset)
595 struct ptlrpc_request *req;
596 struct ost_body *body;
597 struct osc_fsync_args *fa;
601 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
605 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
606 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
608 ptlrpc_request_free(req);
612 /* overload the size and blocks fields in the oa with start/end */
613 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
615 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
617 osc_pack_capa(req, body, oinfo->oi_capa);
619 ptlrpc_request_set_replen(req);
620 req->rq_interpret_reply = osc_sync_interpret;
622 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
623 fa = ptlrpc_req_async_args(req);
625 fa->fa_upcall = upcall;
626 fa->fa_cookie = cookie;
628 if (rqset == PTLRPCD_SET)
629 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
631 ptlrpc_set_add_req(rqset, req);
636 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
637 struct obd_info *oinfo, obd_size start, obd_size end,
638 struct ptlrpc_request_set *set)
643 CDEBUG(D_INFO, "oa NULL\n");
647 oinfo->oi_oa->o_size = start;
648 oinfo->oi_oa->o_blocks = end;
649 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
651 RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
654 /* Find and cancel locally locks matched by @mode in the resource found by
655 * @objid. Found locks are added into @cancel list. Returns the amount of
656 * locks added to @cancels list. */
657 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
658 struct list_head *cancels,
659 ldlm_mode_t mode, int lock_flags)
661 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
662 struct ldlm_res_id res_id;
663 struct ldlm_resource *res;
667 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
668 * export) but disabled through procfs (flag in NS).
670 * This distinguishes from a case when ELC is not supported originally,
671 * when we still want to cancel locks in advance and just cancel them
672 * locally, without sending any RPC. */
673 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
676 ostid_build_res_name(&oa->o_oi, &res_id);
677 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
681 LDLM_RESOURCE_ADDREF(res);
682 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
683 lock_flags, 0, NULL);
684 LDLM_RESOURCE_DELREF(res);
685 ldlm_resource_putref(res);
689 static int osc_destroy_interpret(const struct lu_env *env,
690 struct ptlrpc_request *req, void *data,
693 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
695 atomic_dec(&cli->cl_destroy_in_flight);
696 wake_up(&cli->cl_destroy_waitq);
700 static int osc_can_send_destroy(struct client_obd *cli)
702 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
703 cli->cl_max_rpcs_in_flight) {
704 /* The destroy request can be sent */
707 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
708 cli->cl_max_rpcs_in_flight) {
710 * The counter has been modified between the two atomic
713 wake_up(&cli->cl_destroy_waitq);
718 int osc_create(const struct lu_env *env, struct obd_export *exp,
719 struct obdo *oa, struct lov_stripe_md **ea,
720 struct obd_trans_info *oti)
727 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
729 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
730 oa->o_flags == OBD_FL_RECREATE_OBJS) {
731 RETURN(osc_real_create(exp, oa, ea, oti));
734 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
735 RETURN(osc_real_create(exp, oa, ea, oti));
737 /* we should not get here anymore */
743 /* Destroy requests can be async always on the client, and we don't even really
744 * care about the return code since the client cannot do anything at all about
746 * When the MDS is unlinking a filename, it saves the file objects into a
747 * recovery llog, and these object records are cancelled when the OST reports
748 * they were destroyed and sync'd to disk (i.e. transaction committed).
749 * If the client dies, or the OST is down when the object should be destroyed,
750 * the records are not cancelled, and when the OST reconnects to the MDS next,
751 * it will retrieve the llog unlink logs and then sends the log cancellation
752 * cookies to the MDS after committing destroy transactions. */
753 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
754 struct obdo *oa, struct lov_stripe_md *ea,
755 struct obd_trans_info *oti, struct obd_export *md_export,
758 struct client_obd *cli = &exp->exp_obd->u.cli;
759 struct ptlrpc_request *req;
760 struct ost_body *body;
766 CDEBUG(D_INFO, "oa NULL\n");
770 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771 LDLM_FL_DISCARD_DATA);
773 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
775 ldlm_lock_list_put(&cancels, l_bl_ast, count);
779 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
783 ptlrpc_request_free(req);
787 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788 ptlrpc_at_set_req_timeout(req);
790 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791 oa->o_lcookie = *oti->oti_logcookies;
792 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
794 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
796 osc_pack_capa(req, body, (struct obd_capa *)capa);
797 ptlrpc_request_set_replen(req);
799 /* If osc_destory is for destroying the unlink orphan,
800 * sent from MDT to OST, which should not be blocked here,
801 * because the process might be triggered by ptlrpcd, and
802 * it is not good to block ptlrpcd thread (b=16006)*/
803 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804 req->rq_interpret_reply = osc_destroy_interpret;
805 if (!osc_can_send_destroy(cli)) {
806 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
810 * Wait until the number of on-going destroy RPCs drops
811 * under max_rpc_in_flight
813 l_wait_event_exclusive(cli->cl_destroy_waitq,
814 osc_can_send_destroy(cli), &lwi);
818 /* Do not wait for response */
819 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
823 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
826 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
828 LASSERT(!(oa->o_valid & bits));
831 client_obd_list_lock(&cli->cl_loi_list_lock);
832 oa->o_dirty = cli->cl_dirty;
833 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834 cli->cl_dirty_max)) {
835 CERROR("dirty %lu - %lu > dirty_max %lu\n",
836 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
838 } else if (unlikely(atomic_read(&obd_dirty_pages) -
839 atomic_read(&obd_dirty_transit_pages) >
840 (long)(obd_max_dirty_pages + 1))) {
841 /* The atomic_read() allowing the atomic_inc() are
842 * not covered by a lock thus they may safely race and trip
843 * this CERROR() unless we add in a small fudge factor (+1). */
844 CERROR("dirty %d - %d > system dirty_max %d\n",
845 atomic_read(&obd_dirty_pages),
846 atomic_read(&obd_dirty_transit_pages),
847 obd_max_dirty_pages);
849 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850 CERROR("dirty %lu - dirty_max %lu too big???\n",
851 cli->cl_dirty, cli->cl_dirty_max);
854 long max_in_flight = (cli->cl_max_pages_per_rpc <<
856 (cli->cl_max_rpcs_in_flight + 1);
857 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
859 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860 oa->o_dropped = cli->cl_lost_grant;
861 cli->cl_lost_grant = 0;
862 client_obd_list_unlock(&cli->cl_loi_list_lock);
863 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
864 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
868 void osc_update_next_shrink(struct client_obd *cli)
870 cli->cl_next_shrink_grant =
871 cfs_time_shift(cli->cl_grant_shrink_interval);
872 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873 cli->cl_next_shrink_grant);
876 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
878 client_obd_list_lock(&cli->cl_loi_list_lock);
879 cli->cl_avail_grant += grant;
880 client_obd_list_unlock(&cli->cl_loi_list_lock);
883 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
885 if (body->oa.o_valid & OBD_MD_FLGRANT) {
886 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
887 __osc_update_grant(cli, body->oa.o_grant);
891 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
892 obd_count keylen, void *key, obd_count vallen,
893 void *val, struct ptlrpc_request_set *set);
895 static int osc_shrink_grant_interpret(const struct lu_env *env,
896 struct ptlrpc_request *req,
899 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
900 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
901 struct ost_body *body;
904 __osc_update_grant(cli, oa->o_grant);
908 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
910 osc_update_grant(cli, body);
916 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
918 client_obd_list_lock(&cli->cl_loi_list_lock);
919 oa->o_grant = cli->cl_avail_grant / 4;
920 cli->cl_avail_grant -= oa->o_grant;
921 client_obd_list_unlock(&cli->cl_loi_list_lock);
922 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923 oa->o_valid |= OBD_MD_FLFLAGS;
926 oa->o_flags |= OBD_FL_SHRINK_GRANT;
927 osc_update_next_shrink(cli);
930 /* Shrink the current grant, either from some large amount to enough for a
931 * full set of in-flight RPCs, or if we have already shrunk to that limit
932 * then to enough for a single RPC. This avoids keeping more grant than
933 * needed, and avoids shrinking the grant piecemeal. */
934 static int osc_shrink_grant(struct client_obd *cli)
936 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
939 client_obd_list_lock(&cli->cl_loi_list_lock);
940 if (cli->cl_avail_grant <= target_bytes)
941 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942 client_obd_list_unlock(&cli->cl_loi_list_lock);
944 return osc_shrink_grant_to_target(cli, target_bytes);
947 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
950 struct ost_body *body;
953 client_obd_list_lock(&cli->cl_loi_list_lock);
954 /* Don't shrink if we are already above or below the desired limit
955 * We don't want to shrink below a single RPC, as that will negatively
956 * impact block allocation and long-term performance. */
957 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
958 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
960 if (target_bytes >= cli->cl_avail_grant) {
961 client_obd_list_unlock(&cli->cl_loi_list_lock);
964 client_obd_list_unlock(&cli->cl_loi_list_lock);
970 osc_announce_cached(cli, &body->oa, 0);
972 client_obd_list_lock(&cli->cl_loi_list_lock);
973 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
974 cli->cl_avail_grant = target_bytes;
975 client_obd_list_unlock(&cli->cl_loi_list_lock);
976 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
977 body->oa.o_valid |= OBD_MD_FLFLAGS;
978 body->oa.o_flags = 0;
980 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
981 osc_update_next_shrink(cli);
983 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
984 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
985 sizeof(*body), body, NULL);
987 __osc_update_grant(cli, body->oa.o_grant);
992 static int osc_should_shrink_grant(struct client_obd *client)
994 cfs_time_t time = cfs_time_current();
995 cfs_time_t next_shrink = client->cl_next_shrink_grant;
997 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
998 OBD_CONNECT_GRANT_SHRINK) == 0)
1001 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1002 /* Get the current RPC size directly, instead of going via:
1003 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1004 * Keep comment here so that it can be found by searching. */
1005 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1007 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1008 client->cl_avail_grant > brw_size)
1011 osc_update_next_shrink(client);
1016 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1018 struct client_obd *client;
1020 list_for_each_entry(client, &item->ti_obd_list,
1021 cl_grant_shrink_list) {
1022 if (osc_should_shrink_grant(client))
1023 osc_shrink_grant(client);
1028 static int osc_add_shrink_grant(struct client_obd *client)
1032 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1034 osc_grant_shrink_grant_cb, NULL,
1035 &client->cl_grant_shrink_list);
1037 CERROR("add grant client %s error %d\n",
1038 client->cl_import->imp_obd->obd_name, rc);
1041 CDEBUG(D_CACHE, "add grant client %s \n",
1042 client->cl_import->imp_obd->obd_name);
1043 osc_update_next_shrink(client);
1047 static int osc_del_shrink_grant(struct client_obd *client)
1049 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1053 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1056 * ocd_grant is the total grant amount we're expect to hold: if we've
1057 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1058 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1060 * race is tolerable here: if we're evicted, but imp_state already
1061 * left EVICTED state, then cl_dirty must be 0 already.
1063 client_obd_list_lock(&cli->cl_loi_list_lock);
1064 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1065 cli->cl_avail_grant = ocd->ocd_grant;
1067 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1069 if (cli->cl_avail_grant < 0) {
1070 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1071 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1072 ocd->ocd_grant, cli->cl_dirty);
1073 /* workaround for servers which do not have the patch from
1075 cli->cl_avail_grant = ocd->ocd_grant;
1078 /* determine the appropriate chunk size used by osc_extent. */
1079 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1080 client_obd_list_unlock(&cli->cl_loi_list_lock);
1082 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1083 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1084 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1086 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1087 list_empty(&cli->cl_grant_shrink_list))
1088 osc_add_shrink_grant(cli);
1091 /* We assume that the reason this OSC got a short read is because it read
1092 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1093 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1094 * this stripe never got written at or beyond this stripe offset yet. */
1095 static void handle_short_read(int nob_read, obd_count page_count,
1096 struct brw_page **pga)
1101 /* skip bytes read OK */
1102 while (nob_read > 0) {
1103 LASSERT (page_count > 0);
1105 if (pga[i]->count > nob_read) {
1106 /* EOF inside this page */
1107 ptr = kmap(pga[i]->pg) +
1108 (pga[i]->off & ~CFS_PAGE_MASK);
1109 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1116 nob_read -= pga[i]->count;
1121 /* zero remaining pages */
1122 while (page_count-- > 0) {
1123 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1124 memset(ptr, 0, pga[i]->count);
1130 static int check_write_rcs(struct ptlrpc_request *req,
1131 int requested_nob, int niocount,
1132 obd_count page_count, struct brw_page **pga)
1137 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1138 sizeof(*remote_rcs) *
1140 if (remote_rcs == NULL) {
1141 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1145 /* return error if any niobuf was in error */
1146 for (i = 0; i < niocount; i++) {
1147 if ((int)remote_rcs[i] < 0)
1148 return(remote_rcs[i]);
1150 if (remote_rcs[i] != 0) {
1151 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1152 i, remote_rcs[i], req);
1157 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1158 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1159 req->rq_bulk->bd_nob_transferred, requested_nob);
1166 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1168 if (p1->flag != p2->flag) {
1169 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1170 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1172 /* warn if we try to combine flags that we don't know to be
1173 * safe to combine */
1174 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1175 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1176 "report this at http://bugs.whamcloud.com/\n",
1177 p1->flag, p2->flag);
1182 return (p1->off + p1->count == p2->off);
1185 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1186 struct brw_page **pga, int opc,
1187 cksum_type_t cksum_type)
1191 struct cfs_crypto_hash_desc *hdesc;
1192 unsigned int bufsize;
1194 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1196 LASSERT(pg_count > 0);
1198 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1199 if (IS_ERR(hdesc)) {
1200 CERROR("Unable to initialize checksum hash %s\n",
1201 cfs_crypto_hash_name(cfs_alg));
1202 return PTR_ERR(hdesc);
1205 while (nob > 0 && pg_count > 0) {
1206 int count = pga[i]->count > nob ? nob : pga[i]->count;
1208 /* corrupt the data before we compute the checksum, to
1209 * simulate an OST->client data error */
1210 if (i == 0 && opc == OST_READ &&
1211 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1212 unsigned char *ptr = kmap(pga[i]->pg);
1213 int off = pga[i]->off & ~CFS_PAGE_MASK;
1214 memcpy(ptr + off, "bad1", min(4, nob));
1217 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1218 pga[i]->off & ~CFS_PAGE_MASK,
1220 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1221 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1223 nob -= pga[i]->count;
1229 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1232 cfs_crypto_hash_final(hdesc, NULL, NULL);
1234 /* For sending we only compute the wrong checksum instead
1235 * of corrupting the data so it is still correct on a redo */
1236 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1242 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1243 struct lov_stripe_md *lsm, obd_count page_count,
1244 struct brw_page **pga,
1245 struct ptlrpc_request **reqp,
1246 struct obd_capa *ocapa, int reserve,
1249 struct ptlrpc_request *req;
1250 struct ptlrpc_bulk_desc *desc;
1251 struct ost_body *body;
1252 struct obd_ioobj *ioobj;
1253 struct niobuf_remote *niobuf;
1254 int niocount, i, requested_nob, opc, rc;
1255 struct osc_brw_async_args *aa;
1256 struct req_capsule *pill;
1257 struct brw_page *pg_prev;
1260 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1261 RETURN(-ENOMEM); /* Recoverable */
1262 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1263 RETURN(-EINVAL); /* Fatal */
1265 if ((cmd & OBD_BRW_WRITE) != 0) {
1267 req = ptlrpc_request_alloc_pool(cli->cl_import,
1268 cli->cl_import->imp_rq_pool,
1269 &RQF_OST_BRW_WRITE);
1272 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1277 for (niocount = i = 1; i < page_count; i++) {
1278 if (!can_merge_pages(pga[i - 1], pga[i]))
1282 pill = &req->rq_pill;
1283 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1285 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1286 niocount * sizeof(*niobuf));
1287 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1289 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1291 ptlrpc_request_free(req);
1294 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1295 ptlrpc_at_set_req_timeout(req);
1296 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1298 req->rq_no_retry_einprogress = 1;
1300 desc = ptlrpc_prep_bulk_imp(req, page_count,
1301 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1302 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1306 GOTO(out, rc = -ENOMEM);
1307 /* NB request now owns desc and will free it when it gets freed */
1309 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1310 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1311 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1314 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1316 obdo_to_ioobj(oa, ioobj);
1317 ioobj->ioo_bufcnt = niocount;
1318 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1319 * that might be send for this request. The actual number is decided
1320 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1321 * "max - 1" for old client compatibility sending "0", and also so the
1322 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1323 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1324 osc_pack_capa(req, body, ocapa);
1325 LASSERT(page_count > 0);
1327 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1328 struct brw_page *pg = pga[i];
1329 int poff = pg->off & ~CFS_PAGE_MASK;
1331 LASSERT(pg->count > 0);
1332 /* make sure there is no gap in the middle of page array */
1333 LASSERTF(page_count == 1 ||
1334 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1335 ergo(i > 0 && i < page_count - 1,
1336 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1337 ergo(i == page_count - 1, poff == 0)),
1338 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1339 i, page_count, pg, pg->off, pg->count);
1340 LASSERTF(i == 0 || pg->off > pg_prev->off,
1341 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1342 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1344 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1345 pg_prev->pg, page_private(pg_prev->pg),
1346 pg_prev->pg->index, pg_prev->off);
1347 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1348 (pg->flag & OBD_BRW_SRVLOCK));
1350 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1351 requested_nob += pg->count;
1353 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1355 niobuf->len += pg->count;
1357 niobuf->offset = pg->off;
1358 niobuf->len = pg->count;
1359 niobuf->flags = pg->flag;
1364 LASSERTF((void *)(niobuf - niocount) ==
1365 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1366 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1367 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1369 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1371 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1372 body->oa.o_valid |= OBD_MD_FLFLAGS;
1373 body->oa.o_flags = 0;
1375 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1378 if (osc_should_shrink_grant(cli))
1379 osc_shrink_grant_local(cli, &body->oa);
1381 /* size[REQ_REC_OFF] still sizeof (*body) */
1382 if (opc == OST_WRITE) {
1383 if (cli->cl_checksum &&
1384 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1385 /* store cl_cksum_type in a local variable since
1386 * it can be changed via lprocfs */
1387 cksum_type_t cksum_type = cli->cl_cksum_type;
1389 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1390 oa->o_flags &= OBD_FL_LOCAL_MASK;
1391 body->oa.o_flags = 0;
1393 body->oa.o_flags |= cksum_type_pack(cksum_type);
1394 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1395 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1399 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1401 /* save this in 'oa', too, for later checking */
1402 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403 oa->o_flags |= cksum_type_pack(cksum_type);
1405 /* clear out the checksum flag, in case this is a
1406 * resend but cl_checksum is no longer set. b=11238 */
1407 oa->o_valid &= ~OBD_MD_FLCKSUM;
1409 oa->o_cksum = body->oa.o_cksum;
1410 /* 1 RC per niobuf */
1411 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1412 sizeof(__u32) * niocount);
1414 if (cli->cl_checksum &&
1415 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1416 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1417 body->oa.o_flags = 0;
1418 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1419 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1422 ptlrpc_request_set_replen(req);
1424 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1425 aa = ptlrpc_req_async_args(req);
1427 aa->aa_requested_nob = requested_nob;
1428 aa->aa_nio_count = niocount;
1429 aa->aa_page_count = page_count;
1433 INIT_LIST_HEAD(&aa->aa_oaps);
1434 if (ocapa && reserve)
1435 aa->aa_ocapa = capa_get(ocapa);
1441 ptlrpc_req_finished(req);
1445 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1446 __u32 client_cksum, __u32 server_cksum, int nob,
1447 obd_count page_count, struct brw_page **pga,
1448 cksum_type_t client_cksum_type)
1452 cksum_type_t cksum_type;
1454 if (server_cksum == client_cksum) {
1455 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1459 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1461 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1464 if (cksum_type != client_cksum_type)
1465 msg = "the server did not use the checksum type specified in "
1466 "the original request - likely a protocol problem";
1467 else if (new_cksum == server_cksum)
1468 msg = "changed on the client after we checksummed it - "
1469 "likely false positive due to mmap IO (bug 11742)";
1470 else if (new_cksum == client_cksum)
1471 msg = "changed in transit before arrival at OST";
1473 msg = "changed in transit AND doesn't match the original - "
1474 "likely false positive due to mmap IO (bug 11742)";
1476 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1477 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1478 msg, libcfs_nid2str(peer->nid),
1479 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1480 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1481 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1482 POSTID(&oa->o_oi), pga[0]->off,
1483 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1484 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1485 "client csum now %x\n", client_cksum, client_cksum_type,
1486 server_cksum, cksum_type, new_cksum);
1490 /* Note rc enters this function as number of bytes transferred */
1491 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1493 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1494 const lnet_process_id_t *peer =
1495 &req->rq_import->imp_connection->c_peer;
1496 struct client_obd *cli = aa->aa_cli;
1497 struct ost_body *body;
1498 __u32 client_cksum = 0;
1501 if (rc < 0 && rc != -EDQUOT) {
1502 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1506 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1509 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1513 /* set/clear over quota flag for a uid/gid */
1514 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1515 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1516 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1518 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1519 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1521 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1524 osc_update_grant(cli, body);
1529 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1530 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1532 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1534 CERROR("Unexpected +ve rc %d\n", rc);
1537 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1539 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1542 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1543 check_write_checksum(&body->oa, peer, client_cksum,
1544 body->oa.o_cksum, aa->aa_requested_nob,
1545 aa->aa_page_count, aa->aa_ppga,
1546 cksum_type_unpack(aa->aa_oa->o_flags)))
1549 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1550 aa->aa_page_count, aa->aa_ppga);
1554 /* The rest of this function executes only for OST_READs */
1556 /* if unwrap_bulk failed, return -EAGAIN to retry */
1557 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1559 GOTO(out, rc = -EAGAIN);
1561 if (rc > aa->aa_requested_nob) {
1562 CERROR("Unexpected rc %d (%d requested)\n", rc,
1563 aa->aa_requested_nob);
1567 if (rc != req->rq_bulk->bd_nob_transferred) {
1568 CERROR ("Unexpected rc %d (%d transferred)\n",
1569 rc, req->rq_bulk->bd_nob_transferred);
1573 if (rc < aa->aa_requested_nob)
1574 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1576 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1577 static int cksum_counter;
1578 __u32 server_cksum = body->oa.o_cksum;
1581 cksum_type_t cksum_type;
1583 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1584 body->oa.o_flags : 0);
1585 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1586 aa->aa_ppga, OST_READ,
1589 if (peer->nid == req->rq_bulk->bd_sender) {
1593 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1596 if (server_cksum == ~0 && rc > 0) {
1597 CERROR("Protocol error: server %s set the 'checksum' "
1598 "bit, but didn't send a checksum. Not fatal, "
1599 "but please notify on http://bugs.whamcloud.com/\n",
1600 libcfs_nid2str(peer->nid));
1601 } else if (server_cksum != client_cksum) {
1602 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1603 "%s%s%s inode "DFID" object "DOSTID
1604 " extent ["LPU64"-"LPU64"]\n",
1605 req->rq_import->imp_obd->obd_name,
1606 libcfs_nid2str(peer->nid),
1608 body->oa.o_valid & OBD_MD_FLFID ?
1609 body->oa.o_parent_seq : (__u64)0,
1610 body->oa.o_valid & OBD_MD_FLFID ?
1611 body->oa.o_parent_oid : 0,
1612 body->oa.o_valid & OBD_MD_FLFID ?
1613 body->oa.o_parent_ver : 0,
1614 POSTID(&body->oa.o_oi),
1615 aa->aa_ppga[0]->off,
1616 aa->aa_ppga[aa->aa_page_count-1]->off +
1617 aa->aa_ppga[aa->aa_page_count-1]->count -
1619 CERROR("client %x, server %x, cksum_type %x\n",
1620 client_cksum, server_cksum, cksum_type);
1622 aa->aa_oa->o_cksum = client_cksum;
1626 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1629 } else if (unlikely(client_cksum)) {
1630 static int cksum_missed;
1633 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1634 CERROR("Checksum %u requested from %s but not sent\n",
1635 cksum_missed, libcfs_nid2str(peer->nid));
1641 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1642 aa->aa_oa, &body->oa);
1647 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1648 struct lov_stripe_md *lsm,
1649 obd_count page_count, struct brw_page **pga,
1650 struct obd_capa *ocapa)
1652 struct ptlrpc_request *req;
1654 wait_queue_head_t waitq;
1655 int generation, resends = 0;
1656 struct l_wait_info lwi;
1660 init_waitqueue_head(&waitq);
1661 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1664 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1665 page_count, pga, &req, ocapa, 0, resends);
1670 req->rq_generation_set = 1;
1671 req->rq_import_generation = generation;
1672 req->rq_sent = cfs_time_current_sec() + resends;
1675 rc = ptlrpc_queue_wait(req);
1677 if (rc == -ETIMEDOUT && req->rq_resend) {
1678 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1679 ptlrpc_req_finished(req);
1683 rc = osc_brw_fini_request(req, rc);
1685 ptlrpc_req_finished(req);
1686 /* When server return -EINPROGRESS, client should always retry
1687 * regardless of the number of times the bulk was resent already.*/
1688 if (osc_recoverable_error(rc)) {
1690 if (rc != -EINPROGRESS &&
1691 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1692 CERROR("%s: too many resend retries for object: "
1693 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1694 POSTID(&oa->o_oi), rc);
1698 exp->exp_obd->u.cli.cl_import->imp_generation) {
1699 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1700 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1701 POSTID(&oa->o_oi), rc);
1705 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1707 l_wait_event(waitq, 0, &lwi);
1712 if (rc == -EAGAIN || rc == -EINPROGRESS)
1717 static int osc_brw_redo_request(struct ptlrpc_request *request,
1718 struct osc_brw_async_args *aa, int rc)
1720 struct ptlrpc_request *new_req;
1721 struct osc_brw_async_args *new_aa;
1722 struct osc_async_page *oap;
1725 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1726 "redo for recoverable error %d", rc);
1728 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1729 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1730 aa->aa_cli, aa->aa_oa,
1731 NULL /* lsm unused by osc currently */,
1732 aa->aa_page_count, aa->aa_ppga,
1733 &new_req, aa->aa_ocapa, 0, 1);
1737 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1738 if (oap->oap_request != NULL) {
1739 LASSERTF(request == oap->oap_request,
1740 "request %p != oap_request %p\n",
1741 request, oap->oap_request);
1742 if (oap->oap_interrupted) {
1743 ptlrpc_req_finished(new_req);
1748 /* New request takes over pga and oaps from old request.
1749 * Note that copying a list_head doesn't work, need to move it... */
1751 new_req->rq_interpret_reply = request->rq_interpret_reply;
1752 new_req->rq_async_args = request->rq_async_args;
1753 /* cap resend delay to the current request timeout, this is similar to
1754 * what ptlrpc does (see after_reply()) */
1755 if (aa->aa_resends > new_req->rq_timeout)
1756 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1758 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1759 new_req->rq_generation_set = 1;
1760 new_req->rq_import_generation = request->rq_import_generation;
1762 new_aa = ptlrpc_req_async_args(new_req);
1764 INIT_LIST_HEAD(&new_aa->aa_oaps);
1765 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1766 INIT_LIST_HEAD(&new_aa->aa_exts);
1767 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1768 new_aa->aa_resends = aa->aa_resends;
1770 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1771 if (oap->oap_request) {
1772 ptlrpc_req_finished(oap->oap_request);
1773 oap->oap_request = ptlrpc_request_addref(new_req);
1777 new_aa->aa_ocapa = aa->aa_ocapa;
1778 aa->aa_ocapa = NULL;
1780 /* XXX: This code will run into problem if we're going to support
1781 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1782 * and wait for all of them to be finished. We should inherit request
1783 * set from old request. */
1784 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1786 DEBUG_REQ(D_INFO, new_req, "new request");
1791 * ugh, we want disk allocation on the target to happen in offset order. we'll
1792 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1793 * fine for our small page arrays and doesn't require allocation. its an
1794 * insertion sort that swaps elements that are strides apart, shrinking the
1795 * stride down until its '1' and the array is sorted.
1797 static void sort_brw_pages(struct brw_page **array, int num)
1800 struct brw_page *tmp;
1804 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1809 for (i = stride ; i < num ; i++) {
1812 while (j >= stride && array[j - stride]->off > tmp->off) {
1813 array[j] = array[j - stride];
1818 } while (stride > 1);
1821 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1827 LASSERT (pages > 0);
1828 offset = pg[i]->off & ~CFS_PAGE_MASK;
1832 if (pages == 0) /* that's all */
1835 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1836 return count; /* doesn't end on page boundary */
1839 offset = pg[i]->off & ~CFS_PAGE_MASK;
1840 if (offset != 0) /* doesn't start on page boundary */
1847 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1849 struct brw_page **ppga;
1852 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1856 for (i = 0; i < count; i++)
1861 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1863 LASSERT(ppga != NULL);
1864 OBD_FREE(ppga, sizeof(*ppga) * count);
1867 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1868 obd_count page_count, struct brw_page *pga,
1869 struct obd_trans_info *oti)
1871 struct obdo *saved_oa = NULL;
1872 struct brw_page **ppga, **orig;
1873 struct obd_import *imp = class_exp2cliimp(exp);
1874 struct client_obd *cli;
1875 int rc, page_count_orig;
1878 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1879 cli = &imp->imp_obd->u.cli;
1881 if (cmd & OBD_BRW_CHECK) {
1882 /* The caller just wants to know if there's a chance that this
1883 * I/O can succeed */
1885 if (imp->imp_invalid)
1890 /* test_brw with a failed create can trip this, maybe others. */
1891 LASSERT(cli->cl_max_pages_per_rpc);
1895 orig = ppga = osc_build_ppga(pga, page_count);
1898 page_count_orig = page_count;
1900 sort_brw_pages(ppga, page_count);
1901 while (page_count) {
1902 obd_count pages_per_brw;
1904 if (page_count > cli->cl_max_pages_per_rpc)
1905 pages_per_brw = cli->cl_max_pages_per_rpc;
1907 pages_per_brw = page_count;
1909 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1911 if (saved_oa != NULL) {
1912 /* restore previously saved oa */
1913 *oinfo->oi_oa = *saved_oa;
1914 } else if (page_count > pages_per_brw) {
1915 /* save a copy of oa (brw will clobber it) */
1916 OBDO_ALLOC(saved_oa);
1917 if (saved_oa == NULL)
1918 GOTO(out, rc = -ENOMEM);
1919 *saved_oa = *oinfo->oi_oa;
1922 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1923 pages_per_brw, ppga, oinfo->oi_capa);
1928 page_count -= pages_per_brw;
1929 ppga += pages_per_brw;
1933 osc_release_ppga(orig, page_count_orig);
1935 if (saved_oa != NULL)
1936 OBDO_FREE(saved_oa);
1941 static int brw_interpret(const struct lu_env *env,
1942 struct ptlrpc_request *req, void *data, int rc)
1944 struct osc_brw_async_args *aa = data;
1945 struct osc_extent *ext;
1946 struct osc_extent *tmp;
1947 struct cl_object *obj = NULL;
1948 struct client_obd *cli = aa->aa_cli;
1951 rc = osc_brw_fini_request(req, rc);
1952 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1953 /* When server return -EINPROGRESS, client should always retry
1954 * regardless of the number of times the bulk was resent already. */
1955 if (osc_recoverable_error(rc)) {
1956 if (req->rq_import_generation !=
1957 req->rq_import->imp_generation) {
1958 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1959 ""DOSTID", rc = %d.\n",
1960 req->rq_import->imp_obd->obd_name,
1961 POSTID(&aa->aa_oa->o_oi), rc);
1962 } else if (rc == -EINPROGRESS ||
1963 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1964 rc = osc_brw_redo_request(req, aa, rc);
1966 CERROR("%s: too many resent retries for object: "
1967 ""LPU64":"LPU64", rc = %d.\n",
1968 req->rq_import->imp_obd->obd_name,
1969 POSTID(&aa->aa_oa->o_oi), rc);
1974 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1979 capa_put(aa->aa_ocapa);
1980 aa->aa_ocapa = NULL;
1983 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1984 if (obj == NULL && rc == 0) {
1985 obj = osc2cl(ext->oe_obj);
1989 list_del_init(&ext->oe_link);
1990 osc_extent_finish(env, ext, 1, rc);
1992 LASSERT(list_empty(&aa->aa_exts));
1993 LASSERT(list_empty(&aa->aa_oaps));
1996 struct obdo *oa = aa->aa_oa;
1997 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1998 unsigned long valid = 0;
2001 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2002 attr->cat_blocks = oa->o_blocks;
2003 valid |= CAT_BLOCKS;
2005 if (oa->o_valid & OBD_MD_FLMTIME) {
2006 attr->cat_mtime = oa->o_mtime;
2009 if (oa->o_valid & OBD_MD_FLATIME) {
2010 attr->cat_atime = oa->o_atime;
2013 if (oa->o_valid & OBD_MD_FLCTIME) {
2014 attr->cat_ctime = oa->o_ctime;
2018 cl_object_attr_lock(obj);
2019 cl_object_attr_set(env, obj, attr, valid);
2020 cl_object_attr_unlock(obj);
2022 cl_object_put(env, obj);
2024 OBDO_FREE(aa->aa_oa);
2026 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2027 req->rq_bulk->bd_nob_transferred);
2028 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2029 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2031 client_obd_list_lock(&cli->cl_loi_list_lock);
2032 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2033 * is called so we know whether to go to sync BRWs or wait for more
2034 * RPCs to complete */
2035 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2036 cli->cl_w_in_flight--;
2038 cli->cl_r_in_flight--;
2039 osc_wake_cache_waiters(cli);
2040 client_obd_list_unlock(&cli->cl_loi_list_lock);
2042 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2047 * Build an RPC by the list of extent @ext_list. The caller must ensure
2048 * that the total pages in this list are NOT over max pages per RPC.
2049 * Extents in the list must be in OES_RPC state.
2051 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2052 struct list_head *ext_list, int cmd, pdl_policy_t pol)
2054 struct ptlrpc_request *req = NULL;
2055 struct osc_extent *ext;
2056 struct brw_page **pga = NULL;
2057 struct osc_brw_async_args *aa = NULL;
2058 struct obdo *oa = NULL;
2059 struct osc_async_page *oap;
2060 struct osc_async_page *tmp;
2061 struct cl_req *clerq = NULL;
2062 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2064 struct ldlm_lock *lock = NULL;
2065 struct cl_req_attr *crattr = NULL;
2066 obd_off starting_offset = OBD_OBJECT_EOF;
2067 obd_off ending_offset = 0;
2073 LIST_HEAD(rpc_list);
2076 LASSERT(!list_empty(ext_list));
2078 /* add pages into rpc_list to build BRW rpc */
2079 list_for_each_entry(ext, ext_list, oe_link) {
2080 LASSERT(ext->oe_state == OES_RPC);
2081 mem_tight |= ext->oe_memalloc;
2082 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2084 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2085 if (starting_offset > oap->oap_obj_off)
2086 starting_offset = oap->oap_obj_off;
2088 LASSERT(oap->oap_page_off == 0);
2089 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2090 ending_offset = oap->oap_obj_off +
2093 LASSERT(oap->oap_page_off + oap->oap_count ==
2099 mpflag = cfs_memory_pressure_get_and_set();
2101 OBD_ALLOC(crattr, sizeof(*crattr));
2103 GOTO(out, rc = -ENOMEM);
2105 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2107 GOTO(out, rc = -ENOMEM);
2111 GOTO(out, rc = -ENOMEM);
2114 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2115 struct cl_page *page = oap2cl_page(oap);
2116 if (clerq == NULL) {
2117 clerq = cl_req_alloc(env, page, crt,
2118 1 /* only 1-object rpcs for now */);
2120 GOTO(out, rc = PTR_ERR(clerq));
2121 lock = oap->oap_ldlm_lock;
2124 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2125 pga[i] = &oap->oap_brw_page;
2126 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2127 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2128 pga[i]->pg, page_index(oap->oap_page), oap,
2131 cl_req_page_add(env, clerq, page);
2134 /* always get the data for the obdo for the rpc */
2135 LASSERT(clerq != NULL);
2136 crattr->cra_oa = oa;
2137 cl_req_attr_set(env, clerq, crattr, ~0ULL);
2139 oa->o_handle = lock->l_remote_handle;
2140 oa->o_valid |= OBD_MD_FLHANDLE;
2143 rc = cl_req_prep(env, clerq);
2145 CERROR("cl_req_prep failed: %d\n", rc);
2149 sort_brw_pages(pga, page_count);
2150 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2151 pga, &req, crattr->cra_capa, 1, 0);
2153 CERROR("prep_req failed: %d\n", rc);
2157 req->rq_interpret_reply = brw_interpret;
2160 req->rq_memalloc = 1;
2162 /* Need to update the timestamps after the request is built in case
2163 * we race with setattr (locally or in queue at OST). If OST gets
2164 * later setattr before earlier BRW (as determined by the request xid),
2165 * the OST will not use BRW timestamps. Sadly, there is no obvious
2166 * way to do this in a single call. bug 10150 */
2167 cl_req_attr_set(env, clerq, crattr,
2168 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2170 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2172 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2173 aa = ptlrpc_req_async_args(req);
2174 INIT_LIST_HEAD(&aa->aa_oaps);
2175 list_splice_init(&rpc_list, &aa->aa_oaps);
2176 INIT_LIST_HEAD(&aa->aa_exts);
2177 list_splice_init(ext_list, &aa->aa_exts);
2178 aa->aa_clerq = clerq;
2180 /* queued sync pages can be torn down while the pages
2181 * were between the pending list and the rpc */
2183 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2184 /* only one oap gets a request reference */
2187 if (oap->oap_interrupted && !req->rq_intr) {
2188 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2190 ptlrpc_mark_interrupted(req);
2194 tmp->oap_request = ptlrpc_request_addref(req);
2196 client_obd_list_lock(&cli->cl_loi_list_lock);
2197 starting_offset >>= PAGE_CACHE_SHIFT;
2198 if (cmd == OBD_BRW_READ) {
2199 cli->cl_r_in_flight++;
2200 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2201 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2202 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2203 starting_offset + 1);
2205 cli->cl_w_in_flight++;
2206 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2207 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2208 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2209 starting_offset + 1);
2211 client_obd_list_unlock(&cli->cl_loi_list_lock);
2213 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2214 page_count, aa, cli->cl_r_in_flight,
2215 cli->cl_w_in_flight);
2217 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2218 * see which CPU/NUMA node the majority of pages were allocated
2219 * on, and try to assign the async RPC to the CPU core
2220 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2222 * But on the other hand, we expect that multiple ptlrpcd
2223 * threads and the initial write sponsor can run in parallel,
2224 * especially when data checksum is enabled, which is CPU-bound
2225 * operation and single ptlrpcd thread cannot process in time.
2226 * So more ptlrpcd threads sharing BRW load
2227 * (with PDL_POLICY_ROUND) seems better.
2229 ptlrpcd_add_req(req, pol, -1);
2235 cfs_memory_pressure_restore(mpflag);
2237 if (crattr != NULL) {
2238 capa_put(crattr->cra_capa);
2239 OBD_FREE(crattr, sizeof(*crattr));
2243 LASSERT(req == NULL);
2248 OBD_FREE(pga, sizeof(*pga) * page_count);
2249 /* this should happen rarely and is pretty bad, it makes the
2250 * pending list not follow the dirty order */
2251 while (!list_empty(ext_list)) {
2252 ext = list_entry(ext_list->next, struct osc_extent,
2254 list_del_init(&ext->oe_link);
2255 osc_extent_finish(env, ext, 0, rc);
2257 if (clerq && !IS_ERR(clerq))
2258 cl_req_completion(env, clerq, rc);
2263 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2264 struct ldlm_enqueue_info *einfo)
2266 void *data = einfo->ei_cbdata;
2269 LASSERT(lock != NULL);
2270 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2271 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2272 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2273 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2275 lock_res_and_lock(lock);
2276 spin_lock(&osc_ast_guard);
2278 if (lock->l_ast_data == NULL)
2279 lock->l_ast_data = data;
2280 if (lock->l_ast_data == data)
2283 spin_unlock(&osc_ast_guard);
2284 unlock_res_and_lock(lock);
2289 static int osc_set_data_with_check(struct lustre_handle *lockh,
2290 struct ldlm_enqueue_info *einfo)
2292 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2296 set = osc_set_lock_data_with_check(lock, einfo);
2297 LDLM_LOCK_PUT(lock);
2299 CERROR("lockh %p, data %p - client evicted?\n",
2300 lockh, einfo->ei_cbdata);
2304 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2305 ldlm_iterator_t replace, void *data)
2307 struct ldlm_res_id res_id;
2308 struct obd_device *obd = class_exp2obd(exp);
2310 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2311 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2315 /* find any ldlm lock of the inode in osc
2319 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2320 ldlm_iterator_t replace, void *data)
2322 struct ldlm_res_id res_id;
2323 struct obd_device *obd = class_exp2obd(exp);
2326 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2327 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2328 if (rc == LDLM_ITER_STOP)
2330 if (rc == LDLM_ITER_CONTINUE)
2335 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2336 obd_enqueue_update_f upcall, void *cookie,
2337 __u64 *flags, int agl, int rc)
2339 int intent = *flags & LDLM_FL_HAS_INTENT;
2343 /* The request was created before ldlm_cli_enqueue call. */
2344 if (rc == ELDLM_LOCK_ABORTED) {
2345 struct ldlm_reply *rep;
2346 rep = req_capsule_server_get(&req->rq_pill,
2349 LASSERT(rep != NULL);
2350 rep->lock_policy_res1 =
2351 ptlrpc_status_ntoh(rep->lock_policy_res1);
2352 if (rep->lock_policy_res1)
2353 rc = rep->lock_policy_res1;
2357 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2359 *flags |= LDLM_FL_LVB_READY;
2360 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2361 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2364 /* Call the update callback. */
2365 rc = (*upcall)(cookie, rc);
2369 static int osc_enqueue_interpret(const struct lu_env *env,
2370 struct ptlrpc_request *req,
2371 struct osc_enqueue_args *aa, int rc)
2373 struct ldlm_lock *lock;
2374 struct lustre_handle handle;
2376 struct ost_lvb *lvb;
2378 __u64 *flags = aa->oa_flags;
2380 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2381 * might be freed anytime after lock upcall has been called. */
2382 lustre_handle_copy(&handle, aa->oa_lockh);
2383 mode = aa->oa_ei->ei_mode;
2385 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2387 lock = ldlm_handle2lock(&handle);
2389 /* Take an additional reference so that a blocking AST that
2390 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2391 * to arrive after an upcall has been executed by
2392 * osc_enqueue_fini(). */
2393 ldlm_lock_addref(&handle, mode);
2395 /* Let CP AST to grant the lock first. */
2396 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2398 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2403 lvb_len = sizeof(*aa->oa_lvb);
2406 /* Complete obtaining the lock procedure. */
2407 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2408 mode, flags, lvb, lvb_len, &handle, rc);
2409 /* Complete osc stuff. */
2410 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2411 flags, aa->oa_agl, rc);
2413 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2415 /* Release the lock for async request. */
2416 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2418 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2419 * not already released by
2420 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2422 ldlm_lock_decref(&handle, mode);
2424 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2425 aa->oa_lockh, req, aa);
2426 ldlm_lock_decref(&handle, mode);
2427 LDLM_LOCK_PUT(lock);
2431 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2432 struct lov_oinfo *loi, int flags,
2433 struct ost_lvb *lvb, __u32 mode, int rc)
2435 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2437 if (rc == ELDLM_OK) {
2440 LASSERT(lock != NULL);
2441 loi->loi_lvb = *lvb;
2442 tmp = loi->loi_lvb.lvb_size;
2443 /* Extend KMS up to the end of this lock and no further
2444 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2445 if (tmp > lock->l_policy_data.l_extent.end)
2446 tmp = lock->l_policy_data.l_extent.end + 1;
2447 if (tmp >= loi->loi_kms) {
2448 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2449 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2450 loi_kms_set(loi, tmp);
2452 LDLM_DEBUG(lock, "lock acquired, setting rss="
2453 LPU64"; leaving kms="LPU64", end="LPU64,
2454 loi->loi_lvb.lvb_size, loi->loi_kms,
2455 lock->l_policy_data.l_extent.end);
2457 ldlm_lock_allow_match(lock);
2458 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2459 LASSERT(lock != NULL);
2460 loi->loi_lvb = *lvb;
2461 ldlm_lock_allow_match(lock);
2462 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2463 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2469 ldlm_lock_fail_match(lock);
2471 LDLM_LOCK_PUT(lock);
2474 EXPORT_SYMBOL(osc_update_enqueue);
2476 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2478 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2479 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2480 * other synchronous requests, however keeping some locks and trying to obtain
2481 * others may take a considerable amount of time in a case of ost failure; and
2482 * when other sync requests do not get released lock from a client, the client
2483 * is excluded from the cluster -- such scenarious make the life difficult, so
2484 * release locks just after they are obtained. */
2485 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2486 __u64 *flags, ldlm_policy_data_t *policy,
2487 struct ost_lvb *lvb, int kms_valid,
2488 obd_enqueue_update_f upcall, void *cookie,
2489 struct ldlm_enqueue_info *einfo,
2490 struct lustre_handle *lockh,
2491 struct ptlrpc_request_set *rqset, int async, int agl)
2493 struct obd_device *obd = exp->exp_obd;
2494 struct ptlrpc_request *req = NULL;
2495 int intent = *flags & LDLM_FL_HAS_INTENT;
2496 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2501 /* Filesystem lock extents are extended to page boundaries so that
2502 * dealing with the page cache is a little smoother. */
2503 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2504 policy->l_extent.end |= ~CFS_PAGE_MASK;
2507 * kms is not valid when either object is completely fresh (so that no
2508 * locks are cached), or object was evicted. In the latter case cached
2509 * lock cannot be used, because it would prime inode state with
2510 * potentially stale LVB.
2515 /* Next, search for already existing extent locks that will cover us */
2516 /* If we're trying to read, we also search for an existing PW lock. The
2517 * VFS and page cache already protect us locally, so lots of readers/
2518 * writers can share a single PW lock.
2520 * There are problems with conversion deadlocks, so instead of
2521 * converting a read lock to a write lock, we'll just enqueue a new
2524 * At some point we should cancel the read lock instead of making them
2525 * send us a blocking callback, but there are problems with canceling
2526 * locks out from other users right now, too. */
2527 mode = einfo->ei_mode;
2528 if (einfo->ei_mode == LCK_PR)
2530 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2531 einfo->ei_type, policy, mode, lockh, 0);
2533 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2535 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2536 /* For AGL, if enqueue RPC is sent but the lock is not
2537 * granted, then skip to process this strpe.
2538 * Return -ECANCELED to tell the caller. */
2539 ldlm_lock_decref(lockh, mode);
2540 LDLM_LOCK_PUT(matched);
2542 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2543 *flags |= LDLM_FL_LVB_READY;
2544 /* addref the lock only if not async requests and PW
2545 * lock is matched whereas we asked for PR. */
2546 if (!rqset && einfo->ei_mode != mode)
2547 ldlm_lock_addref(lockh, LCK_PR);
2549 /* I would like to be able to ASSERT here that
2550 * rss <= kms, but I can't, for reasons which
2551 * are explained in lov_enqueue() */
2554 /* We already have a lock, and it's referenced.
2556 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2557 * AGL upcall may change it to CLS_HELD directly. */
2558 (*upcall)(cookie, ELDLM_OK);
2560 if (einfo->ei_mode != mode)
2561 ldlm_lock_decref(lockh, LCK_PW);
2563 /* For async requests, decref the lock. */
2564 ldlm_lock_decref(lockh, einfo->ei_mode);
2565 LDLM_LOCK_PUT(matched);
2568 ldlm_lock_decref(lockh, mode);
2569 LDLM_LOCK_PUT(matched);
2576 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2577 &RQF_LDLM_ENQUEUE_LVB);
2581 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2583 ptlrpc_request_free(req);
2587 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2589 ptlrpc_request_set_replen(req);
2592 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2593 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2595 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2596 sizeof(*lvb), LVB_T_OST, lockh, async);
2599 struct osc_enqueue_args *aa;
2600 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2601 aa = ptlrpc_req_async_args(req);
2604 aa->oa_flags = flags;
2605 aa->oa_upcall = upcall;
2606 aa->oa_cookie = cookie;
2608 aa->oa_lockh = lockh;
2611 req->rq_interpret_reply =
2612 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2613 if (rqset == PTLRPCD_SET)
2614 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2616 ptlrpc_set_add_req(rqset, req);
2617 } else if (intent) {
2618 ptlrpc_req_finished(req);
2623 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2625 ptlrpc_req_finished(req);
2630 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2631 struct ldlm_enqueue_info *einfo,
2632 struct ptlrpc_request_set *rqset)
2634 struct ldlm_res_id res_id;
2638 ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2639 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2640 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2641 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2642 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2643 rqset, rqset != NULL, 0);
2647 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2648 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2649 int *flags, void *data, struct lustre_handle *lockh,
2652 struct obd_device *obd = exp->exp_obd;
2653 int lflags = *flags;
2657 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2660 /* Filesystem lock extents are extended to page boundaries so that
2661 * dealing with the page cache is a little smoother */
2662 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2663 policy->l_extent.end |= ~CFS_PAGE_MASK;
2665 /* Next, search for already existing extent locks that will cover us */
2666 /* If we're trying to read, we also search for an existing PW lock. The
2667 * VFS and page cache already protect us locally, so lots of readers/
2668 * writers can share a single PW lock. */
2672 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2673 res_id, type, policy, rc, lockh, unref);
2676 if (!osc_set_data_with_check(lockh, data)) {
2677 if (!(lflags & LDLM_FL_TEST_LOCK))
2678 ldlm_lock_decref(lockh, rc);
2682 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2683 ldlm_lock_addref(lockh, LCK_PR);
2684 ldlm_lock_decref(lockh, LCK_PW);
2691 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2695 if (unlikely(mode == LCK_GROUP))
2696 ldlm_lock_decref_and_cancel(lockh, mode);
2698 ldlm_lock_decref(lockh, mode);
2703 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2704 __u32 mode, struct lustre_handle *lockh)
2707 RETURN(osc_cancel_base(lockh, mode));
2710 static int osc_cancel_unused(struct obd_export *exp,
2711 struct lov_stripe_md *lsm,
2712 ldlm_cancel_flags_t flags,
2715 struct obd_device *obd = class_exp2obd(exp);
2716 struct ldlm_res_id res_id, *resp = NULL;
2719 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2723 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2726 static int osc_statfs_interpret(const struct lu_env *env,
2727 struct ptlrpc_request *req,
2728 struct osc_async_args *aa, int rc)
2730 struct obd_statfs *msfs;
2734 /* The request has in fact never been sent
2735 * due to issues at a higher level (LOV).
2736 * Exit immediately since the caller is
2737 * aware of the problem and takes care
2738 * of the clean up */
2741 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2742 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2748 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2750 GOTO(out, rc = -EPROTO);
2753 *aa->aa_oi->oi_osfs = *msfs;
2755 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2759 static int osc_statfs_async(struct obd_export *exp,
2760 struct obd_info *oinfo, __u64 max_age,
2761 struct ptlrpc_request_set *rqset)
2763 struct obd_device *obd = class_exp2obd(exp);
2764 struct ptlrpc_request *req;
2765 struct osc_async_args *aa;
2769 /* We could possibly pass max_age in the request (as an absolute
2770 * timestamp or a "seconds.usec ago") so the target can avoid doing
2771 * extra calls into the filesystem if that isn't necessary (e.g.
2772 * during mount that would help a bit). Having relative timestamps
2773 * is not so great if request processing is slow, while absolute
2774 * timestamps are not ideal because they need time synchronization. */
2775 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2779 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2781 ptlrpc_request_free(req);
2784 ptlrpc_request_set_replen(req);
2785 req->rq_request_portal = OST_CREATE_PORTAL;
2786 ptlrpc_at_set_req_timeout(req);
2788 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2789 /* procfs requests not want stat in wait for avoid deadlock */
2790 req->rq_no_resend = 1;
2791 req->rq_no_delay = 1;
2794 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2795 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2796 aa = ptlrpc_req_async_args(req);
2799 ptlrpc_set_add_req(rqset, req);
2803 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2804 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2806 struct obd_device *obd = class_exp2obd(exp);
2807 struct obd_statfs *msfs;
2808 struct ptlrpc_request *req;
2809 struct obd_import *imp = NULL;
2813 /*Since the request might also come from lprocfs, so we need
2814 *sync this with client_disconnect_export Bug15684*/
2815 down_read(&obd->u.cli.cl_sem);
2816 if (obd->u.cli.cl_import)
2817 imp = class_import_get(obd->u.cli.cl_import);
2818 up_read(&obd->u.cli.cl_sem);
2822 /* We could possibly pass max_age in the request (as an absolute
2823 * timestamp or a "seconds.usec ago") so the target can avoid doing
2824 * extra calls into the filesystem if that isn't necessary (e.g.
2825 * during mount that would help a bit). Having relative timestamps
2826 * is not so great if request processing is slow, while absolute
2827 * timestamps are not ideal because they need time synchronization. */
2828 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2830 class_import_put(imp);
2835 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2837 ptlrpc_request_free(req);
2840 ptlrpc_request_set_replen(req);
2841 req->rq_request_portal = OST_CREATE_PORTAL;
2842 ptlrpc_at_set_req_timeout(req);
2844 if (flags & OBD_STATFS_NODELAY) {
2845 /* procfs requests not want stat in wait for avoid deadlock */
2846 req->rq_no_resend = 1;
2847 req->rq_no_delay = 1;
2850 rc = ptlrpc_queue_wait(req);
2854 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2856 GOTO(out, rc = -EPROTO);
2863 ptlrpc_req_finished(req);
2867 /* Retrieve object striping information.
2869 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2870 * the maximum number of OST indices which will fit in the user buffer.
2871 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2873 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2875 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2876 struct lov_user_md_v3 lum, *lumk;
2877 struct lov_user_ost_data_v1 *lmm_objects;
2878 int rc = 0, lum_size;
2884 /* we only need the header part from user space to get lmm_magic and
2885 * lmm_stripe_count, (the header part is common to v1 and v3) */
2886 lum_size = sizeof(struct lov_user_md_v1);
2887 if (copy_from_user(&lum, lump, lum_size))
2890 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2891 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2894 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2895 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2896 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2897 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2899 /* we can use lov_mds_md_size() to compute lum_size
2900 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2901 if (lum.lmm_stripe_count > 0) {
2902 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2903 OBD_ALLOC(lumk, lum_size);
2907 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2909 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2911 lmm_objects = &(lumk->lmm_objects[0]);
2912 lmm_objects->l_ost_oi = lsm->lsm_oi;
2914 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2918 lumk->lmm_oi = lsm->lsm_oi;
2919 lumk->lmm_stripe_count = 1;
2921 if (copy_to_user(lump, lumk, lum_size))
2925 OBD_FREE(lumk, lum_size);
2931 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2932 void *karg, void *uarg)
2934 struct obd_device *obd = exp->exp_obd;
2935 struct obd_ioctl_data *data = karg;
2939 if (!try_module_get(THIS_MODULE)) {
2940 CERROR("Can't get module. Is it alive?");
2944 case OBD_IOC_LOV_GET_CONFIG: {
2946 struct lov_desc *desc;
2947 struct obd_uuid uuid;
2951 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2952 GOTO(out, err = -EINVAL);
2954 data = (struct obd_ioctl_data *)buf;
2956 if (sizeof(*desc) > data->ioc_inllen1) {
2957 obd_ioctl_freedata(buf, len);
2958 GOTO(out, err = -EINVAL);
2961 if (data->ioc_inllen2 < sizeof(uuid)) {
2962 obd_ioctl_freedata(buf, len);
2963 GOTO(out, err = -EINVAL);
2966 desc = (struct lov_desc *)data->ioc_inlbuf1;
2967 desc->ld_tgt_count = 1;
2968 desc->ld_active_tgt_count = 1;
2969 desc->ld_default_stripe_count = 1;
2970 desc->ld_default_stripe_size = 0;
2971 desc->ld_default_stripe_offset = 0;
2972 desc->ld_pattern = 0;
2973 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2975 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2977 err = copy_to_user((void *)uarg, buf, len);
2980 obd_ioctl_freedata(buf, len);
2983 case LL_IOC_LOV_SETSTRIPE:
2984 err = obd_alloc_memmd(exp, karg);
2988 case LL_IOC_LOV_GETSTRIPE:
2989 err = osc_getstripe(karg, uarg);
2991 case OBD_IOC_CLIENT_RECOVER:
2992 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2993 data->ioc_inlbuf1, 0);
2997 case IOC_OSC_SET_ACTIVE:
2998 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3001 case OBD_IOC_POLL_QUOTACHECK:
3002 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3004 case OBD_IOC_PING_TARGET:
3005 err = ptlrpc_obd_ping(obd);
3008 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3009 cmd, current_comm());
3010 GOTO(out, err = -ENOTTY);
3013 module_put(THIS_MODULE);
3017 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3018 obd_count keylen, void *key, __u32 *vallen, void *val,
3019 struct lov_stripe_md *lsm)
3022 if (!vallen || !val)
3025 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3026 __u32 *stripe = val;
3027 *vallen = sizeof(*stripe);
3030 } else if (KEY_IS(KEY_LAST_ID)) {
3031 struct ptlrpc_request *req;
3036 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3037 &RQF_OST_GET_INFO_LAST_ID);
3041 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3042 RCL_CLIENT, keylen);
3043 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3045 ptlrpc_request_free(req);
3049 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3050 memcpy(tmp, key, keylen);
3052 req->rq_no_delay = req->rq_no_resend = 1;
3053 ptlrpc_request_set_replen(req);
3054 rc = ptlrpc_queue_wait(req);
3058 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3060 GOTO(out, rc = -EPROTO);
3062 *((obd_id *)val) = *reply;
3064 ptlrpc_req_finished(req);
3066 } else if (KEY_IS(KEY_FIEMAP)) {
3067 struct ll_fiemap_info_key *fm_key =
3068 (struct ll_fiemap_info_key *)key;
3069 struct ldlm_res_id res_id;
3070 ldlm_policy_data_t policy;
3071 struct lustre_handle lockh;
3072 ldlm_mode_t mode = 0;
3073 struct ptlrpc_request *req;
3074 struct ll_user_fiemap *reply;
3078 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3081 policy.l_extent.start = fm_key->fiemap.fm_start &
3084 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3085 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3086 policy.l_extent.end = OBD_OBJECT_EOF;
3088 policy.l_extent.end = (fm_key->fiemap.fm_start +
3089 fm_key->fiemap.fm_length +
3090 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3092 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3093 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3094 LDLM_FL_BLOCK_GRANTED |
3096 &res_id, LDLM_EXTENT, &policy,
3097 LCK_PR | LCK_PW, &lockh, 0);
3098 if (mode) { /* lock is cached on client */
3099 if (mode != LCK_PR) {
3100 ldlm_lock_addref(&lockh, LCK_PR);
3101 ldlm_lock_decref(&lockh, LCK_PW);
3103 } else { /* no cached lock, needs acquire lock on server side */
3104 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3105 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3109 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3110 &RQF_OST_GET_INFO_FIEMAP);
3112 GOTO(drop_lock, rc = -ENOMEM);
3114 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3115 RCL_CLIENT, keylen);
3116 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3117 RCL_CLIENT, *vallen);
3118 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3119 RCL_SERVER, *vallen);
3121 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3123 ptlrpc_request_free(req);
3124 GOTO(drop_lock, rc);
3127 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3128 memcpy(tmp, key, keylen);
3129 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3130 memcpy(tmp, val, *vallen);
3132 ptlrpc_request_set_replen(req);
3133 rc = ptlrpc_queue_wait(req);
3137 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3139 GOTO(fini_req, rc = -EPROTO);
3141 memcpy(val, reply, *vallen);
3143 ptlrpc_req_finished(req);
3146 ldlm_lock_decref(&lockh, LCK_PR);
3153 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3154 obd_count keylen, void *key, obd_count vallen,
3155 void *val, struct ptlrpc_request_set *set)
3157 struct ptlrpc_request *req;
3158 struct obd_device *obd = exp->exp_obd;
3159 struct obd_import *imp = class_exp2cliimp(exp);
3164 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3166 if (KEY_IS(KEY_CHECKSUM)) {
3167 if (vallen != sizeof(int))
3169 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3173 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3174 sptlrpc_conf_client_adapt(obd);
3178 if (KEY_IS(KEY_FLUSH_CTX)) {
3179 sptlrpc_import_flush_my_ctx(imp);
3183 if (KEY_IS(KEY_CACHE_SET)) {
3184 struct client_obd *cli = &obd->u.cli;
3186 LASSERT(cli->cl_cache == NULL); /* only once */
3187 cli->cl_cache = (struct cl_client_cache *)val;
3188 atomic_inc(&cli->cl_cache->ccc_users);
3189 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3191 /* add this osc into entity list */
3192 LASSERT(list_empty(&cli->cl_lru_osc));
3193 spin_lock(&cli->cl_cache->ccc_lru_lock);
3194 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3195 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3200 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3201 struct client_obd *cli = &obd->u.cli;
3202 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3203 int target = *(int *)val;
3205 nr = osc_lru_shrink(cli, min(nr, target));
3210 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3213 /* We pass all other commands directly to OST. Since nobody calls osc
3214 methods directly and everybody is supposed to go through LOV, we
3215 assume lov checked invalid values for us.
3216 The only recognised values so far are evict_by_nid and mds_conn.
3217 Even if something bad goes through, we'd get a -EINVAL from OST
3220 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3221 &RQF_OST_SET_GRANT_INFO :
3226 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3227 RCL_CLIENT, keylen);
3228 if (!KEY_IS(KEY_GRANT_SHRINK))
3229 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3230 RCL_CLIENT, vallen);
3231 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3233 ptlrpc_request_free(req);
3237 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3238 memcpy(tmp, key, keylen);
3239 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3242 memcpy(tmp, val, vallen);
3244 if (KEY_IS(KEY_GRANT_SHRINK)) {
3245 struct osc_grant_args *aa;
3248 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3249 aa = ptlrpc_req_async_args(req);
3252 ptlrpc_req_finished(req);
3255 *oa = ((struct ost_body *)val)->oa;
3257 req->rq_interpret_reply = osc_shrink_grant_interpret;
3260 ptlrpc_request_set_replen(req);
3261 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3262 LASSERT(set != NULL);
3263 ptlrpc_set_add_req(set, req);
3264 ptlrpc_check_set(NULL, set);
3266 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3272 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3273 struct obd_device *disk_obd, int *index)
3275 /* this code is not supposed to be used with LOD/OSP
3276 * to be removed soon */
3281 static int osc_llog_finish(struct obd_device *obd, int count)
3283 struct llog_ctxt *ctxt;
3287 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3289 llog_cat_close(NULL, ctxt->loc_handle);
3290 llog_cleanup(NULL, ctxt);
3293 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3295 llog_cleanup(NULL, ctxt);
3299 static int osc_reconnect(const struct lu_env *env,
3300 struct obd_export *exp, struct obd_device *obd,
3301 struct obd_uuid *cluuid,
3302 struct obd_connect_data *data,
3305 struct client_obd *cli = &obd->u.cli;
3307 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3310 client_obd_list_lock(&cli->cl_loi_list_lock);
3311 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3312 2 * cli_brw_size(obd);
3313 lost_grant = cli->cl_lost_grant;
3314 cli->cl_lost_grant = 0;
3315 client_obd_list_unlock(&cli->cl_loi_list_lock);
3317 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3318 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3319 data->ocd_version, data->ocd_grant, lost_grant);
3325 static int osc_disconnect(struct obd_export *exp)
3327 struct obd_device *obd = class_exp2obd(exp);
3328 struct llog_ctxt *ctxt;
3331 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3333 if (obd->u.cli.cl_conn_count == 1) {
3334 /* Flush any remaining cancel messages out to the
3336 llog_sync(ctxt, exp, 0);
3338 llog_ctxt_put(ctxt);
3340 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3344 rc = client_disconnect_export(exp);
3346 * Initially we put del_shrink_grant before disconnect_export, but it
3347 * causes the following problem if setup (connect) and cleanup
3348 * (disconnect) are tangled together.
3349 * connect p1 disconnect p2
3350 * ptlrpc_connect_import
3351 * ............... class_manual_cleanup
3354 * ptlrpc_connect_interrupt
3356 * add this client to shrink list
3358 * Bang! pinger trigger the shrink.
3359 * So the osc should be disconnected from the shrink list, after we
3360 * are sure the import has been destroyed. BUG18662
3362 if (obd->u.cli.cl_import == NULL)
3363 osc_del_shrink_grant(&obd->u.cli);
3367 static int osc_import_event(struct obd_device *obd,
3368 struct obd_import *imp,
3369 enum obd_import_event event)
3371 struct client_obd *cli;
3375 LASSERT(imp->imp_obd == obd);
3378 case IMP_EVENT_DISCON: {
3380 client_obd_list_lock(&cli->cl_loi_list_lock);
3381 cli->cl_avail_grant = 0;
3382 cli->cl_lost_grant = 0;
3383 client_obd_list_unlock(&cli->cl_loi_list_lock);
3386 case IMP_EVENT_INACTIVE: {
3387 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3390 case IMP_EVENT_INVALIDATE: {
3391 struct ldlm_namespace *ns = obd->obd_namespace;
3395 env = cl_env_get(&refcheck);
3399 /* all pages go to failing rpcs due to the invalid
3401 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3403 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3404 cl_env_put(env, &refcheck);
3409 case IMP_EVENT_ACTIVE: {
3410 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3413 case IMP_EVENT_OCD: {
3414 struct obd_connect_data *ocd = &imp->imp_connect_data;
3416 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3417 osc_init_grant(&obd->u.cli, ocd);
3420 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3421 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3423 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3426 case IMP_EVENT_DEACTIVATE: {
3427 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3430 case IMP_EVENT_ACTIVATE: {
3431 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3435 CERROR("Unknown import event %d\n", event);
3442 * Determine whether the lock can be canceled before replaying the lock
3443 * during recovery, see bug16774 for detailed information.
3445 * \retval zero the lock can't be canceled
3446 * \retval other ok to cancel
3448 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3450 check_res_locked(lock->l_resource);
3453 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3455 * XXX as a future improvement, we can also cancel unused write lock
3456 * if it doesn't have dirty data and active mmaps.
3458 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3459 (lock->l_granted_mode == LCK_PR ||
3460 lock->l_granted_mode == LCK_CR) &&
3461 (osc_dlm_lock_pageref(lock) == 0))
3467 static int brw_queue_work(const struct lu_env *env, void *data)
3469 struct client_obd *cli = data;
3471 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3473 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3477 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3479 struct lprocfs_static_vars lvars = { 0 };
3480 struct client_obd *cli = &obd->u.cli;
3485 rc = ptlrpcd_addref();
3489 rc = client_obd_setup(obd, lcfg);
3491 GOTO(out_ptlrpcd, rc);
3493 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3494 if (IS_ERR(handler))
3495 GOTO(out_client_setup, rc = PTR_ERR(handler));
3496 cli->cl_writeback_work = handler;
3498 rc = osc_quota_setup(obd);
3500 GOTO(out_ptlrpcd_work, rc);
3502 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3503 lprocfs_osc_init_vars(&lvars);
3504 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3505 lproc_osc_attach_seqstat(obd);
3506 sptlrpc_lprocfs_cliobd_attach(obd);
3507 ptlrpc_lprocfs_register_obd(obd);
3510 /* We need to allocate a few requests more, because
3511 * brw_interpret tries to create new requests before freeing
3512 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3513 * reserved, but I'm afraid that might be too much wasted RAM
3514 * in fact, so 2 is just my guess and still should work. */
3515 cli->cl_import->imp_rq_pool =
3516 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3518 ptlrpc_add_rqs_to_pool);
3520 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3521 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3525 ptlrpcd_destroy_work(handler);
3527 client_obd_cleanup(obd);
3533 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3539 case OBD_CLEANUP_EARLY: {
3540 struct obd_import *imp;
3541 imp = obd->u.cli.cl_import;
3542 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3543 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3544 ptlrpc_deactivate_import(imp);
3545 spin_lock(&imp->imp_lock);
3546 imp->imp_pingable = 0;
3547 spin_unlock(&imp->imp_lock);
3550 case OBD_CLEANUP_EXPORTS: {
3551 struct client_obd *cli = &obd->u.cli;
3553 * for echo client, export may be on zombie list, wait for
3554 * zombie thread to cull it, because cli.cl_import will be
3555 * cleared in client_disconnect_export():
3556 * class_export_destroy() -> obd_cleanup() ->
3557 * echo_device_free() -> echo_client_cleanup() ->
3558 * obd_disconnect() -> osc_disconnect() ->
3559 * client_disconnect_export()
3561 obd_zombie_barrier();
3562 if (cli->cl_writeback_work) {
3563 ptlrpcd_destroy_work(cli->cl_writeback_work);
3564 cli->cl_writeback_work = NULL;
3566 obd_cleanup_client_import(obd);
3567 ptlrpc_lprocfs_unregister_obd(obd);
3568 lprocfs_obd_cleanup(obd);
3569 rc = obd_llog_finish(obd, 0);
3571 CERROR("failed to cleanup llogging subsystems\n");
3578 int osc_cleanup(struct obd_device *obd)
3580 struct client_obd *cli = &obd->u.cli;
3586 if (cli->cl_cache != NULL) {
3587 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3588 spin_lock(&cli->cl_cache->ccc_lru_lock);
3589 list_del_init(&cli->cl_lru_osc);
3590 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3591 cli->cl_lru_left = NULL;
3592 atomic_dec(&cli->cl_cache->ccc_users);
3593 cli->cl_cache = NULL;
3596 /* free memory of osc quota cache */
3597 osc_quota_cleanup(obd);
3599 rc = client_obd_cleanup(obd);
3605 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3607 struct lprocfs_static_vars lvars = { 0 };
3610 lprocfs_osc_init_vars(&lvars);
3612 switch (lcfg->lcfg_command) {
3614 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3624 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3626 return osc_process_config_base(obd, buf);
3629 struct obd_ops osc_obd_ops = {
3630 .o_owner = THIS_MODULE,
3631 .o_setup = osc_setup,
3632 .o_precleanup = osc_precleanup,
3633 .o_cleanup = osc_cleanup,
3634 .o_add_conn = client_import_add_conn,
3635 .o_del_conn = client_import_del_conn,
3636 .o_connect = client_connect_import,
3637 .o_reconnect = osc_reconnect,
3638 .o_disconnect = osc_disconnect,
3639 .o_statfs = osc_statfs,
3640 .o_statfs_async = osc_statfs_async,
3641 .o_packmd = osc_packmd,
3642 .o_unpackmd = osc_unpackmd,
3643 .o_create = osc_create,
3644 .o_destroy = osc_destroy,
3645 .o_getattr = osc_getattr,
3646 .o_getattr_async = osc_getattr_async,
3647 .o_setattr = osc_setattr,
3648 .o_setattr_async = osc_setattr_async,
3650 .o_punch = osc_punch,
3652 .o_enqueue = osc_enqueue,
3653 .o_change_cbdata = osc_change_cbdata,
3654 .o_find_cbdata = osc_find_cbdata,
3655 .o_cancel = osc_cancel,
3656 .o_cancel_unused = osc_cancel_unused,
3657 .o_iocontrol = osc_iocontrol,
3658 .o_get_info = osc_get_info,
3659 .o_set_info_async = osc_set_info_async,
3660 .o_import_event = osc_import_event,
3661 .o_llog_init = osc_llog_init,
3662 .o_llog_finish = osc_llog_finish,
3663 .o_process_config = osc_process_config,
3664 .o_quotactl = osc_quotactl,
3665 .o_quotacheck = osc_quotacheck,
3668 extern struct lu_kmem_descr osc_caches[];
3669 extern spinlock_t osc_ast_guard;
3670 extern struct lock_class_key osc_ast_guard_class;
3672 int __init osc_init(void)
3674 struct lprocfs_static_vars lvars = { 0 };
3678 /* print an address of _any_ initialized kernel symbol from this
3679 * module, to allow debugging with gdb that doesn't support data
3680 * symbols from modules.*/
3681 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3683 rc = lu_kmem_init(osc_caches);
3687 lprocfs_osc_init_vars(&lvars);
3689 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3690 LUSTRE_OSC_NAME, &osc_device_type);
3692 lu_kmem_fini(osc_caches);
3696 spin_lock_init(&osc_ast_guard);
3697 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3702 static void /*__exit*/ osc_exit(void)
3704 class_unregister_type(LUSTRE_OSC_NAME);
3705 lu_kmem_fini(osc_caches);
3708 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3709 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3710 MODULE_LICENSE("GPL");
3712 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);