4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <linux/libcfs/libcfs.h>
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
53 #include <lustre_ha.h>
54 #include <lprocfs_status.h>
55 #include <lustre_log.h>
56 #include <lustre_debug.h>
57 #include <lustre_param.h>
58 #include <lustre_fid.h>
59 #include "osc_internal.h"
60 #include "osc_cl_internal.h"
62 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63 static int brw_interpret(const struct lu_env *env,
64 struct ptlrpc_request *req, void *data, int rc);
65 int osc_cleanup(struct obd_device *obd);
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69 struct lov_stripe_md *lsm)
73 lmm_size = sizeof(**lmmp);
77 if (*lmmp != NULL && lsm == NULL) {
78 OBD_FREE(*lmmp, lmm_size);
81 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
86 OBD_ALLOC(*lmmp, lmm_size);
92 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
97 /* Unpack OSC object metadata from disk storage (LE byte order). */
98 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
99 struct lov_mds_md *lmm, int lmm_bytes)
102 struct obd_import *imp = class_exp2cliimp(exp);
105 if (lmm_bytes < sizeof(*lmm)) {
106 CERROR("%s: lov_mds_md too small: %d, need %d\n",
107 exp->exp_obd->obd_name, lmm_bytes,
111 /* XXX LOV_MAGIC etc check? */
113 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
114 CERROR("%s: zero lmm_object_id: rc = %d\n",
115 exp->exp_obd->obd_name, -EINVAL);
120 lsm_size = lov_stripe_md_size(1);
124 if (*lsmp != NULL && lmm == NULL) {
125 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126 OBD_FREE(*lsmp, lsm_size);
132 OBD_ALLOC(*lsmp, lsm_size);
133 if (unlikely(*lsmp == NULL))
135 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
137 OBD_FREE(*lsmp, lsm_size);
140 loi_init((*lsmp)->lsm_oinfo[0]);
141 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
146 /* XXX zero *lsmp? */
147 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
150 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
151 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
153 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
158 static inline void osc_pack_capa(struct ptlrpc_request *req,
159 struct ost_body *body, void *capa)
161 struct obd_capa *oc = (struct obd_capa *)capa;
162 struct lustre_capa *c;
167 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
170 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
171 DEBUG_CAPA(D_SEC, c, "pack");
174 static inline void osc_pack_req_body(struct ptlrpc_request *req,
175 struct obd_info *oinfo)
177 struct ost_body *body;
179 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
182 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
184 osc_pack_capa(req, body, oinfo->oi_capa);
187 static inline void osc_set_capa_size(struct ptlrpc_request *req,
188 const struct req_msg_field *field,
192 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
194 /* it is already calculated as sizeof struct obd_capa */
198 static int osc_getattr_interpret(const struct lu_env *env,
199 struct ptlrpc_request *req,
200 struct osc_async_args *aa, int rc)
202 struct ost_body *body;
207 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
209 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
210 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
211 aa->aa_oi->oi_oa, &body->oa);
213 /* This should really be sent by the OST */
214 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
215 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
217 CDEBUG(D_INFO, "can't unpack ost_body\n");
219 aa->aa_oi->oi_oa->o_valid = 0;
222 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
226 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
227 struct ptlrpc_request_set *set)
229 struct ptlrpc_request *req;
230 struct osc_async_args *aa;
233 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
237 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
238 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
240 ptlrpc_request_free(req);
244 osc_pack_req_body(req, oinfo);
246 ptlrpc_request_set_replen(req);
247 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
249 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
250 aa = ptlrpc_req_async_args(req);
253 ptlrpc_set_add_req(set, req);
257 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
258 struct obd_info *oinfo)
260 struct ptlrpc_request *req;
261 struct ost_body *body;
264 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
268 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
269 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
271 ptlrpc_request_free(req);
275 osc_pack_req_body(req, oinfo);
277 ptlrpc_request_set_replen(req);
279 rc = ptlrpc_queue_wait(req);
283 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
285 GOTO(out, rc = -EPROTO);
287 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
288 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
291 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
292 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
296 ptlrpc_req_finished(req);
300 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
301 struct obd_info *oinfo, struct obd_trans_info *oti)
303 struct ptlrpc_request *req;
304 struct ost_body *body;
307 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
313 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
314 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
316 ptlrpc_request_free(req);
320 osc_pack_req_body(req, oinfo);
322 ptlrpc_request_set_replen(req);
324 rc = ptlrpc_queue_wait(req);
328 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
330 GOTO(out, rc = -EPROTO);
332 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
337 ptlrpc_req_finished(req);
341 static int osc_setattr_interpret(const struct lu_env *env,
342 struct ptlrpc_request *req,
343 struct osc_setattr_args *sa, int rc)
345 struct ost_body *body;
350 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
352 GOTO(out, rc = -EPROTO);
354 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
357 rc = sa->sa_upcall(sa->sa_cookie, rc);
361 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
362 struct obd_trans_info *oti,
363 obd_enqueue_update_f upcall, void *cookie,
364 struct ptlrpc_request_set *rqset)
366 struct ptlrpc_request *req;
367 struct osc_setattr_args *sa;
370 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
374 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
375 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
377 ptlrpc_request_free(req);
381 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
382 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
384 osc_pack_req_body(req, oinfo);
386 ptlrpc_request_set_replen(req);
388 /* do mds to ost setattr asynchronously */
390 /* Do not wait for response. */
391 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
393 req->rq_interpret_reply =
394 (ptlrpc_interpterer_t)osc_setattr_interpret;
396 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
397 sa = ptlrpc_req_async_args(req);
398 sa->sa_oa = oinfo->oi_oa;
399 sa->sa_upcall = upcall;
400 sa->sa_cookie = cookie;
402 if (rqset == PTLRPCD_SET)
403 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
405 ptlrpc_set_add_req(rqset, req);
411 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
412 struct obd_trans_info *oti,
413 struct ptlrpc_request_set *rqset)
415 return osc_setattr_async_base(exp, oinfo, oti,
416 oinfo->oi_cb_up, oinfo, rqset);
419 int osc_real_create(struct obd_export *exp, struct obdo *oa,
420 struct lov_stripe_md **ea, struct obd_trans_info *oti)
422 struct ptlrpc_request *req;
423 struct ost_body *body;
424 struct lov_stripe_md *lsm;
432 rc = obd_alloc_memmd(exp, &lsm);
437 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
439 GOTO(out, rc = -ENOMEM);
441 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
443 ptlrpc_request_free(req);
447 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
450 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
452 ptlrpc_request_set_replen(req);
454 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
455 oa->o_flags == OBD_FL_DELORPHAN) {
457 "delorphan from OST integration");
458 /* Don't resend the delorphan req */
459 req->rq_no_resend = req->rq_no_delay = 1;
462 rc = ptlrpc_queue_wait(req);
466 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
468 GOTO(out_req, rc = -EPROTO);
470 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
471 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
473 oa->o_blksize = cli_brw_size(exp->exp_obd);
474 oa->o_valid |= OBD_MD_FLBLKSZ;
476 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
477 * have valid lsm_oinfo data structs, so don't go touching that.
478 * This needs to be fixed in a big way.
480 lsm->lsm_oi = oa->o_oi;
484 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
486 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487 if (!oti->oti_logcookies)
488 oti_alloc_cookies(oti, 1);
489 *oti->oti_logcookies = oa->o_lcookie;
493 CDEBUG(D_HA, "transno: "LPD64"\n",
494 lustre_msg_get_transno(req->rq_repmsg));
496 ptlrpc_req_finished(req);
499 obd_free_memmd(exp, &lsm);
503 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
504 obd_enqueue_update_f upcall, void *cookie,
505 struct ptlrpc_request_set *rqset)
507 struct ptlrpc_request *req;
508 struct osc_setattr_args *sa;
509 struct ost_body *body;
512 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
516 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
517 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
519 ptlrpc_request_free(req);
522 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
523 ptlrpc_at_set_req_timeout(req);
525 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
527 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
529 osc_pack_capa(req, body, oinfo->oi_capa);
531 ptlrpc_request_set_replen(req);
533 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
534 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
535 sa = ptlrpc_req_async_args(req);
536 sa->sa_oa = oinfo->oi_oa;
537 sa->sa_upcall = upcall;
538 sa->sa_cookie = cookie;
539 if (rqset == PTLRPCD_SET)
540 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
542 ptlrpc_set_add_req(rqset, req);
547 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
548 struct obd_info *oinfo, struct obd_trans_info *oti,
549 struct ptlrpc_request_set *rqset)
551 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
552 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
553 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
554 return osc_punch_base(exp, oinfo,
555 oinfo->oi_cb_up, oinfo, rqset);
558 static int osc_sync_interpret(const struct lu_env *env,
559 struct ptlrpc_request *req,
562 struct osc_fsync_args *fa = arg;
563 struct ost_body *body;
568 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
570 CERROR ("can't unpack ost_body\n");
571 GOTO(out, rc = -EPROTO);
574 *fa->fa_oi->oi_oa = body->oa;
576 rc = fa->fa_upcall(fa->fa_cookie, rc);
580 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
581 obd_enqueue_update_f upcall, void *cookie,
582 struct ptlrpc_request_set *rqset)
584 struct ptlrpc_request *req;
585 struct ost_body *body;
586 struct osc_fsync_args *fa;
589 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
593 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
594 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
596 ptlrpc_request_free(req);
600 /* overload the size and blocks fields in the oa with start/end */
601 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
603 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
605 osc_pack_capa(req, body, oinfo->oi_capa);
607 ptlrpc_request_set_replen(req);
608 req->rq_interpret_reply = osc_sync_interpret;
610 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
611 fa = ptlrpc_req_async_args(req);
613 fa->fa_upcall = upcall;
614 fa->fa_cookie = cookie;
616 if (rqset == PTLRPCD_SET)
617 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
619 ptlrpc_set_add_req(rqset, req);
624 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
625 struct obd_info *oinfo, obd_size start, obd_size end,
626 struct ptlrpc_request_set *set)
629 CDEBUG(D_INFO, "oa NULL\n");
633 oinfo->oi_oa->o_size = start;
634 oinfo->oi_oa->o_blocks = end;
635 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
637 RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
640 /* Find and cancel locally locks matched by @mode in the resource found by
641 * @objid. Found locks are added into @cancel list. Returns the amount of
642 * locks added to @cancels list. */
643 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
644 struct list_head *cancels,
645 ldlm_mode_t mode, int lock_flags)
647 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
648 struct ldlm_res_id res_id;
649 struct ldlm_resource *res;
652 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
653 * export) but disabled through procfs (flag in NS).
655 * This distinguishes from a case when ELC is not supported originally,
656 * when we still want to cancel locks in advance and just cancel them
657 * locally, without sending any RPC. */
658 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
661 ostid_build_res_name(&oa->o_oi, &res_id);
662 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
666 LDLM_RESOURCE_ADDREF(res);
667 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
668 lock_flags, 0, NULL);
669 LDLM_RESOURCE_DELREF(res);
670 ldlm_resource_putref(res);
674 static int osc_destroy_interpret(const struct lu_env *env,
675 struct ptlrpc_request *req, void *data,
678 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
680 atomic_dec(&cli->cl_destroy_in_flight);
681 wake_up(&cli->cl_destroy_waitq);
685 static int osc_can_send_destroy(struct client_obd *cli)
687 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
688 cli->cl_max_rpcs_in_flight) {
689 /* The destroy request can be sent */
692 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
693 cli->cl_max_rpcs_in_flight) {
695 * The counter has been modified between the two atomic
698 wake_up(&cli->cl_destroy_waitq);
703 int osc_create(const struct lu_env *env, struct obd_export *exp,
704 struct obdo *oa, struct lov_stripe_md **ea,
705 struct obd_trans_info *oti)
711 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
713 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
714 oa->o_flags == OBD_FL_RECREATE_OBJS) {
715 RETURN(osc_real_create(exp, oa, ea, oti));
718 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
719 RETURN(osc_real_create(exp, oa, ea, oti));
721 /* we should not get here anymore */
727 /* Destroy requests can be async always on the client, and we don't even really
728 * care about the return code since the client cannot do anything at all about
730 * When the MDS is unlinking a filename, it saves the file objects into a
731 * recovery llog, and these object records are cancelled when the OST reports
732 * they were destroyed and sync'd to disk (i.e. transaction committed).
733 * If the client dies, or the OST is down when the object should be destroyed,
734 * the records are not cancelled, and when the OST reconnects to the MDS next,
735 * it will retrieve the llog unlink logs and then sends the log cancellation
736 * cookies to the MDS after committing destroy transactions. */
737 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
738 struct obdo *oa, struct lov_stripe_md *ea,
739 struct obd_trans_info *oti, struct obd_export *md_export,
742 struct client_obd *cli = &exp->exp_obd->u.cli;
743 struct ptlrpc_request *req;
744 struct ost_body *body;
749 CDEBUG(D_INFO, "oa NULL\n");
753 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
754 LDLM_FL_DISCARD_DATA);
756 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
758 ldlm_lock_list_put(&cancels, l_bl_ast, count);
762 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
763 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
766 ptlrpc_request_free(req);
770 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
771 ptlrpc_at_set_req_timeout(req);
773 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
774 oa->o_lcookie = *oti->oti_logcookies;
775 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
777 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
779 osc_pack_capa(req, body, (struct obd_capa *)capa);
780 ptlrpc_request_set_replen(req);
782 /* If osc_destory is for destroying the unlink orphan,
783 * sent from MDT to OST, which should not be blocked here,
784 * because the process might be triggered by ptlrpcd, and
785 * it is not good to block ptlrpcd thread (b=16006)*/
786 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
787 req->rq_interpret_reply = osc_destroy_interpret;
788 if (!osc_can_send_destroy(cli)) {
789 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
793 * Wait until the number of on-going destroy RPCs drops
794 * under max_rpc_in_flight
796 l_wait_event_exclusive(cli->cl_destroy_waitq,
797 osc_can_send_destroy(cli), &lwi);
801 /* Do not wait for response */
802 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
806 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
809 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
811 LASSERT(!(oa->o_valid & bits));
814 client_obd_list_lock(&cli->cl_loi_list_lock);
815 oa->o_dirty = cli->cl_dirty;
816 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
817 cli->cl_dirty_max)) {
818 CERROR("dirty %lu - %lu > dirty_max %lu\n",
819 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
821 } else if (unlikely(atomic_read(&obd_dirty_pages) -
822 atomic_read(&obd_dirty_transit_pages) >
823 (long)(obd_max_dirty_pages + 1))) {
824 /* The atomic_read() allowing the atomic_inc() are
825 * not covered by a lock thus they may safely race and trip
826 * this CERROR() unless we add in a small fudge factor (+1). */
827 CERROR("dirty %d - %d > system dirty_max %d\n",
828 atomic_read(&obd_dirty_pages),
829 atomic_read(&obd_dirty_transit_pages),
830 obd_max_dirty_pages);
832 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
833 CERROR("dirty %lu - dirty_max %lu too big???\n",
834 cli->cl_dirty, cli->cl_dirty_max);
837 long max_in_flight = (cli->cl_max_pages_per_rpc <<
839 (cli->cl_max_rpcs_in_flight + 1);
840 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
842 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
843 oa->o_dropped = cli->cl_lost_grant;
844 cli->cl_lost_grant = 0;
845 client_obd_list_unlock(&cli->cl_loi_list_lock);
846 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
847 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
851 void osc_update_next_shrink(struct client_obd *cli)
853 cli->cl_next_shrink_grant =
854 cfs_time_shift(cli->cl_grant_shrink_interval);
855 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
856 cli->cl_next_shrink_grant);
859 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
861 client_obd_list_lock(&cli->cl_loi_list_lock);
862 cli->cl_avail_grant += grant;
863 client_obd_list_unlock(&cli->cl_loi_list_lock);
866 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
868 if (body->oa.o_valid & OBD_MD_FLGRANT) {
869 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
870 __osc_update_grant(cli, body->oa.o_grant);
874 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
875 obd_count keylen, void *key, obd_count vallen,
876 void *val, struct ptlrpc_request_set *set);
878 static int osc_shrink_grant_interpret(const struct lu_env *env,
879 struct ptlrpc_request *req,
882 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
883 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
884 struct ost_body *body;
887 __osc_update_grant(cli, oa->o_grant);
891 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
893 osc_update_grant(cli, body);
899 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
901 client_obd_list_lock(&cli->cl_loi_list_lock);
902 oa->o_grant = cli->cl_avail_grant / 4;
903 cli->cl_avail_grant -= oa->o_grant;
904 client_obd_list_unlock(&cli->cl_loi_list_lock);
905 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
906 oa->o_valid |= OBD_MD_FLFLAGS;
909 oa->o_flags |= OBD_FL_SHRINK_GRANT;
910 osc_update_next_shrink(cli);
913 /* Shrink the current grant, either from some large amount to enough for a
914 * full set of in-flight RPCs, or if we have already shrunk to that limit
915 * then to enough for a single RPC. This avoids keeping more grant than
916 * needed, and avoids shrinking the grant piecemeal. */
917 static int osc_shrink_grant(struct client_obd *cli)
919 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
920 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
922 client_obd_list_lock(&cli->cl_loi_list_lock);
923 if (cli->cl_avail_grant <= target_bytes)
924 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
925 client_obd_list_unlock(&cli->cl_loi_list_lock);
927 return osc_shrink_grant_to_target(cli, target_bytes);
930 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
933 struct ost_body *body;
935 client_obd_list_lock(&cli->cl_loi_list_lock);
936 /* Don't shrink if we are already above or below the desired limit
937 * We don't want to shrink below a single RPC, as that will negatively
938 * impact block allocation and long-term performance. */
939 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
940 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942 if (target_bytes >= cli->cl_avail_grant) {
943 client_obd_list_unlock(&cli->cl_loi_list_lock);
946 client_obd_list_unlock(&cli->cl_loi_list_lock);
952 osc_announce_cached(cli, &body->oa, 0);
954 client_obd_list_lock(&cli->cl_loi_list_lock);
955 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
956 cli->cl_avail_grant = target_bytes;
957 client_obd_list_unlock(&cli->cl_loi_list_lock);
958 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
959 body->oa.o_valid |= OBD_MD_FLFLAGS;
960 body->oa.o_flags = 0;
962 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
963 osc_update_next_shrink(cli);
965 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
966 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
967 sizeof(*body), body, NULL);
969 __osc_update_grant(cli, body->oa.o_grant);
974 static int osc_should_shrink_grant(struct client_obd *client)
976 cfs_time_t time = cfs_time_current();
977 cfs_time_t next_shrink = client->cl_next_shrink_grant;
979 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
980 OBD_CONNECT_GRANT_SHRINK) == 0)
983 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
984 /* Get the current RPC size directly, instead of going via:
985 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
986 * Keep comment here so that it can be found by searching. */
987 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
989 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
990 client->cl_avail_grant > brw_size)
993 osc_update_next_shrink(client);
998 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1000 struct client_obd *client;
1002 list_for_each_entry(client, &item->ti_obd_list,
1003 cl_grant_shrink_list) {
1004 if (osc_should_shrink_grant(client))
1005 osc_shrink_grant(client);
1010 static int osc_add_shrink_grant(struct client_obd *client)
1014 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1016 osc_grant_shrink_grant_cb, NULL,
1017 &client->cl_grant_shrink_list);
1019 CERROR("add grant client %s error %d\n",
1020 client->cl_import->imp_obd->obd_name, rc);
1023 CDEBUG(D_CACHE, "add grant client %s \n",
1024 client->cl_import->imp_obd->obd_name);
1025 osc_update_next_shrink(client);
1029 static int osc_del_shrink_grant(struct client_obd *client)
1031 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1035 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1038 * ocd_grant is the total grant amount we're expect to hold: if we've
1039 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1040 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1042 * race is tolerable here: if we're evicted, but imp_state already
1043 * left EVICTED state, then cl_dirty must be 0 already.
1045 client_obd_list_lock(&cli->cl_loi_list_lock);
1046 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1047 cli->cl_avail_grant = ocd->ocd_grant;
1049 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1051 if (cli->cl_avail_grant < 0) {
1052 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1053 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1054 ocd->ocd_grant, cli->cl_dirty);
1055 /* workaround for servers which do not have the patch from
1057 cli->cl_avail_grant = ocd->ocd_grant;
1060 /* determine the appropriate chunk size used by osc_extent. */
1061 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1062 client_obd_list_unlock(&cli->cl_loi_list_lock);
1064 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1065 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1066 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1068 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1069 list_empty(&cli->cl_grant_shrink_list))
1070 osc_add_shrink_grant(cli);
1073 /* We assume that the reason this OSC got a short read is because it read
1074 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1075 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1076 * this stripe never got written at or beyond this stripe offset yet. */
1077 static void handle_short_read(int nob_read, obd_count page_count,
1078 struct brw_page **pga)
1083 /* skip bytes read OK */
1084 while (nob_read > 0) {
1085 LASSERT (page_count > 0);
1087 if (pga[i]->count > nob_read) {
1088 /* EOF inside this page */
1089 ptr = kmap(pga[i]->pg) +
1090 (pga[i]->off & ~CFS_PAGE_MASK);
1091 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1098 nob_read -= pga[i]->count;
1103 /* zero remaining pages */
1104 while (page_count-- > 0) {
1105 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1106 memset(ptr, 0, pga[i]->count);
1112 static int check_write_rcs(struct ptlrpc_request *req,
1113 int requested_nob, int niocount,
1114 obd_count page_count, struct brw_page **pga)
1119 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1120 sizeof(*remote_rcs) *
1122 if (remote_rcs == NULL) {
1123 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1127 /* return error if any niobuf was in error */
1128 for (i = 0; i < niocount; i++) {
1129 if ((int)remote_rcs[i] < 0)
1130 return(remote_rcs[i]);
1132 if (remote_rcs[i] != 0) {
1133 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1134 i, remote_rcs[i], req);
1139 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1140 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1141 req->rq_bulk->bd_nob_transferred, requested_nob);
1148 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1150 if (p1->flag != p2->flag) {
1151 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1152 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1154 /* warn if we try to combine flags that we don't know to be
1155 * safe to combine */
1156 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1157 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1158 "report this at http://bugs.whamcloud.com/\n",
1159 p1->flag, p2->flag);
1164 return (p1->off + p1->count == p2->off);
1167 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1168 struct brw_page **pga, int opc,
1169 cksum_type_t cksum_type)
1173 struct cfs_crypto_hash_desc *hdesc;
1174 unsigned int bufsize;
1176 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1178 LASSERT(pg_count > 0);
1180 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1181 if (IS_ERR(hdesc)) {
1182 CERROR("Unable to initialize checksum hash %s\n",
1183 cfs_crypto_hash_name(cfs_alg));
1184 return PTR_ERR(hdesc);
1187 while (nob > 0 && pg_count > 0) {
1188 int count = pga[i]->count > nob ? nob : pga[i]->count;
1190 /* corrupt the data before we compute the checksum, to
1191 * simulate an OST->client data error */
1192 if (i == 0 && opc == OST_READ &&
1193 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1194 unsigned char *ptr = kmap(pga[i]->pg);
1195 int off = pga[i]->off & ~CFS_PAGE_MASK;
1196 memcpy(ptr + off, "bad1", min(4, nob));
1199 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1200 pga[i]->off & ~CFS_PAGE_MASK,
1202 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1203 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1205 nob -= pga[i]->count;
1211 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1214 cfs_crypto_hash_final(hdesc, NULL, NULL);
1216 /* For sending we only compute the wrong checksum instead
1217 * of corrupting the data so it is still correct on a redo */
1218 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1224 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1225 struct lov_stripe_md *lsm, obd_count page_count,
1226 struct brw_page **pga,
1227 struct ptlrpc_request **reqp,
1228 struct obd_capa *ocapa, int reserve,
1231 struct ptlrpc_request *req;
1232 struct ptlrpc_bulk_desc *desc;
1233 struct ost_body *body;
1234 struct obd_ioobj *ioobj;
1235 struct niobuf_remote *niobuf;
1236 int niocount, i, requested_nob, opc, rc;
1237 struct osc_brw_async_args *aa;
1238 struct req_capsule *pill;
1239 struct brw_page *pg_prev;
1241 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1242 RETURN(-ENOMEM); /* Recoverable */
1243 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1244 RETURN(-EINVAL); /* Fatal */
1246 if ((cmd & OBD_BRW_WRITE) != 0) {
1248 req = ptlrpc_request_alloc_pool(cli->cl_import,
1249 cli->cl_import->imp_rq_pool,
1250 &RQF_OST_BRW_WRITE);
1253 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1258 for (niocount = i = 1; i < page_count; i++) {
1259 if (!can_merge_pages(pga[i - 1], pga[i]))
1263 pill = &req->rq_pill;
1264 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1266 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1267 niocount * sizeof(*niobuf));
1268 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1270 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1272 ptlrpc_request_free(req);
1275 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1276 ptlrpc_at_set_req_timeout(req);
1277 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1279 req->rq_no_retry_einprogress = 1;
1281 desc = ptlrpc_prep_bulk_imp(req, page_count,
1282 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1283 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1287 GOTO(out, rc = -ENOMEM);
1288 /* NB request now owns desc and will free it when it gets freed */
1290 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1291 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1292 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1293 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1295 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1297 obdo_to_ioobj(oa, ioobj);
1298 ioobj->ioo_bufcnt = niocount;
1299 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1300 * that might be send for this request. The actual number is decided
1301 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1302 * "max - 1" for old client compatibility sending "0", and also so the
1303 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1304 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1305 osc_pack_capa(req, body, ocapa);
1306 LASSERT(page_count > 0);
1308 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1309 struct brw_page *pg = pga[i];
1310 int poff = pg->off & ~CFS_PAGE_MASK;
1312 LASSERT(pg->count > 0);
1313 /* make sure there is no gap in the middle of page array */
1314 LASSERTF(page_count == 1 ||
1315 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1316 ergo(i > 0 && i < page_count - 1,
1317 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1318 ergo(i == page_count - 1, poff == 0)),
1319 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1320 i, page_count, pg, pg->off, pg->count);
1321 LASSERTF(i == 0 || pg->off > pg_prev->off,
1322 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1323 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1325 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1326 pg_prev->pg, page_private(pg_prev->pg),
1327 pg_prev->pg->index, pg_prev->off);
1328 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1329 (pg->flag & OBD_BRW_SRVLOCK));
1331 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1332 requested_nob += pg->count;
1334 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1336 niobuf->len += pg->count;
1338 niobuf->offset = pg->off;
1339 niobuf->len = pg->count;
1340 niobuf->flags = pg->flag;
1345 LASSERTF((void *)(niobuf - niocount) ==
1346 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1347 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1348 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1350 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1352 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1353 body->oa.o_valid |= OBD_MD_FLFLAGS;
1354 body->oa.o_flags = 0;
1356 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1359 if (osc_should_shrink_grant(cli))
1360 osc_shrink_grant_local(cli, &body->oa);
1362 /* size[REQ_REC_OFF] still sizeof (*body) */
1363 if (opc == OST_WRITE) {
1364 if (cli->cl_checksum &&
1365 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1366 /* store cl_cksum_type in a local variable since
1367 * it can be changed via lprocfs */
1368 cksum_type_t cksum_type = cli->cl_cksum_type;
1370 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1371 oa->o_flags &= OBD_FL_LOCAL_MASK;
1372 body->oa.o_flags = 0;
1374 body->oa.o_flags |= cksum_type_pack(cksum_type);
1375 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1376 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1380 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1382 /* save this in 'oa', too, for later checking */
1383 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1384 oa->o_flags |= cksum_type_pack(cksum_type);
1386 /* clear out the checksum flag, in case this is a
1387 * resend but cl_checksum is no longer set. b=11238 */
1388 oa->o_valid &= ~OBD_MD_FLCKSUM;
1390 oa->o_cksum = body->oa.o_cksum;
1391 /* 1 RC per niobuf */
1392 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1393 sizeof(__u32) * niocount);
1395 if (cli->cl_checksum &&
1396 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1397 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1398 body->oa.o_flags = 0;
1399 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1400 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403 ptlrpc_request_set_replen(req);
1405 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1406 aa = ptlrpc_req_async_args(req);
1408 aa->aa_requested_nob = requested_nob;
1409 aa->aa_nio_count = niocount;
1410 aa->aa_page_count = page_count;
1414 INIT_LIST_HEAD(&aa->aa_oaps);
1415 if (ocapa && reserve)
1416 aa->aa_ocapa = capa_get(ocapa);
1422 ptlrpc_req_finished(req);
1426 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1427 __u32 client_cksum, __u32 server_cksum, int nob,
1428 obd_count page_count, struct brw_page **pga,
1429 cksum_type_t client_cksum_type)
1433 cksum_type_t cksum_type;
1435 if (server_cksum == client_cksum) {
1436 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1440 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1442 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1445 if (cksum_type != client_cksum_type)
1446 msg = "the server did not use the checksum type specified in "
1447 "the original request - likely a protocol problem";
1448 else if (new_cksum == server_cksum)
1449 msg = "changed on the client after we checksummed it - "
1450 "likely false positive due to mmap IO (bug 11742)";
1451 else if (new_cksum == client_cksum)
1452 msg = "changed in transit before arrival at OST";
1454 msg = "changed in transit AND doesn't match the original - "
1455 "likely false positive due to mmap IO (bug 11742)";
1457 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1458 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1459 msg, libcfs_nid2str(peer->nid),
1460 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1461 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1462 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1463 POSTID(&oa->o_oi), pga[0]->off,
1464 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1465 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1466 "client csum now %x\n", client_cksum, client_cksum_type,
1467 server_cksum, cksum_type, new_cksum);
1471 /* Note rc enters this function as number of bytes transferred */
1472 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1474 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1475 const lnet_process_id_t *peer =
1476 &req->rq_import->imp_connection->c_peer;
1477 struct client_obd *cli = aa->aa_cli;
1478 struct ost_body *body;
1479 __u32 client_cksum = 0;
1481 if (rc < 0 && rc != -EDQUOT) {
1482 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1486 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1487 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1489 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1493 /* set/clear over quota flag for a uid/gid */
1494 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1495 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1496 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1498 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1499 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1501 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1504 osc_update_grant(cli, body);
1509 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1510 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1512 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1514 CERROR("Unexpected +ve rc %d\n", rc);
1517 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1519 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1522 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1523 check_write_checksum(&body->oa, peer, client_cksum,
1524 body->oa.o_cksum, aa->aa_requested_nob,
1525 aa->aa_page_count, aa->aa_ppga,
1526 cksum_type_unpack(aa->aa_oa->o_flags)))
1529 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1530 aa->aa_page_count, aa->aa_ppga);
1534 /* The rest of this function executes only for OST_READs */
1536 /* if unwrap_bulk failed, return -EAGAIN to retry */
1537 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1539 GOTO(out, rc = -EAGAIN);
1541 if (rc > aa->aa_requested_nob) {
1542 CERROR("Unexpected rc %d (%d requested)\n", rc,
1543 aa->aa_requested_nob);
1547 if (rc != req->rq_bulk->bd_nob_transferred) {
1548 CERROR ("Unexpected rc %d (%d transferred)\n",
1549 rc, req->rq_bulk->bd_nob_transferred);
1553 if (rc < aa->aa_requested_nob)
1554 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1556 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1557 static int cksum_counter;
1558 __u32 server_cksum = body->oa.o_cksum;
1561 cksum_type_t cksum_type;
1563 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1564 body->oa.o_flags : 0);
1565 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1566 aa->aa_ppga, OST_READ,
1569 if (peer->nid == req->rq_bulk->bd_sender) {
1573 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1576 if (server_cksum == ~0 && rc > 0) {
1577 CERROR("Protocol error: server %s set the 'checksum' "
1578 "bit, but didn't send a checksum. Not fatal, "
1579 "but please notify on http://bugs.whamcloud.com/\n",
1580 libcfs_nid2str(peer->nid));
1581 } else if (server_cksum != client_cksum) {
1582 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1583 "%s%s%s inode "DFID" object "DOSTID
1584 " extent ["LPU64"-"LPU64"]\n",
1585 req->rq_import->imp_obd->obd_name,
1586 libcfs_nid2str(peer->nid),
1588 body->oa.o_valid & OBD_MD_FLFID ?
1589 body->oa.o_parent_seq : (__u64)0,
1590 body->oa.o_valid & OBD_MD_FLFID ?
1591 body->oa.o_parent_oid : 0,
1592 body->oa.o_valid & OBD_MD_FLFID ?
1593 body->oa.o_parent_ver : 0,
1594 POSTID(&body->oa.o_oi),
1595 aa->aa_ppga[0]->off,
1596 aa->aa_ppga[aa->aa_page_count-1]->off +
1597 aa->aa_ppga[aa->aa_page_count-1]->count -
1599 CERROR("client %x, server %x, cksum_type %x\n",
1600 client_cksum, server_cksum, cksum_type);
1602 aa->aa_oa->o_cksum = client_cksum;
1606 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1609 } else if (unlikely(client_cksum)) {
1610 static int cksum_missed;
1613 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1614 CERROR("Checksum %u requested from %s but not sent\n",
1615 cksum_missed, libcfs_nid2str(peer->nid));
1621 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1622 aa->aa_oa, &body->oa);
1627 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1628 struct lov_stripe_md *lsm,
1629 obd_count page_count, struct brw_page **pga,
1630 struct obd_capa *ocapa)
1632 struct ptlrpc_request *req;
1634 wait_queue_head_t waitq;
1635 int generation, resends = 0;
1636 struct l_wait_info lwi;
1638 init_waitqueue_head(&waitq);
1639 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1642 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1643 page_count, pga, &req, ocapa, 0, resends);
1648 req->rq_generation_set = 1;
1649 req->rq_import_generation = generation;
1650 req->rq_sent = cfs_time_current_sec() + resends;
1653 rc = ptlrpc_queue_wait(req);
1655 if (rc == -ETIMEDOUT && req->rq_resend) {
1656 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1657 ptlrpc_req_finished(req);
1661 rc = osc_brw_fini_request(req, rc);
1663 ptlrpc_req_finished(req);
1664 /* When server return -EINPROGRESS, client should always retry
1665 * regardless of the number of times the bulk was resent already.*/
1666 if (osc_recoverable_error(rc)) {
1668 if (rc != -EINPROGRESS &&
1669 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1670 CERROR("%s: too many resend retries for object: "
1671 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1672 POSTID(&oa->o_oi), rc);
1676 exp->exp_obd->u.cli.cl_import->imp_generation) {
1677 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1678 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1679 POSTID(&oa->o_oi), rc);
1683 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1685 l_wait_event(waitq, 0, &lwi);
1690 if (rc == -EAGAIN || rc == -EINPROGRESS)
1695 static int osc_brw_redo_request(struct ptlrpc_request *request,
1696 struct osc_brw_async_args *aa, int rc)
1698 struct ptlrpc_request *new_req;
1699 struct osc_brw_async_args *new_aa;
1700 struct osc_async_page *oap;
1702 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1703 "redo for recoverable error %d", rc);
1705 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1706 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1707 aa->aa_cli, aa->aa_oa,
1708 NULL /* lsm unused by osc currently */,
1709 aa->aa_page_count, aa->aa_ppga,
1710 &new_req, aa->aa_ocapa, 0, 1);
1714 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1715 if (oap->oap_request != NULL) {
1716 LASSERTF(request == oap->oap_request,
1717 "request %p != oap_request %p\n",
1718 request, oap->oap_request);
1719 if (oap->oap_interrupted) {
1720 ptlrpc_req_finished(new_req);
1725 /* New request takes over pga and oaps from old request.
1726 * Note that copying a list_head doesn't work, need to move it... */
1728 new_req->rq_interpret_reply = request->rq_interpret_reply;
1729 new_req->rq_async_args = request->rq_async_args;
1730 /* cap resend delay to the current request timeout, this is similar to
1731 * what ptlrpc does (see after_reply()) */
1732 if (aa->aa_resends > new_req->rq_timeout)
1733 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1735 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1736 new_req->rq_generation_set = 1;
1737 new_req->rq_import_generation = request->rq_import_generation;
1739 new_aa = ptlrpc_req_async_args(new_req);
1741 INIT_LIST_HEAD(&new_aa->aa_oaps);
1742 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1743 INIT_LIST_HEAD(&new_aa->aa_exts);
1744 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1745 new_aa->aa_resends = aa->aa_resends;
1747 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1748 if (oap->oap_request) {
1749 ptlrpc_req_finished(oap->oap_request);
1750 oap->oap_request = ptlrpc_request_addref(new_req);
1754 new_aa->aa_ocapa = aa->aa_ocapa;
1755 aa->aa_ocapa = NULL;
1757 /* XXX: This code will run into problem if we're going to support
1758 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1759 * and wait for all of them to be finished. We should inherit request
1760 * set from old request. */
1761 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1763 DEBUG_REQ(D_INFO, new_req, "new request");
1768 * ugh, we want disk allocation on the target to happen in offset order. we'll
1769 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1770 * fine for our small page arrays and doesn't require allocation. its an
1771 * insertion sort that swaps elements that are strides apart, shrinking the
1772 * stride down until its '1' and the array is sorted.
1774 static void sort_brw_pages(struct brw_page **array, int num)
1777 struct brw_page *tmp;
1781 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1786 for (i = stride ; i < num ; i++) {
1789 while (j >= stride && array[j - stride]->off > tmp->off) {
1790 array[j] = array[j - stride];
1795 } while (stride > 1);
1798 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1804 LASSERT (pages > 0);
1805 offset = pg[i]->off & ~CFS_PAGE_MASK;
1809 if (pages == 0) /* that's all */
1812 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1813 return count; /* doesn't end on page boundary */
1816 offset = pg[i]->off & ~CFS_PAGE_MASK;
1817 if (offset != 0) /* doesn't start on page boundary */
1824 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1826 struct brw_page **ppga;
1829 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1833 for (i = 0; i < count; i++)
1838 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1840 LASSERT(ppga != NULL);
1841 OBD_FREE(ppga, sizeof(*ppga) * count);
1844 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1845 obd_count page_count, struct brw_page *pga,
1846 struct obd_trans_info *oti)
1848 struct obdo *saved_oa = NULL;
1849 struct brw_page **ppga, **orig;
1850 struct obd_import *imp = class_exp2cliimp(exp);
1851 struct client_obd *cli;
1852 int rc, page_count_orig;
1854 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1855 cli = &imp->imp_obd->u.cli;
1857 if (cmd & OBD_BRW_CHECK) {
1858 /* The caller just wants to know if there's a chance that this
1859 * I/O can succeed */
1861 if (imp->imp_invalid)
1866 /* test_brw with a failed create can trip this, maybe others. */
1867 LASSERT(cli->cl_max_pages_per_rpc);
1871 orig = ppga = osc_build_ppga(pga, page_count);
1874 page_count_orig = page_count;
1876 sort_brw_pages(ppga, page_count);
1877 while (page_count) {
1878 obd_count pages_per_brw;
1880 if (page_count > cli->cl_max_pages_per_rpc)
1881 pages_per_brw = cli->cl_max_pages_per_rpc;
1883 pages_per_brw = page_count;
1885 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1887 if (saved_oa != NULL) {
1888 /* restore previously saved oa */
1889 *oinfo->oi_oa = *saved_oa;
1890 } else if (page_count > pages_per_brw) {
1891 /* save a copy of oa (brw will clobber it) */
1892 OBDO_ALLOC(saved_oa);
1893 if (saved_oa == NULL)
1894 GOTO(out, rc = -ENOMEM);
1895 *saved_oa = *oinfo->oi_oa;
1898 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1899 pages_per_brw, ppga, oinfo->oi_capa);
1904 page_count -= pages_per_brw;
1905 ppga += pages_per_brw;
1909 osc_release_ppga(orig, page_count_orig);
1911 if (saved_oa != NULL)
1912 OBDO_FREE(saved_oa);
1917 static int brw_interpret(const struct lu_env *env,
1918 struct ptlrpc_request *req, void *data, int rc)
1920 struct osc_brw_async_args *aa = data;
1921 struct osc_extent *ext;
1922 struct osc_extent *tmp;
1923 struct cl_object *obj = NULL;
1924 struct client_obd *cli = aa->aa_cli;
1926 rc = osc_brw_fini_request(req, rc);
1927 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1928 /* When server return -EINPROGRESS, client should always retry
1929 * regardless of the number of times the bulk was resent already. */
1930 if (osc_recoverable_error(rc)) {
1931 if (req->rq_import_generation !=
1932 req->rq_import->imp_generation) {
1933 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1934 ""DOSTID", rc = %d.\n",
1935 req->rq_import->imp_obd->obd_name,
1936 POSTID(&aa->aa_oa->o_oi), rc);
1937 } else if (rc == -EINPROGRESS ||
1938 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1939 rc = osc_brw_redo_request(req, aa, rc);
1941 CERROR("%s: too many resent retries for object: "
1942 ""LPU64":"LPU64", rc = %d.\n",
1943 req->rq_import->imp_obd->obd_name,
1944 POSTID(&aa->aa_oa->o_oi), rc);
1949 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1954 capa_put(aa->aa_ocapa);
1955 aa->aa_ocapa = NULL;
1958 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1959 if (obj == NULL && rc == 0) {
1960 obj = osc2cl(ext->oe_obj);
1964 list_del_init(&ext->oe_link);
1965 osc_extent_finish(env, ext, 1, rc);
1967 LASSERT(list_empty(&aa->aa_exts));
1968 LASSERT(list_empty(&aa->aa_oaps));
1971 struct obdo *oa = aa->aa_oa;
1972 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1973 unsigned long valid = 0;
1976 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1977 attr->cat_blocks = oa->o_blocks;
1978 valid |= CAT_BLOCKS;
1980 if (oa->o_valid & OBD_MD_FLMTIME) {
1981 attr->cat_mtime = oa->o_mtime;
1984 if (oa->o_valid & OBD_MD_FLATIME) {
1985 attr->cat_atime = oa->o_atime;
1988 if (oa->o_valid & OBD_MD_FLCTIME) {
1989 attr->cat_ctime = oa->o_ctime;
1993 cl_object_attr_lock(obj);
1994 cl_object_attr_set(env, obj, attr, valid);
1995 cl_object_attr_unlock(obj);
1997 cl_object_put(env, obj);
1999 OBDO_FREE(aa->aa_oa);
2001 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2002 req->rq_bulk->bd_nob_transferred);
2003 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2004 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2006 client_obd_list_lock(&cli->cl_loi_list_lock);
2007 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2008 * is called so we know whether to go to sync BRWs or wait for more
2009 * RPCs to complete */
2010 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2011 cli->cl_w_in_flight--;
2013 cli->cl_r_in_flight--;
2014 osc_wake_cache_waiters(cli);
2015 client_obd_list_unlock(&cli->cl_loi_list_lock);
2017 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2022 * Build an RPC by the list of extent @ext_list. The caller must ensure
2023 * that the total pages in this list are NOT over max pages per RPC.
2024 * Extents in the list must be in OES_RPC state.
2026 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2027 struct list_head *ext_list, int cmd, pdl_policy_t pol)
2029 struct ptlrpc_request *req = NULL;
2030 struct osc_extent *ext;
2031 struct brw_page **pga = NULL;
2032 struct osc_brw_async_args *aa = NULL;
2033 struct obdo *oa = NULL;
2034 struct osc_async_page *oap;
2035 struct osc_async_page *tmp;
2036 struct cl_req *clerq = NULL;
2037 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2039 struct ldlm_lock *lock = NULL;
2040 struct cl_req_attr *crattr = NULL;
2041 obd_off starting_offset = OBD_OBJECT_EOF;
2042 obd_off ending_offset = 0;
2048 LIST_HEAD(rpc_list);
2050 LASSERT(!list_empty(ext_list));
2052 /* add pages into rpc_list to build BRW rpc */
2053 list_for_each_entry(ext, ext_list, oe_link) {
2054 LASSERT(ext->oe_state == OES_RPC);
2055 mem_tight |= ext->oe_memalloc;
2056 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2058 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2059 if (starting_offset > oap->oap_obj_off)
2060 starting_offset = oap->oap_obj_off;
2062 LASSERT(oap->oap_page_off == 0);
2063 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2064 ending_offset = oap->oap_obj_off +
2067 LASSERT(oap->oap_page_off + oap->oap_count ==
2073 mpflag = cfs_memory_pressure_get_and_set();
2075 OBD_ALLOC(crattr, sizeof(*crattr));
2077 GOTO(out, rc = -ENOMEM);
2079 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2081 GOTO(out, rc = -ENOMEM);
2085 GOTO(out, rc = -ENOMEM);
2088 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2089 struct cl_page *page = oap2cl_page(oap);
2090 if (clerq == NULL) {
2091 clerq = cl_req_alloc(env, page, crt,
2092 1 /* only 1-object rpcs for now */);
2094 GOTO(out, rc = PTR_ERR(clerq));
2095 lock = oap->oap_ldlm_lock;
2098 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2099 pga[i] = &oap->oap_brw_page;
2100 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2101 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2102 pga[i]->pg, page_index(oap->oap_page), oap,
2105 cl_req_page_add(env, clerq, page);
2108 /* always get the data for the obdo for the rpc */
2109 LASSERT(clerq != NULL);
2110 crattr->cra_oa = oa;
2111 cl_req_attr_set(env, clerq, crattr, ~0ULL);
2113 oa->o_handle = lock->l_remote_handle;
2114 oa->o_valid |= OBD_MD_FLHANDLE;
2117 rc = cl_req_prep(env, clerq);
2119 CERROR("cl_req_prep failed: %d\n", rc);
2123 sort_brw_pages(pga, page_count);
2124 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2125 pga, &req, crattr->cra_capa, 1, 0);
2127 CERROR("prep_req failed: %d\n", rc);
2131 req->rq_interpret_reply = brw_interpret;
2134 req->rq_memalloc = 1;
2136 /* Need to update the timestamps after the request is built in case
2137 * we race with setattr (locally or in queue at OST). If OST gets
2138 * later setattr before earlier BRW (as determined by the request xid),
2139 * the OST will not use BRW timestamps. Sadly, there is no obvious
2140 * way to do this in a single call. bug 10150 */
2141 cl_req_attr_set(env, clerq, crattr,
2142 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2144 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2146 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2147 aa = ptlrpc_req_async_args(req);
2148 INIT_LIST_HEAD(&aa->aa_oaps);
2149 list_splice_init(&rpc_list, &aa->aa_oaps);
2150 INIT_LIST_HEAD(&aa->aa_exts);
2151 list_splice_init(ext_list, &aa->aa_exts);
2152 aa->aa_clerq = clerq;
2154 /* queued sync pages can be torn down while the pages
2155 * were between the pending list and the rpc */
2157 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2158 /* only one oap gets a request reference */
2161 if (oap->oap_interrupted && !req->rq_intr) {
2162 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2164 ptlrpc_mark_interrupted(req);
2168 tmp->oap_request = ptlrpc_request_addref(req);
2170 client_obd_list_lock(&cli->cl_loi_list_lock);
2171 starting_offset >>= PAGE_CACHE_SHIFT;
2172 if (cmd == OBD_BRW_READ) {
2173 cli->cl_r_in_flight++;
2174 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2175 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2176 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2177 starting_offset + 1);
2179 cli->cl_w_in_flight++;
2180 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2181 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2182 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2183 starting_offset + 1);
2185 client_obd_list_unlock(&cli->cl_loi_list_lock);
2187 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2188 page_count, aa, cli->cl_r_in_flight,
2189 cli->cl_w_in_flight);
2191 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2192 * see which CPU/NUMA node the majority of pages were allocated
2193 * on, and try to assign the async RPC to the CPU core
2194 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2196 * But on the other hand, we expect that multiple ptlrpcd
2197 * threads and the initial write sponsor can run in parallel,
2198 * especially when data checksum is enabled, which is CPU-bound
2199 * operation and single ptlrpcd thread cannot process in time.
2200 * So more ptlrpcd threads sharing BRW load
2201 * (with PDL_POLICY_ROUND) seems better.
2203 ptlrpcd_add_req(req, pol, -1);
2209 cfs_memory_pressure_restore(mpflag);
2211 if (crattr != NULL) {
2212 capa_put(crattr->cra_capa);
2213 OBD_FREE(crattr, sizeof(*crattr));
2217 LASSERT(req == NULL);
2222 OBD_FREE(pga, sizeof(*pga) * page_count);
2223 /* this should happen rarely and is pretty bad, it makes the
2224 * pending list not follow the dirty order */
2225 while (!list_empty(ext_list)) {
2226 ext = list_entry(ext_list->next, struct osc_extent,
2228 list_del_init(&ext->oe_link);
2229 osc_extent_finish(env, ext, 0, rc);
2231 if (clerq && !IS_ERR(clerq))
2232 cl_req_completion(env, clerq, rc);
2237 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2238 struct ldlm_enqueue_info *einfo)
2240 void *data = einfo->ei_cbdata;
2243 LASSERT(lock != NULL);
2244 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2245 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2246 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2247 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2249 lock_res_and_lock(lock);
2250 spin_lock(&osc_ast_guard);
2252 if (lock->l_ast_data == NULL)
2253 lock->l_ast_data = data;
2254 if (lock->l_ast_data == data)
2257 spin_unlock(&osc_ast_guard);
2258 unlock_res_and_lock(lock);
2263 static int osc_set_data_with_check(struct lustre_handle *lockh,
2264 struct ldlm_enqueue_info *einfo)
2266 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2270 set = osc_set_lock_data_with_check(lock, einfo);
2271 LDLM_LOCK_PUT(lock);
2273 CERROR("lockh %p, data %p - client evicted?\n",
2274 lockh, einfo->ei_cbdata);
2278 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2279 ldlm_iterator_t replace, void *data)
2281 struct ldlm_res_id res_id;
2282 struct obd_device *obd = class_exp2obd(exp);
2284 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2285 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2289 /* find any ldlm lock of the inode in osc
2293 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2294 ldlm_iterator_t replace, void *data)
2296 struct ldlm_res_id res_id;
2297 struct obd_device *obd = class_exp2obd(exp);
2300 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2301 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2302 if (rc == LDLM_ITER_STOP)
2304 if (rc == LDLM_ITER_CONTINUE)
2309 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2310 obd_enqueue_update_f upcall, void *cookie,
2311 __u64 *flags, int agl, int rc)
2313 int intent = *flags & LDLM_FL_HAS_INTENT;
2316 /* The request was created before ldlm_cli_enqueue call. */
2317 if (rc == ELDLM_LOCK_ABORTED) {
2318 struct ldlm_reply *rep;
2319 rep = req_capsule_server_get(&req->rq_pill,
2322 LASSERT(rep != NULL);
2323 rep->lock_policy_res1 =
2324 ptlrpc_status_ntoh(rep->lock_policy_res1);
2325 if (rep->lock_policy_res1)
2326 rc = rep->lock_policy_res1;
2330 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2332 *flags |= LDLM_FL_LVB_READY;
2333 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2334 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2337 /* Call the update callback. */
2338 rc = (*upcall)(cookie, rc);
2342 static int osc_enqueue_interpret(const struct lu_env *env,
2343 struct ptlrpc_request *req,
2344 struct osc_enqueue_args *aa, int rc)
2346 struct ldlm_lock *lock;
2347 struct lustre_handle handle;
2349 struct ost_lvb *lvb;
2351 __u64 *flags = aa->oa_flags;
2353 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2354 * might be freed anytime after lock upcall has been called. */
2355 lustre_handle_copy(&handle, aa->oa_lockh);
2356 mode = aa->oa_ei->ei_mode;
2358 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2360 lock = ldlm_handle2lock(&handle);
2362 /* Take an additional reference so that a blocking AST that
2363 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2364 * to arrive after an upcall has been executed by
2365 * osc_enqueue_fini(). */
2366 ldlm_lock_addref(&handle, mode);
2368 /* Let CP AST to grant the lock first. */
2369 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2371 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2376 lvb_len = sizeof(*aa->oa_lvb);
2379 /* Complete obtaining the lock procedure. */
2380 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2381 mode, flags, lvb, lvb_len, &handle, rc);
2382 /* Complete osc stuff. */
2383 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2384 flags, aa->oa_agl, rc);
2386 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2388 /* Release the lock for async request. */
2389 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2391 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2392 * not already released by
2393 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2395 ldlm_lock_decref(&handle, mode);
2397 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2398 aa->oa_lockh, req, aa);
2399 ldlm_lock_decref(&handle, mode);
2400 LDLM_LOCK_PUT(lock);
2404 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2405 struct lov_oinfo *loi, int flags,
2406 struct ost_lvb *lvb, __u32 mode, int rc)
2408 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2410 if (rc == ELDLM_OK) {
2413 LASSERT(lock != NULL);
2414 loi->loi_lvb = *lvb;
2415 tmp = loi->loi_lvb.lvb_size;
2416 /* Extend KMS up to the end of this lock and no further
2417 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2418 if (tmp > lock->l_policy_data.l_extent.end)
2419 tmp = lock->l_policy_data.l_extent.end + 1;
2420 if (tmp >= loi->loi_kms) {
2421 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2422 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2423 loi_kms_set(loi, tmp);
2425 LDLM_DEBUG(lock, "lock acquired, setting rss="
2426 LPU64"; leaving kms="LPU64", end="LPU64,
2427 loi->loi_lvb.lvb_size, loi->loi_kms,
2428 lock->l_policy_data.l_extent.end);
2430 ldlm_lock_allow_match(lock);
2431 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2432 LASSERT(lock != NULL);
2433 loi->loi_lvb = *lvb;
2434 ldlm_lock_allow_match(lock);
2435 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2436 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2442 ldlm_lock_fail_match(lock);
2444 LDLM_LOCK_PUT(lock);
2447 EXPORT_SYMBOL(osc_update_enqueue);
2449 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2451 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2452 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2453 * other synchronous requests, however keeping some locks and trying to obtain
2454 * others may take a considerable amount of time in a case of ost failure; and
2455 * when other sync requests do not get released lock from a client, the client
2456 * is excluded from the cluster -- such scenarious make the life difficult, so
2457 * release locks just after they are obtained. */
2458 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2459 __u64 *flags, ldlm_policy_data_t *policy,
2460 struct ost_lvb *lvb, int kms_valid,
2461 obd_enqueue_update_f upcall, void *cookie,
2462 struct ldlm_enqueue_info *einfo,
2463 struct lustre_handle *lockh,
2464 struct ptlrpc_request_set *rqset, int async, int agl)
2466 struct obd_device *obd = exp->exp_obd;
2467 struct ptlrpc_request *req = NULL;
2468 int intent = *flags & LDLM_FL_HAS_INTENT;
2469 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2473 /* Filesystem lock extents are extended to page boundaries so that
2474 * dealing with the page cache is a little smoother. */
2475 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2476 policy->l_extent.end |= ~CFS_PAGE_MASK;
2479 * kms is not valid when either object is completely fresh (so that no
2480 * locks are cached), or object was evicted. In the latter case cached
2481 * lock cannot be used, because it would prime inode state with
2482 * potentially stale LVB.
2487 /* Next, search for already existing extent locks that will cover us */
2488 /* If we're trying to read, we also search for an existing PW lock. The
2489 * VFS and page cache already protect us locally, so lots of readers/
2490 * writers can share a single PW lock.
2492 * There are problems with conversion deadlocks, so instead of
2493 * converting a read lock to a write lock, we'll just enqueue a new
2496 * At some point we should cancel the read lock instead of making them
2497 * send us a blocking callback, but there are problems with canceling
2498 * locks out from other users right now, too. */
2499 mode = einfo->ei_mode;
2500 if (einfo->ei_mode == LCK_PR)
2502 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2503 einfo->ei_type, policy, mode, lockh, 0);
2505 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2507 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2508 /* For AGL, if enqueue RPC is sent but the lock is not
2509 * granted, then skip to process this strpe.
2510 * Return -ECANCELED to tell the caller. */
2511 ldlm_lock_decref(lockh, mode);
2512 LDLM_LOCK_PUT(matched);
2514 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2515 *flags |= LDLM_FL_LVB_READY;
2516 /* addref the lock only if not async requests and PW
2517 * lock is matched whereas we asked for PR. */
2518 if (!rqset && einfo->ei_mode != mode)
2519 ldlm_lock_addref(lockh, LCK_PR);
2521 /* I would like to be able to ASSERT here that
2522 * rss <= kms, but I can't, for reasons which
2523 * are explained in lov_enqueue() */
2526 /* We already have a lock, and it's referenced.
2528 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2529 * AGL upcall may change it to CLS_HELD directly. */
2530 (*upcall)(cookie, ELDLM_OK);
2532 if (einfo->ei_mode != mode)
2533 ldlm_lock_decref(lockh, LCK_PW);
2535 /* For async requests, decref the lock. */
2536 ldlm_lock_decref(lockh, einfo->ei_mode);
2537 LDLM_LOCK_PUT(matched);
2540 ldlm_lock_decref(lockh, mode);
2541 LDLM_LOCK_PUT(matched);
2548 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2549 &RQF_LDLM_ENQUEUE_LVB);
2553 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2555 ptlrpc_request_free(req);
2559 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2561 ptlrpc_request_set_replen(req);
2564 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2565 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2567 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2568 sizeof(*lvb), LVB_T_OST, lockh, async);
2571 struct osc_enqueue_args *aa;
2572 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2573 aa = ptlrpc_req_async_args(req);
2576 aa->oa_flags = flags;
2577 aa->oa_upcall = upcall;
2578 aa->oa_cookie = cookie;
2580 aa->oa_lockh = lockh;
2583 req->rq_interpret_reply =
2584 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2585 if (rqset == PTLRPCD_SET)
2586 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2588 ptlrpc_set_add_req(rqset, req);
2589 } else if (intent) {
2590 ptlrpc_req_finished(req);
2595 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2597 ptlrpc_req_finished(req);
2602 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2603 struct ldlm_enqueue_info *einfo,
2604 struct ptlrpc_request_set *rqset)
2606 struct ldlm_res_id res_id;
2609 ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2610 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2611 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2612 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2613 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2614 rqset, rqset != NULL, 0);
2618 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2619 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2620 int *flags, void *data, struct lustre_handle *lockh,
2623 struct obd_device *obd = exp->exp_obd;
2624 int lflags = *flags;
2627 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2630 /* Filesystem lock extents are extended to page boundaries so that
2631 * dealing with the page cache is a little smoother */
2632 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2633 policy->l_extent.end |= ~CFS_PAGE_MASK;
2635 /* Next, search for already existing extent locks that will cover us */
2636 /* If we're trying to read, we also search for an existing PW lock. The
2637 * VFS and page cache already protect us locally, so lots of readers/
2638 * writers can share a single PW lock. */
2642 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2643 res_id, type, policy, rc, lockh, unref);
2646 if (!osc_set_data_with_check(lockh, data)) {
2647 if (!(lflags & LDLM_FL_TEST_LOCK))
2648 ldlm_lock_decref(lockh, rc);
2652 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2653 ldlm_lock_addref(lockh, LCK_PR);
2654 ldlm_lock_decref(lockh, LCK_PW);
2661 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2663 if (unlikely(mode == LCK_GROUP))
2664 ldlm_lock_decref_and_cancel(lockh, mode);
2666 ldlm_lock_decref(lockh, mode);
2671 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2672 __u32 mode, struct lustre_handle *lockh)
2674 RETURN(osc_cancel_base(lockh, mode));
2677 static int osc_cancel_unused(struct obd_export *exp,
2678 struct lov_stripe_md *lsm,
2679 ldlm_cancel_flags_t flags,
2682 struct obd_device *obd = class_exp2obd(exp);
2683 struct ldlm_res_id res_id, *resp = NULL;
2686 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2690 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2693 static int osc_statfs_interpret(const struct lu_env *env,
2694 struct ptlrpc_request *req,
2695 struct osc_async_args *aa, int rc)
2697 struct obd_statfs *msfs;
2700 /* The request has in fact never been sent
2701 * due to issues at a higher level (LOV).
2702 * Exit immediately since the caller is
2703 * aware of the problem and takes care
2704 * of the clean up */
2707 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2708 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2714 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2716 GOTO(out, rc = -EPROTO);
2719 *aa->aa_oi->oi_osfs = *msfs;
2721 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2725 static int osc_statfs_async(struct obd_export *exp,
2726 struct obd_info *oinfo, __u64 max_age,
2727 struct ptlrpc_request_set *rqset)
2729 struct obd_device *obd = class_exp2obd(exp);
2730 struct ptlrpc_request *req;
2731 struct osc_async_args *aa;
2734 /* We could possibly pass max_age in the request (as an absolute
2735 * timestamp or a "seconds.usec ago") so the target can avoid doing
2736 * extra calls into the filesystem if that isn't necessary (e.g.
2737 * during mount that would help a bit). Having relative timestamps
2738 * is not so great if request processing is slow, while absolute
2739 * timestamps are not ideal because they need time synchronization. */
2740 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2744 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2746 ptlrpc_request_free(req);
2749 ptlrpc_request_set_replen(req);
2750 req->rq_request_portal = OST_CREATE_PORTAL;
2751 ptlrpc_at_set_req_timeout(req);
2753 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2754 /* procfs requests not want stat in wait for avoid deadlock */
2755 req->rq_no_resend = 1;
2756 req->rq_no_delay = 1;
2759 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2760 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2761 aa = ptlrpc_req_async_args(req);
2764 ptlrpc_set_add_req(rqset, req);
2768 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2769 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2771 struct obd_device *obd = class_exp2obd(exp);
2772 struct obd_statfs *msfs;
2773 struct ptlrpc_request *req;
2774 struct obd_import *imp = NULL;
2777 /*Since the request might also come from lprocfs, so we need
2778 *sync this with client_disconnect_export Bug15684*/
2779 down_read(&obd->u.cli.cl_sem);
2780 if (obd->u.cli.cl_import)
2781 imp = class_import_get(obd->u.cli.cl_import);
2782 up_read(&obd->u.cli.cl_sem);
2786 /* We could possibly pass max_age in the request (as an absolute
2787 * timestamp or a "seconds.usec ago") so the target can avoid doing
2788 * extra calls into the filesystem if that isn't necessary (e.g.
2789 * during mount that would help a bit). Having relative timestamps
2790 * is not so great if request processing is slow, while absolute
2791 * timestamps are not ideal because they need time synchronization. */
2792 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2794 class_import_put(imp);
2799 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2801 ptlrpc_request_free(req);
2804 ptlrpc_request_set_replen(req);
2805 req->rq_request_portal = OST_CREATE_PORTAL;
2806 ptlrpc_at_set_req_timeout(req);
2808 if (flags & OBD_STATFS_NODELAY) {
2809 /* procfs requests not want stat in wait for avoid deadlock */
2810 req->rq_no_resend = 1;
2811 req->rq_no_delay = 1;
2814 rc = ptlrpc_queue_wait(req);
2818 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2820 GOTO(out, rc = -EPROTO);
2827 ptlrpc_req_finished(req);
2831 /* Retrieve object striping information.
2833 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2834 * the maximum number of OST indices which will fit in the user buffer.
2835 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2837 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2839 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2840 struct lov_user_md_v3 lum, *lumk;
2841 struct lov_user_ost_data_v1 *lmm_objects;
2842 int rc = 0, lum_size;
2847 /* we only need the header part from user space to get lmm_magic and
2848 * lmm_stripe_count, (the header part is common to v1 and v3) */
2849 lum_size = sizeof(struct lov_user_md_v1);
2850 if (copy_from_user(&lum, lump, lum_size))
2853 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2854 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2857 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2858 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2859 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2860 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2862 /* we can use lov_mds_md_size() to compute lum_size
2863 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2864 if (lum.lmm_stripe_count > 0) {
2865 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2866 OBD_ALLOC(lumk, lum_size);
2870 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2872 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2874 lmm_objects = &(lumk->lmm_objects[0]);
2875 lmm_objects->l_ost_oi = lsm->lsm_oi;
2877 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2881 lumk->lmm_oi = lsm->lsm_oi;
2882 lumk->lmm_stripe_count = 1;
2884 if (copy_to_user(lump, lumk, lum_size))
2888 OBD_FREE(lumk, lum_size);
2894 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2895 void *karg, void *uarg)
2897 struct obd_device *obd = exp->exp_obd;
2898 struct obd_ioctl_data *data = karg;
2901 if (!try_module_get(THIS_MODULE)) {
2902 CERROR("Can't get module. Is it alive?");
2906 case OBD_IOC_LOV_GET_CONFIG: {
2908 struct lov_desc *desc;
2909 struct obd_uuid uuid;
2913 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2914 GOTO(out, err = -EINVAL);
2916 data = (struct obd_ioctl_data *)buf;
2918 if (sizeof(*desc) > data->ioc_inllen1) {
2919 obd_ioctl_freedata(buf, len);
2920 GOTO(out, err = -EINVAL);
2923 if (data->ioc_inllen2 < sizeof(uuid)) {
2924 obd_ioctl_freedata(buf, len);
2925 GOTO(out, err = -EINVAL);
2928 desc = (struct lov_desc *)data->ioc_inlbuf1;
2929 desc->ld_tgt_count = 1;
2930 desc->ld_active_tgt_count = 1;
2931 desc->ld_default_stripe_count = 1;
2932 desc->ld_default_stripe_size = 0;
2933 desc->ld_default_stripe_offset = 0;
2934 desc->ld_pattern = 0;
2935 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2937 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2939 err = copy_to_user((void *)uarg, buf, len);
2942 obd_ioctl_freedata(buf, len);
2945 case LL_IOC_LOV_SETSTRIPE:
2946 err = obd_alloc_memmd(exp, karg);
2950 case LL_IOC_LOV_GETSTRIPE:
2951 err = osc_getstripe(karg, uarg);
2953 case OBD_IOC_CLIENT_RECOVER:
2954 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2955 data->ioc_inlbuf1, 0);
2959 case IOC_OSC_SET_ACTIVE:
2960 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2963 case OBD_IOC_POLL_QUOTACHECK:
2964 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2966 case OBD_IOC_PING_TARGET:
2967 err = ptlrpc_obd_ping(obd);
2970 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2971 cmd, current_comm());
2972 GOTO(out, err = -ENOTTY);
2975 module_put(THIS_MODULE);
2979 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2980 obd_count keylen, void *key, __u32 *vallen, void *val,
2981 struct lov_stripe_md *lsm)
2983 if (!vallen || !val)
2986 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2987 __u32 *stripe = val;
2988 *vallen = sizeof(*stripe);
2991 } else if (KEY_IS(KEY_LAST_ID)) {
2992 struct ptlrpc_request *req;
2997 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2998 &RQF_OST_GET_INFO_LAST_ID);
3002 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3003 RCL_CLIENT, keylen);
3004 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3006 ptlrpc_request_free(req);
3010 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3011 memcpy(tmp, key, keylen);
3013 req->rq_no_delay = req->rq_no_resend = 1;
3014 ptlrpc_request_set_replen(req);
3015 rc = ptlrpc_queue_wait(req);
3019 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3021 GOTO(out, rc = -EPROTO);
3023 *((obd_id *)val) = *reply;
3025 ptlrpc_req_finished(req);
3027 } else if (KEY_IS(KEY_FIEMAP)) {
3028 struct ll_fiemap_info_key *fm_key =
3029 (struct ll_fiemap_info_key *)key;
3030 struct ldlm_res_id res_id;
3031 ldlm_policy_data_t policy;
3032 struct lustre_handle lockh;
3033 ldlm_mode_t mode = 0;
3034 struct ptlrpc_request *req;
3035 struct ll_user_fiemap *reply;
3039 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3042 policy.l_extent.start = fm_key->fiemap.fm_start &
3045 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3046 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3047 policy.l_extent.end = OBD_OBJECT_EOF;
3049 policy.l_extent.end = (fm_key->fiemap.fm_start +
3050 fm_key->fiemap.fm_length +
3051 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3053 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3054 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3055 LDLM_FL_BLOCK_GRANTED |
3057 &res_id, LDLM_EXTENT, &policy,
3058 LCK_PR | LCK_PW, &lockh, 0);
3059 if (mode) { /* lock is cached on client */
3060 if (mode != LCK_PR) {
3061 ldlm_lock_addref(&lockh, LCK_PR);
3062 ldlm_lock_decref(&lockh, LCK_PW);
3064 } else { /* no cached lock, needs acquire lock on server side */
3065 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3066 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3070 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3071 &RQF_OST_GET_INFO_FIEMAP);
3073 GOTO(drop_lock, rc = -ENOMEM);
3075 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3076 RCL_CLIENT, keylen);
3077 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3078 RCL_CLIENT, *vallen);
3079 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3080 RCL_SERVER, *vallen);
3082 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3084 ptlrpc_request_free(req);
3085 GOTO(drop_lock, rc);
3088 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3089 memcpy(tmp, key, keylen);
3090 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3091 memcpy(tmp, val, *vallen);
3093 ptlrpc_request_set_replen(req);
3094 rc = ptlrpc_queue_wait(req);
3098 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3100 GOTO(fini_req, rc = -EPROTO);
3102 memcpy(val, reply, *vallen);
3104 ptlrpc_req_finished(req);
3107 ldlm_lock_decref(&lockh, LCK_PR);
3114 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3115 obd_count keylen, void *key, obd_count vallen,
3116 void *val, struct ptlrpc_request_set *set)
3118 struct ptlrpc_request *req;
3119 struct obd_device *obd = exp->exp_obd;
3120 struct obd_import *imp = class_exp2cliimp(exp);
3124 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3126 if (KEY_IS(KEY_CHECKSUM)) {
3127 if (vallen != sizeof(int))
3129 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3133 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3134 sptlrpc_conf_client_adapt(obd);
3138 if (KEY_IS(KEY_FLUSH_CTX)) {
3139 sptlrpc_import_flush_my_ctx(imp);
3143 if (KEY_IS(KEY_CACHE_SET)) {
3144 struct client_obd *cli = &obd->u.cli;
3146 LASSERT(cli->cl_cache == NULL); /* only once */
3147 cli->cl_cache = (struct cl_client_cache *)val;
3148 atomic_inc(&cli->cl_cache->ccc_users);
3149 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3151 /* add this osc into entity list */
3152 LASSERT(list_empty(&cli->cl_lru_osc));
3153 spin_lock(&cli->cl_cache->ccc_lru_lock);
3154 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3155 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3160 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3161 struct client_obd *cli = &obd->u.cli;
3162 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3163 int target = *(int *)val;
3165 nr = osc_lru_shrink(cli, min(nr, target));
3170 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3173 /* We pass all other commands directly to OST. Since nobody calls osc
3174 methods directly and everybody is supposed to go through LOV, we
3175 assume lov checked invalid values for us.
3176 The only recognised values so far are evict_by_nid and mds_conn.
3177 Even if something bad goes through, we'd get a -EINVAL from OST
3180 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3181 &RQF_OST_SET_GRANT_INFO :
3186 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3187 RCL_CLIENT, keylen);
3188 if (!KEY_IS(KEY_GRANT_SHRINK))
3189 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3190 RCL_CLIENT, vallen);
3191 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3193 ptlrpc_request_free(req);
3197 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3198 memcpy(tmp, key, keylen);
3199 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3202 memcpy(tmp, val, vallen);
3204 if (KEY_IS(KEY_GRANT_SHRINK)) {
3205 struct osc_grant_args *aa;
3208 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3209 aa = ptlrpc_req_async_args(req);
3212 ptlrpc_req_finished(req);
3215 *oa = ((struct ost_body *)val)->oa;
3217 req->rq_interpret_reply = osc_shrink_grant_interpret;
3220 ptlrpc_request_set_replen(req);
3221 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3222 LASSERT(set != NULL);
3223 ptlrpc_set_add_req(set, req);
3224 ptlrpc_check_set(NULL, set);
3226 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3232 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3233 struct obd_device *disk_obd, int *index)
3235 /* this code is not supposed to be used with LOD/OSP
3236 * to be removed soon */
3241 static int osc_llog_finish(struct obd_device *obd, int count)
3243 struct llog_ctxt *ctxt;
3245 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3247 llog_cat_close(NULL, ctxt->loc_handle);
3248 llog_cleanup(NULL, ctxt);
3251 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3253 llog_cleanup(NULL, ctxt);
3257 static int osc_reconnect(const struct lu_env *env,
3258 struct obd_export *exp, struct obd_device *obd,
3259 struct obd_uuid *cluuid,
3260 struct obd_connect_data *data,
3263 struct client_obd *cli = &obd->u.cli;
3265 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3268 client_obd_list_lock(&cli->cl_loi_list_lock);
3269 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3270 2 * cli_brw_size(obd);
3271 lost_grant = cli->cl_lost_grant;
3272 cli->cl_lost_grant = 0;
3273 client_obd_list_unlock(&cli->cl_loi_list_lock);
3275 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3276 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3277 data->ocd_version, data->ocd_grant, lost_grant);
3283 static int osc_disconnect(struct obd_export *exp)
3285 struct obd_device *obd = class_exp2obd(exp);
3286 struct llog_ctxt *ctxt;
3289 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3291 if (obd->u.cli.cl_conn_count == 1) {
3292 /* Flush any remaining cancel messages out to the
3294 llog_sync(ctxt, exp, 0);
3296 llog_ctxt_put(ctxt);
3298 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3302 rc = client_disconnect_export(exp);
3304 * Initially we put del_shrink_grant before disconnect_export, but it
3305 * causes the following problem if setup (connect) and cleanup
3306 * (disconnect) are tangled together.
3307 * connect p1 disconnect p2
3308 * ptlrpc_connect_import
3309 * ............... class_manual_cleanup
3312 * ptlrpc_connect_interrupt
3314 * add this client to shrink list
3316 * Bang! pinger trigger the shrink.
3317 * So the osc should be disconnected from the shrink list, after we
3318 * are sure the import has been destroyed. BUG18662
3320 if (obd->u.cli.cl_import == NULL)
3321 osc_del_shrink_grant(&obd->u.cli);
3325 static int osc_import_event(struct obd_device *obd,
3326 struct obd_import *imp,
3327 enum obd_import_event event)
3329 struct client_obd *cli;
3332 LASSERT(imp->imp_obd == obd);
3335 case IMP_EVENT_DISCON: {
3337 client_obd_list_lock(&cli->cl_loi_list_lock);
3338 cli->cl_avail_grant = 0;
3339 cli->cl_lost_grant = 0;
3340 client_obd_list_unlock(&cli->cl_loi_list_lock);
3343 case IMP_EVENT_INACTIVE: {
3344 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3347 case IMP_EVENT_INVALIDATE: {
3348 struct ldlm_namespace *ns = obd->obd_namespace;
3352 env = cl_env_get(&refcheck);
3356 /* all pages go to failing rpcs due to the invalid
3358 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3360 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3361 cl_env_put(env, &refcheck);
3366 case IMP_EVENT_ACTIVE: {
3367 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3370 case IMP_EVENT_OCD: {
3371 struct obd_connect_data *ocd = &imp->imp_connect_data;
3373 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3374 osc_init_grant(&obd->u.cli, ocd);
3377 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3378 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3380 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3383 case IMP_EVENT_DEACTIVATE: {
3384 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3387 case IMP_EVENT_ACTIVATE: {
3388 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3392 CERROR("Unknown import event %d\n", event);
3399 * Determine whether the lock can be canceled before replaying the lock
3400 * during recovery, see bug16774 for detailed information.
3402 * \retval zero the lock can't be canceled
3403 * \retval other ok to cancel
3405 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3407 check_res_locked(lock->l_resource);
3410 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3412 * XXX as a future improvement, we can also cancel unused write lock
3413 * if it doesn't have dirty data and active mmaps.
3415 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3416 (lock->l_granted_mode == LCK_PR ||
3417 lock->l_granted_mode == LCK_CR) &&
3418 (osc_dlm_lock_pageref(lock) == 0))
3424 static int brw_queue_work(const struct lu_env *env, void *data)
3426 struct client_obd *cli = data;
3428 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3430 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3434 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3436 struct lprocfs_static_vars lvars = { 0 };
3437 struct client_obd *cli = &obd->u.cli;
3441 rc = ptlrpcd_addref();
3445 rc = client_obd_setup(obd, lcfg);
3447 GOTO(out_ptlrpcd, rc);
3449 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3450 if (IS_ERR(handler))
3451 GOTO(out_client_setup, rc = PTR_ERR(handler));
3452 cli->cl_writeback_work = handler;
3454 rc = osc_quota_setup(obd);
3456 GOTO(out_ptlrpcd_work, rc);
3458 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3459 lprocfs_osc_init_vars(&lvars);
3460 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3461 lproc_osc_attach_seqstat(obd);
3462 sptlrpc_lprocfs_cliobd_attach(obd);
3463 ptlrpc_lprocfs_register_obd(obd);
3466 /* We need to allocate a few requests more, because
3467 * brw_interpret tries to create new requests before freeing
3468 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3469 * reserved, but I'm afraid that might be too much wasted RAM
3470 * in fact, so 2 is just my guess and still should work. */
3471 cli->cl_import->imp_rq_pool =
3472 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3474 ptlrpc_add_rqs_to_pool);
3476 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3477 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3481 ptlrpcd_destroy_work(handler);
3483 client_obd_cleanup(obd);
3489 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3494 case OBD_CLEANUP_EARLY: {
3495 struct obd_import *imp;
3496 imp = obd->u.cli.cl_import;
3497 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3498 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3499 ptlrpc_deactivate_import(imp);
3500 spin_lock(&imp->imp_lock);
3501 imp->imp_pingable = 0;
3502 spin_unlock(&imp->imp_lock);
3505 case OBD_CLEANUP_EXPORTS: {
3506 struct client_obd *cli = &obd->u.cli;
3508 * for echo client, export may be on zombie list, wait for
3509 * zombie thread to cull it, because cli.cl_import will be
3510 * cleared in client_disconnect_export():
3511 * class_export_destroy() -> obd_cleanup() ->
3512 * echo_device_free() -> echo_client_cleanup() ->
3513 * obd_disconnect() -> osc_disconnect() ->
3514 * client_disconnect_export()
3516 obd_zombie_barrier();
3517 if (cli->cl_writeback_work) {
3518 ptlrpcd_destroy_work(cli->cl_writeback_work);
3519 cli->cl_writeback_work = NULL;
3521 obd_cleanup_client_import(obd);
3522 ptlrpc_lprocfs_unregister_obd(obd);
3523 lprocfs_obd_cleanup(obd);
3524 rc = obd_llog_finish(obd, 0);
3526 CERROR("failed to cleanup llogging subsystems\n");
3533 int osc_cleanup(struct obd_device *obd)
3535 struct client_obd *cli = &obd->u.cli;
3539 if (cli->cl_cache != NULL) {
3540 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3541 spin_lock(&cli->cl_cache->ccc_lru_lock);
3542 list_del_init(&cli->cl_lru_osc);
3543 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3544 cli->cl_lru_left = NULL;
3545 atomic_dec(&cli->cl_cache->ccc_users);
3546 cli->cl_cache = NULL;
3549 /* free memory of osc quota cache */
3550 osc_quota_cleanup(obd);
3552 rc = client_obd_cleanup(obd);
3558 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3560 struct lprocfs_static_vars lvars = { 0 };
3563 lprocfs_osc_init_vars(&lvars);
3565 switch (lcfg->lcfg_command) {
3567 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3577 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3579 return osc_process_config_base(obd, buf);
3582 struct obd_ops osc_obd_ops = {
3583 .o_owner = THIS_MODULE,
3584 .o_setup = osc_setup,
3585 .o_precleanup = osc_precleanup,
3586 .o_cleanup = osc_cleanup,
3587 .o_add_conn = client_import_add_conn,
3588 .o_del_conn = client_import_del_conn,
3589 .o_connect = client_connect_import,
3590 .o_reconnect = osc_reconnect,
3591 .o_disconnect = osc_disconnect,
3592 .o_statfs = osc_statfs,
3593 .o_statfs_async = osc_statfs_async,
3594 .o_packmd = osc_packmd,
3595 .o_unpackmd = osc_unpackmd,
3596 .o_create = osc_create,
3597 .o_destroy = osc_destroy,
3598 .o_getattr = osc_getattr,
3599 .o_getattr_async = osc_getattr_async,
3600 .o_setattr = osc_setattr,
3601 .o_setattr_async = osc_setattr_async,
3603 .o_punch = osc_punch,
3605 .o_enqueue = osc_enqueue,
3606 .o_change_cbdata = osc_change_cbdata,
3607 .o_find_cbdata = osc_find_cbdata,
3608 .o_cancel = osc_cancel,
3609 .o_cancel_unused = osc_cancel_unused,
3610 .o_iocontrol = osc_iocontrol,
3611 .o_get_info = osc_get_info,
3612 .o_set_info_async = osc_set_info_async,
3613 .o_import_event = osc_import_event,
3614 .o_llog_init = osc_llog_init,
3615 .o_llog_finish = osc_llog_finish,
3616 .o_process_config = osc_process_config,
3617 .o_quotactl = osc_quotactl,
3618 .o_quotacheck = osc_quotacheck,
3621 extern struct lu_kmem_descr osc_caches[];
3622 extern spinlock_t osc_ast_guard;
3623 extern struct lock_class_key osc_ast_guard_class;
3625 int __init osc_init(void)
3627 struct lprocfs_static_vars lvars = { 0 };
3630 /* print an address of _any_ initialized kernel symbol from this
3631 * module, to allow debugging with gdb that doesn't support data
3632 * symbols from modules.*/
3633 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3635 rc = lu_kmem_init(osc_caches);
3639 lprocfs_osc_init_vars(&lvars);
3641 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3642 LUSTRE_OSC_NAME, &osc_device_type);
3644 lu_kmem_fini(osc_caches);
3648 spin_lock_init(&osc_ast_guard);
3649 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3654 static void /*__exit*/ osc_exit(void)
3656 class_unregister_type(LUSTRE_OSC_NAME);
3657 lu_kmem_fini(osc_caches);
3660 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3661 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3662 MODULE_LICENSE("GPL");
3664 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);