]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/osc/osc_request.c
43b11c61c2f08ce26662b29cb69e7c30a6e6130d
[karo-tx-linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include "../../include/linux/libcfs/libcfs.h"
40
41 #include "../include/lustre_dlm.h"
42 #include "../include/lustre_net.h"
43 #include "../include/lustre/lustre_user.h"
44 #include "../include/obd_cksum.h"
45
46 #include "../include/lustre_ha.h"
47 #include "../include/lprocfs_status.h"
48 #include "../include/lustre_debug.h"
49 #include "../include/lustre_param.h"
50 #include "../include/lustre_fid.h"
51 #include "../include/obd_class.h"
52 #include "../include/obd.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55
56 atomic_t osc_pool_req_count;
57 unsigned int osc_reqpool_maxreqcount;
58 struct ptlrpc_request_pool *osc_rq_pool;
59
60 /* max memory used for request pool, unit is MB */
61 static unsigned int osc_reqpool_mem_max = 5;
62 module_param(osc_reqpool_mem_max, uint, 0444);
63
64 struct osc_brw_async_args {
65         struct obdo       *aa_oa;
66         int             aa_requested_nob;
67         int             aa_nio_count;
68         u32             aa_page_count;
69         int             aa_resends;
70         struct brw_page  **aa_ppga;
71         struct client_obd *aa_cli;
72         struct list_head         aa_oaps;
73         struct list_head         aa_exts;
74         struct cl_req     *aa_clerq;
75 };
76
77 struct osc_async_args {
78         struct obd_info   *aa_oi;
79 };
80
81 struct osc_setattr_args {
82         struct obdo      *sa_oa;
83         obd_enqueue_update_f sa_upcall;
84         void            *sa_cookie;
85 };
86
87 struct osc_fsync_args {
88         struct obd_info     *fa_oi;
89         obd_enqueue_update_f fa_upcall;
90         void            *fa_cookie;
91 };
92
93 struct osc_enqueue_args {
94         struct obd_export       *oa_exp;
95         __u64               *oa_flags;
96         obd_enqueue_update_f      oa_upcall;
97         void                 *oa_cookie;
98         struct ost_lvb     *oa_lvb;
99         struct lustre_handle     *oa_lockh;
100         struct ldlm_enqueue_info *oa_ei;
101         unsigned int          oa_agl:1;
102 };
103
104 static void osc_release_ppga(struct brw_page **ppga, u32 count);
105 static int brw_interpret(const struct lu_env *env,
106                          struct ptlrpc_request *req, void *data, int rc);
107
108 /* Pack OSC object metadata for disk storage (LE byte order). */
109 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
110                       struct lov_stripe_md *lsm)
111 {
112         int lmm_size;
113
114         lmm_size = sizeof(**lmmp);
115         if (!lmmp)
116                 return lmm_size;
117
118         if (*lmmp && !lsm) {
119                 kfree(*lmmp);
120                 *lmmp = NULL;
121                 return 0;
122         } else if (unlikely(lsm && ostid_id(&lsm->lsm_oi) == 0)) {
123                 return -EBADF;
124         }
125
126         if (!*lmmp) {
127                 *lmmp = kzalloc(lmm_size, GFP_NOFS);
128                 if (!*lmmp)
129                         return -ENOMEM;
130         }
131
132         if (lsm)
133                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
134
135         return lmm_size;
136 }
137
138 /* Unpack OSC object metadata from disk storage (LE byte order). */
139 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
140                         struct lov_mds_md *lmm, int lmm_bytes)
141 {
142         int lsm_size;
143         struct obd_import *imp = class_exp2cliimp(exp);
144
145         if (lmm) {
146                 if (lmm_bytes < sizeof(*lmm)) {
147                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
148                                exp->exp_obd->obd_name, lmm_bytes,
149                                (int)sizeof(*lmm));
150                         return -EINVAL;
151                 }
152                 /* XXX LOV_MAGIC etc check? */
153
154                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
155                         CERROR("%s: zero lmm_object_id: rc = %d\n",
156                                exp->exp_obd->obd_name, -EINVAL);
157                         return -EINVAL;
158                 }
159         }
160
161         lsm_size = lov_stripe_md_size(1);
162         if (!lsmp)
163                 return lsm_size;
164
165         if (*lsmp && !lmm) {
166                 kfree((*lsmp)->lsm_oinfo[0]);
167                 kfree(*lsmp);
168                 *lsmp = NULL;
169                 return 0;
170         }
171
172         if (!*lsmp) {
173                 *lsmp = kzalloc(lsm_size, GFP_NOFS);
174                 if (unlikely(!*lsmp))
175                         return -ENOMEM;
176                 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
177                                                 GFP_NOFS);
178                 if (unlikely(!(*lsmp)->lsm_oinfo[0])) {
179                         kfree(*lsmp);
180                         return -ENOMEM;
181                 }
182                 loi_init((*lsmp)->lsm_oinfo[0]);
183         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
184                 return -EBADF;
185         }
186
187         if (lmm)
188                 /* XXX zero *lsmp? */
189                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
190
191         if (imp &&
192             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
193                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
194         else
195                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
196
197         return lsm_size;
198 }
199
200 static inline void osc_pack_req_body(struct ptlrpc_request *req,
201                                      struct obd_info *oinfo)
202 {
203         struct ost_body *body;
204
205         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
206         LASSERT(body);
207
208         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
209                              oinfo->oi_oa);
210 }
211
212 static int osc_getattr_interpret(const struct lu_env *env,
213                                  struct ptlrpc_request *req,
214                                  struct osc_async_args *aa, int rc)
215 {
216         struct ost_body *body;
217
218         if (rc != 0)
219                 goto out;
220
221         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
222         if (body) {
223                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
224                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
225                                      aa->aa_oi->oi_oa, &body->oa);
226
227                 /* This should really be sent by the OST */
228                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
229                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
230         } else {
231                 CDEBUG(D_INFO, "can't unpack ost_body\n");
232                 rc = -EPROTO;
233                 aa->aa_oi->oi_oa->o_valid = 0;
234         }
235 out:
236         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
237         return rc;
238 }
239
240 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
241                              struct ptlrpc_request_set *set)
242 {
243         struct ptlrpc_request *req;
244         struct osc_async_args *aa;
245         int rc;
246
247         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
248         if (!req)
249                 return -ENOMEM;
250
251         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
252         if (rc) {
253                 ptlrpc_request_free(req);
254                 return rc;
255         }
256
257         osc_pack_req_body(req, oinfo);
258
259         ptlrpc_request_set_replen(req);
260         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
261
262         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
263         aa = ptlrpc_req_async_args(req);
264         aa->aa_oi = oinfo;
265
266         ptlrpc_set_add_req(set, req);
267         return 0;
268 }
269
270 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
271                        struct obd_info *oinfo)
272 {
273         struct ptlrpc_request *req;
274         struct ost_body *body;
275         int rc;
276
277         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
278         if (!req)
279                 return -ENOMEM;
280
281         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
282         if (rc) {
283                 ptlrpc_request_free(req);
284                 return rc;
285         }
286
287         osc_pack_req_body(req, oinfo);
288
289         ptlrpc_request_set_replen(req);
290
291         rc = ptlrpc_queue_wait(req);
292         if (rc)
293                 goto out;
294
295         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
296         if (!body) {
297                 rc = -EPROTO;
298                 goto out;
299         }
300
301         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
302         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
303                              &body->oa);
304
305         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
306         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
307
308  out:
309         ptlrpc_req_finished(req);
310         return rc;
311 }
312
313 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
314                        struct obd_info *oinfo, struct obd_trans_info *oti)
315 {
316         struct ptlrpc_request *req;
317         struct ost_body *body;
318         int rc;
319
320         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
321
322         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
323         if (!req)
324                 return -ENOMEM;
325
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 return rc;
330         }
331
332         osc_pack_req_body(req, oinfo);
333
334         ptlrpc_request_set_replen(req);
335
336         rc = ptlrpc_queue_wait(req);
337         if (rc)
338                 goto out;
339
340         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
341         if (!body) {
342                 rc = -EPROTO;
343                 goto out;
344         }
345
346         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
347                              &body->oa);
348
349 out:
350         ptlrpc_req_finished(req);
351         return rc;
352 }
353
354 static int osc_setattr_interpret(const struct lu_env *env,
355                                  struct ptlrpc_request *req,
356                                  struct osc_setattr_args *sa, int rc)
357 {
358         struct ost_body *body;
359
360         if (rc != 0)
361                 goto out;
362
363         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
364         if (!body) {
365                 rc = -EPROTO;
366                 goto out;
367         }
368
369         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
370                              &body->oa);
371 out:
372         rc = sa->sa_upcall(sa->sa_cookie, rc);
373         return rc;
374 }
375
376 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
377                            struct obd_trans_info *oti,
378                            obd_enqueue_update_f upcall, void *cookie,
379                            struct ptlrpc_request_set *rqset)
380 {
381         struct ptlrpc_request *req;
382         struct osc_setattr_args *sa;
383         int rc;
384
385         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
386         if (!req)
387                 return -ENOMEM;
388
389         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
390         if (rc) {
391                 ptlrpc_request_free(req);
392                 return rc;
393         }
394
395         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
396                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
397
398         osc_pack_req_body(req, oinfo);
399
400         ptlrpc_request_set_replen(req);
401
402         /* do mds to ost setattr asynchronously */
403         if (!rqset) {
404                 /* Do not wait for response. */
405                 ptlrpcd_add_req(req);
406         } else {
407                 req->rq_interpret_reply =
408                         (ptlrpc_interpterer_t)osc_setattr_interpret;
409
410                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
411                 sa = ptlrpc_req_async_args(req);
412                 sa->sa_oa = oinfo->oi_oa;
413                 sa->sa_upcall = upcall;
414                 sa->sa_cookie = cookie;
415
416                 if (rqset == PTLRPCD_SET)
417                         ptlrpcd_add_req(req);
418                 else
419                         ptlrpc_set_add_req(rqset, req);
420         }
421
422         return 0;
423 }
424
425 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
426                              struct obd_trans_info *oti,
427                              struct ptlrpc_request_set *rqset)
428 {
429         return osc_setattr_async_base(exp, oinfo, oti,
430                                       oinfo->oi_cb_up, oinfo, rqset);
431 }
432
433 static int osc_real_create(struct obd_export *exp, struct obdo *oa,
434                            struct lov_stripe_md **ea,
435                            struct obd_trans_info *oti)
436 {
437         struct ptlrpc_request *req;
438         struct ost_body *body;
439         struct lov_stripe_md *lsm;
440         int rc;
441
442         LASSERT(oa);
443         LASSERT(ea);
444
445         lsm = *ea;
446         if (!lsm) {
447                 rc = obd_alloc_memmd(exp, &lsm);
448                 if (rc < 0)
449                         return rc;
450         }
451
452         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
453         if (!req) {
454                 rc = -ENOMEM;
455                 goto out;
456         }
457
458         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
459         if (rc) {
460                 ptlrpc_request_free(req);
461                 goto out;
462         }
463
464         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465         LASSERT(body);
466
467         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
468
469         ptlrpc_request_set_replen(req);
470
471         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
472             oa->o_flags == OBD_FL_DELORPHAN) {
473                 DEBUG_REQ(D_HA, req,
474                           "delorphan from OST integration");
475                 /* Don't resend the delorphan req */
476                 req->rq_no_resend = req->rq_no_delay = 1;
477         }
478
479         rc = ptlrpc_queue_wait(req);
480         if (rc)
481                 goto out_req;
482
483         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
484         if (!body) {
485                 rc = -EPROTO;
486                 goto out_req;
487         }
488
489         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
490         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
491
492         oa->o_blksize = cli_brw_size(exp->exp_obd);
493         oa->o_valid |= OBD_MD_FLBLKSZ;
494
495         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
496          * have valid lsm_oinfo data structs, so don't go touching that.
497          * This needs to be fixed in a big way.
498          */
499         lsm->lsm_oi = oa->o_oi;
500         *ea = lsm;
501
502         if (oti) {
503                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
504
505                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
506                         if (!oti->oti_logcookies)
507                                 oti_alloc_cookies(oti, 1);
508                         *oti->oti_logcookies = oa->o_lcookie;
509                 }
510         }
511
512         CDEBUG(D_HA, "transno: %lld\n",
513                lustre_msg_get_transno(req->rq_repmsg));
514 out_req:
515         ptlrpc_req_finished(req);
516 out:
517         if (rc && !*ea)
518                 obd_free_memmd(exp, &lsm);
519         return rc;
520 }
521
522 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
523                    obd_enqueue_update_f upcall, void *cookie,
524                    struct ptlrpc_request_set *rqset)
525 {
526         struct ptlrpc_request *req;
527         struct osc_setattr_args *sa;
528         struct ost_body *body;
529         int rc;
530
531         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532         if (!req)
533                 return -ENOMEM;
534
535         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
536         if (rc) {
537                 ptlrpc_request_free(req);
538                 return rc;
539         }
540         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
541         ptlrpc_at_set_req_timeout(req);
542
543         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
544         LASSERT(body);
545         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
546                              oinfo->oi_oa);
547
548         ptlrpc_request_set_replen(req);
549
550         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
551         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
552         sa = ptlrpc_req_async_args(req);
553         sa->sa_oa = oinfo->oi_oa;
554         sa->sa_upcall = upcall;
555         sa->sa_cookie = cookie;
556         if (rqset == PTLRPCD_SET)
557                 ptlrpcd_add_req(req);
558         else
559                 ptlrpc_set_add_req(rqset, req);
560
561         return 0;
562 }
563
564 static int osc_sync_interpret(const struct lu_env *env,
565                               struct ptlrpc_request *req,
566                               void *arg, int rc)
567 {
568         struct osc_fsync_args *fa = arg;
569         struct ost_body *body;
570
571         if (rc)
572                 goto out;
573
574         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
575         if (!body) {
576                 CERROR("can't unpack ost_body\n");
577                 rc = -EPROTO;
578                 goto out;
579         }
580
581         *fa->fa_oi->oi_oa = body->oa;
582 out:
583         rc = fa->fa_upcall(fa->fa_cookie, rc);
584         return rc;
585 }
586
587 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
588                   obd_enqueue_update_f upcall, void *cookie,
589                   struct ptlrpc_request_set *rqset)
590 {
591         struct ptlrpc_request *req;
592         struct ost_body *body;
593         struct osc_fsync_args *fa;
594         int rc;
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
597         if (!req)
598                 return -ENOMEM;
599
600         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
601         if (rc) {
602                 ptlrpc_request_free(req);
603                 return rc;
604         }
605
606         /* overload the size and blocks fields in the oa with start/end */
607         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
608         LASSERT(body);
609         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
610                              oinfo->oi_oa);
611
612         ptlrpc_request_set_replen(req);
613         req->rq_interpret_reply = osc_sync_interpret;
614
615         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
616         fa = ptlrpc_req_async_args(req);
617         fa->fa_oi = oinfo;
618         fa->fa_upcall = upcall;
619         fa->fa_cookie = cookie;
620
621         if (rqset == PTLRPCD_SET)
622                 ptlrpcd_add_req(req);
623         else
624                 ptlrpc_set_add_req(rqset, req);
625
626         return 0;
627 }
628
629 /* Find and cancel locally locks matched by @mode in the resource found by
630  * @objid. Found locks are added into @cancel list. Returns the amount of
631  * locks added to @cancels list. */
632 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
633                                    struct list_head *cancels,
634                                    ldlm_mode_t mode, __u64 lock_flags)
635 {
636         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
637         struct ldlm_res_id res_id;
638         struct ldlm_resource *res;
639         int count;
640
641         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
642          * export) but disabled through procfs (flag in NS).
643          *
644          * This distinguishes from a case when ELC is not supported originally,
645          * when we still want to cancel locks in advance and just cancel them
646          * locally, without sending any RPC. */
647         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
648                 return 0;
649
650         ostid_build_res_name(&oa->o_oi, &res_id);
651         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
652         if (!res)
653                 return 0;
654
655         LDLM_RESOURCE_ADDREF(res);
656         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
657                                            lock_flags, 0, NULL);
658         LDLM_RESOURCE_DELREF(res);
659         ldlm_resource_putref(res);
660         return count;
661 }
662
663 static int osc_destroy_interpret(const struct lu_env *env,
664                                  struct ptlrpc_request *req, void *data,
665                                  int rc)
666 {
667         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
668
669         atomic_dec(&cli->cl_destroy_in_flight);
670         wake_up(&cli->cl_destroy_waitq);
671         return 0;
672 }
673
674 static int osc_can_send_destroy(struct client_obd *cli)
675 {
676         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
677             cli->cl_max_rpcs_in_flight) {
678                 /* The destroy request can be sent */
679                 return 1;
680         }
681         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
682             cli->cl_max_rpcs_in_flight) {
683                 /*
684                  * The counter has been modified between the two atomic
685                  * operations.
686                  */
687                 wake_up(&cli->cl_destroy_waitq);
688         }
689         return 0;
690 }
691
692 static int osc_create(const struct lu_env *env, struct obd_export *exp,
693                       struct obdo *oa, struct lov_stripe_md **ea,
694                       struct obd_trans_info *oti)
695 {
696         int rc = 0;
697
698         LASSERT(oa);
699         LASSERT(ea);
700         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
701
702         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
703             oa->o_flags == OBD_FL_RECREATE_OBJS) {
704                 return osc_real_create(exp, oa, ea, oti);
705         }
706
707         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
708                 return osc_real_create(exp, oa, ea, oti);
709
710         /* we should not get here anymore */
711         LBUG();
712
713         return rc;
714 }
715
716 /* Destroy requests can be async always on the client, and we don't even really
717  * care about the return code since the client cannot do anything at all about
718  * a destroy failure.
719  * When the MDS is unlinking a filename, it saves the file objects into a
720  * recovery llog, and these object records are cancelled when the OST reports
721  * they were destroyed and sync'd to disk (i.e. transaction committed).
722  * If the client dies, or the OST is down when the object should be destroyed,
723  * the records are not cancelled, and when the OST reconnects to the MDS next,
724  * it will retrieve the llog unlink logs and then sends the log cancellation
725  * cookies to the MDS after committing destroy transactions. */
726 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
727                        struct obdo *oa, struct lov_stripe_md *ea,
728                        struct obd_trans_info *oti, struct obd_export *md_export)
729 {
730         struct client_obd *cli = &exp->exp_obd->u.cli;
731         struct ptlrpc_request *req;
732         struct ost_body *body;
733         LIST_HEAD(cancels);
734         int rc, count;
735
736         if (!oa) {
737                 CDEBUG(D_INFO, "oa NULL\n");
738                 return -EINVAL;
739         }
740
741         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
742                                         LDLM_FL_DISCARD_DATA);
743
744         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
745         if (!req) {
746                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
747                 return -ENOMEM;
748         }
749
750         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
751                                0, &cancels, count);
752         if (rc) {
753                 ptlrpc_request_free(req);
754                 return rc;
755         }
756
757         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
758         ptlrpc_at_set_req_timeout(req);
759
760         if (oti && oa->o_valid & OBD_MD_FLCOOKIE)
761                 oa->o_lcookie = *oti->oti_logcookies;
762         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
763         LASSERT(body);
764         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
765
766         ptlrpc_request_set_replen(req);
767
768         /* If osc_destroy is for destroying the unlink orphan,
769          * sent from MDT to OST, which should not be blocked here,
770          * because the process might be triggered by ptlrpcd, and
771          * it is not good to block ptlrpcd thread (b=16006)*/
772         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
773                 req->rq_interpret_reply = osc_destroy_interpret;
774                 if (!osc_can_send_destroy(cli)) {
775                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
776                                                           NULL);
777
778                         /*
779                          * Wait until the number of on-going destroy RPCs drops
780                          * under max_rpc_in_flight
781                          */
782                         l_wait_event_exclusive(cli->cl_destroy_waitq,
783                                                osc_can_send_destroy(cli), &lwi);
784                 }
785         }
786
787         /* Do not wait for response */
788         ptlrpcd_add_req(req);
789         return 0;
790 }
791
792 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
793                                 long writing_bytes)
794 {
795         u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
796
797         LASSERT(!(oa->o_valid & bits));
798
799         oa->o_valid |= bits;
800         client_obd_list_lock(&cli->cl_loi_list_lock);
801         oa->o_dirty = cli->cl_dirty;
802         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
803                      cli->cl_dirty_max)) {
804                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
805                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
806                 oa->o_undirty = 0;
807         } else if (unlikely(atomic_read(&obd_dirty_pages) -
808                             atomic_read(&obd_dirty_transit_pages) >
809                             (long)(obd_max_dirty_pages + 1))) {
810                 /* The atomic_read() allowing the atomic_inc() are
811                  * not covered by a lock thus they may safely race and trip
812                  * this CERROR() unless we add in a small fudge factor (+1). */
813                 CERROR("dirty %d - %d > system dirty_max %d\n",
814                        atomic_read(&obd_dirty_pages),
815                        atomic_read(&obd_dirty_transit_pages),
816                        obd_max_dirty_pages);
817                 oa->o_undirty = 0;
818         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
819                 CERROR("dirty %lu - dirty_max %lu too big???\n",
820                        cli->cl_dirty, cli->cl_dirty_max);
821                 oa->o_undirty = 0;
822         } else {
823                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
824                                       PAGE_CACHE_SHIFT)*
825                                      (cli->cl_max_rpcs_in_flight + 1);
826                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
827         }
828         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
829         oa->o_dropped = cli->cl_lost_grant;
830         cli->cl_lost_grant = 0;
831         client_obd_list_unlock(&cli->cl_loi_list_lock);
832         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
833                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
834
835 }
836
837 void osc_update_next_shrink(struct client_obd *cli)
838 {
839         cli->cl_next_shrink_grant =
840                 cfs_time_shift(cli->cl_grant_shrink_interval);
841         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
842                cli->cl_next_shrink_grant);
843 }
844
845 static void __osc_update_grant(struct client_obd *cli, u64 grant)
846 {
847         client_obd_list_lock(&cli->cl_loi_list_lock);
848         cli->cl_avail_grant += grant;
849         client_obd_list_unlock(&cli->cl_loi_list_lock);
850 }
851
852 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
853 {
854         if (body->oa.o_valid & OBD_MD_FLGRANT) {
855                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
856                 __osc_update_grant(cli, body->oa.o_grant);
857         }
858 }
859
860 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
861                               u32 keylen, void *key, u32 vallen,
862                               void *val, struct ptlrpc_request_set *set);
863
864 static int osc_shrink_grant_interpret(const struct lu_env *env,
865                                       struct ptlrpc_request *req,
866                                       void *aa, int rc)
867 {
868         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
869         struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
870         struct ost_body *body;
871
872         if (rc != 0) {
873                 __osc_update_grant(cli, oa->o_grant);
874                 goto out;
875         }
876
877         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
878         LASSERT(body);
879         osc_update_grant(cli, body);
880 out:
881         kmem_cache_free(obdo_cachep, oa);
882         return rc;
883 }
884
885 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
886 {
887         client_obd_list_lock(&cli->cl_loi_list_lock);
888         oa->o_grant = cli->cl_avail_grant / 4;
889         cli->cl_avail_grant -= oa->o_grant;
890         client_obd_list_unlock(&cli->cl_loi_list_lock);
891         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
892                 oa->o_valid |= OBD_MD_FLFLAGS;
893                 oa->o_flags = 0;
894         }
895         oa->o_flags |= OBD_FL_SHRINK_GRANT;
896         osc_update_next_shrink(cli);
897 }
898
899 /* Shrink the current grant, either from some large amount to enough for a
900  * full set of in-flight RPCs, or if we have already shrunk to that limit
901  * then to enough for a single RPC.  This avoids keeping more grant than
902  * needed, and avoids shrinking the grant piecemeal. */
903 static int osc_shrink_grant(struct client_obd *cli)
904 {
905         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
906                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
907
908         client_obd_list_lock(&cli->cl_loi_list_lock);
909         if (cli->cl_avail_grant <= target_bytes)
910                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
911         client_obd_list_unlock(&cli->cl_loi_list_lock);
912
913         return osc_shrink_grant_to_target(cli, target_bytes);
914 }
915
916 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
917 {
918         int rc = 0;
919         struct ost_body *body;
920
921         client_obd_list_lock(&cli->cl_loi_list_lock);
922         /* Don't shrink if we are already above or below the desired limit
923          * We don't want to shrink below a single RPC, as that will negatively
924          * impact block allocation and long-term performance. */
925         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
926                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
927
928         if (target_bytes >= cli->cl_avail_grant) {
929                 client_obd_list_unlock(&cli->cl_loi_list_lock);
930                 return 0;
931         }
932         client_obd_list_unlock(&cli->cl_loi_list_lock);
933
934         body = kzalloc(sizeof(*body), GFP_NOFS);
935         if (!body)
936                 return -ENOMEM;
937
938         osc_announce_cached(cli, &body->oa, 0);
939
940         client_obd_list_lock(&cli->cl_loi_list_lock);
941         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
942         cli->cl_avail_grant = target_bytes;
943         client_obd_list_unlock(&cli->cl_loi_list_lock);
944         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
945                 body->oa.o_valid |= OBD_MD_FLFLAGS;
946                 body->oa.o_flags = 0;
947         }
948         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
949         osc_update_next_shrink(cli);
950
951         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
952                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
953                                 sizeof(*body), body, NULL);
954         if (rc != 0)
955                 __osc_update_grant(cli, body->oa.o_grant);
956         kfree(body);
957         return rc;
958 }
959
960 static int osc_should_shrink_grant(struct client_obd *client)
961 {
962         unsigned long time = cfs_time_current();
963         unsigned long next_shrink = client->cl_next_shrink_grant;
964
965         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
966              OBD_CONNECT_GRANT_SHRINK) == 0)
967                 return 0;
968
969         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
970                 /* Get the current RPC size directly, instead of going via:
971                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
972                  * Keep comment here so that it can be found by searching. */
973                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
974
975                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
976                     client->cl_avail_grant > brw_size)
977                         return 1;
978
979                 osc_update_next_shrink(client);
980         }
981         return 0;
982 }
983
984 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
985 {
986         struct client_obd *client;
987
988         list_for_each_entry(client, &item->ti_obd_list,
989                                 cl_grant_shrink_list) {
990                 if (osc_should_shrink_grant(client))
991                         osc_shrink_grant(client);
992         }
993         return 0;
994 }
995
996 static int osc_add_shrink_grant(struct client_obd *client)
997 {
998         int rc;
999
1000         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1001                                        TIMEOUT_GRANT,
1002                                        osc_grant_shrink_grant_cb, NULL,
1003                                        &client->cl_grant_shrink_list);
1004         if (rc) {
1005                 CERROR("add grant client %s error %d\n",
1006                         client->cl_import->imp_obd->obd_name, rc);
1007                 return rc;
1008         }
1009         CDEBUG(D_CACHE, "add grant client %s \n",
1010                client->cl_import->imp_obd->obd_name);
1011         osc_update_next_shrink(client);
1012         return 0;
1013 }
1014
1015 static int osc_del_shrink_grant(struct client_obd *client)
1016 {
1017         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1018                                          TIMEOUT_GRANT);
1019 }
1020
1021 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1022 {
1023         /*
1024          * ocd_grant is the total grant amount we're expect to hold: if we've
1025          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1026          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1027          *
1028          * race is tolerable here: if we're evicted, but imp_state already
1029          * left EVICTED state, then cl_dirty must be 0 already.
1030          */
1031         client_obd_list_lock(&cli->cl_loi_list_lock);
1032         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1033                 cli->cl_avail_grant = ocd->ocd_grant;
1034         else
1035                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1036
1037         if (cli->cl_avail_grant < 0) {
1038                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1039                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1040                       ocd->ocd_grant, cli->cl_dirty);
1041                 /* workaround for servers which do not have the patch from
1042                  * LU-2679 */
1043                 cli->cl_avail_grant = ocd->ocd_grant;
1044         }
1045
1046         /* determine the appropriate chunk size used by osc_extent. */
1047         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1048         client_obd_list_unlock(&cli->cl_loi_list_lock);
1049
1050         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1051                cli->cl_import->imp_obd->obd_name,
1052                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1053
1054         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1055             list_empty(&cli->cl_grant_shrink_list))
1056                 osc_add_shrink_grant(cli);
1057 }
1058
1059 /* We assume that the reason this OSC got a short read is because it read
1060  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1061  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1062  * this stripe never got written at or beyond this stripe offset yet. */
1063 static void handle_short_read(int nob_read, u32 page_count,
1064                               struct brw_page **pga)
1065 {
1066         char *ptr;
1067         int i = 0;
1068
1069         /* skip bytes read OK */
1070         while (nob_read > 0) {
1071                 LASSERT(page_count > 0);
1072
1073                 if (pga[i]->count > nob_read) {
1074                         /* EOF inside this page */
1075                         ptr = kmap(pga[i]->pg) +
1076                                 (pga[i]->off & ~CFS_PAGE_MASK);
1077                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1078                         kunmap(pga[i]->pg);
1079                         page_count--;
1080                         i++;
1081                         break;
1082                 }
1083
1084                 nob_read -= pga[i]->count;
1085                 page_count--;
1086                 i++;
1087         }
1088
1089         /* zero remaining pages */
1090         while (page_count-- > 0) {
1091                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1092                 memset(ptr, 0, pga[i]->count);
1093                 kunmap(pga[i]->pg);
1094                 i++;
1095         }
1096 }
1097
1098 static int check_write_rcs(struct ptlrpc_request *req,
1099                            int requested_nob, int niocount,
1100                            u32 page_count, struct brw_page **pga)
1101 {
1102         int i;
1103         __u32 *remote_rcs;
1104
1105         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1106                                                   sizeof(*remote_rcs) *
1107                                                   niocount);
1108         if (!remote_rcs) {
1109                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1110                 return -EPROTO;
1111         }
1112
1113         /* return error if any niobuf was in error */
1114         for (i = 0; i < niocount; i++) {
1115                 if ((int)remote_rcs[i] < 0)
1116                         return remote_rcs[i];
1117
1118                 if (remote_rcs[i] != 0) {
1119                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1120                                 i, remote_rcs[i], req);
1121                         return -EPROTO;
1122                 }
1123         }
1124
1125         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1126                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1127                        req->rq_bulk->bd_nob_transferred, requested_nob);
1128                 return -EPROTO;
1129         }
1130
1131         return 0;
1132 }
1133
1134 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1135 {
1136         if (p1->flag != p2->flag) {
1137                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1138                                   OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1139
1140                 /* warn if we try to combine flags that we don't know to be
1141                  * safe to combine */
1142                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1143                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1144                               p1->flag, p2->flag);
1145                 }
1146                 return 0;
1147         }
1148
1149         return (p1->off + p1->count == p2->off);
1150 }
1151
1152 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1153                              struct brw_page **pga, int opc,
1154                              cksum_type_t cksum_type)
1155 {
1156         __u32 cksum;
1157         int i = 0;
1158         struct cfs_crypto_hash_desc *hdesc;
1159         unsigned int bufsize;
1160         int err;
1161         unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1162
1163         LASSERT(pg_count > 0);
1164
1165         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1166         if (IS_ERR(hdesc)) {
1167                 CERROR("Unable to initialize checksum hash %s\n",
1168                        cfs_crypto_hash_name(cfs_alg));
1169                 return PTR_ERR(hdesc);
1170         }
1171
1172         while (nob > 0 && pg_count > 0) {
1173                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1174
1175                 /* corrupt the data before we compute the checksum, to
1176                  * simulate an OST->client data error */
1177                 if (i == 0 && opc == OST_READ &&
1178                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1179                         unsigned char *ptr = kmap(pga[i]->pg);
1180                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1181
1182                         memcpy(ptr + off, "bad1", min(4, nob));
1183                         kunmap(pga[i]->pg);
1184                 }
1185                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1186                                   pga[i]->off & ~CFS_PAGE_MASK,
1187                                   count);
1188                 CDEBUG(D_PAGE,
1189                        "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1190                        pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1191                        (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1192                        page_private(pga[i]->pg),
1193                        (int)(pga[i]->off & ~CFS_PAGE_MASK));
1194
1195                 nob -= pga[i]->count;
1196                 pg_count--;
1197                 i++;
1198         }
1199
1200         bufsize = 4;
1201         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1202
1203         if (err)
1204                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1205
1206         /* For sending we only compute the wrong checksum instead
1207          * of corrupting the data so it is still correct on a redo */
1208         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1209                 cksum++;
1210
1211         return cksum;
1212 }
1213
1214 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1215                                 struct obdo *oa,
1216                                 struct lov_stripe_md *lsm, u32 page_count,
1217                                 struct brw_page **pga,
1218                                 struct ptlrpc_request **reqp,
1219                                 int reserve,
1220                                 int resend)
1221 {
1222         struct ptlrpc_request *req;
1223         struct ptlrpc_bulk_desc *desc;
1224         struct ost_body *body;
1225         struct obd_ioobj *ioobj;
1226         struct niobuf_remote *niobuf;
1227         int niocount, i, requested_nob, opc, rc;
1228         struct osc_brw_async_args *aa;
1229         struct req_capsule *pill;
1230         struct brw_page *pg_prev;
1231
1232         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1233                 return -ENOMEM; /* Recoverable */
1234         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1235                 return -EINVAL; /* Fatal */
1236
1237         if ((cmd & OBD_BRW_WRITE) != 0) {
1238                 opc = OST_WRITE;
1239                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1240                                                 osc_rq_pool,
1241                                                 &RQF_OST_BRW_WRITE);
1242         } else {
1243                 opc = OST_READ;
1244                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1245         }
1246         if (!req)
1247                 return -ENOMEM;
1248
1249         for (niocount = i = 1; i < page_count; i++) {
1250                 if (!can_merge_pages(pga[i - 1], pga[i]))
1251                         niocount++;
1252         }
1253
1254         pill = &req->rq_pill;
1255         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1256                              sizeof(*ioobj));
1257         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1258                              niocount * sizeof(*niobuf));
1259
1260         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1261         if (rc) {
1262                 ptlrpc_request_free(req);
1263                 return rc;
1264         }
1265         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1266         ptlrpc_at_set_req_timeout(req);
1267         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1268          * retry logic */
1269         req->rq_no_retry_einprogress = 1;
1270
1271         desc = ptlrpc_prep_bulk_imp(req, page_count,
1272                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1273                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1274                 OST_BULK_PORTAL);
1275
1276         if (!desc) {
1277                 rc = -ENOMEM;
1278                 goto out;
1279         }
1280         /* NB request now owns desc and will free it when it gets freed */
1281
1282         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1283         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1284         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1285         LASSERT(body && ioobj && niobuf);
1286
1287         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1288
1289         obdo_to_ioobj(oa, ioobj);
1290         ioobj->ioo_bufcnt = niocount;
1291         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1292          * that might be send for this request.  The actual number is decided
1293          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1294          * "max - 1" for old client compatibility sending "0", and also so the
1295          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1296         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1297         LASSERT(page_count > 0);
1298         pg_prev = pga[0];
1299         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1300                 struct brw_page *pg = pga[i];
1301                 int poff = pg->off & ~CFS_PAGE_MASK;
1302
1303                 LASSERT(pg->count > 0);
1304                 /* make sure there is no gap in the middle of page array */
1305                 LASSERTF(page_count == 1 ||
1306                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1307                           ergo(i > 0 && i < page_count - 1,
1308                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1309                           ergo(i == page_count - 1, poff == 0)),
1310                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1311                          i, page_count, pg, pg->off, pg->count);
1312                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1313                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1314                          i, page_count,
1315                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1316                          pg_prev->pg, page_private(pg_prev->pg),
1317                          pg_prev->pg->index, pg_prev->off);
1318                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1319                         (pg->flag & OBD_BRW_SRVLOCK));
1320
1321                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1322                 requested_nob += pg->count;
1323
1324                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1325                         niobuf--;
1326                         niobuf->len += pg->count;
1327                 } else {
1328                         niobuf->offset = pg->off;
1329                         niobuf->len = pg->count;
1330                         niobuf->flags = pg->flag;
1331                 }
1332                 pg_prev = pg;
1333         }
1334
1335         LASSERTF((void *)(niobuf - niocount) ==
1336                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1337                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1338                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1339
1340         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1341         if (resend) {
1342                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1343                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1344                         body->oa.o_flags = 0;
1345                 }
1346                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1347         }
1348
1349         if (osc_should_shrink_grant(cli))
1350                 osc_shrink_grant_local(cli, &body->oa);
1351
1352         /* size[REQ_REC_OFF] still sizeof (*body) */
1353         if (opc == OST_WRITE) {
1354                 if (cli->cl_checksum &&
1355                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1356                         /* store cl_cksum_type in a local variable since
1357                          * it can be changed via lprocfs */
1358                         cksum_type_t cksum_type = cli->cl_cksum_type;
1359
1360                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1361                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1362                                 body->oa.o_flags = 0;
1363                         }
1364                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1365                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1366                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1367                                                              page_count, pga,
1368                                                              OST_WRITE,
1369                                                              cksum_type);
1370                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1371                                body->oa.o_cksum);
1372                         /* save this in 'oa', too, for later checking */
1373                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374                         oa->o_flags |= cksum_type_pack(cksum_type);
1375                 } else {
1376                         /* clear out the checksum flag, in case this is a
1377                          * resend but cl_checksum is no longer set. b=11238 */
1378                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1379                 }
1380                 oa->o_cksum = body->oa.o_cksum;
1381                 /* 1 RC per niobuf */
1382                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1383                                      sizeof(__u32) * niocount);
1384         } else {
1385                 if (cli->cl_checksum &&
1386                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1387                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1388                                 body->oa.o_flags = 0;
1389                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1390                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1391                 }
1392         }
1393         ptlrpc_request_set_replen(req);
1394
1395         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1396         aa = ptlrpc_req_async_args(req);
1397         aa->aa_oa = oa;
1398         aa->aa_requested_nob = requested_nob;
1399         aa->aa_nio_count = niocount;
1400         aa->aa_page_count = page_count;
1401         aa->aa_resends = 0;
1402         aa->aa_ppga = pga;
1403         aa->aa_cli = cli;
1404         INIT_LIST_HEAD(&aa->aa_oaps);
1405
1406         *reqp = req;
1407         return 0;
1408
1409  out:
1410         ptlrpc_req_finished(req);
1411         return rc;
1412 }
1413
1414 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1415                                 __u32 client_cksum, __u32 server_cksum, int nob,
1416                                 u32 page_count, struct brw_page **pga,
1417                                 cksum_type_t client_cksum_type)
1418 {
1419         __u32 new_cksum;
1420         char *msg;
1421         cksum_type_t cksum_type;
1422
1423         if (server_cksum == client_cksum) {
1424                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1425                 return 0;
1426         }
1427
1428         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1429                                        oa->o_flags : 0);
1430         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1431                                       cksum_type);
1432
1433         if (cksum_type != client_cksum_type)
1434                 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1435                         ;
1436         else if (new_cksum == server_cksum)
1437                 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1438                         ;
1439         else if (new_cksum == client_cksum)
1440                 msg = "changed in transit before arrival at OST";
1441         else
1442                 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1443                         ;
1444
1445         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1446                            " object "DOSTID" extent [%llu-%llu]\n",
1447                            msg, libcfs_nid2str(peer->nid),
1448                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1449                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1450                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1451                            POSTID(&oa->o_oi), pga[0]->off,
1452                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1453         CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1454                client_cksum, client_cksum_type,
1455                server_cksum, cksum_type, new_cksum);
1456         return 1;
1457 }
1458
1459 /* Note rc enters this function as number of bytes transferred */
1460 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1461 {
1462         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1463         const lnet_process_id_t *peer =
1464                         &req->rq_import->imp_connection->c_peer;
1465         struct client_obd *cli = aa->aa_cli;
1466         struct ost_body *body;
1467         __u32 client_cksum = 0;
1468
1469         if (rc < 0 && rc != -EDQUOT) {
1470                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1471                 return rc;
1472         }
1473
1474         LASSERTF(req->rq_repmsg, "rc = %d\n", rc);
1475         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1476         if (!body) {
1477                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1478                 return -EPROTO;
1479         }
1480
1481         /* set/clear over quota flag for a uid/gid */
1482         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1483             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1484                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1485
1486                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1487                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1488                        body->oa.o_flags);
1489                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1490         }
1491
1492         osc_update_grant(cli, body);
1493
1494         if (rc < 0)
1495                 return rc;
1496
1497         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1498                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1499
1500         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1501                 if (rc > 0) {
1502                         CERROR("Unexpected +ve rc %d\n", rc);
1503                         return -EPROTO;
1504                 }
1505                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1506
1507                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1508                         return -EAGAIN;
1509
1510                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1511                     check_write_checksum(&body->oa, peer, client_cksum,
1512                                          body->oa.o_cksum, aa->aa_requested_nob,
1513                                          aa->aa_page_count, aa->aa_ppga,
1514                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1515                         return -EAGAIN;
1516
1517                 rc = check_write_rcs(req, aa->aa_requested_nob,
1518                                      aa->aa_nio_count,
1519                                      aa->aa_page_count, aa->aa_ppga);
1520                 goto out;
1521         }
1522
1523         /* The rest of this function executes only for OST_READs */
1524
1525         /* if unwrap_bulk failed, return -EAGAIN to retry */
1526         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1527         if (rc < 0) {
1528                 rc = -EAGAIN;
1529                 goto out;
1530         }
1531
1532         if (rc > aa->aa_requested_nob) {
1533                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1534                        aa->aa_requested_nob);
1535                 return -EPROTO;
1536         }
1537
1538         if (rc != req->rq_bulk->bd_nob_transferred) {
1539                 CERROR("Unexpected rc %d (%d transferred)\n",
1540                         rc, req->rq_bulk->bd_nob_transferred);
1541                 return -EPROTO;
1542         }
1543
1544         if (rc < aa->aa_requested_nob)
1545                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1546
1547         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1548                 static int cksum_counter;
1549                 __u32 server_cksum = body->oa.o_cksum;
1550                 char *via = "";
1551                 char *router = "";
1552                 cksum_type_t cksum_type;
1553
1554                 cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
1555                                                body->oa.o_flags : 0);
1556                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1557                                                  aa->aa_ppga, OST_READ,
1558                                                  cksum_type);
1559
1560                 if (peer->nid != req->rq_bulk->bd_sender) {
1561                         via = " via ";
1562                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1563                 }
1564
1565                 if (server_cksum != client_cksum) {
1566                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1567                                            req->rq_import->imp_obd->obd_name,
1568                                            libcfs_nid2str(peer->nid),
1569                                            via, router,
1570                                            body->oa.o_valid & OBD_MD_FLFID ?
1571                                            body->oa.o_parent_seq : (__u64)0,
1572                                            body->oa.o_valid & OBD_MD_FLFID ?
1573                                            body->oa.o_parent_oid : 0,
1574                                            body->oa.o_valid & OBD_MD_FLFID ?
1575                                            body->oa.o_parent_ver : 0,
1576                                            POSTID(&body->oa.o_oi),
1577                                            aa->aa_ppga[0]->off,
1578                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1579                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1580                                            1);
1581                         CERROR("client %x, server %x, cksum_type %x\n",
1582                                client_cksum, server_cksum, cksum_type);
1583                         cksum_counter = 0;
1584                         aa->aa_oa->o_cksum = client_cksum;
1585                         rc = -EAGAIN;
1586                 } else {
1587                         cksum_counter++;
1588                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1589                         rc = 0;
1590                 }
1591         } else if (unlikely(client_cksum)) {
1592                 static int cksum_missed;
1593
1594                 cksum_missed++;
1595                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1596                         CERROR("Checksum %u requested from %s but not sent\n",
1597                                cksum_missed, libcfs_nid2str(peer->nid));
1598         } else {
1599                 rc = 0;
1600         }
1601 out:
1602         if (rc >= 0)
1603                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1604                                      aa->aa_oa, &body->oa);
1605
1606         return rc;
1607 }
1608
1609 static int osc_brw_redo_request(struct ptlrpc_request *request,
1610                                 struct osc_brw_async_args *aa, int rc)
1611 {
1612         struct ptlrpc_request *new_req;
1613         struct osc_brw_async_args *new_aa;
1614         struct osc_async_page *oap;
1615
1616         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1617                   "redo for recoverable error %d", rc);
1618
1619         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1620                                         OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1621                                   aa->aa_cli, aa->aa_oa,
1622                                   NULL /* lsm unused by osc currently */,
1623                                   aa->aa_page_count, aa->aa_ppga,
1624                                   &new_req, 0, 1);
1625         if (rc)
1626                 return rc;
1627
1628         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1629                 if (oap->oap_request) {
1630                         LASSERTF(request == oap->oap_request,
1631                                  "request %p != oap_request %p\n",
1632                                  request, oap->oap_request);
1633                         if (oap->oap_interrupted) {
1634                                 ptlrpc_req_finished(new_req);
1635                                 return -EINTR;
1636                         }
1637                 }
1638         }
1639         /* New request takes over pga and oaps from old request.
1640          * Note that copying a list_head doesn't work, need to move it... */
1641         aa->aa_resends++;
1642         new_req->rq_interpret_reply = request->rq_interpret_reply;
1643         new_req->rq_async_args = request->rq_async_args;
1644         /* cap resend delay to the current request timeout, this is similar to
1645          * what ptlrpc does (see after_reply()) */
1646         if (aa->aa_resends > new_req->rq_timeout)
1647                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1648         else
1649                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1650         new_req->rq_generation_set = 1;
1651         new_req->rq_import_generation = request->rq_import_generation;
1652
1653         new_aa = ptlrpc_req_async_args(new_req);
1654
1655         INIT_LIST_HEAD(&new_aa->aa_oaps);
1656         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1657         INIT_LIST_HEAD(&new_aa->aa_exts);
1658         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1659         new_aa->aa_resends = aa->aa_resends;
1660
1661         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1662                 if (oap->oap_request) {
1663                         ptlrpc_req_finished(oap->oap_request);
1664                         oap->oap_request = ptlrpc_request_addref(new_req);
1665                 }
1666         }
1667
1668         /* XXX: This code will run into problem if we're going to support
1669          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1670          * and wait for all of them to be finished. We should inherit request
1671          * set from old request. */
1672         ptlrpcd_add_req(new_req);
1673
1674         DEBUG_REQ(D_INFO, new_req, "new request");
1675         return 0;
1676 }
1677
1678 /*
1679  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1680  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1681  * fine for our small page arrays and doesn't require allocation.  its an
1682  * insertion sort that swaps elements that are strides apart, shrinking the
1683  * stride down until its '1' and the array is sorted.
1684  */
1685 static void sort_brw_pages(struct brw_page **array, int num)
1686 {
1687         int stride, i, j;
1688         struct brw_page *tmp;
1689
1690         if (num == 1)
1691                 return;
1692         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1693                 ;
1694
1695         do {
1696                 stride /= 3;
1697                 for (i = stride ; i < num ; i++) {
1698                         tmp = array[i];
1699                         j = i;
1700                         while (j >= stride && array[j - stride]->off > tmp->off) {
1701                                 array[j] = array[j - stride];
1702                                 j -= stride;
1703                         }
1704                         array[j] = tmp;
1705                 }
1706         } while (stride > 1);
1707 }
1708
1709 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1710 {
1711         LASSERT(ppga);
1712         kfree(ppga);
1713 }
1714
1715 static int brw_interpret(const struct lu_env *env,
1716                          struct ptlrpc_request *req, void *data, int rc)
1717 {
1718         struct osc_brw_async_args *aa = data;
1719         struct osc_extent *ext;
1720         struct osc_extent *tmp;
1721         struct cl_object *obj = NULL;
1722         struct client_obd *cli = aa->aa_cli;
1723
1724         rc = osc_brw_fini_request(req, rc);
1725         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1726         /* When server return -EINPROGRESS, client should always retry
1727          * regardless of the number of times the bulk was resent already. */
1728         if (osc_recoverable_error(rc)) {
1729                 if (req->rq_import_generation !=
1730                     req->rq_import->imp_generation) {
1731                         CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1732                                req->rq_import->imp_obd->obd_name,
1733                                POSTID(&aa->aa_oa->o_oi), rc);
1734                 } else if (rc == -EINPROGRESS ||
1735                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1736                         rc = osc_brw_redo_request(req, aa, rc);
1737                 } else {
1738                         CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1739                                req->rq_import->imp_obd->obd_name,
1740                                POSTID(&aa->aa_oa->o_oi), rc);
1741                 }
1742
1743                 if (rc == 0)
1744                         return 0;
1745                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1746                         rc = -EIO;
1747         }
1748
1749         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1750                 if (!obj && rc == 0) {
1751                         obj = osc2cl(ext->oe_obj);
1752                         cl_object_get(obj);
1753                 }
1754
1755                 list_del_init(&ext->oe_link);
1756                 osc_extent_finish(env, ext, 1, rc);
1757         }
1758         LASSERT(list_empty(&aa->aa_exts));
1759         LASSERT(list_empty(&aa->aa_oaps));
1760
1761         if (obj) {
1762                 struct obdo *oa = aa->aa_oa;
1763                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1764                 unsigned long valid = 0;
1765
1766                 LASSERT(rc == 0);
1767                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1768                         attr->cat_blocks = oa->o_blocks;
1769                         valid |= CAT_BLOCKS;
1770                 }
1771                 if (oa->o_valid & OBD_MD_FLMTIME) {
1772                         attr->cat_mtime = oa->o_mtime;
1773                         valid |= CAT_MTIME;
1774                 }
1775                 if (oa->o_valid & OBD_MD_FLATIME) {
1776                         attr->cat_atime = oa->o_atime;
1777                         valid |= CAT_ATIME;
1778                 }
1779                 if (oa->o_valid & OBD_MD_FLCTIME) {
1780                         attr->cat_ctime = oa->o_ctime;
1781                         valid |= CAT_CTIME;
1782                 }
1783                 if (valid != 0) {
1784                         cl_object_attr_lock(obj);
1785                         cl_object_attr_set(env, obj, attr, valid);
1786                         cl_object_attr_unlock(obj);
1787                 }
1788                 cl_object_put(env, obj);
1789         }
1790         kmem_cache_free(obdo_cachep, aa->aa_oa);
1791
1792         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1793                           req->rq_bulk->bd_nob_transferred);
1794         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1795         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1796
1797         client_obd_list_lock(&cli->cl_loi_list_lock);
1798         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1799          * is called so we know whether to go to sync BRWs or wait for more
1800          * RPCs to complete */
1801         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1802                 cli->cl_w_in_flight--;
1803         else
1804                 cli->cl_r_in_flight--;
1805         osc_wake_cache_waiters(cli);
1806         client_obd_list_unlock(&cli->cl_loi_list_lock);
1807
1808         osc_io_unplug(env, cli, NULL);
1809         return rc;
1810 }
1811
1812 /**
1813  * Build an RPC by the list of extent @ext_list. The caller must ensure
1814  * that the total pages in this list are NOT over max pages per RPC.
1815  * Extents in the list must be in OES_RPC state.
1816  */
1817 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1818                   struct list_head *ext_list, int cmd)
1819 {
1820         struct ptlrpc_request *req = NULL;
1821         struct osc_extent *ext;
1822         struct brw_page **pga = NULL;
1823         struct osc_brw_async_args *aa = NULL;
1824         struct obdo *oa = NULL;
1825         struct osc_async_page *oap;
1826         struct osc_async_page *tmp;
1827         struct cl_req *clerq = NULL;
1828         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1829         struct ldlm_lock *lock = NULL;
1830         struct cl_req_attr *crattr = NULL;
1831         u64 starting_offset = OBD_OBJECT_EOF;
1832         u64 ending_offset = 0;
1833         int mpflag = 0;
1834         int mem_tight = 0;
1835         int page_count = 0;
1836         int i;
1837         int rc;
1838         struct ost_body *body;
1839         LIST_HEAD(rpc_list);
1840
1841         LASSERT(!list_empty(ext_list));
1842
1843         /* add pages into rpc_list to build BRW rpc */
1844         list_for_each_entry(ext, ext_list, oe_link) {
1845                 LASSERT(ext->oe_state == OES_RPC);
1846                 mem_tight |= ext->oe_memalloc;
1847                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1848                         ++page_count;
1849                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1850                         if (starting_offset > oap->oap_obj_off)
1851                                 starting_offset = oap->oap_obj_off;
1852                         else
1853                                 LASSERT(oap->oap_page_off == 0);
1854                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1855                                 ending_offset = oap->oap_obj_off +
1856                                                 oap->oap_count;
1857                         else
1858                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1859                                         PAGE_CACHE_SIZE);
1860                 }
1861         }
1862
1863         if (mem_tight)
1864                 mpflag = cfs_memory_pressure_get_and_set();
1865
1866         crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
1867         if (!crattr) {
1868                 rc = -ENOMEM;
1869                 goto out;
1870         }
1871
1872         pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
1873         if (!pga) {
1874                 rc = -ENOMEM;
1875                 goto out;
1876         }
1877
1878         oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
1879         if (!oa) {
1880                 rc = -ENOMEM;
1881                 goto out;
1882         }
1883
1884         i = 0;
1885         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1886                 struct cl_page *page = oap2cl_page(oap);
1887
1888                 if (!clerq) {
1889                         clerq = cl_req_alloc(env, page, crt,
1890                                              1 /* only 1-object rpcs for now */);
1891                         if (IS_ERR(clerq)) {
1892                                 rc = PTR_ERR(clerq);
1893                                 goto out;
1894                         }
1895                         lock = oap->oap_ldlm_lock;
1896                 }
1897                 if (mem_tight)
1898                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1899                 pga[i] = &oap->oap_brw_page;
1900                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1901                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1902                        pga[i]->pg, page_index(oap->oap_page), oap,
1903                        pga[i]->flag);
1904                 i++;
1905                 cl_req_page_add(env, clerq, page);
1906         }
1907
1908         /* always get the data for the obdo for the rpc */
1909         LASSERT(clerq);
1910         crattr->cra_oa = oa;
1911         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1912         if (lock) {
1913                 oa->o_handle = lock->l_remote_handle;
1914                 oa->o_valid |= OBD_MD_FLHANDLE;
1915         }
1916
1917         rc = cl_req_prep(env, clerq);
1918         if (rc != 0) {
1919                 CERROR("cl_req_prep failed: %d\n", rc);
1920                 goto out;
1921         }
1922
1923         sort_brw_pages(pga, page_count);
1924         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1925                         pga, &req, 1, 0);
1926         if (rc != 0) {
1927                 CERROR("prep_req failed: %d\n", rc);
1928                 goto out;
1929         }
1930
1931         req->rq_interpret_reply = brw_interpret;
1932
1933         if (mem_tight != 0)
1934                 req->rq_memalloc = 1;
1935
1936         /* Need to update the timestamps after the request is built in case
1937          * we race with setattr (locally or in queue at OST).  If OST gets
1938          * later setattr before earlier BRW (as determined by the request xid),
1939          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1940          * way to do this in a single call.  bug 10150 */
1941         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1942         crattr->cra_oa = &body->oa;
1943         cl_req_attr_set(env, clerq, crattr,
1944                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1945
1946         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1947
1948         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1949         aa = ptlrpc_req_async_args(req);
1950         INIT_LIST_HEAD(&aa->aa_oaps);
1951         list_splice_init(&rpc_list, &aa->aa_oaps);
1952         INIT_LIST_HEAD(&aa->aa_exts);
1953         list_splice_init(ext_list, &aa->aa_exts);
1954         aa->aa_clerq = clerq;
1955
1956         /* queued sync pages can be torn down while the pages
1957          * were between the pending list and the rpc */
1958         tmp = NULL;
1959         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1960                 /* only one oap gets a request reference */
1961                 if (!tmp)
1962                         tmp = oap;
1963                 if (oap->oap_interrupted && !req->rq_intr) {
1964                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1965                                         oap, req);
1966                         ptlrpc_mark_interrupted(req);
1967                 }
1968         }
1969         if (tmp)
1970                 tmp->oap_request = ptlrpc_request_addref(req);
1971
1972         client_obd_list_lock(&cli->cl_loi_list_lock);
1973         starting_offset >>= PAGE_CACHE_SHIFT;
1974         if (cmd == OBD_BRW_READ) {
1975                 cli->cl_r_in_flight++;
1976                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1977                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1978                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1979                                       starting_offset + 1);
1980         } else {
1981                 cli->cl_w_in_flight++;
1982                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1983                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1984                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1985                                       starting_offset + 1);
1986         }
1987         client_obd_list_unlock(&cli->cl_loi_list_lock);
1988
1989         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
1990                   page_count, aa, cli->cl_r_in_flight,
1991                   cli->cl_w_in_flight);
1992
1993         ptlrpcd_add_req(req);
1994         rc = 0;
1995
1996 out:
1997         if (mem_tight != 0)
1998                 cfs_memory_pressure_restore(mpflag);
1999
2000         kfree(crattr);
2001
2002         if (rc != 0) {
2003                 LASSERT(!req);
2004
2005                 if (oa)
2006                         kmem_cache_free(obdo_cachep, oa);
2007                 kfree(pga);
2008                 /* this should happen rarely and is pretty bad, it makes the
2009                  * pending list not follow the dirty order */
2010                 while (!list_empty(ext_list)) {
2011                         ext = list_entry(ext_list->next, struct osc_extent,
2012                                              oe_link);
2013                         list_del_init(&ext->oe_link);
2014                         osc_extent_finish(env, ext, 0, rc);
2015                 }
2016                 if (clerq && !IS_ERR(clerq))
2017                         cl_req_completion(env, clerq, rc);
2018         }
2019         return rc;
2020 }
2021
2022 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2023                                         struct ldlm_enqueue_info *einfo)
2024 {
2025         void *data = einfo->ei_cbdata;
2026         int set = 0;
2027
2028         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2029         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2030         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2031         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2032
2033         lock_res_and_lock(lock);
2034         spin_lock(&osc_ast_guard);
2035
2036         if (!lock->l_ast_data)
2037                 lock->l_ast_data = data;
2038         if (lock->l_ast_data == data)
2039                 set = 1;
2040
2041         spin_unlock(&osc_ast_guard);
2042         unlock_res_and_lock(lock);
2043
2044         return set;
2045 }
2046
2047 static int osc_set_data_with_check(struct lustre_handle *lockh,
2048                                    struct ldlm_enqueue_info *einfo)
2049 {
2050         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2051         int set = 0;
2052
2053         if (lock) {
2054                 set = osc_set_lock_data_with_check(lock, einfo);
2055                 LDLM_LOCK_PUT(lock);
2056         } else
2057                 CERROR("lockh %p, data %p - client evicted?\n",
2058                        lockh, einfo->ei_cbdata);
2059         return set;
2060 }
2061
2062 /* find any ldlm lock of the inode in osc
2063  * return 0    not find
2064  *      1    find one
2065  *      < 0    error */
2066 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2067                            ldlm_iterator_t replace, void *data)
2068 {
2069         struct ldlm_res_id res_id;
2070         struct obd_device *obd = class_exp2obd(exp);
2071         int rc = 0;
2072
2073         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2074         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2075         if (rc == LDLM_ITER_STOP)
2076                 return 1;
2077         if (rc == LDLM_ITER_CONTINUE)
2078                 return 0;
2079         return rc;
2080 }
2081
2082 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2083                             obd_enqueue_update_f upcall, void *cookie,
2084                             __u64 *flags, int agl, int rc)
2085 {
2086         int intent = *flags & LDLM_FL_HAS_INTENT;
2087
2088         if (intent) {
2089                 /* The request was created before ldlm_cli_enqueue call. */
2090                 if (rc == ELDLM_LOCK_ABORTED) {
2091                         struct ldlm_reply *rep;
2092
2093                         rep = req_capsule_server_get(&req->rq_pill,
2094                                                      &RMF_DLM_REP);
2095
2096                         rep->lock_policy_res1 =
2097                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2098                         if (rep->lock_policy_res1)
2099                                 rc = rep->lock_policy_res1;
2100                 }
2101         }
2102
2103         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2104             (rc == 0)) {
2105                 *flags |= LDLM_FL_LVB_READY;
2106                 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2107                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2108         }
2109
2110         /* Call the update callback. */
2111         rc = (*upcall)(cookie, rc);
2112         return rc;
2113 }
2114
2115 static int osc_enqueue_interpret(const struct lu_env *env,
2116                                  struct ptlrpc_request *req,
2117                                  struct osc_enqueue_args *aa, int rc)
2118 {
2119         struct ldlm_lock *lock;
2120         struct lustre_handle handle;
2121         __u32 mode;
2122         struct ost_lvb *lvb;
2123         __u32 lvb_len;
2124         __u64 *flags = aa->oa_flags;
2125
2126         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2127          * might be freed anytime after lock upcall has been called. */
2128         lustre_handle_copy(&handle, aa->oa_lockh);
2129         mode = aa->oa_ei->ei_mode;
2130
2131         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2132          * be valid. */
2133         lock = ldlm_handle2lock(&handle);
2134
2135         /* Take an additional reference so that a blocking AST that
2136          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2137          * to arrive after an upcall has been executed by
2138          * osc_enqueue_fini(). */
2139         ldlm_lock_addref(&handle, mode);
2140
2141         /* Let CP AST to grant the lock first. */
2142         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2143
2144         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2145                 lvb = NULL;
2146                 lvb_len = 0;
2147         } else {
2148                 lvb = aa->oa_lvb;
2149                 lvb_len = sizeof(*aa->oa_lvb);
2150         }
2151
2152         /* Complete obtaining the lock procedure. */
2153         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2154                                    mode, flags, lvb, lvb_len, &handle, rc);
2155         /* Complete osc stuff. */
2156         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2157                               flags, aa->oa_agl, rc);
2158
2159         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2160
2161         /* Release the lock for async request. */
2162         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2163                 /*
2164                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2165                  * not already released by
2166                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2167                  */
2168                 ldlm_lock_decref(&handle, mode);
2169
2170         LASSERTF(lock, "lockh %p, req %p, aa %p - client evicted?\n",
2171                  aa->oa_lockh, req, aa);
2172         ldlm_lock_decref(&handle, mode);
2173         LDLM_LOCK_PUT(lock);
2174         return rc;
2175 }
2176
2177 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2178
2179 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2180  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2181  * other synchronous requests, however keeping some locks and trying to obtain
2182  * others may take a considerable amount of time in a case of ost failure; and
2183  * when other sync requests do not get released lock from a client, the client
2184  * is excluded from the cluster -- such scenarious make the life difficult, so
2185  * release locks just after they are obtained. */
2186 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2187                      __u64 *flags, ldlm_policy_data_t *policy,
2188                      struct ost_lvb *lvb, int kms_valid,
2189                      obd_enqueue_update_f upcall, void *cookie,
2190                      struct ldlm_enqueue_info *einfo,
2191                      struct lustre_handle *lockh,
2192                      struct ptlrpc_request_set *rqset, int async, int agl)
2193 {
2194         struct obd_device *obd = exp->exp_obd;
2195         struct ptlrpc_request *req = NULL;
2196         int intent = *flags & LDLM_FL_HAS_INTENT;
2197         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2198         ldlm_mode_t mode;
2199         int rc;
2200
2201         /* Filesystem lock extents are extended to page boundaries so that
2202          * dealing with the page cache is a little smoother.  */
2203         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2204         policy->l_extent.end |= ~CFS_PAGE_MASK;
2205
2206         /*
2207          * kms is not valid when either object is completely fresh (so that no
2208          * locks are cached), or object was evicted. In the latter case cached
2209          * lock cannot be used, because it would prime inode state with
2210          * potentially stale LVB.
2211          */
2212         if (!kms_valid)
2213                 goto no_match;
2214
2215         /* Next, search for already existing extent locks that will cover us */
2216         /* If we're trying to read, we also search for an existing PW lock.  The
2217          * VFS and page cache already protect us locally, so lots of readers/
2218          * writers can share a single PW lock.
2219          *
2220          * There are problems with conversion deadlocks, so instead of
2221          * converting a read lock to a write lock, we'll just enqueue a new
2222          * one.
2223          *
2224          * At some point we should cancel the read lock instead of making them
2225          * send us a blocking callback, but there are problems with canceling
2226          * locks out from other users right now, too. */
2227         mode = einfo->ei_mode;
2228         if (einfo->ei_mode == LCK_PR)
2229                 mode |= LCK_PW;
2230         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2231                                einfo->ei_type, policy, mode, lockh, 0);
2232         if (mode) {
2233                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2234
2235                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2236                         /* For AGL, if enqueue RPC is sent but the lock is not
2237                          * granted, then skip to process this strpe.
2238                          * Return -ECANCELED to tell the caller. */
2239                         ldlm_lock_decref(lockh, mode);
2240                         LDLM_LOCK_PUT(matched);
2241                         return -ECANCELED;
2242                 }
2243
2244                 if (osc_set_lock_data_with_check(matched, einfo)) {
2245                         *flags |= LDLM_FL_LVB_READY;
2246                         /* addref the lock only if not async requests and PW
2247                          * lock is matched whereas we asked for PR. */
2248                         if (!rqset && einfo->ei_mode != mode)
2249                                 ldlm_lock_addref(lockh, LCK_PR);
2250                         if (intent) {
2251                                 /* I would like to be able to ASSERT here that
2252                                  * rss <= kms, but I can't, for reasons which
2253                                  * are explained in lov_enqueue() */
2254                         }
2255
2256                         /* We already have a lock, and it's referenced.
2257                          *
2258                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2259                          * AGL upcall may change it to CLS_HELD directly. */
2260                         (*upcall)(cookie, ELDLM_OK);
2261
2262                         if (einfo->ei_mode != mode)
2263                                 ldlm_lock_decref(lockh, LCK_PW);
2264                         else if (rqset)
2265                                 /* For async requests, decref the lock. */
2266                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2267                         LDLM_LOCK_PUT(matched);
2268                         return ELDLM_OK;
2269                 }
2270
2271                 ldlm_lock_decref(lockh, mode);
2272                 LDLM_LOCK_PUT(matched);
2273         }
2274
2275  no_match:
2276         if (intent) {
2277                 LIST_HEAD(cancels);
2278
2279                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2280                                            &RQF_LDLM_ENQUEUE_LVB);
2281                 if (!req)
2282                         return -ENOMEM;
2283
2284                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2285                 if (rc) {
2286                         ptlrpc_request_free(req);
2287                         return rc;
2288                 }
2289
2290                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2291                                      sizeof(*lvb));
2292                 ptlrpc_request_set_replen(req);
2293         }
2294
2295         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2296         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2297
2298         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2299                               sizeof(*lvb), LVB_T_OST, lockh, async);
2300         if (rqset) {
2301                 if (!rc) {
2302                         struct osc_enqueue_args *aa;
2303
2304                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2305                         aa = ptlrpc_req_async_args(req);
2306                         aa->oa_ei = einfo;
2307                         aa->oa_exp = exp;
2308                         aa->oa_flags  = flags;
2309                         aa->oa_upcall = upcall;
2310                         aa->oa_cookie = cookie;
2311                         aa->oa_lvb    = lvb;
2312                         aa->oa_lockh  = lockh;
2313                         aa->oa_agl    = !!agl;
2314
2315                         req->rq_interpret_reply =
2316                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2317                         if (rqset == PTLRPCD_SET)
2318                                 ptlrpcd_add_req(req);
2319                         else
2320                                 ptlrpc_set_add_req(rqset, req);
2321                 } else if (intent) {
2322                         ptlrpc_req_finished(req);
2323                 }
2324                 return rc;
2325         }
2326
2327         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2328         if (intent)
2329                 ptlrpc_req_finished(req);
2330
2331         return rc;
2332 }
2333
2334 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2335                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2336                    __u64 *flags, void *data, struct lustre_handle *lockh,
2337                    int unref)
2338 {
2339         struct obd_device *obd = exp->exp_obd;
2340         __u64 lflags = *flags;
2341         ldlm_mode_t rc;
2342
2343         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2344                 return -EIO;
2345
2346         /* Filesystem lock extents are extended to page boundaries so that
2347          * dealing with the page cache is a little smoother */
2348         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2349         policy->l_extent.end |= ~CFS_PAGE_MASK;
2350
2351         /* Next, search for already existing extent locks that will cover us */
2352         /* If we're trying to read, we also search for an existing PW lock.  The
2353          * VFS and page cache already protect us locally, so lots of readers/
2354          * writers can share a single PW lock. */
2355         rc = mode;
2356         if (mode == LCK_PR)
2357                 rc |= LCK_PW;
2358         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2359                              res_id, type, policy, rc, lockh, unref);
2360         if (rc) {
2361                 if (data) {
2362                         if (!osc_set_data_with_check(lockh, data)) {
2363                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2364                                         ldlm_lock_decref(lockh, rc);
2365                                 return 0;
2366                         }
2367                 }
2368                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2369                         ldlm_lock_addref(lockh, LCK_PR);
2370                         ldlm_lock_decref(lockh, LCK_PW);
2371                 }
2372                 return rc;
2373         }
2374         return rc;
2375 }
2376
2377 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2378 {
2379         if (unlikely(mode == LCK_GROUP))
2380                 ldlm_lock_decref_and_cancel(lockh, mode);
2381         else
2382                 ldlm_lock_decref(lockh, mode);
2383
2384         return 0;
2385 }
2386
2387 static int osc_statfs_interpret(const struct lu_env *env,
2388                                 struct ptlrpc_request *req,
2389                                 struct osc_async_args *aa, int rc)
2390 {
2391         struct obd_statfs *msfs;
2392
2393         if (rc == -EBADR)
2394                 /* The request has in fact never been sent
2395                  * due to issues at a higher level (LOV).
2396                  * Exit immediately since the caller is
2397                  * aware of the problem and takes care
2398                  * of the clean up */
2399                  return rc;
2400
2401         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2402             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2403                 rc = 0;
2404                 goto out;
2405         }
2406
2407         if (rc != 0)
2408                 goto out;
2409
2410         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2411         if (!msfs) {
2412                 rc = -EPROTO;
2413                 goto out;
2414         }
2415
2416         *aa->aa_oi->oi_osfs = *msfs;
2417 out:
2418         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2419         return rc;
2420 }
2421
2422 static int osc_statfs_async(struct obd_export *exp,
2423                             struct obd_info *oinfo, __u64 max_age,
2424                             struct ptlrpc_request_set *rqset)
2425 {
2426         struct obd_device *obd = class_exp2obd(exp);
2427         struct ptlrpc_request *req;
2428         struct osc_async_args *aa;
2429         int rc;
2430
2431         /* We could possibly pass max_age in the request (as an absolute
2432          * timestamp or a "seconds.usec ago") so the target can avoid doing
2433          * extra calls into the filesystem if that isn't necessary (e.g.
2434          * during mount that would help a bit).  Having relative timestamps
2435          * is not so great if request processing is slow, while absolute
2436          * timestamps are not ideal because they need time synchronization. */
2437         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2438         if (!req)
2439                 return -ENOMEM;
2440
2441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2442         if (rc) {
2443                 ptlrpc_request_free(req);
2444                 return rc;
2445         }
2446         ptlrpc_request_set_replen(req);
2447         req->rq_request_portal = OST_CREATE_PORTAL;
2448         ptlrpc_at_set_req_timeout(req);
2449
2450         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2451                 /* procfs requests not want stat in wait for avoid deadlock */
2452                 req->rq_no_resend = 1;
2453                 req->rq_no_delay = 1;
2454         }
2455
2456         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2457         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2458         aa = ptlrpc_req_async_args(req);
2459         aa->aa_oi = oinfo;
2460
2461         ptlrpc_set_add_req(rqset, req);
2462         return 0;
2463 }
2464
2465 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2466                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2467 {
2468         struct obd_device *obd = class_exp2obd(exp);
2469         struct obd_statfs *msfs;
2470         struct ptlrpc_request *req;
2471         struct obd_import *imp = NULL;
2472         int rc;
2473
2474         /*Since the request might also come from lprocfs, so we need
2475          *sync this with client_disconnect_export Bug15684*/
2476         down_read(&obd->u.cli.cl_sem);
2477         if (obd->u.cli.cl_import)
2478                 imp = class_import_get(obd->u.cli.cl_import);
2479         up_read(&obd->u.cli.cl_sem);
2480         if (!imp)
2481                 return -ENODEV;
2482
2483         /* We could possibly pass max_age in the request (as an absolute
2484          * timestamp or a "seconds.usec ago") so the target can avoid doing
2485          * extra calls into the filesystem if that isn't necessary (e.g.
2486          * during mount that would help a bit).  Having relative timestamps
2487          * is not so great if request processing is slow, while absolute
2488          * timestamps are not ideal because they need time synchronization. */
2489         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2490
2491         class_import_put(imp);
2492
2493         if (!req)
2494                 return -ENOMEM;
2495
2496         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2497         if (rc) {
2498                 ptlrpc_request_free(req);
2499                 return rc;
2500         }
2501         ptlrpc_request_set_replen(req);
2502         req->rq_request_portal = OST_CREATE_PORTAL;
2503         ptlrpc_at_set_req_timeout(req);
2504
2505         if (flags & OBD_STATFS_NODELAY) {
2506                 /* procfs requests not want stat in wait for avoid deadlock */
2507                 req->rq_no_resend = 1;
2508                 req->rq_no_delay = 1;
2509         }
2510
2511         rc = ptlrpc_queue_wait(req);
2512         if (rc)
2513                 goto out;
2514
2515         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2516         if (!msfs) {
2517                 rc = -EPROTO;
2518                 goto out;
2519         }
2520
2521         *osfs = *msfs;
2522
2523  out:
2524         ptlrpc_req_finished(req);
2525         return rc;
2526 }
2527
2528 /* Retrieve object striping information.
2529  *
2530  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2531  * the maximum number of OST indices which will fit in the user buffer.
2532  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2533  */
2534 static int osc_getstripe(struct lov_stripe_md *lsm,
2535                          struct lov_user_md __user *lump)
2536 {
2537         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2538         struct lov_user_md_v3 lum, *lumk;
2539         struct lov_user_ost_data_v1 *lmm_objects;
2540         int rc = 0, lum_size;
2541
2542         if (!lsm)
2543                 return -ENODATA;
2544
2545         /* we only need the header part from user space to get lmm_magic and
2546          * lmm_stripe_count, (the header part is common to v1 and v3) */
2547         lum_size = sizeof(struct lov_user_md_v1);
2548         if (copy_from_user(&lum, lump, lum_size))
2549                 return -EFAULT;
2550
2551         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2552             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2553                 return -EINVAL;
2554
2555         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2556         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2557         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2558         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2559
2560         /* we can use lov_mds_md_size() to compute lum_size
2561          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2562         if (lum.lmm_stripe_count > 0) {
2563                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2564                 lumk = kzalloc(lum_size, GFP_NOFS);
2565                 if (!lumk)
2566                         return -ENOMEM;
2567
2568                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2569                         lmm_objects =
2570                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2571                 else
2572                         lmm_objects = &(lumk->lmm_objects[0]);
2573                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2574         } else {
2575                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2576                 lumk = &lum;
2577         }
2578
2579         lumk->lmm_oi = lsm->lsm_oi;
2580         lumk->lmm_stripe_count = 1;
2581
2582         if (copy_to_user(lump, lumk, lum_size))
2583                 rc = -EFAULT;
2584
2585         if (lumk != &lum)
2586                 kfree(lumk);
2587
2588         return rc;
2589 }
2590
2591 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2592                          void *karg, void __user *uarg)
2593 {
2594         struct obd_device *obd = exp->exp_obd;
2595         struct obd_ioctl_data *data = karg;
2596         int err = 0;
2597
2598         if (!try_module_get(THIS_MODULE)) {
2599                 CERROR("Can't get module. Is it alive?");
2600                 return -EINVAL;
2601         }
2602         switch (cmd) {
2603         case OBD_IOC_LOV_GET_CONFIG: {
2604                 char *buf;
2605                 struct lov_desc *desc;
2606                 struct obd_uuid uuid;
2607
2608                 buf = NULL;
2609                 len = 0;
2610                 if (obd_ioctl_getdata(&buf, &len, uarg)) {
2611                         err = -EINVAL;
2612                         goto out;
2613                 }
2614
2615                 data = (struct obd_ioctl_data *)buf;
2616
2617                 if (sizeof(*desc) > data->ioc_inllen1) {
2618                         obd_ioctl_freedata(buf, len);
2619                         err = -EINVAL;
2620                         goto out;
2621                 }
2622
2623                 if (data->ioc_inllen2 < sizeof(uuid)) {
2624                         obd_ioctl_freedata(buf, len);
2625                         err = -EINVAL;
2626                         goto out;
2627                 }
2628
2629                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2630                 desc->ld_tgt_count = 1;
2631                 desc->ld_active_tgt_count = 1;
2632                 desc->ld_default_stripe_count = 1;
2633                 desc->ld_default_stripe_size = 0;
2634                 desc->ld_default_stripe_offset = 0;
2635                 desc->ld_pattern = 0;
2636                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2637
2638                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2639
2640                 err = copy_to_user(uarg, buf, len);
2641                 if (err)
2642                         err = -EFAULT;
2643                 obd_ioctl_freedata(buf, len);
2644                 goto out;
2645         }
2646         case LL_IOC_LOV_SETSTRIPE:
2647                 err = obd_alloc_memmd(exp, karg);
2648                 if (err > 0)
2649                         err = 0;
2650                 goto out;
2651         case LL_IOC_LOV_GETSTRIPE:
2652                 err = osc_getstripe(karg, uarg);
2653                 goto out;
2654         case OBD_IOC_CLIENT_RECOVER:
2655                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2656                                             data->ioc_inlbuf1, 0);
2657                 if (err > 0)
2658                         err = 0;
2659                 goto out;
2660         case IOC_OSC_SET_ACTIVE:
2661                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2662                                                data->ioc_offset);
2663                 goto out;
2664         case OBD_IOC_POLL_QUOTACHECK:
2665                 err = osc_quota_poll_check(exp, karg);
2666                 goto out;
2667         case OBD_IOC_PING_TARGET:
2668                 err = ptlrpc_obd_ping(obd);
2669                 goto out;
2670         default:
2671                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2672                        cmd, current_comm());
2673                 err = -ENOTTY;
2674                 goto out;
2675         }
2676 out:
2677         module_put(THIS_MODULE);
2678         return err;
2679 }
2680
2681 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2682                         u32 keylen, void *key, __u32 *vallen, void *val,
2683                         struct lov_stripe_md *lsm)
2684 {
2685         if (!vallen || !val)
2686                 return -EFAULT;
2687
2688         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2689                 __u32 *stripe = val;
2690                 *vallen = sizeof(*stripe);
2691                 *stripe = 0;
2692                 return 0;
2693         } else if (KEY_IS(KEY_LAST_ID)) {
2694                 struct ptlrpc_request *req;
2695                 u64 *reply;
2696                 char *tmp;
2697                 int rc;
2698
2699                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2700                                            &RQF_OST_GET_INFO_LAST_ID);
2701                 if (!req)
2702                         return -ENOMEM;
2703
2704                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2705                                      RCL_CLIENT, keylen);
2706                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2707                 if (rc) {
2708                         ptlrpc_request_free(req);
2709                         return rc;
2710                 }
2711
2712                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2713                 memcpy(tmp, key, keylen);
2714
2715                 req->rq_no_delay = req->rq_no_resend = 1;
2716                 ptlrpc_request_set_replen(req);
2717                 rc = ptlrpc_queue_wait(req);
2718                 if (rc)
2719                         goto out;
2720
2721                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2722                 if (!reply) {
2723                         rc = -EPROTO;
2724                         goto out;
2725                 }
2726
2727                 *((u64 *)val) = *reply;
2728 out:
2729                 ptlrpc_req_finished(req);
2730                 return rc;
2731         } else if (KEY_IS(KEY_FIEMAP)) {
2732                 struct ll_fiemap_info_key *fm_key = key;
2733                 struct ldlm_res_id res_id;
2734                 ldlm_policy_data_t policy;
2735                 struct lustre_handle lockh;
2736                 ldlm_mode_t mode = 0;
2737                 struct ptlrpc_request *req;
2738                 struct ll_user_fiemap *reply;
2739                 char *tmp;
2740                 int rc;
2741
2742                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2743                         goto skip_locking;
2744
2745                 policy.l_extent.start = fm_key->fiemap.fm_start &
2746                                                 CFS_PAGE_MASK;
2747
2748                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2749                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2750                         policy.l_extent.end = OBD_OBJECT_EOF;
2751                 else
2752                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2753                                 fm_key->fiemap.fm_length +
2754                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2755
2756                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2757                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2758                                        LDLM_FL_BLOCK_GRANTED |
2759                                        LDLM_FL_LVB_READY,
2760                                        &res_id, LDLM_EXTENT, &policy,
2761                                        LCK_PR | LCK_PW, &lockh, 0);
2762                 if (mode) { /* lock is cached on client */
2763                         if (mode != LCK_PR) {
2764                                 ldlm_lock_addref(&lockh, LCK_PR);
2765                                 ldlm_lock_decref(&lockh, LCK_PW);
2766                         }
2767                 } else { /* no cached lock, needs acquire lock on server side */
2768                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2769                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2770                 }
2771
2772 skip_locking:
2773                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2774                                            &RQF_OST_GET_INFO_FIEMAP);
2775                 if (!req) {
2776                         rc = -ENOMEM;
2777                         goto drop_lock;
2778                 }
2779
2780                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2781                                      RCL_CLIENT, keylen);
2782                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2783                                      RCL_CLIENT, *vallen);
2784                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2785                                      RCL_SERVER, *vallen);
2786
2787                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2788                 if (rc) {
2789                         ptlrpc_request_free(req);
2790                         goto drop_lock;
2791                 }
2792
2793                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2794                 memcpy(tmp, key, keylen);
2795                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2796                 memcpy(tmp, val, *vallen);
2797
2798                 ptlrpc_request_set_replen(req);
2799                 rc = ptlrpc_queue_wait(req);
2800                 if (rc)
2801                         goto fini_req;
2802
2803                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2804                 if (!reply) {
2805                         rc = -EPROTO;
2806                         goto fini_req;
2807                 }
2808
2809                 memcpy(val, reply, *vallen);
2810 fini_req:
2811                 ptlrpc_req_finished(req);
2812 drop_lock:
2813                 if (mode)
2814                         ldlm_lock_decref(&lockh, LCK_PR);
2815                 return rc;
2816         }
2817
2818         return -EINVAL;
2819 }
2820
2821 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2822                               u32 keylen, void *key, u32 vallen,
2823                               void *val, struct ptlrpc_request_set *set)
2824 {
2825         struct ptlrpc_request *req;
2826         struct obd_device *obd = exp->exp_obd;
2827         struct obd_import *imp = class_exp2cliimp(exp);
2828         char *tmp;
2829         int rc;
2830
2831         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2832
2833         if (KEY_IS(KEY_CHECKSUM)) {
2834                 if (vallen != sizeof(int))
2835                         return -EINVAL;
2836                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2837                 return 0;
2838         }
2839
2840         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2841                 sptlrpc_conf_client_adapt(obd);
2842                 return 0;
2843         }
2844
2845         if (KEY_IS(KEY_FLUSH_CTX)) {
2846                 sptlrpc_import_flush_my_ctx(imp);
2847                 return 0;
2848         }
2849
2850         if (KEY_IS(KEY_CACHE_SET)) {
2851                 struct client_obd *cli = &obd->u.cli;
2852
2853                 LASSERT(!cli->cl_cache); /* only once */
2854                 cli->cl_cache = val;
2855                 atomic_inc(&cli->cl_cache->ccc_users);
2856                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2857
2858                 /* add this osc into entity list */
2859                 LASSERT(list_empty(&cli->cl_lru_osc));
2860                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2861                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2862                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2863
2864                 return 0;
2865         }
2866
2867         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2868                 struct client_obd *cli = &obd->u.cli;
2869                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2870                 int target = *(int *)val;
2871
2872                 nr = osc_lru_shrink(cli, min(nr, target));
2873                 *(int *)val -= nr;
2874                 return 0;
2875         }
2876
2877         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2878                 return -EINVAL;
2879
2880         /* We pass all other commands directly to OST. Since nobody calls osc
2881            methods directly and everybody is supposed to go through LOV, we
2882            assume lov checked invalid values for us.
2883            The only recognised values so far are evict_by_nid and mds_conn.
2884            Even if something bad goes through, we'd get a -EINVAL from OST
2885            anyway. */
2886
2887         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2888                                                 &RQF_OST_SET_GRANT_INFO :
2889                                                 &RQF_OBD_SET_INFO);
2890         if (!req)
2891                 return -ENOMEM;
2892
2893         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2894                              RCL_CLIENT, keylen);
2895         if (!KEY_IS(KEY_GRANT_SHRINK))
2896                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2897                                      RCL_CLIENT, vallen);
2898         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2899         if (rc) {
2900                 ptlrpc_request_free(req);
2901                 return rc;
2902         }
2903
2904         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2905         memcpy(tmp, key, keylen);
2906         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2907                                                         &RMF_OST_BODY :
2908                                                         &RMF_SETINFO_VAL);
2909         memcpy(tmp, val, vallen);
2910
2911         if (KEY_IS(KEY_GRANT_SHRINK)) {
2912                 struct osc_brw_async_args *aa;
2913                 struct obdo *oa;
2914
2915                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2916                 aa = ptlrpc_req_async_args(req);
2917                 oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
2918                 if (!oa) {
2919                         ptlrpc_req_finished(req);
2920                         return -ENOMEM;
2921                 }
2922                 *oa = ((struct ost_body *)val)->oa;
2923                 aa->aa_oa = oa;
2924                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2925         }
2926
2927         ptlrpc_request_set_replen(req);
2928         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2929                 LASSERT(set);
2930                 ptlrpc_set_add_req(set, req);
2931                 ptlrpc_check_set(NULL, set);
2932         } else {
2933                 ptlrpcd_add_req(req);
2934         }
2935
2936         return 0;
2937 }
2938
2939 static int osc_reconnect(const struct lu_env *env,
2940                          struct obd_export *exp, struct obd_device *obd,
2941                          struct obd_uuid *cluuid,
2942                          struct obd_connect_data *data,
2943                          void *localdata)
2944 {
2945         struct client_obd *cli = &obd->u.cli;
2946
2947         if (data && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2948                 long lost_grant;
2949
2950                 client_obd_list_lock(&cli->cl_loi_list_lock);
2951                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2952                                 2 * cli_brw_size(obd);
2953                 lost_grant = cli->cl_lost_grant;
2954                 cli->cl_lost_grant = 0;
2955                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2956
2957                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
2958                        data->ocd_connect_flags,
2959                        data->ocd_version, data->ocd_grant, lost_grant);
2960         }
2961
2962         return 0;
2963 }
2964
2965 static int osc_disconnect(struct obd_export *exp)
2966 {
2967         struct obd_device *obd = class_exp2obd(exp);
2968         int rc;
2969
2970         rc = client_disconnect_export(exp);
2971         /**
2972          * Initially we put del_shrink_grant before disconnect_export, but it
2973          * causes the following problem if setup (connect) and cleanup
2974          * (disconnect) are tangled together.
2975          *      connect p1                   disconnect p2
2976          *   ptlrpc_connect_import
2977          *     ...............         class_manual_cleanup
2978          *                                   osc_disconnect
2979          *                                   del_shrink_grant
2980          *   ptlrpc_connect_interrupt
2981          *     init_grant_shrink
2982          *   add this client to shrink list
2983          *                                    cleanup_osc
2984          * Bang! pinger trigger the shrink.
2985          * So the osc should be disconnected from the shrink list, after we
2986          * are sure the import has been destroyed. BUG18662
2987          */
2988         if (!obd->u.cli.cl_import)
2989                 osc_del_shrink_grant(&obd->u.cli);
2990         return rc;
2991 }
2992
2993 static int osc_import_event(struct obd_device *obd,
2994                             struct obd_import *imp,
2995                             enum obd_import_event event)
2996 {
2997         struct client_obd *cli;
2998         int rc = 0;
2999
3000         LASSERT(imp->imp_obd == obd);
3001
3002         switch (event) {
3003         case IMP_EVENT_DISCON: {
3004                 cli = &obd->u.cli;
3005                 client_obd_list_lock(&cli->cl_loi_list_lock);
3006                 cli->cl_avail_grant = 0;
3007                 cli->cl_lost_grant = 0;
3008                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3009                 break;
3010         }
3011         case IMP_EVENT_INACTIVE: {
3012                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3013                 break;
3014         }
3015         case IMP_EVENT_INVALIDATE: {
3016                 struct ldlm_namespace *ns = obd->obd_namespace;
3017                 struct lu_env *env;
3018                 int refcheck;
3019
3020                 env = cl_env_get(&refcheck);
3021                 if (!IS_ERR(env)) {
3022                         /* Reset grants */
3023                         cli = &obd->u.cli;
3024                         /* all pages go to failing rpcs due to the invalid
3025                          * import */
3026                         osc_io_unplug(env, cli, NULL);
3027
3028                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3029                         cl_env_put(env, &refcheck);
3030                 } else
3031                         rc = PTR_ERR(env);
3032                 break;
3033         }
3034         case IMP_EVENT_ACTIVE: {
3035                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3036                 break;
3037         }
3038         case IMP_EVENT_OCD: {
3039                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3040
3041                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3042                         osc_init_grant(&obd->u.cli, ocd);
3043
3044                 /* See bug 7198 */
3045                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3046                         imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
3047
3048                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3049                 break;
3050         }
3051         case IMP_EVENT_DEACTIVATE: {
3052                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3053                 break;
3054         }
3055         case IMP_EVENT_ACTIVATE: {
3056                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3057                 break;
3058         }
3059         default:
3060                 CERROR("Unknown import event %d\n", event);
3061                 LBUG();
3062         }
3063         return rc;
3064 }
3065
3066 /**
3067  * Determine whether the lock can be canceled before replaying the lock
3068  * during recovery, see bug16774 for detailed information.
3069  *
3070  * \retval zero the lock can't be canceled
3071  * \retval other ok to cancel
3072  */
3073 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3074 {
3075         check_res_locked(lock->l_resource);
3076
3077         /*
3078          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3079          *
3080          * XXX as a future improvement, we can also cancel unused write lock
3081          * if it doesn't have dirty data and active mmaps.
3082          */
3083         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3084             (lock->l_granted_mode == LCK_PR ||
3085              lock->l_granted_mode == LCK_CR) &&
3086             (osc_dlm_lock_pageref(lock) == 0))
3087                 return 1;
3088
3089         return 0;
3090 }
3091
3092 static int brw_queue_work(const struct lu_env *env, void *data)
3093 {
3094         struct client_obd *cli = data;
3095
3096         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3097
3098         osc_io_unplug(env, cli, NULL);
3099         return 0;
3100 }
3101
3102 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3103 {
3104         struct lprocfs_static_vars lvars = { NULL };
3105         struct client_obd *cli = &obd->u.cli;
3106         void *handler;
3107         int rc;
3108         int adding;
3109         int added;
3110         int req_count;
3111
3112         rc = ptlrpcd_addref();
3113         if (rc)
3114                 return rc;
3115
3116         rc = client_obd_setup(obd, lcfg);
3117         if (rc)
3118                 goto out_ptlrpcd;
3119
3120         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3121         if (IS_ERR(handler)) {
3122                 rc = PTR_ERR(handler);
3123                 goto out_client_setup;
3124         }
3125         cli->cl_writeback_work = handler;
3126
3127         rc = osc_quota_setup(obd);
3128         if (rc)
3129                 goto out_ptlrpcd_work;
3130
3131         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3132         lprocfs_osc_init_vars(&lvars);
3133         if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
3134                 lproc_osc_attach_seqstat(obd);
3135                 sptlrpc_lprocfs_cliobd_attach(obd);
3136                 ptlrpc_lprocfs_register_obd(obd);
3137         }
3138
3139         /*
3140          * We try to control the total number of requests with a upper limit
3141          * osc_reqpool_maxreqcount. There might be some race which will cause
3142          * over-limit allocation, but it is fine.
3143          */
3144         req_count = atomic_read(&osc_pool_req_count);
3145         if (req_count < osc_reqpool_maxreqcount) {
3146                 adding = cli->cl_max_rpcs_in_flight + 2;
3147                 if (req_count + adding > osc_reqpool_maxreqcount)
3148                         adding = osc_reqpool_maxreqcount - req_count;
3149
3150                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3151                 atomic_add(added, &osc_pool_req_count);
3152         }
3153
3154         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3155         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3156         return rc;
3157
3158 out_ptlrpcd_work:
3159         ptlrpcd_destroy_work(handler);
3160 out_client_setup:
3161         client_obd_cleanup(obd);
3162 out_ptlrpcd:
3163         ptlrpcd_decref();
3164         return rc;
3165 }
3166
3167 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3168 {
3169         switch (stage) {
3170         case OBD_CLEANUP_EARLY: {
3171                 struct obd_import *imp;
3172
3173                 imp = obd->u.cli.cl_import;
3174                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3175                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3176                 ptlrpc_deactivate_import(imp);
3177                 spin_lock(&imp->imp_lock);
3178                 imp->imp_pingable = 0;
3179                 spin_unlock(&imp->imp_lock);
3180                 break;
3181         }
3182         case OBD_CLEANUP_EXPORTS: {
3183                 struct client_obd *cli = &obd->u.cli;
3184                 /* LU-464
3185                  * for echo client, export may be on zombie list, wait for
3186                  * zombie thread to cull it, because cli.cl_import will be
3187                  * cleared in client_disconnect_export():
3188                  *   class_export_destroy() -> obd_cleanup() ->
3189                  *   echo_device_free() -> echo_client_cleanup() ->
3190                  *   obd_disconnect() -> osc_disconnect() ->
3191                  *   client_disconnect_export()
3192                  */
3193                 obd_zombie_barrier();
3194                 if (cli->cl_writeback_work) {
3195                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3196                         cli->cl_writeback_work = NULL;
3197                 }
3198                 obd_cleanup_client_import(obd);
3199                 ptlrpc_lprocfs_unregister_obd(obd);
3200                 lprocfs_obd_cleanup(obd);
3201                 break;
3202                 }
3203         }
3204         return 0;
3205 }
3206
3207 static int osc_cleanup(struct obd_device *obd)
3208 {
3209         struct client_obd *cli = &obd->u.cli;
3210         int rc;
3211
3212         /* lru cleanup */
3213         if (cli->cl_cache) {
3214                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3215                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3216                 list_del_init(&cli->cl_lru_osc);
3217                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3218                 cli->cl_lru_left = NULL;
3219                 atomic_dec(&cli->cl_cache->ccc_users);
3220                 cli->cl_cache = NULL;
3221         }
3222
3223         /* free memory of osc quota cache */
3224         osc_quota_cleanup(obd);
3225
3226         rc = client_obd_cleanup(obd);
3227
3228         ptlrpcd_decref();
3229         return rc;
3230 }
3231
3232 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3233 {
3234         struct lprocfs_static_vars lvars = { NULL };
3235         int rc = 0;
3236
3237         lprocfs_osc_init_vars(&lvars);
3238
3239         switch (lcfg->lcfg_command) {
3240         default:
3241                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3242                                               lcfg, obd);
3243                 if (rc > 0)
3244                         rc = 0;
3245                 break;
3246         }
3247
3248         return rc;
3249 }
3250
3251 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3252 {
3253         return osc_process_config_base(obd, buf);
3254 }
3255
3256 static struct obd_ops osc_obd_ops = {
3257         .owner          = THIS_MODULE,
3258         .setup          = osc_setup,
3259         .precleanup     = osc_precleanup,
3260         .cleanup        = osc_cleanup,
3261         .add_conn       = client_import_add_conn,
3262         .del_conn       = client_import_del_conn,
3263         .connect        = client_connect_import,
3264         .reconnect      = osc_reconnect,
3265         .disconnect     = osc_disconnect,
3266         .statfs         = osc_statfs,
3267         .statfs_async   = osc_statfs_async,
3268         .packmd         = osc_packmd,
3269         .unpackmd       = osc_unpackmd,
3270         .create         = osc_create,
3271         .destroy        = osc_destroy,
3272         .getattr        = osc_getattr,
3273         .getattr_async  = osc_getattr_async,
3274         .setattr        = osc_setattr,
3275         .setattr_async  = osc_setattr_async,
3276         .find_cbdata    = osc_find_cbdata,
3277         .iocontrol      = osc_iocontrol,
3278         .get_info       = osc_get_info,
3279         .set_info_async = osc_set_info_async,
3280         .import_event   = osc_import_event,
3281         .process_config = osc_process_config,
3282         .quotactl       = osc_quotactl,
3283         .quotacheck     = osc_quotacheck,
3284 };
3285
3286 extern struct lu_kmem_descr osc_caches[];
3287 extern spinlock_t osc_ast_guard;
3288 extern struct lock_class_key osc_ast_guard_class;
3289
3290 static int __init osc_init(void)
3291 {
3292         struct lprocfs_static_vars lvars = { NULL };
3293         unsigned int reqpool_size;
3294         unsigned int reqsize;
3295         int rc;
3296
3297         /* print an address of _any_ initialized kernel symbol from this
3298          * module, to allow debugging with gdb that doesn't support data
3299          * symbols from modules.*/
3300         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3301
3302         rc = lu_kmem_init(osc_caches);
3303         if (rc)
3304                 return rc;
3305
3306         lprocfs_osc_init_vars(&lvars);
3307
3308         rc = class_register_type(&osc_obd_ops, NULL,
3309                                  LUSTRE_OSC_NAME, &osc_device_type);
3310         if (rc)
3311                 goto out_kmem;
3312
3313         spin_lock_init(&osc_ast_guard);
3314         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3315
3316         /* This is obviously too much memory, only prevent overflow here */
3317         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
3318                 rc = -EINVAL;
3319                 goto out_type;
3320         }
3321
3322         reqpool_size = osc_reqpool_mem_max << 20;
3323
3324         reqsize = 1;
3325         while (reqsize < OST_MAXREQSIZE)
3326                 reqsize = reqsize << 1;
3327
3328         /*
3329          * We don't enlarge the request count in OSC pool according to
3330          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3331          * tried after normal allocation failed. So a small OSC pool won't
3332          * cause much performance degression in most of cases.
3333          */
3334         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3335
3336         atomic_set(&osc_pool_req_count, 0);
3337         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
3338                                           ptlrpc_add_rqs_to_pool);
3339
3340         if (osc_rq_pool)
3341                 return 0;
3342
3343         rc = -ENOMEM;
3344
3345 out_type:
3346         class_unregister_type(LUSTRE_OSC_NAME);
3347 out_kmem:
3348         lu_kmem_fini(osc_caches);
3349         return rc;
3350 }
3351
3352 static void /*__exit*/ osc_exit(void)
3353 {
3354         class_unregister_type(LUSTRE_OSC_NAME);
3355         lu_kmem_fini(osc_caches);
3356         ptlrpc_free_rq_pool(osc_rq_pool);
3357 }
3358
3359 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3360 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3361 MODULE_LICENSE("GPL");
3362 MODULE_VERSION(LUSTRE_VERSION_STRING);
3363
3364 module_init(osc_init);
3365 module_exit(osc_exit);