]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/osc/osc_request.c
Staging: lustre: osc: Remove null check before kfree
[karo-tx-linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include "../../include/linux/libcfs/libcfs.h"
40
41
42 #include "../include/lustre_dlm.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre/lustre_user.h"
45 #include "../include/obd_cksum.h"
46
47 #include "../include/lustre_ha.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_debug.h"
50 #include "../include/lustre_param.h"
51 #include "../include/lustre_fid.h"
52 #include "../include/obd_class.h"
53 #include "../include/obd.h"
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
56
57 atomic_t osc_pool_req_count;
58 unsigned int osc_reqpool_maxreqcount;
59 struct ptlrpc_request_pool *osc_rq_pool;
60
61 /* max memory used for request pool, unit is MB */
62 static unsigned int osc_reqpool_mem_max = 5;
63 module_param(osc_reqpool_mem_max, uint, 0444);
64
65 struct osc_brw_async_args {
66         struct obdo       *aa_oa;
67         int             aa_requested_nob;
68         int             aa_nio_count;
69         u32             aa_page_count;
70         int             aa_resends;
71         struct brw_page  **aa_ppga;
72         struct client_obd *aa_cli;
73         struct list_head         aa_oaps;
74         struct list_head         aa_exts;
75         struct cl_req     *aa_clerq;
76 };
77
78 struct osc_async_args {
79         struct obd_info   *aa_oi;
80 };
81
82 struct osc_setattr_args {
83         struct obdo      *sa_oa;
84         obd_enqueue_update_f sa_upcall;
85         void            *sa_cookie;
86 };
87
88 struct osc_fsync_args {
89         struct obd_info     *fa_oi;
90         obd_enqueue_update_f fa_upcall;
91         void            *fa_cookie;
92 };
93
94 struct osc_enqueue_args {
95         struct obd_export       *oa_exp;
96         __u64               *oa_flags;
97         obd_enqueue_update_f      oa_upcall;
98         void                 *oa_cookie;
99         struct ost_lvb     *oa_lvb;
100         struct lustre_handle     *oa_lockh;
101         struct ldlm_enqueue_info *oa_ei;
102         unsigned int          oa_agl:1;
103 };
104
105 static void osc_release_ppga(struct brw_page **ppga, u32 count);
106 static int brw_interpret(const struct lu_env *env,
107                          struct ptlrpc_request *req, void *data, int rc);
108 int osc_cleanup(struct obd_device *obd);
109
110 /* Pack OSC object metadata for disk storage (LE byte order). */
111 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
112                       struct lov_stripe_md *lsm)
113 {
114         int lmm_size;
115
116         lmm_size = sizeof(**lmmp);
117         if (lmmp == NULL)
118                 return lmm_size;
119
120         if (*lmmp != NULL && lsm == NULL) {
121                 kfree(*lmmp);
122                 *lmmp = NULL;
123                 return 0;
124         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
125                 return -EBADF;
126         }
127
128         if (*lmmp == NULL) {
129                 *lmmp = kzalloc(lmm_size, GFP_NOFS);
130                 if (!*lmmp)
131                         return -ENOMEM;
132         }
133
134         if (lsm)
135                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
136
137         return lmm_size;
138 }
139
140 /* Unpack OSC object metadata from disk storage (LE byte order). */
141 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
142                         struct lov_mds_md *lmm, int lmm_bytes)
143 {
144         int lsm_size;
145         struct obd_import *imp = class_exp2cliimp(exp);
146
147         if (lmm != NULL) {
148                 if (lmm_bytes < sizeof(*lmm)) {
149                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
150                                exp->exp_obd->obd_name, lmm_bytes,
151                                (int)sizeof(*lmm));
152                         return -EINVAL;
153                 }
154                 /* XXX LOV_MAGIC etc check? */
155
156                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
157                         CERROR("%s: zero lmm_object_id: rc = %d\n",
158                                exp->exp_obd->obd_name, -EINVAL);
159                         return -EINVAL;
160                 }
161         }
162
163         lsm_size = lov_stripe_md_size(1);
164         if (lsmp == NULL)
165                 return lsm_size;
166
167         if (*lsmp != NULL && lmm == NULL) {
168                 kfree((*lsmp)->lsm_oinfo[0]);
169                 kfree(*lsmp);
170                 *lsmp = NULL;
171                 return 0;
172         }
173
174         if (*lsmp == NULL) {
175                 *lsmp = kzalloc(lsm_size, GFP_NOFS);
176                 if (unlikely(*lsmp == NULL))
177                         return -ENOMEM;
178                 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
179                                                 GFP_NOFS);
180                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
181                         kfree(*lsmp);
182                         return -ENOMEM;
183                 }
184                 loi_init((*lsmp)->lsm_oinfo[0]);
185         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
186                 return -EBADF;
187         }
188
189         if (lmm != NULL)
190                 /* XXX zero *lsmp? */
191                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
192
193         if (imp != NULL &&
194             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
195                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
196         else
197                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
198
199         return lsm_size;
200 }
201
202 static inline void osc_pack_req_body(struct ptlrpc_request *req,
203                                      struct obd_info *oinfo)
204 {
205         struct ost_body *body;
206
207         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
208         LASSERT(body);
209
210         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
211                              oinfo->oi_oa);
212 }
213
214 static int osc_getattr_interpret(const struct lu_env *env,
215                                  struct ptlrpc_request *req,
216                                  struct osc_async_args *aa, int rc)
217 {
218         struct ost_body *body;
219
220         if (rc != 0)
221                 goto out;
222
223         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
224         if (body) {
225                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
226                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
227                                      aa->aa_oi->oi_oa, &body->oa);
228
229                 /* This should really be sent by the OST */
230                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
231                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
232         } else {
233                 CDEBUG(D_INFO, "can't unpack ost_body\n");
234                 rc = -EPROTO;
235                 aa->aa_oi->oi_oa->o_valid = 0;
236         }
237 out:
238         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
239         return rc;
240 }
241
242 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
243                              struct ptlrpc_request_set *set)
244 {
245         struct ptlrpc_request *req;
246         struct osc_async_args *aa;
247         int rc;
248
249         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
250         if (req == NULL)
251                 return -ENOMEM;
252
253         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
254         if (rc) {
255                 ptlrpc_request_free(req);
256                 return rc;
257         }
258
259         osc_pack_req_body(req, oinfo);
260
261         ptlrpc_request_set_replen(req);
262         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
263
264         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
265         aa = ptlrpc_req_async_args(req);
266         aa->aa_oi = oinfo;
267
268         ptlrpc_set_add_req(set, req);
269         return 0;
270 }
271
272 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
273                        struct obd_info *oinfo)
274 {
275         struct ptlrpc_request *req;
276         struct ost_body *body;
277         int rc;
278
279         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
280         if (req == NULL)
281                 return -ENOMEM;
282
283         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
284         if (rc) {
285                 ptlrpc_request_free(req);
286                 return rc;
287         }
288
289         osc_pack_req_body(req, oinfo);
290
291         ptlrpc_request_set_replen(req);
292
293         rc = ptlrpc_queue_wait(req);
294         if (rc)
295                 goto out;
296
297         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
298         if (body == NULL) {
299                 rc = -EPROTO;
300                 goto out;
301         }
302
303         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
304         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
305                              &body->oa);
306
307         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
308         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
309
310  out:
311         ptlrpc_req_finished(req);
312         return rc;
313 }
314
315 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
316                        struct obd_info *oinfo, struct obd_trans_info *oti)
317 {
318         struct ptlrpc_request *req;
319         struct ost_body *body;
320         int rc;
321
322         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
323
324         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
325         if (req == NULL)
326                 return -ENOMEM;
327
328         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
329         if (rc) {
330                 ptlrpc_request_free(req);
331                 return rc;
332         }
333
334         osc_pack_req_body(req, oinfo);
335
336         ptlrpc_request_set_replen(req);
337
338         rc = ptlrpc_queue_wait(req);
339         if (rc)
340                 goto out;
341
342         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
343         if (body == NULL) {
344                 rc = -EPROTO;
345                 goto out;
346         }
347
348         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
349                              &body->oa);
350
351 out:
352         ptlrpc_req_finished(req);
353         return rc;
354 }
355
356 static int osc_setattr_interpret(const struct lu_env *env,
357                                  struct ptlrpc_request *req,
358                                  struct osc_setattr_args *sa, int rc)
359 {
360         struct ost_body *body;
361
362         if (rc != 0)
363                 goto out;
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL) {
367                 rc = -EPROTO;
368                 goto out;
369         }
370
371         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
372                              &body->oa);
373 out:
374         rc = sa->sa_upcall(sa->sa_cookie, rc);
375         return rc;
376 }
377
378 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
379                            struct obd_trans_info *oti,
380                            obd_enqueue_update_f upcall, void *cookie,
381                            struct ptlrpc_request_set *rqset)
382 {
383         struct ptlrpc_request *req;
384         struct osc_setattr_args *sa;
385         int rc;
386
387         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
388         if (req == NULL)
389                 return -ENOMEM;
390
391         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
392         if (rc) {
393                 ptlrpc_request_free(req);
394                 return rc;
395         }
396
397         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
398                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
399
400         osc_pack_req_body(req, oinfo);
401
402         ptlrpc_request_set_replen(req);
403
404         /* do mds to ost setattr asynchronously */
405         if (!rqset) {
406                 /* Do not wait for response. */
407                 ptlrpcd_add_req(req);
408         } else {
409                 req->rq_interpret_reply =
410                         (ptlrpc_interpterer_t)osc_setattr_interpret;
411
412                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
413                 sa = ptlrpc_req_async_args(req);
414                 sa->sa_oa = oinfo->oi_oa;
415                 sa->sa_upcall = upcall;
416                 sa->sa_cookie = cookie;
417
418                 if (rqset == PTLRPCD_SET)
419                         ptlrpcd_add_req(req);
420                 else
421                         ptlrpc_set_add_req(rqset, req);
422         }
423
424         return 0;
425 }
426
427 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
428                              struct obd_trans_info *oti,
429                              struct ptlrpc_request_set *rqset)
430 {
431         return osc_setattr_async_base(exp, oinfo, oti,
432                                       oinfo->oi_cb_up, oinfo, rqset);
433 }
434
435 int osc_real_create(struct obd_export *exp, struct obdo *oa,
436                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
437 {
438         struct ptlrpc_request *req;
439         struct ost_body *body;
440         struct lov_stripe_md *lsm;
441         int rc;
442
443         LASSERT(oa);
444         LASSERT(ea);
445
446         lsm = *ea;
447         if (!lsm) {
448                 rc = obd_alloc_memmd(exp, &lsm);
449                 if (rc < 0)
450                         return rc;
451         }
452
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
454         if (req == NULL) {
455                 rc = -ENOMEM;
456                 goto out;
457         }
458
459         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
460         if (rc) {
461                 ptlrpc_request_free(req);
462                 goto out;
463         }
464
465         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
466         LASSERT(body);
467
468         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
469
470         ptlrpc_request_set_replen(req);
471
472         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
473             oa->o_flags == OBD_FL_DELORPHAN) {
474                 DEBUG_REQ(D_HA, req,
475                           "delorphan from OST integration");
476                 /* Don't resend the delorphan req */
477                 req->rq_no_resend = req->rq_no_delay = 1;
478         }
479
480         rc = ptlrpc_queue_wait(req);
481         if (rc)
482                 goto out_req;
483
484         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
485         if (body == NULL) {
486                 rc = -EPROTO;
487                 goto out_req;
488         }
489
490         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
491         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
492
493         oa->o_blksize = cli_brw_size(exp->exp_obd);
494         oa->o_valid |= OBD_MD_FLBLKSZ;
495
496         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
497          * have valid lsm_oinfo data structs, so don't go touching that.
498          * This needs to be fixed in a big way.
499          */
500         lsm->lsm_oi = oa->o_oi;
501         *ea = lsm;
502
503         if (oti != NULL) {
504                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
505
506                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
507                         if (!oti->oti_logcookies)
508                                 oti_alloc_cookies(oti, 1);
509                         *oti->oti_logcookies = oa->o_lcookie;
510                 }
511         }
512
513         CDEBUG(D_HA, "transno: %lld\n",
514                lustre_msg_get_transno(req->rq_repmsg));
515 out_req:
516         ptlrpc_req_finished(req);
517 out:
518         if (rc && !*ea)
519                 obd_free_memmd(exp, &lsm);
520         return rc;
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
524                    obd_enqueue_update_f upcall, void *cookie,
525                    struct ptlrpc_request_set *rqset)
526 {
527         struct ptlrpc_request *req;
528         struct osc_setattr_args *sa;
529         struct ost_body *body;
530         int rc;
531
532         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533         if (req == NULL)
534                 return -ENOMEM;
535
536         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
537         if (rc) {
538                 ptlrpc_request_free(req);
539                 return rc;
540         }
541         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
542         ptlrpc_at_set_req_timeout(req);
543
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
547                              oinfo->oi_oa);
548
549         ptlrpc_request_set_replen(req);
550
551         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
552         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
553         sa = ptlrpc_req_async_args(req);
554         sa->sa_oa = oinfo->oi_oa;
555         sa->sa_upcall = upcall;
556         sa->sa_cookie = cookie;
557         if (rqset == PTLRPCD_SET)
558                 ptlrpcd_add_req(req);
559         else
560                 ptlrpc_set_add_req(rqset, req);
561
562         return 0;
563 }
564
565 static int osc_sync_interpret(const struct lu_env *env,
566                               struct ptlrpc_request *req,
567                               void *arg, int rc)
568 {
569         struct osc_fsync_args *fa = arg;
570         struct ost_body *body;
571
572         if (rc)
573                 goto out;
574
575         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
576         if (body == NULL) {
577                 CERROR("can't unpack ost_body\n");
578                 rc = -EPROTO;
579                 goto out;
580         }
581
582         *fa->fa_oi->oi_oa = body->oa;
583 out:
584         rc = fa->fa_upcall(fa->fa_cookie, rc);
585         return rc;
586 }
587
588 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
589                   obd_enqueue_update_f upcall, void *cookie,
590                   struct ptlrpc_request_set *rqset)
591 {
592         struct ptlrpc_request *req;
593         struct ost_body *body;
594         struct osc_fsync_args *fa;
595         int rc;
596
597         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
598         if (req == NULL)
599                 return -ENOMEM;
600
601         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
602         if (rc) {
603                 ptlrpc_request_free(req);
604                 return rc;
605         }
606
607         /* overload the size and blocks fields in the oa with start/end */
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
611                              oinfo->oi_oa);
612
613         ptlrpc_request_set_replen(req);
614         req->rq_interpret_reply = osc_sync_interpret;
615
616         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
617         fa = ptlrpc_req_async_args(req);
618         fa->fa_oi = oinfo;
619         fa->fa_upcall = upcall;
620         fa->fa_cookie = cookie;
621
622         if (rqset == PTLRPCD_SET)
623                 ptlrpcd_add_req(req);
624         else
625                 ptlrpc_set_add_req(rqset, req);
626
627         return 0;
628 }
629
630 /* Find and cancel locally locks matched by @mode in the resource found by
631  * @objid. Found locks are added into @cancel list. Returns the amount of
632  * locks added to @cancels list. */
633 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
634                                    struct list_head *cancels,
635                                    ldlm_mode_t mode, __u64 lock_flags)
636 {
637         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
638         struct ldlm_res_id res_id;
639         struct ldlm_resource *res;
640         int count;
641
642         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
643          * export) but disabled through procfs (flag in NS).
644          *
645          * This distinguishes from a case when ELC is not supported originally,
646          * when we still want to cancel locks in advance and just cancel them
647          * locally, without sending any RPC. */
648         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
649                 return 0;
650
651         ostid_build_res_name(&oa->o_oi, &res_id);
652         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
653         if (res == NULL)
654                 return 0;
655
656         LDLM_RESOURCE_ADDREF(res);
657         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
658                                            lock_flags, 0, NULL);
659         LDLM_RESOURCE_DELREF(res);
660         ldlm_resource_putref(res);
661         return count;
662 }
663
664 static int osc_destroy_interpret(const struct lu_env *env,
665                                  struct ptlrpc_request *req, void *data,
666                                  int rc)
667 {
668         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
669
670         atomic_dec(&cli->cl_destroy_in_flight);
671         wake_up(&cli->cl_destroy_waitq);
672         return 0;
673 }
674
675 static int osc_can_send_destroy(struct client_obd *cli)
676 {
677         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
678             cli->cl_max_rpcs_in_flight) {
679                 /* The destroy request can be sent */
680                 return 1;
681         }
682         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
683             cli->cl_max_rpcs_in_flight) {
684                 /*
685                  * The counter has been modified between the two atomic
686                  * operations.
687                  */
688                 wake_up(&cli->cl_destroy_waitq);
689         }
690         return 0;
691 }
692
693 int osc_create(const struct lu_env *env, struct obd_export *exp,
694                struct obdo *oa, struct lov_stripe_md **ea,
695                struct obd_trans_info *oti)
696 {
697         int rc = 0;
698
699         LASSERT(oa);
700         LASSERT(ea);
701         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
702
703         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
704             oa->o_flags == OBD_FL_RECREATE_OBJS) {
705                 return osc_real_create(exp, oa, ea, oti);
706         }
707
708         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
709                 return osc_real_create(exp, oa, ea, oti);
710
711         /* we should not get here anymore */
712         LBUG();
713
714         return rc;
715 }
716
717 /* Destroy requests can be async always on the client, and we don't even really
718  * care about the return code since the client cannot do anything at all about
719  * a destroy failure.
720  * When the MDS is unlinking a filename, it saves the file objects into a
721  * recovery llog, and these object records are cancelled when the OST reports
722  * they were destroyed and sync'd to disk (i.e. transaction committed).
723  * If the client dies, or the OST is down when the object should be destroyed,
724  * the records are not cancelled, and when the OST reconnects to the MDS next,
725  * it will retrieve the llog unlink logs and then sends the log cancellation
726  * cookies to the MDS after committing destroy transactions. */
727 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
728                        struct obdo *oa, struct lov_stripe_md *ea,
729                        struct obd_trans_info *oti, struct obd_export *md_export)
730 {
731         struct client_obd *cli = &exp->exp_obd->u.cli;
732         struct ptlrpc_request *req;
733         struct ost_body *body;
734         LIST_HEAD(cancels);
735         int rc, count;
736
737         if (!oa) {
738                 CDEBUG(D_INFO, "oa NULL\n");
739                 return -EINVAL;
740         }
741
742         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
743                                         LDLM_FL_DISCARD_DATA);
744
745         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
746         if (req == NULL) {
747                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
748                 return -ENOMEM;
749         }
750
751         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
752                                0, &cancels, count);
753         if (rc) {
754                 ptlrpc_request_free(req);
755                 return rc;
756         }
757
758         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
759         ptlrpc_at_set_req_timeout(req);
760
761         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
762                 oa->o_lcookie = *oti->oti_logcookies;
763         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
764         LASSERT(body);
765         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
766
767         ptlrpc_request_set_replen(req);
768
769         /* If osc_destroy is for destroying the unlink orphan,
770          * sent from MDT to OST, which should not be blocked here,
771          * because the process might be triggered by ptlrpcd, and
772          * it is not good to block ptlrpcd thread (b=16006)*/
773         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
774                 req->rq_interpret_reply = osc_destroy_interpret;
775                 if (!osc_can_send_destroy(cli)) {
776                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
777                                                           NULL);
778
779                         /*
780                          * Wait until the number of on-going destroy RPCs drops
781                          * under max_rpc_in_flight
782                          */
783                         l_wait_event_exclusive(cli->cl_destroy_waitq,
784                                                osc_can_send_destroy(cli), &lwi);
785                 }
786         }
787
788         /* Do not wait for response */
789         ptlrpcd_add_req(req);
790         return 0;
791 }
792
793 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
794                                 long writing_bytes)
795 {
796         u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
797
798         LASSERT(!(oa->o_valid & bits));
799
800         oa->o_valid |= bits;
801         client_obd_list_lock(&cli->cl_loi_list_lock);
802         oa->o_dirty = cli->cl_dirty;
803         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
804                      cli->cl_dirty_max)) {
805                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
806                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
807                 oa->o_undirty = 0;
808         } else if (unlikely(atomic_read(&obd_dirty_pages) -
809                             atomic_read(&obd_dirty_transit_pages) >
810                             (long)(obd_max_dirty_pages + 1))) {
811                 /* The atomic_read() allowing the atomic_inc() are
812                  * not covered by a lock thus they may safely race and trip
813                  * this CERROR() unless we add in a small fudge factor (+1). */
814                 CERROR("dirty %d - %d > system dirty_max %d\n",
815                        atomic_read(&obd_dirty_pages),
816                        atomic_read(&obd_dirty_transit_pages),
817                        obd_max_dirty_pages);
818                 oa->o_undirty = 0;
819         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
820                 CERROR("dirty %lu - dirty_max %lu too big???\n",
821                        cli->cl_dirty, cli->cl_dirty_max);
822                 oa->o_undirty = 0;
823         } else {
824                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
825                                       PAGE_CACHE_SHIFT)*
826                                      (cli->cl_max_rpcs_in_flight + 1);
827                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
828         }
829         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
830         oa->o_dropped = cli->cl_lost_grant;
831         cli->cl_lost_grant = 0;
832         client_obd_list_unlock(&cli->cl_loi_list_lock);
833         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
834                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
835
836 }
837
838 void osc_update_next_shrink(struct client_obd *cli)
839 {
840         cli->cl_next_shrink_grant =
841                 cfs_time_shift(cli->cl_grant_shrink_interval);
842         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
843                cli->cl_next_shrink_grant);
844 }
845
846 static void __osc_update_grant(struct client_obd *cli, u64 grant)
847 {
848         client_obd_list_lock(&cli->cl_loi_list_lock);
849         cli->cl_avail_grant += grant;
850         client_obd_list_unlock(&cli->cl_loi_list_lock);
851 }
852
853 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
854 {
855         if (body->oa.o_valid & OBD_MD_FLGRANT) {
856                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
857                 __osc_update_grant(cli, body->oa.o_grant);
858         }
859 }
860
861 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
862                               u32 keylen, void *key, u32 vallen,
863                               void *val, struct ptlrpc_request_set *set);
864
865 static int osc_shrink_grant_interpret(const struct lu_env *env,
866                                       struct ptlrpc_request *req,
867                                       void *aa, int rc)
868 {
869         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870         struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
871         struct ost_body *body;
872
873         if (rc != 0) {
874                 __osc_update_grant(cli, oa->o_grant);
875                 goto out;
876         }
877
878         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
879         LASSERT(body);
880         osc_update_grant(cli, body);
881 out:
882         OBDO_FREE(oa);
883         return rc;
884 }
885
886 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
887 {
888         client_obd_list_lock(&cli->cl_loi_list_lock);
889         oa->o_grant = cli->cl_avail_grant / 4;
890         cli->cl_avail_grant -= oa->o_grant;
891         client_obd_list_unlock(&cli->cl_loi_list_lock);
892         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
893                 oa->o_valid |= OBD_MD_FLFLAGS;
894                 oa->o_flags = 0;
895         }
896         oa->o_flags |= OBD_FL_SHRINK_GRANT;
897         osc_update_next_shrink(cli);
898 }
899
900 /* Shrink the current grant, either from some large amount to enough for a
901  * full set of in-flight RPCs, or if we have already shrunk to that limit
902  * then to enough for a single RPC.  This avoids keeping more grant than
903  * needed, and avoids shrinking the grant piecemeal. */
904 static int osc_shrink_grant(struct client_obd *cli)
905 {
906         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
907                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
908
909         client_obd_list_lock(&cli->cl_loi_list_lock);
910         if (cli->cl_avail_grant <= target_bytes)
911                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
912         client_obd_list_unlock(&cli->cl_loi_list_lock);
913
914         return osc_shrink_grant_to_target(cli, target_bytes);
915 }
916
917 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
918 {
919         int rc = 0;
920         struct ost_body *body;
921
922         client_obd_list_lock(&cli->cl_loi_list_lock);
923         /* Don't shrink if we are already above or below the desired limit
924          * We don't want to shrink below a single RPC, as that will negatively
925          * impact block allocation and long-term performance. */
926         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
927                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
928
929         if (target_bytes >= cli->cl_avail_grant) {
930                 client_obd_list_unlock(&cli->cl_loi_list_lock);
931                 return 0;
932         }
933         client_obd_list_unlock(&cli->cl_loi_list_lock);
934
935         body = kzalloc(sizeof(*body), GFP_NOFS);
936         if (!body)
937                 return -ENOMEM;
938
939         osc_announce_cached(cli, &body->oa, 0);
940
941         client_obd_list_lock(&cli->cl_loi_list_lock);
942         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
943         cli->cl_avail_grant = target_bytes;
944         client_obd_list_unlock(&cli->cl_loi_list_lock);
945         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
946                 body->oa.o_valid |= OBD_MD_FLFLAGS;
947                 body->oa.o_flags = 0;
948         }
949         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
950         osc_update_next_shrink(cli);
951
952         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
953                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
954                                 sizeof(*body), body, NULL);
955         if (rc != 0)
956                 __osc_update_grant(cli, body->oa.o_grant);
957         kfree(body);
958         return rc;
959 }
960
961 static int osc_should_shrink_grant(struct client_obd *client)
962 {
963         unsigned long time = cfs_time_current();
964         unsigned long next_shrink = client->cl_next_shrink_grant;
965
966         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
967              OBD_CONNECT_GRANT_SHRINK) == 0)
968                 return 0;
969
970         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
971                 /* Get the current RPC size directly, instead of going via:
972                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
973                  * Keep comment here so that it can be found by searching. */
974                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
975
976                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
977                     client->cl_avail_grant > brw_size)
978                         return 1;
979
980                 osc_update_next_shrink(client);
981         }
982         return 0;
983 }
984
985 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
986 {
987         struct client_obd *client;
988
989         list_for_each_entry(client, &item->ti_obd_list,
990                                 cl_grant_shrink_list) {
991                 if (osc_should_shrink_grant(client))
992                         osc_shrink_grant(client);
993         }
994         return 0;
995 }
996
997 static int osc_add_shrink_grant(struct client_obd *client)
998 {
999         int rc;
1000
1001         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1002                                        TIMEOUT_GRANT,
1003                                        osc_grant_shrink_grant_cb, NULL,
1004                                        &client->cl_grant_shrink_list);
1005         if (rc) {
1006                 CERROR("add grant client %s error %d\n",
1007                         client->cl_import->imp_obd->obd_name, rc);
1008                 return rc;
1009         }
1010         CDEBUG(D_CACHE, "add grant client %s \n",
1011                client->cl_import->imp_obd->obd_name);
1012         osc_update_next_shrink(client);
1013         return 0;
1014 }
1015
1016 static int osc_del_shrink_grant(struct client_obd *client)
1017 {
1018         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1019                                          TIMEOUT_GRANT);
1020 }
1021
1022 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1023 {
1024         /*
1025          * ocd_grant is the total grant amount we're expect to hold: if we've
1026          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1027          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1028          *
1029          * race is tolerable here: if we're evicted, but imp_state already
1030          * left EVICTED state, then cl_dirty must be 0 already.
1031          */
1032         client_obd_list_lock(&cli->cl_loi_list_lock);
1033         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1034                 cli->cl_avail_grant = ocd->ocd_grant;
1035         else
1036                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1037
1038         if (cli->cl_avail_grant < 0) {
1039                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1040                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1041                       ocd->ocd_grant, cli->cl_dirty);
1042                 /* workaround for servers which do not have the patch from
1043                  * LU-2679 */
1044                 cli->cl_avail_grant = ocd->ocd_grant;
1045         }
1046
1047         /* determine the appropriate chunk size used by osc_extent. */
1048         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1049         client_obd_list_unlock(&cli->cl_loi_list_lock);
1050
1051         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1052                cli->cl_import->imp_obd->obd_name,
1053                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1054
1055         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1056             list_empty(&cli->cl_grant_shrink_list))
1057                 osc_add_shrink_grant(cli);
1058 }
1059
1060 /* We assume that the reason this OSC got a short read is because it read
1061  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1062  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1063  * this stripe never got written at or beyond this stripe offset yet. */
1064 static void handle_short_read(int nob_read, u32 page_count,
1065                               struct brw_page **pga)
1066 {
1067         char *ptr;
1068         int i = 0;
1069
1070         /* skip bytes read OK */
1071         while (nob_read > 0) {
1072                 LASSERT(page_count > 0);
1073
1074                 if (pga[i]->count > nob_read) {
1075                         /* EOF inside this page */
1076                         ptr = kmap(pga[i]->pg) +
1077                                 (pga[i]->off & ~CFS_PAGE_MASK);
1078                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1079                         kunmap(pga[i]->pg);
1080                         page_count--;
1081                         i++;
1082                         break;
1083                 }
1084
1085                 nob_read -= pga[i]->count;
1086                 page_count--;
1087                 i++;
1088         }
1089
1090         /* zero remaining pages */
1091         while (page_count-- > 0) {
1092                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1093                 memset(ptr, 0, pga[i]->count);
1094                 kunmap(pga[i]->pg);
1095                 i++;
1096         }
1097 }
1098
1099 static int check_write_rcs(struct ptlrpc_request *req,
1100                            int requested_nob, int niocount,
1101                            u32 page_count, struct brw_page **pga)
1102 {
1103         int i;
1104         __u32 *remote_rcs;
1105
1106         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1107                                                   sizeof(*remote_rcs) *
1108                                                   niocount);
1109         if (remote_rcs == NULL) {
1110                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1111                 return -EPROTO;
1112         }
1113
1114         /* return error if any niobuf was in error */
1115         for (i = 0; i < niocount; i++) {
1116                 if ((int)remote_rcs[i] < 0)
1117                         return remote_rcs[i];
1118
1119                 if (remote_rcs[i] != 0) {
1120                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1121                                 i, remote_rcs[i], req);
1122                         return -EPROTO;
1123                 }
1124         }
1125
1126         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1127                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1128                        req->rq_bulk->bd_nob_transferred, requested_nob);
1129                 return -EPROTO;
1130         }
1131
1132         return 0;
1133 }
1134
1135 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1136 {
1137         if (p1->flag != p2->flag) {
1138                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1139                                   OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1140
1141                 /* warn if we try to combine flags that we don't know to be
1142                  * safe to combine */
1143                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1144                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1145                               p1->flag, p2->flag);
1146                 }
1147                 return 0;
1148         }
1149
1150         return (p1->off + p1->count == p2->off);
1151 }
1152
1153 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1154                              struct brw_page **pga, int opc,
1155                              cksum_type_t cksum_type)
1156 {
1157         __u32 cksum;
1158         int i = 0;
1159         struct cfs_crypto_hash_desc *hdesc;
1160         unsigned int bufsize;
1161         int err;
1162         unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1163
1164         LASSERT(pg_count > 0);
1165
1166         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1167         if (IS_ERR(hdesc)) {
1168                 CERROR("Unable to initialize checksum hash %s\n",
1169                        cfs_crypto_hash_name(cfs_alg));
1170                 return PTR_ERR(hdesc);
1171         }
1172
1173         while (nob > 0 && pg_count > 0) {
1174                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1175
1176                 /* corrupt the data before we compute the checksum, to
1177                  * simulate an OST->client data error */
1178                 if (i == 0 && opc == OST_READ &&
1179                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1180                         unsigned char *ptr = kmap(pga[i]->pg);
1181                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1182                         memcpy(ptr + off, "bad1", min(4, nob));
1183                         kunmap(pga[i]->pg);
1184                 }
1185                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1186                                   pga[i]->off & ~CFS_PAGE_MASK,
1187                                   count);
1188                 CDEBUG(D_PAGE,
1189                        "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1190                        pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1191                        (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1192                        page_private(pga[i]->pg),
1193                        (int)(pga[i]->off & ~CFS_PAGE_MASK));
1194
1195                 nob -= pga[i]->count;
1196                 pg_count--;
1197                 i++;
1198         }
1199
1200         bufsize = 4;
1201         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1202
1203         if (err)
1204                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1205
1206         /* For sending we only compute the wrong checksum instead
1207          * of corrupting the data so it is still correct on a redo */
1208         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1209                 cksum++;
1210
1211         return cksum;
1212 }
1213
1214 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1215                                 struct obdo *oa,
1216                                 struct lov_stripe_md *lsm, u32 page_count,
1217                                 struct brw_page **pga,
1218                                 struct ptlrpc_request **reqp,
1219                                 int reserve,
1220                                 int resend)
1221 {
1222         struct ptlrpc_request *req;
1223         struct ptlrpc_bulk_desc *desc;
1224         struct ost_body *body;
1225         struct obd_ioobj *ioobj;
1226         struct niobuf_remote *niobuf;
1227         int niocount, i, requested_nob, opc, rc;
1228         struct osc_brw_async_args *aa;
1229         struct req_capsule *pill;
1230         struct brw_page *pg_prev;
1231
1232         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1233                 return -ENOMEM; /* Recoverable */
1234         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1235                 return -EINVAL; /* Fatal */
1236
1237         if ((cmd & OBD_BRW_WRITE) != 0) {
1238                 opc = OST_WRITE;
1239                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1240                                                 osc_rq_pool,
1241                                                 &RQF_OST_BRW_WRITE);
1242         } else {
1243                 opc = OST_READ;
1244                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1245         }
1246         if (req == NULL)
1247                 return -ENOMEM;
1248
1249         for (niocount = i = 1; i < page_count; i++) {
1250                 if (!can_merge_pages(pga[i - 1], pga[i]))
1251                         niocount++;
1252         }
1253
1254         pill = &req->rq_pill;
1255         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1256                              sizeof(*ioobj));
1257         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1258                              niocount * sizeof(*niobuf));
1259
1260         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1261         if (rc) {
1262                 ptlrpc_request_free(req);
1263                 return rc;
1264         }
1265         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1266         ptlrpc_at_set_req_timeout(req);
1267         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1268          * retry logic */
1269         req->rq_no_retry_einprogress = 1;
1270
1271         desc = ptlrpc_prep_bulk_imp(req, page_count,
1272                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1273                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1274                 OST_BULK_PORTAL);
1275
1276         if (desc == NULL) {
1277                 rc = -ENOMEM;
1278                 goto out;
1279         }
1280         /* NB request now owns desc and will free it when it gets freed */
1281
1282         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1283         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1284         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1285         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1286
1287         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1288
1289         obdo_to_ioobj(oa, ioobj);
1290         ioobj->ioo_bufcnt = niocount;
1291         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1292          * that might be send for this request.  The actual number is decided
1293          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1294          * "max - 1" for old client compatibility sending "0", and also so the
1295          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1296         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1297         LASSERT(page_count > 0);
1298         pg_prev = pga[0];
1299         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1300                 struct brw_page *pg = pga[i];
1301                 int poff = pg->off & ~CFS_PAGE_MASK;
1302
1303                 LASSERT(pg->count > 0);
1304                 /* make sure there is no gap in the middle of page array */
1305                 LASSERTF(page_count == 1 ||
1306                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1307                           ergo(i > 0 && i < page_count - 1,
1308                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1309                           ergo(i == page_count - 1, poff == 0)),
1310                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1311                          i, page_count, pg, pg->off, pg->count);
1312                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1313                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1314                          i, page_count,
1315                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1316                          pg_prev->pg, page_private(pg_prev->pg),
1317                          pg_prev->pg->index, pg_prev->off);
1318                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1319                         (pg->flag & OBD_BRW_SRVLOCK));
1320
1321                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1322                 requested_nob += pg->count;
1323
1324                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1325                         niobuf--;
1326                         niobuf->len += pg->count;
1327                 } else {
1328                         niobuf->offset = pg->off;
1329                         niobuf->len = pg->count;
1330                         niobuf->flags = pg->flag;
1331                 }
1332                 pg_prev = pg;
1333         }
1334
1335         LASSERTF((void *)(niobuf - niocount) ==
1336                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1337                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1338                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1339
1340         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1341         if (resend) {
1342                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1343                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1344                         body->oa.o_flags = 0;
1345                 }
1346                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1347         }
1348
1349         if (osc_should_shrink_grant(cli))
1350                 osc_shrink_grant_local(cli, &body->oa);
1351
1352         /* size[REQ_REC_OFF] still sizeof (*body) */
1353         if (opc == OST_WRITE) {
1354                 if (cli->cl_checksum &&
1355                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1356                         /* store cl_cksum_type in a local variable since
1357                          * it can be changed via lprocfs */
1358                         cksum_type_t cksum_type = cli->cl_cksum_type;
1359
1360                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1361                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1362                                 body->oa.o_flags = 0;
1363                         }
1364                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1365                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1366                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1367                                                              page_count, pga,
1368                                                              OST_WRITE,
1369                                                              cksum_type);
1370                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1371                                body->oa.o_cksum);
1372                         /* save this in 'oa', too, for later checking */
1373                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374                         oa->o_flags |= cksum_type_pack(cksum_type);
1375                 } else {
1376                         /* clear out the checksum flag, in case this is a
1377                          * resend but cl_checksum is no longer set. b=11238 */
1378                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1379                 }
1380                 oa->o_cksum = body->oa.o_cksum;
1381                 /* 1 RC per niobuf */
1382                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1383                                      sizeof(__u32) * niocount);
1384         } else {
1385                 if (cli->cl_checksum &&
1386                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1387                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1388                                 body->oa.o_flags = 0;
1389                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1390                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1391                 }
1392         }
1393         ptlrpc_request_set_replen(req);
1394
1395         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1396         aa = ptlrpc_req_async_args(req);
1397         aa->aa_oa = oa;
1398         aa->aa_requested_nob = requested_nob;
1399         aa->aa_nio_count = niocount;
1400         aa->aa_page_count = page_count;
1401         aa->aa_resends = 0;
1402         aa->aa_ppga = pga;
1403         aa->aa_cli = cli;
1404         INIT_LIST_HEAD(&aa->aa_oaps);
1405
1406         *reqp = req;
1407         return 0;
1408
1409  out:
1410         ptlrpc_req_finished(req);
1411         return rc;
1412 }
1413
1414 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1415                                 __u32 client_cksum, __u32 server_cksum, int nob,
1416                                 u32 page_count, struct brw_page **pga,
1417                                 cksum_type_t client_cksum_type)
1418 {
1419         __u32 new_cksum;
1420         char *msg;
1421         cksum_type_t cksum_type;
1422
1423         if (server_cksum == client_cksum) {
1424                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1425                 return 0;
1426         }
1427
1428         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1429                                        oa->o_flags : 0);
1430         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1431                                       cksum_type);
1432
1433         if (cksum_type != client_cksum_type)
1434                 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1435                         ;
1436         else if (new_cksum == server_cksum)
1437                 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1438                         ;
1439         else if (new_cksum == client_cksum)
1440                 msg = "changed in transit before arrival at OST";
1441         else
1442                 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1443                         ;
1444
1445         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1446                            " object "DOSTID" extent [%llu-%llu]\n",
1447                            msg, libcfs_nid2str(peer->nid),
1448                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1449                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1450                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1451                            POSTID(&oa->o_oi), pga[0]->off,
1452                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1453         CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1454                client_cksum, client_cksum_type,
1455                server_cksum, cksum_type, new_cksum);
1456         return 1;
1457 }
1458
1459 /* Note rc enters this function as number of bytes transferred */
1460 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1461 {
1462         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1463         const lnet_process_id_t *peer =
1464                         &req->rq_import->imp_connection->c_peer;
1465         struct client_obd *cli = aa->aa_cli;
1466         struct ost_body *body;
1467         __u32 client_cksum = 0;
1468
1469         if (rc < 0 && rc != -EDQUOT) {
1470                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1471                 return rc;
1472         }
1473
1474         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1475         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1476         if (body == NULL) {
1477                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1478                 return -EPROTO;
1479         }
1480
1481         /* set/clear over quota flag for a uid/gid */
1482         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1483             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1484                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1485
1486                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1487                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1488                        body->oa.o_flags);
1489                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1490         }
1491
1492         osc_update_grant(cli, body);
1493
1494         if (rc < 0)
1495                 return rc;
1496
1497         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1498                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1499
1500         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1501                 if (rc > 0) {
1502                         CERROR("Unexpected +ve rc %d\n", rc);
1503                         return -EPROTO;
1504                 }
1505                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1506
1507                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1508                         return -EAGAIN;
1509
1510                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1511                     check_write_checksum(&body->oa, peer, client_cksum,
1512                                          body->oa.o_cksum, aa->aa_requested_nob,
1513                                          aa->aa_page_count, aa->aa_ppga,
1514                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1515                         return -EAGAIN;
1516
1517                 rc = check_write_rcs(req, aa->aa_requested_nob,
1518                                      aa->aa_nio_count,
1519                                      aa->aa_page_count, aa->aa_ppga);
1520                 goto out;
1521         }
1522
1523         /* The rest of this function executes only for OST_READs */
1524
1525         /* if unwrap_bulk failed, return -EAGAIN to retry */
1526         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1527         if (rc < 0) {
1528                 rc = -EAGAIN;
1529                 goto out;
1530         }
1531
1532         if (rc > aa->aa_requested_nob) {
1533                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1534                        aa->aa_requested_nob);
1535                 return -EPROTO;
1536         }
1537
1538         if (rc != req->rq_bulk->bd_nob_transferred) {
1539                 CERROR("Unexpected rc %d (%d transferred)\n",
1540                         rc, req->rq_bulk->bd_nob_transferred);
1541                 return -EPROTO;
1542         }
1543
1544         if (rc < aa->aa_requested_nob)
1545                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1546
1547         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1548                 static int cksum_counter;
1549                 __u32 server_cksum = body->oa.o_cksum;
1550                 char *via;
1551                 char *router;
1552                 cksum_type_t cksum_type;
1553
1554                 cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
1555                                                body->oa.o_flags : 0);
1556                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1557                                                  aa->aa_ppga, OST_READ,
1558                                                  cksum_type);
1559
1560                 if (peer->nid == req->rq_bulk->bd_sender) {
1561                         via = router = "";
1562                 } else {
1563                         via = " via ";
1564                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1565                 }
1566
1567                 if (server_cksum != client_cksum) {
1568                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1569                                            req->rq_import->imp_obd->obd_name,
1570                                            libcfs_nid2str(peer->nid),
1571                                            via, router,
1572                                            body->oa.o_valid & OBD_MD_FLFID ?
1573                                            body->oa.o_parent_seq : (__u64)0,
1574                                            body->oa.o_valid & OBD_MD_FLFID ?
1575                                            body->oa.o_parent_oid : 0,
1576                                            body->oa.o_valid & OBD_MD_FLFID ?
1577                                            body->oa.o_parent_ver : 0,
1578                                            POSTID(&body->oa.o_oi),
1579                                            aa->aa_ppga[0]->off,
1580                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1581                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1582                                            1);
1583                         CERROR("client %x, server %x, cksum_type %x\n",
1584                                client_cksum, server_cksum, cksum_type);
1585                         cksum_counter = 0;
1586                         aa->aa_oa->o_cksum = client_cksum;
1587                         rc = -EAGAIN;
1588                 } else {
1589                         cksum_counter++;
1590                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1591                         rc = 0;
1592                 }
1593         } else if (unlikely(client_cksum)) {
1594                 static int cksum_missed;
1595
1596                 cksum_missed++;
1597                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1598                         CERROR("Checksum %u requested from %s but not sent\n",
1599                                cksum_missed, libcfs_nid2str(peer->nid));
1600         } else {
1601                 rc = 0;
1602         }
1603 out:
1604         if (rc >= 0)
1605                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1606                                      aa->aa_oa, &body->oa);
1607
1608         return rc;
1609 }
1610
1611 static int osc_brw_redo_request(struct ptlrpc_request *request,
1612                                 struct osc_brw_async_args *aa, int rc)
1613 {
1614         struct ptlrpc_request *new_req;
1615         struct osc_brw_async_args *new_aa;
1616         struct osc_async_page *oap;
1617
1618         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1619                   "redo for recoverable error %d", rc);
1620
1621         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1622                                         OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1623                                   aa->aa_cli, aa->aa_oa,
1624                                   NULL /* lsm unused by osc currently */,
1625                                   aa->aa_page_count, aa->aa_ppga,
1626                                   &new_req, 0, 1);
1627         if (rc)
1628                 return rc;
1629
1630         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1631                 if (oap->oap_request != NULL) {
1632                         LASSERTF(request == oap->oap_request,
1633                                  "request %p != oap_request %p\n",
1634                                  request, oap->oap_request);
1635                         if (oap->oap_interrupted) {
1636                                 ptlrpc_req_finished(new_req);
1637                                 return -EINTR;
1638                         }
1639                 }
1640         }
1641         /* New request takes over pga and oaps from old request.
1642          * Note that copying a list_head doesn't work, need to move it... */
1643         aa->aa_resends++;
1644         new_req->rq_interpret_reply = request->rq_interpret_reply;
1645         new_req->rq_async_args = request->rq_async_args;
1646         /* cap resend delay to the current request timeout, this is similar to
1647          * what ptlrpc does (see after_reply()) */
1648         if (aa->aa_resends > new_req->rq_timeout)
1649                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1650         else
1651                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1652         new_req->rq_generation_set = 1;
1653         new_req->rq_import_generation = request->rq_import_generation;
1654
1655         new_aa = ptlrpc_req_async_args(new_req);
1656
1657         INIT_LIST_HEAD(&new_aa->aa_oaps);
1658         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1659         INIT_LIST_HEAD(&new_aa->aa_exts);
1660         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1661         new_aa->aa_resends = aa->aa_resends;
1662
1663         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1664                 if (oap->oap_request) {
1665                         ptlrpc_req_finished(oap->oap_request);
1666                         oap->oap_request = ptlrpc_request_addref(new_req);
1667                 }
1668         }
1669
1670         /* XXX: This code will run into problem if we're going to support
1671          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1672          * and wait for all of them to be finished. We should inherit request
1673          * set from old request. */
1674         ptlrpcd_add_req(new_req);
1675
1676         DEBUG_REQ(D_INFO, new_req, "new request");
1677         return 0;
1678 }
1679
1680 /*
1681  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1682  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1683  * fine for our small page arrays and doesn't require allocation.  its an
1684  * insertion sort that swaps elements that are strides apart, shrinking the
1685  * stride down until its '1' and the array is sorted.
1686  */
1687 static void sort_brw_pages(struct brw_page **array, int num)
1688 {
1689         int stride, i, j;
1690         struct brw_page *tmp;
1691
1692         if (num == 1)
1693                 return;
1694         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1695                 ;
1696
1697         do {
1698                 stride /= 3;
1699                 for (i = stride ; i < num ; i++) {
1700                         tmp = array[i];
1701                         j = i;
1702                         while (j >= stride && array[j - stride]->off > tmp->off) {
1703                                 array[j] = array[j - stride];
1704                                 j -= stride;
1705                         }
1706                         array[j] = tmp;
1707                 }
1708         } while (stride > 1);
1709 }
1710
1711 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1712 {
1713         LASSERT(ppga != NULL);
1714         kfree(ppga);
1715 }
1716
1717 static int brw_interpret(const struct lu_env *env,
1718                          struct ptlrpc_request *req, void *data, int rc)
1719 {
1720         struct osc_brw_async_args *aa = data;
1721         struct osc_extent *ext;
1722         struct osc_extent *tmp;
1723         struct cl_object *obj = NULL;
1724         struct client_obd *cli = aa->aa_cli;
1725
1726         rc = osc_brw_fini_request(req, rc);
1727         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1728         /* When server return -EINPROGRESS, client should always retry
1729          * regardless of the number of times the bulk was resent already. */
1730         if (osc_recoverable_error(rc)) {
1731                 if (req->rq_import_generation !=
1732                     req->rq_import->imp_generation) {
1733                         CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1734                                req->rq_import->imp_obd->obd_name,
1735                                POSTID(&aa->aa_oa->o_oi), rc);
1736                 } else if (rc == -EINPROGRESS ||
1737                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1738                         rc = osc_brw_redo_request(req, aa, rc);
1739                 } else {
1740                         CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1741                                req->rq_import->imp_obd->obd_name,
1742                                POSTID(&aa->aa_oa->o_oi), rc);
1743                 }
1744
1745                 if (rc == 0)
1746                         return 0;
1747                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1748                         rc = -EIO;
1749         }
1750
1751         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1752                 if (obj == NULL && rc == 0) {
1753                         obj = osc2cl(ext->oe_obj);
1754                         cl_object_get(obj);
1755                 }
1756
1757                 list_del_init(&ext->oe_link);
1758                 osc_extent_finish(env, ext, 1, rc);
1759         }
1760         LASSERT(list_empty(&aa->aa_exts));
1761         LASSERT(list_empty(&aa->aa_oaps));
1762
1763         if (obj != NULL) {
1764                 struct obdo *oa = aa->aa_oa;
1765                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1766                 unsigned long valid = 0;
1767
1768                 LASSERT(rc == 0);
1769                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1770                         attr->cat_blocks = oa->o_blocks;
1771                         valid |= CAT_BLOCKS;
1772                 }
1773                 if (oa->o_valid & OBD_MD_FLMTIME) {
1774                         attr->cat_mtime = oa->o_mtime;
1775                         valid |= CAT_MTIME;
1776                 }
1777                 if (oa->o_valid & OBD_MD_FLATIME) {
1778                         attr->cat_atime = oa->o_atime;
1779                         valid |= CAT_ATIME;
1780                 }
1781                 if (oa->o_valid & OBD_MD_FLCTIME) {
1782                         attr->cat_ctime = oa->o_ctime;
1783                         valid |= CAT_CTIME;
1784                 }
1785                 if (valid != 0) {
1786                         cl_object_attr_lock(obj);
1787                         cl_object_attr_set(env, obj, attr, valid);
1788                         cl_object_attr_unlock(obj);
1789                 }
1790                 cl_object_put(env, obj);
1791         }
1792         OBDO_FREE(aa->aa_oa);
1793
1794         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1795                           req->rq_bulk->bd_nob_transferred);
1796         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1797         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1798
1799         client_obd_list_lock(&cli->cl_loi_list_lock);
1800         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1801          * is called so we know whether to go to sync BRWs or wait for more
1802          * RPCs to complete */
1803         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1804                 cli->cl_w_in_flight--;
1805         else
1806                 cli->cl_r_in_flight--;
1807         osc_wake_cache_waiters(cli);
1808         client_obd_list_unlock(&cli->cl_loi_list_lock);
1809
1810         osc_io_unplug(env, cli, NULL);
1811         return rc;
1812 }
1813
1814 /**
1815  * Build an RPC by the list of extent @ext_list. The caller must ensure
1816  * that the total pages in this list are NOT over max pages per RPC.
1817  * Extents in the list must be in OES_RPC state.
1818  */
1819 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1820                   struct list_head *ext_list, int cmd)
1821 {
1822         struct ptlrpc_request *req = NULL;
1823         struct osc_extent *ext;
1824         struct brw_page **pga = NULL;
1825         struct osc_brw_async_args *aa = NULL;
1826         struct obdo *oa = NULL;
1827         struct osc_async_page *oap;
1828         struct osc_async_page *tmp;
1829         struct cl_req *clerq = NULL;
1830         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1831         struct ldlm_lock *lock = NULL;
1832         struct cl_req_attr *crattr = NULL;
1833         u64 starting_offset = OBD_OBJECT_EOF;
1834         u64 ending_offset = 0;
1835         int mpflag = 0;
1836         int mem_tight = 0;
1837         int page_count = 0;
1838         int i;
1839         int rc;
1840         struct ost_body *body;
1841         LIST_HEAD(rpc_list);
1842
1843         LASSERT(!list_empty(ext_list));
1844
1845         /* add pages into rpc_list to build BRW rpc */
1846         list_for_each_entry(ext, ext_list, oe_link) {
1847                 LASSERT(ext->oe_state == OES_RPC);
1848                 mem_tight |= ext->oe_memalloc;
1849                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1850                         ++page_count;
1851                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1852                         if (starting_offset > oap->oap_obj_off)
1853                                 starting_offset = oap->oap_obj_off;
1854                         else
1855                                 LASSERT(oap->oap_page_off == 0);
1856                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1857                                 ending_offset = oap->oap_obj_off +
1858                                                 oap->oap_count;
1859                         else
1860                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1861                                         PAGE_CACHE_SIZE);
1862                 }
1863         }
1864
1865         if (mem_tight)
1866                 mpflag = cfs_memory_pressure_get_and_set();
1867
1868         crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
1869         if (!crattr) {
1870                 rc = -ENOMEM;
1871                 goto out;
1872         }
1873
1874         pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
1875         if (pga == NULL) {
1876                 rc = -ENOMEM;
1877                 goto out;
1878         }
1879
1880         OBDO_ALLOC(oa);
1881         if (oa == NULL) {
1882                 rc = -ENOMEM;
1883                 goto out;
1884         }
1885
1886         i = 0;
1887         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1888                 struct cl_page *page = oap2cl_page(oap);
1889                 if (clerq == NULL) {
1890                         clerq = cl_req_alloc(env, page, crt,
1891                                              1 /* only 1-object rpcs for now */);
1892                         if (IS_ERR(clerq)) {
1893                                 rc = PTR_ERR(clerq);
1894                                 goto out;
1895                         }
1896                         lock = oap->oap_ldlm_lock;
1897                 }
1898                 if (mem_tight)
1899                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1900                 pga[i] = &oap->oap_brw_page;
1901                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1902                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1903                        pga[i]->pg, page_index(oap->oap_page), oap,
1904                        pga[i]->flag);
1905                 i++;
1906                 cl_req_page_add(env, clerq, page);
1907         }
1908
1909         /* always get the data for the obdo for the rpc */
1910         LASSERT(clerq != NULL);
1911         crattr->cra_oa = oa;
1912         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1913         if (lock) {
1914                 oa->o_handle = lock->l_remote_handle;
1915                 oa->o_valid |= OBD_MD_FLHANDLE;
1916         }
1917
1918         rc = cl_req_prep(env, clerq);
1919         if (rc != 0) {
1920                 CERROR("cl_req_prep failed: %d\n", rc);
1921                 goto out;
1922         }
1923
1924         sort_brw_pages(pga, page_count);
1925         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1926                         pga, &req, 1, 0);
1927         if (rc != 0) {
1928                 CERROR("prep_req failed: %d\n", rc);
1929                 goto out;
1930         }
1931
1932         req->rq_interpret_reply = brw_interpret;
1933
1934         if (mem_tight != 0)
1935                 req->rq_memalloc = 1;
1936
1937         /* Need to update the timestamps after the request is built in case
1938          * we race with setattr (locally or in queue at OST).  If OST gets
1939          * later setattr before earlier BRW (as determined by the request xid),
1940          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1941          * way to do this in a single call.  bug 10150 */
1942         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1943         crattr->cra_oa = &body->oa;
1944         cl_req_attr_set(env, clerq, crattr,
1945                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1946
1947         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1948
1949         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1950         aa = ptlrpc_req_async_args(req);
1951         INIT_LIST_HEAD(&aa->aa_oaps);
1952         list_splice_init(&rpc_list, &aa->aa_oaps);
1953         INIT_LIST_HEAD(&aa->aa_exts);
1954         list_splice_init(ext_list, &aa->aa_exts);
1955         aa->aa_clerq = clerq;
1956
1957         /* queued sync pages can be torn down while the pages
1958          * were between the pending list and the rpc */
1959         tmp = NULL;
1960         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1961                 /* only one oap gets a request reference */
1962                 if (tmp == NULL)
1963                         tmp = oap;
1964                 if (oap->oap_interrupted && !req->rq_intr) {
1965                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1966                                         oap, req);
1967                         ptlrpc_mark_interrupted(req);
1968                 }
1969         }
1970         if (tmp != NULL)
1971                 tmp->oap_request = ptlrpc_request_addref(req);
1972
1973         client_obd_list_lock(&cli->cl_loi_list_lock);
1974         starting_offset >>= PAGE_CACHE_SHIFT;
1975         if (cmd == OBD_BRW_READ) {
1976                 cli->cl_r_in_flight++;
1977                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1978                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1979                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1980                                       starting_offset + 1);
1981         } else {
1982                 cli->cl_w_in_flight++;
1983                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1984                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1985                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1986                                       starting_offset + 1);
1987         }
1988         client_obd_list_unlock(&cli->cl_loi_list_lock);
1989
1990         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
1991                   page_count, aa, cli->cl_r_in_flight,
1992                   cli->cl_w_in_flight);
1993
1994         ptlrpcd_add_req(req);
1995         rc = 0;
1996
1997 out:
1998         if (mem_tight != 0)
1999                 cfs_memory_pressure_restore(mpflag);
2000
2001         kfree(crattr);
2002
2003         if (rc != 0) {
2004                 LASSERT(req == NULL);
2005
2006                 if (oa)
2007                         OBDO_FREE(oa);
2008                 kfree(pga);
2009                 /* this should happen rarely and is pretty bad, it makes the
2010                  * pending list not follow the dirty order */
2011                 while (!list_empty(ext_list)) {
2012                         ext = list_entry(ext_list->next, struct osc_extent,
2013                                              oe_link);
2014                         list_del_init(&ext->oe_link);
2015                         osc_extent_finish(env, ext, 0, rc);
2016                 }
2017                 if (clerq && !IS_ERR(clerq))
2018                         cl_req_completion(env, clerq, rc);
2019         }
2020         return rc;
2021 }
2022
2023 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2024                                         struct ldlm_enqueue_info *einfo)
2025 {
2026         void *data = einfo->ei_cbdata;
2027         int set = 0;
2028
2029         LASSERT(lock != NULL);
2030         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2031         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2032         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2033         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2034
2035         lock_res_and_lock(lock);
2036         spin_lock(&osc_ast_guard);
2037
2038         if (lock->l_ast_data == NULL)
2039                 lock->l_ast_data = data;
2040         if (lock->l_ast_data == data)
2041                 set = 1;
2042
2043         spin_unlock(&osc_ast_guard);
2044         unlock_res_and_lock(lock);
2045
2046         return set;
2047 }
2048
2049 static int osc_set_data_with_check(struct lustre_handle *lockh,
2050                                    struct ldlm_enqueue_info *einfo)
2051 {
2052         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2053         int set = 0;
2054
2055         if (lock != NULL) {
2056                 set = osc_set_lock_data_with_check(lock, einfo);
2057                 LDLM_LOCK_PUT(lock);
2058         } else
2059                 CERROR("lockh %p, data %p - client evicted?\n",
2060                        lockh, einfo->ei_cbdata);
2061         return set;
2062 }
2063
2064 /* find any ldlm lock of the inode in osc
2065  * return 0    not find
2066  *      1    find one
2067  *      < 0    error */
2068 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2069                            ldlm_iterator_t replace, void *data)
2070 {
2071         struct ldlm_res_id res_id;
2072         struct obd_device *obd = class_exp2obd(exp);
2073         int rc = 0;
2074
2075         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2076         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2077         if (rc == LDLM_ITER_STOP)
2078                 return 1;
2079         if (rc == LDLM_ITER_CONTINUE)
2080                 return 0;
2081         return rc;
2082 }
2083
2084 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2085                             obd_enqueue_update_f upcall, void *cookie,
2086                             __u64 *flags, int agl, int rc)
2087 {
2088         int intent = *flags & LDLM_FL_HAS_INTENT;
2089
2090         if (intent) {
2091                 /* The request was created before ldlm_cli_enqueue call. */
2092                 if (rc == ELDLM_LOCK_ABORTED) {
2093                         struct ldlm_reply *rep;
2094                         rep = req_capsule_server_get(&req->rq_pill,
2095                                                      &RMF_DLM_REP);
2096
2097                         LASSERT(rep != NULL);
2098                         rep->lock_policy_res1 =
2099                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2100                         if (rep->lock_policy_res1)
2101                                 rc = rep->lock_policy_res1;
2102                 }
2103         }
2104
2105         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2106             (rc == 0)) {
2107                 *flags |= LDLM_FL_LVB_READY;
2108                 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2109                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2110         }
2111
2112         /* Call the update callback. */
2113         rc = (*upcall)(cookie, rc);
2114         return rc;
2115 }
2116
2117 static int osc_enqueue_interpret(const struct lu_env *env,
2118                                  struct ptlrpc_request *req,
2119                                  struct osc_enqueue_args *aa, int rc)
2120 {
2121         struct ldlm_lock *lock;
2122         struct lustre_handle handle;
2123         __u32 mode;
2124         struct ost_lvb *lvb;
2125         __u32 lvb_len;
2126         __u64 *flags = aa->oa_flags;
2127
2128         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2129          * might be freed anytime after lock upcall has been called. */
2130         lustre_handle_copy(&handle, aa->oa_lockh);
2131         mode = aa->oa_ei->ei_mode;
2132
2133         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2134          * be valid. */
2135         lock = ldlm_handle2lock(&handle);
2136
2137         /* Take an additional reference so that a blocking AST that
2138          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2139          * to arrive after an upcall has been executed by
2140          * osc_enqueue_fini(). */
2141         ldlm_lock_addref(&handle, mode);
2142
2143         /* Let CP AST to grant the lock first. */
2144         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2145
2146         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2147                 lvb = NULL;
2148                 lvb_len = 0;
2149         } else {
2150                 lvb = aa->oa_lvb;
2151                 lvb_len = sizeof(*aa->oa_lvb);
2152         }
2153
2154         /* Complete obtaining the lock procedure. */
2155         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2156                                    mode, flags, lvb, lvb_len, &handle, rc);
2157         /* Complete osc stuff. */
2158         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2159                               flags, aa->oa_agl, rc);
2160
2161         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2162
2163         /* Release the lock for async request. */
2164         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2165                 /*
2166                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2167                  * not already released by
2168                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2169                  */
2170                 ldlm_lock_decref(&handle, mode);
2171
2172         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2173                  aa->oa_lockh, req, aa);
2174         ldlm_lock_decref(&handle, mode);
2175         LDLM_LOCK_PUT(lock);
2176         return rc;
2177 }
2178
2179 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2180
2181 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2182  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2183  * other synchronous requests, however keeping some locks and trying to obtain
2184  * others may take a considerable amount of time in a case of ost failure; and
2185  * when other sync requests do not get released lock from a client, the client
2186  * is excluded from the cluster -- such scenarious make the life difficult, so
2187  * release locks just after they are obtained. */
2188 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2189                      __u64 *flags, ldlm_policy_data_t *policy,
2190                      struct ost_lvb *lvb, int kms_valid,
2191                      obd_enqueue_update_f upcall, void *cookie,
2192                      struct ldlm_enqueue_info *einfo,
2193                      struct lustre_handle *lockh,
2194                      struct ptlrpc_request_set *rqset, int async, int agl)
2195 {
2196         struct obd_device *obd = exp->exp_obd;
2197         struct ptlrpc_request *req = NULL;
2198         int intent = *flags & LDLM_FL_HAS_INTENT;
2199         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2200         ldlm_mode_t mode;
2201         int rc;
2202
2203         /* Filesystem lock extents are extended to page boundaries so that
2204          * dealing with the page cache is a little smoother.  */
2205         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2206         policy->l_extent.end |= ~CFS_PAGE_MASK;
2207
2208         /*
2209          * kms is not valid when either object is completely fresh (so that no
2210          * locks are cached), or object was evicted. In the latter case cached
2211          * lock cannot be used, because it would prime inode state with
2212          * potentially stale LVB.
2213          */
2214         if (!kms_valid)
2215                 goto no_match;
2216
2217         /* Next, search for already existing extent locks that will cover us */
2218         /* If we're trying to read, we also search for an existing PW lock.  The
2219          * VFS and page cache already protect us locally, so lots of readers/
2220          * writers can share a single PW lock.
2221          *
2222          * There are problems with conversion deadlocks, so instead of
2223          * converting a read lock to a write lock, we'll just enqueue a new
2224          * one.
2225          *
2226          * At some point we should cancel the read lock instead of making them
2227          * send us a blocking callback, but there are problems with canceling
2228          * locks out from other users right now, too. */
2229         mode = einfo->ei_mode;
2230         if (einfo->ei_mode == LCK_PR)
2231                 mode |= LCK_PW;
2232         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2233                                einfo->ei_type, policy, mode, lockh, 0);
2234         if (mode) {
2235                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2236
2237                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2238                         /* For AGL, if enqueue RPC is sent but the lock is not
2239                          * granted, then skip to process this strpe.
2240                          * Return -ECANCELED to tell the caller. */
2241                         ldlm_lock_decref(lockh, mode);
2242                         LDLM_LOCK_PUT(matched);
2243                         return -ECANCELED;
2244                 }
2245
2246                 if (osc_set_lock_data_with_check(matched, einfo)) {
2247                         *flags |= LDLM_FL_LVB_READY;
2248                         /* addref the lock only if not async requests and PW
2249                          * lock is matched whereas we asked for PR. */
2250                         if (!rqset && einfo->ei_mode != mode)
2251                                 ldlm_lock_addref(lockh, LCK_PR);
2252                         if (intent) {
2253                                 /* I would like to be able to ASSERT here that
2254                                  * rss <= kms, but I can't, for reasons which
2255                                  * are explained in lov_enqueue() */
2256                         }
2257
2258                         /* We already have a lock, and it's referenced.
2259                          *
2260                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2261                          * AGL upcall may change it to CLS_HELD directly. */
2262                         (*upcall)(cookie, ELDLM_OK);
2263
2264                         if (einfo->ei_mode != mode)
2265                                 ldlm_lock_decref(lockh, LCK_PW);
2266                         else if (rqset)
2267                                 /* For async requests, decref the lock. */
2268                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2269                         LDLM_LOCK_PUT(matched);
2270                         return ELDLM_OK;
2271                 }
2272
2273                 ldlm_lock_decref(lockh, mode);
2274                 LDLM_LOCK_PUT(matched);
2275         }
2276
2277  no_match:
2278         if (intent) {
2279                 LIST_HEAD(cancels);
2280                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2281                                            &RQF_LDLM_ENQUEUE_LVB);
2282                 if (req == NULL)
2283                         return -ENOMEM;
2284
2285                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2286                 if (rc) {
2287                         ptlrpc_request_free(req);
2288                         return rc;
2289                 }
2290
2291                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2292                                      sizeof(*lvb));
2293                 ptlrpc_request_set_replen(req);
2294         }
2295
2296         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2297         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2298
2299         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2300                               sizeof(*lvb), LVB_T_OST, lockh, async);
2301         if (rqset) {
2302                 if (!rc) {
2303                         struct osc_enqueue_args *aa;
2304                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2305                         aa = ptlrpc_req_async_args(req);
2306                         aa->oa_ei = einfo;
2307                         aa->oa_exp = exp;
2308                         aa->oa_flags  = flags;
2309                         aa->oa_upcall = upcall;
2310                         aa->oa_cookie = cookie;
2311                         aa->oa_lvb    = lvb;
2312                         aa->oa_lockh  = lockh;
2313                         aa->oa_agl    = !!agl;
2314
2315                         req->rq_interpret_reply =
2316                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2317                         if (rqset == PTLRPCD_SET)
2318                                 ptlrpcd_add_req(req);
2319                         else
2320                                 ptlrpc_set_add_req(rqset, req);
2321                 } else if (intent) {
2322                         ptlrpc_req_finished(req);
2323                 }
2324                 return rc;
2325         }
2326
2327         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2328         if (intent)
2329                 ptlrpc_req_finished(req);
2330
2331         return rc;
2332 }
2333
2334 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2335                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2336                    __u64 *flags, void *data, struct lustre_handle *lockh,
2337                    int unref)
2338 {
2339         struct obd_device *obd = exp->exp_obd;
2340         __u64 lflags = *flags;
2341         ldlm_mode_t rc;
2342
2343         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2344                 return -EIO;
2345
2346         /* Filesystem lock extents are extended to page boundaries so that
2347          * dealing with the page cache is a little smoother */
2348         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2349         policy->l_extent.end |= ~CFS_PAGE_MASK;
2350
2351         /* Next, search for already existing extent locks that will cover us */
2352         /* If we're trying to read, we also search for an existing PW lock.  The
2353          * VFS and page cache already protect us locally, so lots of readers/
2354          * writers can share a single PW lock. */
2355         rc = mode;
2356         if (mode == LCK_PR)
2357                 rc |= LCK_PW;
2358         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2359                              res_id, type, policy, rc, lockh, unref);
2360         if (rc) {
2361                 if (data != NULL) {
2362                         if (!osc_set_data_with_check(lockh, data)) {
2363                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2364                                         ldlm_lock_decref(lockh, rc);
2365                                 return 0;
2366                         }
2367                 }
2368                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2369                         ldlm_lock_addref(lockh, LCK_PR);
2370                         ldlm_lock_decref(lockh, LCK_PW);
2371                 }
2372                 return rc;
2373         }
2374         return rc;
2375 }
2376
2377 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2378 {
2379         if (unlikely(mode == LCK_GROUP))
2380                 ldlm_lock_decref_and_cancel(lockh, mode);
2381         else
2382                 ldlm_lock_decref(lockh, mode);
2383
2384         return 0;
2385 }
2386
2387 static int osc_statfs_interpret(const struct lu_env *env,
2388                                 struct ptlrpc_request *req,
2389                                 struct osc_async_args *aa, int rc)
2390 {
2391         struct obd_statfs *msfs;
2392
2393         if (rc == -EBADR)
2394                 /* The request has in fact never been sent
2395                  * due to issues at a higher level (LOV).
2396                  * Exit immediately since the caller is
2397                  * aware of the problem and takes care
2398                  * of the clean up */
2399                  return rc;
2400
2401         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2402             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2403                 rc = 0;
2404                 goto out;
2405         }
2406
2407         if (rc != 0)
2408                 goto out;
2409
2410         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2411         if (msfs == NULL) {
2412                 rc = -EPROTO;
2413                 goto out;
2414         }
2415
2416         *aa->aa_oi->oi_osfs = *msfs;
2417 out:
2418         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2419         return rc;
2420 }
2421
2422 static int osc_statfs_async(struct obd_export *exp,
2423                             struct obd_info *oinfo, __u64 max_age,
2424                             struct ptlrpc_request_set *rqset)
2425 {
2426         struct obd_device *obd = class_exp2obd(exp);
2427         struct ptlrpc_request *req;
2428         struct osc_async_args *aa;
2429         int rc;
2430
2431         /* We could possibly pass max_age in the request (as an absolute
2432          * timestamp or a "seconds.usec ago") so the target can avoid doing
2433          * extra calls into the filesystem if that isn't necessary (e.g.
2434          * during mount that would help a bit).  Having relative timestamps
2435          * is not so great if request processing is slow, while absolute
2436          * timestamps are not ideal because they need time synchronization. */
2437         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2438         if (req == NULL)
2439                 return -ENOMEM;
2440
2441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2442         if (rc) {
2443                 ptlrpc_request_free(req);
2444                 return rc;
2445         }
2446         ptlrpc_request_set_replen(req);
2447         req->rq_request_portal = OST_CREATE_PORTAL;
2448         ptlrpc_at_set_req_timeout(req);
2449
2450         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2451                 /* procfs requests not want stat in wait for avoid deadlock */
2452                 req->rq_no_resend = 1;
2453                 req->rq_no_delay = 1;
2454         }
2455
2456         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2457         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2458         aa = ptlrpc_req_async_args(req);
2459         aa->aa_oi = oinfo;
2460
2461         ptlrpc_set_add_req(rqset, req);
2462         return 0;
2463 }
2464
2465 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2466                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2467 {
2468         struct obd_device *obd = class_exp2obd(exp);
2469         struct obd_statfs *msfs;
2470         struct ptlrpc_request *req;
2471         struct obd_import *imp = NULL;
2472         int rc;
2473
2474         /*Since the request might also come from lprocfs, so we need
2475          *sync this with client_disconnect_export Bug15684*/
2476         down_read(&obd->u.cli.cl_sem);
2477         if (obd->u.cli.cl_import)
2478                 imp = class_import_get(obd->u.cli.cl_import);
2479         up_read(&obd->u.cli.cl_sem);
2480         if (!imp)
2481                 return -ENODEV;
2482
2483         /* We could possibly pass max_age in the request (as an absolute
2484          * timestamp or a "seconds.usec ago") so the target can avoid doing
2485          * extra calls into the filesystem if that isn't necessary (e.g.
2486          * during mount that would help a bit).  Having relative timestamps
2487          * is not so great if request processing is slow, while absolute
2488          * timestamps are not ideal because they need time synchronization. */
2489         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2490
2491         class_import_put(imp);
2492
2493         if (req == NULL)
2494                 return -ENOMEM;
2495
2496         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2497         if (rc) {
2498                 ptlrpc_request_free(req);
2499                 return rc;
2500         }
2501         ptlrpc_request_set_replen(req);
2502         req->rq_request_portal = OST_CREATE_PORTAL;
2503         ptlrpc_at_set_req_timeout(req);
2504
2505         if (flags & OBD_STATFS_NODELAY) {
2506                 /* procfs requests not want stat in wait for avoid deadlock */
2507                 req->rq_no_resend = 1;
2508                 req->rq_no_delay = 1;
2509         }
2510
2511         rc = ptlrpc_queue_wait(req);
2512         if (rc)
2513                 goto out;
2514
2515         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2516         if (msfs == NULL) {
2517                 rc = -EPROTO;
2518                 goto out;
2519         }
2520
2521         *osfs = *msfs;
2522
2523  out:
2524         ptlrpc_req_finished(req);
2525         return rc;
2526 }
2527
2528 /* Retrieve object striping information.
2529  *
2530  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2531  * the maximum number of OST indices which will fit in the user buffer.
2532  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2533  */
2534 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2535 {
2536         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2537         struct lov_user_md_v3 lum, *lumk;
2538         struct lov_user_ost_data_v1 *lmm_objects;
2539         int rc = 0, lum_size;
2540
2541         if (!lsm)
2542                 return -ENODATA;
2543
2544         /* we only need the header part from user space to get lmm_magic and
2545          * lmm_stripe_count, (the header part is common to v1 and v3) */
2546         lum_size = sizeof(struct lov_user_md_v1);
2547         if (copy_from_user(&lum, lump, lum_size))
2548                 return -EFAULT;
2549
2550         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2551             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2552                 return -EINVAL;
2553
2554         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2555         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2556         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2557         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2558
2559         /* we can use lov_mds_md_size() to compute lum_size
2560          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2561         if (lum.lmm_stripe_count > 0) {
2562                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2563                 lumk = kzalloc(lum_size, GFP_NOFS);
2564                 if (!lumk)
2565                         return -ENOMEM;
2566
2567                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2568                         lmm_objects =
2569                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2570                 else
2571                         lmm_objects = &(lumk->lmm_objects[0]);
2572                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2573         } else {
2574                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2575                 lumk = &lum;
2576         }
2577
2578         lumk->lmm_oi = lsm->lsm_oi;
2579         lumk->lmm_stripe_count = 1;
2580
2581         if (copy_to_user(lump, lumk, lum_size))
2582                 rc = -EFAULT;
2583
2584         if (lumk != &lum)
2585                 kfree(lumk);
2586
2587         return rc;
2588 }
2589
2590
2591 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2592                          void *karg, void *uarg)
2593 {
2594         struct obd_device *obd = exp->exp_obd;
2595         struct obd_ioctl_data *data = karg;
2596         int err = 0;
2597
2598         if (!try_module_get(THIS_MODULE)) {
2599                 CERROR("Can't get module. Is it alive?");
2600                 return -EINVAL;
2601         }
2602         switch (cmd) {
2603         case OBD_IOC_LOV_GET_CONFIG: {
2604                 char *buf;
2605                 struct lov_desc *desc;
2606                 struct obd_uuid uuid;
2607
2608                 buf = NULL;
2609                 len = 0;
2610                 if (obd_ioctl_getdata(&buf, &len, uarg)) {
2611                         err = -EINVAL;
2612                         goto out;
2613                 }
2614
2615                 data = (struct obd_ioctl_data *)buf;
2616
2617                 if (sizeof(*desc) > data->ioc_inllen1) {
2618                         obd_ioctl_freedata(buf, len);
2619                         err = -EINVAL;
2620                         goto out;
2621                 }
2622
2623                 if (data->ioc_inllen2 < sizeof(uuid)) {
2624                         obd_ioctl_freedata(buf, len);
2625                         err = -EINVAL;
2626                         goto out;
2627                 }
2628
2629                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2630                 desc->ld_tgt_count = 1;
2631                 desc->ld_active_tgt_count = 1;
2632                 desc->ld_default_stripe_count = 1;
2633                 desc->ld_default_stripe_size = 0;
2634                 desc->ld_default_stripe_offset = 0;
2635                 desc->ld_pattern = 0;
2636                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2637
2638                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2639
2640                 err = copy_to_user(uarg, buf, len);
2641                 if (err)
2642                         err = -EFAULT;
2643                 obd_ioctl_freedata(buf, len);
2644                 goto out;
2645         }
2646         case LL_IOC_LOV_SETSTRIPE:
2647                 err = obd_alloc_memmd(exp, karg);
2648                 if (err > 0)
2649                         err = 0;
2650                 goto out;
2651         case LL_IOC_LOV_GETSTRIPE:
2652                 err = osc_getstripe(karg, uarg);
2653                 goto out;
2654         case OBD_IOC_CLIENT_RECOVER:
2655                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2656                                             data->ioc_inlbuf1, 0);
2657                 if (err > 0)
2658                         err = 0;
2659                 goto out;
2660         case IOC_OSC_SET_ACTIVE:
2661                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2662                                                data->ioc_offset);
2663                 goto out;
2664         case OBD_IOC_POLL_QUOTACHECK:
2665                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2666                 goto out;
2667         case OBD_IOC_PING_TARGET:
2668                 err = ptlrpc_obd_ping(obd);
2669                 goto out;
2670         default:
2671                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2672                        cmd, current_comm());
2673                 err = -ENOTTY;
2674                 goto out;
2675         }
2676 out:
2677         module_put(THIS_MODULE);
2678         return err;
2679 }
2680
2681 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2682                         u32 keylen, void *key, __u32 *vallen, void *val,
2683                         struct lov_stripe_md *lsm)
2684 {
2685         if (!vallen || !val)
2686                 return -EFAULT;
2687
2688         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2689                 __u32 *stripe = val;
2690                 *vallen = sizeof(*stripe);
2691                 *stripe = 0;
2692                 return 0;
2693         } else if (KEY_IS(KEY_LAST_ID)) {
2694                 struct ptlrpc_request *req;
2695                 u64 *reply;
2696                 char *tmp;
2697                 int rc;
2698
2699                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2700                                            &RQF_OST_GET_INFO_LAST_ID);
2701                 if (req == NULL)
2702                         return -ENOMEM;
2703
2704                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2705                                      RCL_CLIENT, keylen);
2706                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2707                 if (rc) {
2708                         ptlrpc_request_free(req);
2709                         return rc;
2710                 }
2711
2712                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2713                 memcpy(tmp, key, keylen);
2714
2715                 req->rq_no_delay = req->rq_no_resend = 1;
2716                 ptlrpc_request_set_replen(req);
2717                 rc = ptlrpc_queue_wait(req);
2718                 if (rc)
2719                         goto out;
2720
2721                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2722                 if (reply == NULL) {
2723                         rc = -EPROTO;
2724                         goto out;
2725                 }
2726
2727                 *((u64 *)val) = *reply;
2728         out:
2729                 ptlrpc_req_finished(req);
2730                 return rc;
2731         } else if (KEY_IS(KEY_FIEMAP)) {
2732                 struct ll_fiemap_info_key *fm_key =
2733                                 (struct ll_fiemap_info_key *)key;
2734                 struct ldlm_res_id res_id;
2735                 ldlm_policy_data_t policy;
2736                 struct lustre_handle lockh;
2737                 ldlm_mode_t mode = 0;
2738                 struct ptlrpc_request *req;
2739                 struct ll_user_fiemap *reply;
2740                 char *tmp;
2741                 int rc;
2742
2743                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2744                         goto skip_locking;
2745
2746                 policy.l_extent.start = fm_key->fiemap.fm_start &
2747                                                 CFS_PAGE_MASK;
2748
2749                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2750                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2751                         policy.l_extent.end = OBD_OBJECT_EOF;
2752                 else
2753                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2754                                 fm_key->fiemap.fm_length +
2755                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2756
2757                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2758                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2759                                        LDLM_FL_BLOCK_GRANTED |
2760                                        LDLM_FL_LVB_READY,
2761                                        &res_id, LDLM_EXTENT, &policy,
2762                                        LCK_PR | LCK_PW, &lockh, 0);
2763                 if (mode) { /* lock is cached on client */
2764                         if (mode != LCK_PR) {
2765                                 ldlm_lock_addref(&lockh, LCK_PR);
2766                                 ldlm_lock_decref(&lockh, LCK_PW);
2767                         }
2768                 } else { /* no cached lock, needs acquire lock on server side */
2769                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2770                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2771                 }
2772
2773 skip_locking:
2774                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2775                                            &RQF_OST_GET_INFO_FIEMAP);
2776                 if (req == NULL) {
2777                         rc = -ENOMEM;
2778                         goto drop_lock;
2779                 }
2780
2781                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2782                                      RCL_CLIENT, keylen);
2783                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2784                                      RCL_CLIENT, *vallen);
2785                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2786                                      RCL_SERVER, *vallen);
2787
2788                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2789                 if (rc) {
2790                         ptlrpc_request_free(req);
2791                         goto drop_lock;
2792                 }
2793
2794                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2795                 memcpy(tmp, key, keylen);
2796                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2797                 memcpy(tmp, val, *vallen);
2798
2799                 ptlrpc_request_set_replen(req);
2800                 rc = ptlrpc_queue_wait(req);
2801                 if (rc)
2802                         goto fini_req;
2803
2804                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2805                 if (reply == NULL) {
2806                         rc = -EPROTO;
2807                         goto fini_req;
2808                 }
2809
2810                 memcpy(val, reply, *vallen);
2811 fini_req:
2812                 ptlrpc_req_finished(req);
2813 drop_lock:
2814                 if (mode)
2815                         ldlm_lock_decref(&lockh, LCK_PR);
2816                 return rc;
2817         }
2818
2819         return -EINVAL;
2820 }
2821
2822 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2823                               u32 keylen, void *key, u32 vallen,
2824                               void *val, struct ptlrpc_request_set *set)
2825 {
2826         struct ptlrpc_request *req;
2827         struct obd_device *obd = exp->exp_obd;
2828         struct obd_import *imp = class_exp2cliimp(exp);
2829         char *tmp;
2830         int rc;
2831
2832         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2833
2834         if (KEY_IS(KEY_CHECKSUM)) {
2835                 if (vallen != sizeof(int))
2836                         return -EINVAL;
2837                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2838                 return 0;
2839         }
2840
2841         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2842                 sptlrpc_conf_client_adapt(obd);
2843                 return 0;
2844         }
2845
2846         if (KEY_IS(KEY_FLUSH_CTX)) {
2847                 sptlrpc_import_flush_my_ctx(imp);
2848                 return 0;
2849         }
2850
2851         if (KEY_IS(KEY_CACHE_SET)) {
2852                 struct client_obd *cli = &obd->u.cli;
2853
2854                 LASSERT(cli->cl_cache == NULL); /* only once */
2855                 cli->cl_cache = (struct cl_client_cache *)val;
2856                 atomic_inc(&cli->cl_cache->ccc_users);
2857                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2858
2859                 /* add this osc into entity list */
2860                 LASSERT(list_empty(&cli->cl_lru_osc));
2861                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2862                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2863                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2864
2865                 return 0;
2866         }
2867
2868         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2869                 struct client_obd *cli = &obd->u.cli;
2870                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2871                 int target = *(int *)val;
2872
2873                 nr = osc_lru_shrink(cli, min(nr, target));
2874                 *(int *)val -= nr;
2875                 return 0;
2876         }
2877
2878         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2879                 return -EINVAL;
2880
2881         /* We pass all other commands directly to OST. Since nobody calls osc
2882            methods directly and everybody is supposed to go through LOV, we
2883            assume lov checked invalid values for us.
2884            The only recognised values so far are evict_by_nid and mds_conn.
2885            Even if something bad goes through, we'd get a -EINVAL from OST
2886            anyway. */
2887
2888         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2889                                                 &RQF_OST_SET_GRANT_INFO :
2890                                                 &RQF_OBD_SET_INFO);
2891         if (req == NULL)
2892                 return -ENOMEM;
2893
2894         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2895                              RCL_CLIENT, keylen);
2896         if (!KEY_IS(KEY_GRANT_SHRINK))
2897                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2898                                      RCL_CLIENT, vallen);
2899         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2900         if (rc) {
2901                 ptlrpc_request_free(req);
2902                 return rc;
2903         }
2904
2905         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2906         memcpy(tmp, key, keylen);
2907         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2908                                                         &RMF_OST_BODY :
2909                                                         &RMF_SETINFO_VAL);
2910         memcpy(tmp, val, vallen);
2911
2912         if (KEY_IS(KEY_GRANT_SHRINK)) {
2913                 struct osc_brw_async_args *aa;
2914                 struct obdo *oa;
2915
2916                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2917                 aa = ptlrpc_req_async_args(req);
2918                 OBDO_ALLOC(oa);
2919                 if (!oa) {
2920                         ptlrpc_req_finished(req);
2921                         return -ENOMEM;
2922                 }
2923                 *oa = ((struct ost_body *)val)->oa;
2924                 aa->aa_oa = oa;
2925                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2926         }
2927
2928         ptlrpc_request_set_replen(req);
2929         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2930                 LASSERT(set != NULL);
2931                 ptlrpc_set_add_req(set, req);
2932                 ptlrpc_check_set(NULL, set);
2933         } else {
2934                 ptlrpcd_add_req(req);
2935         }
2936
2937         return 0;
2938 }
2939
2940 static int osc_reconnect(const struct lu_env *env,
2941                          struct obd_export *exp, struct obd_device *obd,
2942                          struct obd_uuid *cluuid,
2943                          struct obd_connect_data *data,
2944                          void *localdata)
2945 {
2946         struct client_obd *cli = &obd->u.cli;
2947
2948         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2949                 long lost_grant;
2950
2951                 client_obd_list_lock(&cli->cl_loi_list_lock);
2952                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2953                                 2 * cli_brw_size(obd);
2954                 lost_grant = cli->cl_lost_grant;
2955                 cli->cl_lost_grant = 0;
2956                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2957
2958                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
2959                        data->ocd_connect_flags,
2960                        data->ocd_version, data->ocd_grant, lost_grant);
2961         }
2962
2963         return 0;
2964 }
2965
2966 static int osc_disconnect(struct obd_export *exp)
2967 {
2968         struct obd_device *obd = class_exp2obd(exp);
2969         int rc;
2970
2971         rc = client_disconnect_export(exp);
2972         /**
2973          * Initially we put del_shrink_grant before disconnect_export, but it
2974          * causes the following problem if setup (connect) and cleanup
2975          * (disconnect) are tangled together.
2976          *      connect p1                   disconnect p2
2977          *   ptlrpc_connect_import
2978          *     ...............         class_manual_cleanup
2979          *                                   osc_disconnect
2980          *                                   del_shrink_grant
2981          *   ptlrpc_connect_interrupt
2982          *     init_grant_shrink
2983          *   add this client to shrink list
2984          *                                    cleanup_osc
2985          * Bang! pinger trigger the shrink.
2986          * So the osc should be disconnected from the shrink list, after we
2987          * are sure the import has been destroyed. BUG18662
2988          */
2989         if (obd->u.cli.cl_import == NULL)
2990                 osc_del_shrink_grant(&obd->u.cli);
2991         return rc;
2992 }
2993
2994 static int osc_import_event(struct obd_device *obd,
2995                             struct obd_import *imp,
2996                             enum obd_import_event event)
2997 {
2998         struct client_obd *cli;
2999         int rc = 0;
3000
3001         LASSERT(imp->imp_obd == obd);
3002
3003         switch (event) {
3004         case IMP_EVENT_DISCON: {
3005                 cli = &obd->u.cli;
3006                 client_obd_list_lock(&cli->cl_loi_list_lock);
3007                 cli->cl_avail_grant = 0;
3008                 cli->cl_lost_grant = 0;
3009                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3010                 break;
3011         }
3012         case IMP_EVENT_INACTIVE: {
3013                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3014                 break;
3015         }
3016         case IMP_EVENT_INVALIDATE: {
3017                 struct ldlm_namespace *ns = obd->obd_namespace;
3018                 struct lu_env *env;
3019                 int refcheck;
3020
3021                 env = cl_env_get(&refcheck);
3022                 if (!IS_ERR(env)) {
3023                         /* Reset grants */
3024                         cli = &obd->u.cli;
3025                         /* all pages go to failing rpcs due to the invalid
3026                          * import */
3027                         osc_io_unplug(env, cli, NULL);
3028
3029                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3030                         cl_env_put(env, &refcheck);
3031                 } else
3032                         rc = PTR_ERR(env);
3033                 break;
3034         }
3035         case IMP_EVENT_ACTIVE: {
3036                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3037                 break;
3038         }
3039         case IMP_EVENT_OCD: {
3040                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3041
3042                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3043                         osc_init_grant(&obd->u.cli, ocd);
3044
3045                 /* See bug 7198 */
3046                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3047                         imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
3048
3049                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3050                 break;
3051         }
3052         case IMP_EVENT_DEACTIVATE: {
3053                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3054                 break;
3055         }
3056         case IMP_EVENT_ACTIVATE: {
3057                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3058                 break;
3059         }
3060         default:
3061                 CERROR("Unknown import event %d\n", event);
3062                 LBUG();
3063         }
3064         return rc;
3065 }
3066
3067 /**
3068  * Determine whether the lock can be canceled before replaying the lock
3069  * during recovery, see bug16774 for detailed information.
3070  *
3071  * \retval zero the lock can't be canceled
3072  * \retval other ok to cancel
3073  */
3074 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3075 {
3076         check_res_locked(lock->l_resource);
3077
3078         /*
3079          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3080          *
3081          * XXX as a future improvement, we can also cancel unused write lock
3082          * if it doesn't have dirty data and active mmaps.
3083          */
3084         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3085             (lock->l_granted_mode == LCK_PR ||
3086              lock->l_granted_mode == LCK_CR) &&
3087             (osc_dlm_lock_pageref(lock) == 0))
3088                 return 1;
3089
3090         return 0;
3091 }
3092
3093 static int brw_queue_work(const struct lu_env *env, void *data)
3094 {
3095         struct client_obd *cli = data;
3096
3097         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3098
3099         osc_io_unplug(env, cli, NULL);
3100         return 0;
3101 }
3102
3103 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3104 {
3105         struct lprocfs_static_vars lvars = { NULL };
3106         struct client_obd *cli = &obd->u.cli;
3107         void *handler;
3108         int rc;
3109         int adding;
3110         int added;
3111         int req_count;
3112
3113         rc = ptlrpcd_addref();
3114         if (rc)
3115                 return rc;
3116
3117         rc = client_obd_setup(obd, lcfg);
3118         if (rc)
3119                 goto out_ptlrpcd;
3120
3121         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3122         if (IS_ERR(handler)) {
3123                 rc = PTR_ERR(handler);
3124                 goto out_client_setup;
3125         }
3126         cli->cl_writeback_work = handler;
3127
3128         rc = osc_quota_setup(obd);
3129         if (rc)
3130                 goto out_ptlrpcd_work;
3131
3132         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3133         lprocfs_osc_init_vars(&lvars);
3134         if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
3135                 lproc_osc_attach_seqstat(obd);
3136                 sptlrpc_lprocfs_cliobd_attach(obd);
3137                 ptlrpc_lprocfs_register_obd(obd);
3138         }
3139
3140         /*
3141          * We try to control the total number of requests with a upper limit
3142          * osc_reqpool_maxreqcount. There might be some race which will cause
3143          * over-limit allocation, but it is fine.
3144          */
3145         req_count = atomic_read(&osc_pool_req_count);
3146         if (req_count < osc_reqpool_maxreqcount) {
3147                 adding = cli->cl_max_rpcs_in_flight + 2;
3148                 if (req_count + adding > osc_reqpool_maxreqcount)
3149                         adding = osc_reqpool_maxreqcount - req_count;
3150
3151                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3152                 atomic_add(added, &osc_pool_req_count);
3153         }
3154
3155         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3156         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3157         return rc;
3158
3159 out_ptlrpcd_work:
3160         ptlrpcd_destroy_work(handler);
3161 out_client_setup:
3162         client_obd_cleanup(obd);
3163 out_ptlrpcd:
3164         ptlrpcd_decref();
3165         return rc;
3166 }
3167
3168 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3169 {
3170         switch (stage) {
3171         case OBD_CLEANUP_EARLY: {
3172                 struct obd_import *imp;
3173                 imp = obd->u.cli.cl_import;
3174                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3175                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3176                 ptlrpc_deactivate_import(imp);
3177                 spin_lock(&imp->imp_lock);
3178                 imp->imp_pingable = 0;
3179                 spin_unlock(&imp->imp_lock);
3180                 break;
3181         }
3182         case OBD_CLEANUP_EXPORTS: {
3183                 struct client_obd *cli = &obd->u.cli;
3184                 /* LU-464
3185                  * for echo client, export may be on zombie list, wait for
3186                  * zombie thread to cull it, because cli.cl_import will be
3187                  * cleared in client_disconnect_export():
3188                  *   class_export_destroy() -> obd_cleanup() ->
3189                  *   echo_device_free() -> echo_client_cleanup() ->
3190                  *   obd_disconnect() -> osc_disconnect() ->
3191                  *   client_disconnect_export()
3192                  */
3193                 obd_zombie_barrier();
3194                 if (cli->cl_writeback_work) {
3195                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3196                         cli->cl_writeback_work = NULL;
3197                 }
3198                 obd_cleanup_client_import(obd);
3199                 ptlrpc_lprocfs_unregister_obd(obd);
3200                 lprocfs_obd_cleanup(obd);
3201                 break;
3202                 }
3203         }
3204         return 0;
3205 }
3206
3207 int osc_cleanup(struct obd_device *obd)
3208 {
3209         struct client_obd *cli = &obd->u.cli;
3210         int rc;
3211
3212         /* lru cleanup */
3213         if (cli->cl_cache != NULL) {
3214                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3215                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3216                 list_del_init(&cli->cl_lru_osc);
3217                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3218                 cli->cl_lru_left = NULL;
3219                 atomic_dec(&cli->cl_cache->ccc_users);
3220                 cli->cl_cache = NULL;
3221         }
3222
3223         /* free memory of osc quota cache */
3224         osc_quota_cleanup(obd);
3225
3226         rc = client_obd_cleanup(obd);
3227
3228         ptlrpcd_decref();
3229         return rc;
3230 }
3231
3232 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3233 {
3234         struct lprocfs_static_vars lvars = { NULL };
3235         int rc = 0;
3236
3237         lprocfs_osc_init_vars(&lvars);
3238
3239         switch (lcfg->lcfg_command) {
3240         default:
3241                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3242                                               lcfg, obd);
3243                 if (rc > 0)
3244                         rc = 0;
3245                 break;
3246         }
3247
3248         return rc;
3249 }
3250
3251 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3252 {
3253         return osc_process_config_base(obd, buf);
3254 }
3255
3256 struct obd_ops osc_obd_ops = {
3257         .o_owner                = THIS_MODULE,
3258         .o_setup                = osc_setup,
3259         .o_precleanup      = osc_precleanup,
3260         .o_cleanup            = osc_cleanup,
3261         .o_add_conn          = client_import_add_conn,
3262         .o_del_conn          = client_import_del_conn,
3263         .o_connect            = client_connect_import,
3264         .o_reconnect        = osc_reconnect,
3265         .o_disconnect      = osc_disconnect,
3266         .o_statfs              = osc_statfs,
3267         .o_statfs_async  = osc_statfs_async,
3268         .o_packmd              = osc_packmd,
3269         .o_unpackmd          = osc_unpackmd,
3270         .o_create              = osc_create,
3271         .o_destroy            = osc_destroy,
3272         .o_getattr            = osc_getattr,
3273         .o_getattr_async        = osc_getattr_async,
3274         .o_setattr            = osc_setattr,
3275         .o_setattr_async        = osc_setattr_async,
3276         .o_find_cbdata    = osc_find_cbdata,
3277         .o_iocontrol        = osc_iocontrol,
3278         .o_get_info          = osc_get_info,
3279         .o_set_info_async       = osc_set_info_async,
3280         .o_import_event  = osc_import_event,
3281         .o_process_config       = osc_process_config,
3282         .o_quotactl          = osc_quotactl,
3283         .o_quotacheck      = osc_quotacheck,
3284 };
3285
3286 extern struct lu_kmem_descr osc_caches[];
3287 extern spinlock_t osc_ast_guard;
3288 extern struct lock_class_key osc_ast_guard_class;
3289
3290 static int __init osc_init(void)
3291 {
3292         struct lprocfs_static_vars lvars = { NULL };
3293         unsigned int reqpool_size;
3294         unsigned int reqsize;
3295         int rc;
3296
3297         /* print an address of _any_ initialized kernel symbol from this
3298          * module, to allow debugging with gdb that doesn't support data
3299          * symbols from modules.*/
3300         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3301
3302         rc = lu_kmem_init(osc_caches);
3303         if (rc)
3304                 return rc;
3305
3306         lprocfs_osc_init_vars(&lvars);
3307
3308         rc = class_register_type(&osc_obd_ops, NULL,
3309                                  LUSTRE_OSC_NAME, &osc_device_type);
3310         if (rc)
3311                 goto out_kmem;
3312
3313         spin_lock_init(&osc_ast_guard);
3314         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3315
3316         /* This is obviously too much memory, only prevent overflow here */
3317         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
3318                 rc = -EINVAL;
3319                 goto out_type;
3320         }
3321
3322         reqpool_size = osc_reqpool_mem_max << 20;
3323
3324         reqsize = 1;
3325         while (reqsize < OST_MAXREQSIZE)
3326                 reqsize = reqsize << 1;
3327
3328         /*
3329          * We don't enlarge the request count in OSC pool according to
3330          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3331          * tried after normal allocation failed. So a small OSC pool won't
3332          * cause much performance degression in most of cases.
3333          */
3334         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3335
3336         atomic_set(&osc_pool_req_count, 0);
3337         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
3338                                           ptlrpc_add_rqs_to_pool);
3339
3340         if (osc_rq_pool)
3341                 return 0;
3342
3343         rc = -ENOMEM;
3344
3345 out_type:
3346         class_unregister_type(LUSTRE_OSC_NAME);
3347 out_kmem:
3348         lu_kmem_fini(osc_caches);
3349         return rc;
3350 }
3351
3352 static void /*__exit*/ osc_exit(void)
3353 {
3354         class_unregister_type(LUSTRE_OSC_NAME);
3355         lu_kmem_fini(osc_caches);
3356         ptlrpc_free_rq_pool(osc_rq_pool);
3357 }
3358
3359 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3360 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3361 MODULE_LICENSE("GPL");
3362 MODULE_VERSION(LUSTRE_VERSION_STRING);
3363
3364 module_init(osc_init);
3365 module_exit(osc_exit);