]> git.karo-electronics.de Git - linux-beck.git/blob - drivers/staging/lustre/lustre/osc/osc_request.c
iio: adc: ina2xx: Fix incorrect report of data endianness to userspace.
[linux-beck.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include "../../include/linux/libcfs/libcfs.h"
40
41 #include "../include/lustre_dlm.h"
42 #include "../include/lustre_net.h"
43 #include "../include/lustre/lustre_user.h"
44 #include "../include/obd_cksum.h"
45
46 #include "../include/lustre_ha.h"
47 #include "../include/lprocfs_status.h"
48 #include "../include/lustre_debug.h"
49 #include "../include/lustre_param.h"
50 #include "../include/lustre_fid.h"
51 #include "../include/obd_class.h"
52 #include "../include/obd.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55
56 atomic_t osc_pool_req_count;
57 unsigned int osc_reqpool_maxreqcount;
58 struct ptlrpc_request_pool *osc_rq_pool;
59
60 /* max memory used for request pool, unit is MB */
61 static unsigned int osc_reqpool_mem_max = 5;
62 module_param(osc_reqpool_mem_max, uint, 0444);
63
64 struct osc_brw_async_args {
65         struct obdo       *aa_oa;
66         int             aa_requested_nob;
67         int             aa_nio_count;
68         u32             aa_page_count;
69         int             aa_resends;
70         struct brw_page  **aa_ppga;
71         struct client_obd *aa_cli;
72         struct list_head         aa_oaps;
73         struct list_head         aa_exts;
74         struct cl_req     *aa_clerq;
75 };
76
77 struct osc_async_args {
78         struct obd_info   *aa_oi;
79 };
80
81 struct osc_setattr_args {
82         struct obdo      *sa_oa;
83         obd_enqueue_update_f sa_upcall;
84         void            *sa_cookie;
85 };
86
87 struct osc_fsync_args {
88         struct obd_info     *fa_oi;
89         obd_enqueue_update_f fa_upcall;
90         void            *fa_cookie;
91 };
92
93 struct osc_enqueue_args {
94         struct obd_export       *oa_exp;
95         __u64               *oa_flags;
96         obd_enqueue_update_f      oa_upcall;
97         void                 *oa_cookie;
98         struct ost_lvb     *oa_lvb;
99         struct lustre_handle     *oa_lockh;
100         struct ldlm_enqueue_info *oa_ei;
101         unsigned int          oa_agl:1;
102 };
103
104 static void osc_release_ppga(struct brw_page **ppga, u32 count);
105 static int brw_interpret(const struct lu_env *env,
106                          struct ptlrpc_request *req, void *data, int rc);
107 static int osc_cleanup(struct obd_device *obd);
108
109 /* Pack OSC object metadata for disk storage (LE byte order). */
110 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
111                       struct lov_stripe_md *lsm)
112 {
113         int lmm_size;
114
115         lmm_size = sizeof(**lmmp);
116         if (lmmp == NULL)
117                 return lmm_size;
118
119         if (*lmmp != NULL && lsm == NULL) {
120                 kfree(*lmmp);
121                 *lmmp = NULL;
122                 return 0;
123         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
124                 return -EBADF;
125         }
126
127         if (*lmmp == NULL) {
128                 *lmmp = kzalloc(lmm_size, GFP_NOFS);
129                 if (!*lmmp)
130                         return -ENOMEM;
131         }
132
133         if (lsm)
134                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
135
136         return lmm_size;
137 }
138
139 /* Unpack OSC object metadata from disk storage (LE byte order). */
140 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
141                         struct lov_mds_md *lmm, int lmm_bytes)
142 {
143         int lsm_size;
144         struct obd_import *imp = class_exp2cliimp(exp);
145
146         if (lmm != NULL) {
147                 if (lmm_bytes < sizeof(*lmm)) {
148                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
149                                exp->exp_obd->obd_name, lmm_bytes,
150                                (int)sizeof(*lmm));
151                         return -EINVAL;
152                 }
153                 /* XXX LOV_MAGIC etc check? */
154
155                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
156                         CERROR("%s: zero lmm_object_id: rc = %d\n",
157                                exp->exp_obd->obd_name, -EINVAL);
158                         return -EINVAL;
159                 }
160         }
161
162         lsm_size = lov_stripe_md_size(1);
163         if (lsmp == NULL)
164                 return lsm_size;
165
166         if (*lsmp != NULL && lmm == NULL) {
167                 kfree((*lsmp)->lsm_oinfo[0]);
168                 kfree(*lsmp);
169                 *lsmp = NULL;
170                 return 0;
171         }
172
173         if (*lsmp == NULL) {
174                 *lsmp = kzalloc(lsm_size, GFP_NOFS);
175                 if (unlikely(*lsmp == NULL))
176                         return -ENOMEM;
177                 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
178                                                 GFP_NOFS);
179                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
180                         kfree(*lsmp);
181                         return -ENOMEM;
182                 }
183                 loi_init((*lsmp)->lsm_oinfo[0]);
184         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
185                 return -EBADF;
186         }
187
188         if (lmm != NULL)
189                 /* XXX zero *lsmp? */
190                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
191
192         if (imp != NULL &&
193             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
194                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
195         else
196                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
197
198         return lsm_size;
199 }
200
201 static inline void osc_pack_req_body(struct ptlrpc_request *req,
202                                      struct obd_info *oinfo)
203 {
204         struct ost_body *body;
205
206         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
207         LASSERT(body);
208
209         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
210                              oinfo->oi_oa);
211 }
212
213 static int osc_getattr_interpret(const struct lu_env *env,
214                                  struct ptlrpc_request *req,
215                                  struct osc_async_args *aa, int rc)
216 {
217         struct ost_body *body;
218
219         if (rc != 0)
220                 goto out;
221
222         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
223         if (body) {
224                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
225                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
226                                      aa->aa_oi->oi_oa, &body->oa);
227
228                 /* This should really be sent by the OST */
229                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
230                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
231         } else {
232                 CDEBUG(D_INFO, "can't unpack ost_body\n");
233                 rc = -EPROTO;
234                 aa->aa_oi->oi_oa->o_valid = 0;
235         }
236 out:
237         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
238         return rc;
239 }
240
241 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
242                              struct ptlrpc_request_set *set)
243 {
244         struct ptlrpc_request *req;
245         struct osc_async_args *aa;
246         int rc;
247
248         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
249         if (req == NULL)
250                 return -ENOMEM;
251
252         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
253         if (rc) {
254                 ptlrpc_request_free(req);
255                 return rc;
256         }
257
258         osc_pack_req_body(req, oinfo);
259
260         ptlrpc_request_set_replen(req);
261         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
262
263         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
264         aa = ptlrpc_req_async_args(req);
265         aa->aa_oi = oinfo;
266
267         ptlrpc_set_add_req(set, req);
268         return 0;
269 }
270
271 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
272                        struct obd_info *oinfo)
273 {
274         struct ptlrpc_request *req;
275         struct ost_body *body;
276         int rc;
277
278         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
279         if (req == NULL)
280                 return -ENOMEM;
281
282         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
283         if (rc) {
284                 ptlrpc_request_free(req);
285                 return rc;
286         }
287
288         osc_pack_req_body(req, oinfo);
289
290         ptlrpc_request_set_replen(req);
291
292         rc = ptlrpc_queue_wait(req);
293         if (rc)
294                 goto out;
295
296         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
297         if (body == NULL) {
298                 rc = -EPROTO;
299                 goto out;
300         }
301
302         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
303         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
304                              &body->oa);
305
306         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
307         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
308
309  out:
310         ptlrpc_req_finished(req);
311         return rc;
312 }
313
314 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
315                        struct obd_info *oinfo, struct obd_trans_info *oti)
316 {
317         struct ptlrpc_request *req;
318         struct ost_body *body;
319         int rc;
320
321         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
322
323         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324         if (req == NULL)
325                 return -ENOMEM;
326
327         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
328         if (rc) {
329                 ptlrpc_request_free(req);
330                 return rc;
331         }
332
333         osc_pack_req_body(req, oinfo);
334
335         ptlrpc_request_set_replen(req);
336
337         rc = ptlrpc_queue_wait(req);
338         if (rc)
339                 goto out;
340
341         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
342         if (body == NULL) {
343                 rc = -EPROTO;
344                 goto out;
345         }
346
347         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
348                              &body->oa);
349
350 out:
351         ptlrpc_req_finished(req);
352         return rc;
353 }
354
355 static int osc_setattr_interpret(const struct lu_env *env,
356                                  struct ptlrpc_request *req,
357                                  struct osc_setattr_args *sa, int rc)
358 {
359         struct ost_body *body;
360
361         if (rc != 0)
362                 goto out;
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL) {
366                 rc = -EPROTO;
367                 goto out;
368         }
369
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
371                              &body->oa);
372 out:
373         rc = sa->sa_upcall(sa->sa_cookie, rc);
374         return rc;
375 }
376
377 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
378                            struct obd_trans_info *oti,
379                            obd_enqueue_update_f upcall, void *cookie,
380                            struct ptlrpc_request_set *rqset)
381 {
382         struct ptlrpc_request *req;
383         struct osc_setattr_args *sa;
384         int rc;
385
386         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
387         if (req == NULL)
388                 return -ENOMEM;
389
390         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
391         if (rc) {
392                 ptlrpc_request_free(req);
393                 return rc;
394         }
395
396         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398
399         osc_pack_req_body(req, oinfo);
400
401         ptlrpc_request_set_replen(req);
402
403         /* do mds to ost setattr asynchronously */
404         if (!rqset) {
405                 /* Do not wait for response. */
406                 ptlrpcd_add_req(req);
407         } else {
408                 req->rq_interpret_reply =
409                         (ptlrpc_interpterer_t)osc_setattr_interpret;
410
411                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
412                 sa = ptlrpc_req_async_args(req);
413                 sa->sa_oa = oinfo->oi_oa;
414                 sa->sa_upcall = upcall;
415                 sa->sa_cookie = cookie;
416
417                 if (rqset == PTLRPCD_SET)
418                         ptlrpcd_add_req(req);
419                 else
420                         ptlrpc_set_add_req(rqset, req);
421         }
422
423         return 0;
424 }
425
426 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427                              struct obd_trans_info *oti,
428                              struct ptlrpc_request_set *rqset)
429 {
430         return osc_setattr_async_base(exp, oinfo, oti,
431                                       oinfo->oi_cb_up, oinfo, rqset);
432 }
433
434 static int osc_real_create(struct obd_export *exp, struct obdo *oa,
435                            struct lov_stripe_md **ea,
436                            struct obd_trans_info *oti)
437 {
438         struct ptlrpc_request *req;
439         struct ost_body *body;
440         struct lov_stripe_md *lsm;
441         int rc;
442
443         LASSERT(oa);
444         LASSERT(ea);
445
446         lsm = *ea;
447         if (!lsm) {
448                 rc = obd_alloc_memmd(exp, &lsm);
449                 if (rc < 0)
450                         return rc;
451         }
452
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
454         if (req == NULL) {
455                 rc = -ENOMEM;
456                 goto out;
457         }
458
459         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
460         if (rc) {
461                 ptlrpc_request_free(req);
462                 goto out;
463         }
464
465         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
466         LASSERT(body);
467
468         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
469
470         ptlrpc_request_set_replen(req);
471
472         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
473             oa->o_flags == OBD_FL_DELORPHAN) {
474                 DEBUG_REQ(D_HA, req,
475                           "delorphan from OST integration");
476                 /* Don't resend the delorphan req */
477                 req->rq_no_resend = req->rq_no_delay = 1;
478         }
479
480         rc = ptlrpc_queue_wait(req);
481         if (rc)
482                 goto out_req;
483
484         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
485         if (body == NULL) {
486                 rc = -EPROTO;
487                 goto out_req;
488         }
489
490         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
491         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
492
493         oa->o_blksize = cli_brw_size(exp->exp_obd);
494         oa->o_valid |= OBD_MD_FLBLKSZ;
495
496         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
497          * have valid lsm_oinfo data structs, so don't go touching that.
498          * This needs to be fixed in a big way.
499          */
500         lsm->lsm_oi = oa->o_oi;
501         *ea = lsm;
502
503         if (oti != NULL) {
504                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
505
506                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
507                         if (!oti->oti_logcookies)
508                                 oti_alloc_cookies(oti, 1);
509                         *oti->oti_logcookies = oa->o_lcookie;
510                 }
511         }
512
513         CDEBUG(D_HA, "transno: %lld\n",
514                lustre_msg_get_transno(req->rq_repmsg));
515 out_req:
516         ptlrpc_req_finished(req);
517 out:
518         if (rc && !*ea)
519                 obd_free_memmd(exp, &lsm);
520         return rc;
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
524                    obd_enqueue_update_f upcall, void *cookie,
525                    struct ptlrpc_request_set *rqset)
526 {
527         struct ptlrpc_request *req;
528         struct osc_setattr_args *sa;
529         struct ost_body *body;
530         int rc;
531
532         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533         if (req == NULL)
534                 return -ENOMEM;
535
536         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
537         if (rc) {
538                 ptlrpc_request_free(req);
539                 return rc;
540         }
541         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
542         ptlrpc_at_set_req_timeout(req);
543
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
547                              oinfo->oi_oa);
548
549         ptlrpc_request_set_replen(req);
550
551         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
552         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
553         sa = ptlrpc_req_async_args(req);
554         sa->sa_oa = oinfo->oi_oa;
555         sa->sa_upcall = upcall;
556         sa->sa_cookie = cookie;
557         if (rqset == PTLRPCD_SET)
558                 ptlrpcd_add_req(req);
559         else
560                 ptlrpc_set_add_req(rqset, req);
561
562         return 0;
563 }
564
565 static int osc_sync_interpret(const struct lu_env *env,
566                               struct ptlrpc_request *req,
567                               void *arg, int rc)
568 {
569         struct osc_fsync_args *fa = arg;
570         struct ost_body *body;
571
572         if (rc)
573                 goto out;
574
575         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
576         if (body == NULL) {
577                 CERROR("can't unpack ost_body\n");
578                 rc = -EPROTO;
579                 goto out;
580         }
581
582         *fa->fa_oi->oi_oa = body->oa;
583 out:
584         rc = fa->fa_upcall(fa->fa_cookie, rc);
585         return rc;
586 }
587
588 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
589                   obd_enqueue_update_f upcall, void *cookie,
590                   struct ptlrpc_request_set *rqset)
591 {
592         struct ptlrpc_request *req;
593         struct ost_body *body;
594         struct osc_fsync_args *fa;
595         int rc;
596
597         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
598         if (req == NULL)
599                 return -ENOMEM;
600
601         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
602         if (rc) {
603                 ptlrpc_request_free(req);
604                 return rc;
605         }
606
607         /* overload the size and blocks fields in the oa with start/end */
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
611                              oinfo->oi_oa);
612
613         ptlrpc_request_set_replen(req);
614         req->rq_interpret_reply = osc_sync_interpret;
615
616         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
617         fa = ptlrpc_req_async_args(req);
618         fa->fa_oi = oinfo;
619         fa->fa_upcall = upcall;
620         fa->fa_cookie = cookie;
621
622         if (rqset == PTLRPCD_SET)
623                 ptlrpcd_add_req(req);
624         else
625                 ptlrpc_set_add_req(rqset, req);
626
627         return 0;
628 }
629
630 /* Find and cancel locally locks matched by @mode in the resource found by
631  * @objid. Found locks are added into @cancel list. Returns the amount of
632  * locks added to @cancels list. */
633 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
634                                    struct list_head *cancels,
635                                    ldlm_mode_t mode, __u64 lock_flags)
636 {
637         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
638         struct ldlm_res_id res_id;
639         struct ldlm_resource *res;
640         int count;
641
642         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
643          * export) but disabled through procfs (flag in NS).
644          *
645          * This distinguishes from a case when ELC is not supported originally,
646          * when we still want to cancel locks in advance and just cancel them
647          * locally, without sending any RPC. */
648         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
649                 return 0;
650
651         ostid_build_res_name(&oa->o_oi, &res_id);
652         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
653         if (res == NULL)
654                 return 0;
655
656         LDLM_RESOURCE_ADDREF(res);
657         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
658                                            lock_flags, 0, NULL);
659         LDLM_RESOURCE_DELREF(res);
660         ldlm_resource_putref(res);
661         return count;
662 }
663
664 static int osc_destroy_interpret(const struct lu_env *env,
665                                  struct ptlrpc_request *req, void *data,
666                                  int rc)
667 {
668         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
669
670         atomic_dec(&cli->cl_destroy_in_flight);
671         wake_up(&cli->cl_destroy_waitq);
672         return 0;
673 }
674
675 static int osc_can_send_destroy(struct client_obd *cli)
676 {
677         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
678             cli->cl_max_rpcs_in_flight) {
679                 /* The destroy request can be sent */
680                 return 1;
681         }
682         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
683             cli->cl_max_rpcs_in_flight) {
684                 /*
685                  * The counter has been modified between the two atomic
686                  * operations.
687                  */
688                 wake_up(&cli->cl_destroy_waitq);
689         }
690         return 0;
691 }
692
693 static int osc_create(const struct lu_env *env, struct obd_export *exp,
694                       struct obdo *oa, struct lov_stripe_md **ea,
695                       struct obd_trans_info *oti)
696 {
697         int rc = 0;
698
699         LASSERT(oa);
700         LASSERT(ea);
701         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
702
703         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
704             oa->o_flags == OBD_FL_RECREATE_OBJS) {
705                 return osc_real_create(exp, oa, ea, oti);
706         }
707
708         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
709                 return osc_real_create(exp, oa, ea, oti);
710
711         /* we should not get here anymore */
712         LBUG();
713
714         return rc;
715 }
716
717 /* Destroy requests can be async always on the client, and we don't even really
718  * care about the return code since the client cannot do anything at all about
719  * a destroy failure.
720  * When the MDS is unlinking a filename, it saves the file objects into a
721  * recovery llog, and these object records are cancelled when the OST reports
722  * they were destroyed and sync'd to disk (i.e. transaction committed).
723  * If the client dies, or the OST is down when the object should be destroyed,
724  * the records are not cancelled, and when the OST reconnects to the MDS next,
725  * it will retrieve the llog unlink logs and then sends the log cancellation
726  * cookies to the MDS after committing destroy transactions. */
727 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
728                        struct obdo *oa, struct lov_stripe_md *ea,
729                        struct obd_trans_info *oti, struct obd_export *md_export)
730 {
731         struct client_obd *cli = &exp->exp_obd->u.cli;
732         struct ptlrpc_request *req;
733         struct ost_body *body;
734         LIST_HEAD(cancels);
735         int rc, count;
736
737         if (!oa) {
738                 CDEBUG(D_INFO, "oa NULL\n");
739                 return -EINVAL;
740         }
741
742         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
743                                         LDLM_FL_DISCARD_DATA);
744
745         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
746         if (req == NULL) {
747                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
748                 return -ENOMEM;
749         }
750
751         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
752                                0, &cancels, count);
753         if (rc) {
754                 ptlrpc_request_free(req);
755                 return rc;
756         }
757
758         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
759         ptlrpc_at_set_req_timeout(req);
760
761         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
762                 oa->o_lcookie = *oti->oti_logcookies;
763         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
764         LASSERT(body);
765         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
766
767         ptlrpc_request_set_replen(req);
768
769         /* If osc_destroy is for destroying the unlink orphan,
770          * sent from MDT to OST, which should not be blocked here,
771          * because the process might be triggered by ptlrpcd, and
772          * it is not good to block ptlrpcd thread (b=16006)*/
773         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
774                 req->rq_interpret_reply = osc_destroy_interpret;
775                 if (!osc_can_send_destroy(cli)) {
776                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
777                                                           NULL);
778
779                         /*
780                          * Wait until the number of on-going destroy RPCs drops
781                          * under max_rpc_in_flight
782                          */
783                         l_wait_event_exclusive(cli->cl_destroy_waitq,
784                                                osc_can_send_destroy(cli), &lwi);
785                 }
786         }
787
788         /* Do not wait for response */
789         ptlrpcd_add_req(req);
790         return 0;
791 }
792
793 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
794                                 long writing_bytes)
795 {
796         u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
797
798         LASSERT(!(oa->o_valid & bits));
799
800         oa->o_valid |= bits;
801         client_obd_list_lock(&cli->cl_loi_list_lock);
802         oa->o_dirty = cli->cl_dirty;
803         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
804                      cli->cl_dirty_max)) {
805                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
806                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
807                 oa->o_undirty = 0;
808         } else if (unlikely(atomic_read(&obd_dirty_pages) -
809                             atomic_read(&obd_dirty_transit_pages) >
810                             (long)(obd_max_dirty_pages + 1))) {
811                 /* The atomic_read() allowing the atomic_inc() are
812                  * not covered by a lock thus they may safely race and trip
813                  * this CERROR() unless we add in a small fudge factor (+1). */
814                 CERROR("dirty %d - %d > system dirty_max %d\n",
815                        atomic_read(&obd_dirty_pages),
816                        atomic_read(&obd_dirty_transit_pages),
817                        obd_max_dirty_pages);
818                 oa->o_undirty = 0;
819         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
820                 CERROR("dirty %lu - dirty_max %lu too big???\n",
821                        cli->cl_dirty, cli->cl_dirty_max);
822                 oa->o_undirty = 0;
823         } else {
824                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
825                                       PAGE_CACHE_SHIFT)*
826                                      (cli->cl_max_rpcs_in_flight + 1);
827                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
828         }
829         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
830         oa->o_dropped = cli->cl_lost_grant;
831         cli->cl_lost_grant = 0;
832         client_obd_list_unlock(&cli->cl_loi_list_lock);
833         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
834                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
835
836 }
837
838 void osc_update_next_shrink(struct client_obd *cli)
839 {
840         cli->cl_next_shrink_grant =
841                 cfs_time_shift(cli->cl_grant_shrink_interval);
842         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
843                cli->cl_next_shrink_grant);
844 }
845
846 static void __osc_update_grant(struct client_obd *cli, u64 grant)
847 {
848         client_obd_list_lock(&cli->cl_loi_list_lock);
849         cli->cl_avail_grant += grant;
850         client_obd_list_unlock(&cli->cl_loi_list_lock);
851 }
852
853 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
854 {
855         if (body->oa.o_valid & OBD_MD_FLGRANT) {
856                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
857                 __osc_update_grant(cli, body->oa.o_grant);
858         }
859 }
860
861 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
862                               u32 keylen, void *key, u32 vallen,
863                               void *val, struct ptlrpc_request_set *set);
864
865 static int osc_shrink_grant_interpret(const struct lu_env *env,
866                                       struct ptlrpc_request *req,
867                                       void *aa, int rc)
868 {
869         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870         struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
871         struct ost_body *body;
872
873         if (rc != 0) {
874                 __osc_update_grant(cli, oa->o_grant);
875                 goto out;
876         }
877
878         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
879         LASSERT(body);
880         osc_update_grant(cli, body);
881 out:
882         kmem_cache_free(obdo_cachep, oa);
883         return rc;
884 }
885
886 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
887 {
888         client_obd_list_lock(&cli->cl_loi_list_lock);
889         oa->o_grant = cli->cl_avail_grant / 4;
890         cli->cl_avail_grant -= oa->o_grant;
891         client_obd_list_unlock(&cli->cl_loi_list_lock);
892         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
893                 oa->o_valid |= OBD_MD_FLFLAGS;
894                 oa->o_flags = 0;
895         }
896         oa->o_flags |= OBD_FL_SHRINK_GRANT;
897         osc_update_next_shrink(cli);
898 }
899
900 /* Shrink the current grant, either from some large amount to enough for a
901  * full set of in-flight RPCs, or if we have already shrunk to that limit
902  * then to enough for a single RPC.  This avoids keeping more grant than
903  * needed, and avoids shrinking the grant piecemeal. */
904 static int osc_shrink_grant(struct client_obd *cli)
905 {
906         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
907                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
908
909         client_obd_list_lock(&cli->cl_loi_list_lock);
910         if (cli->cl_avail_grant <= target_bytes)
911                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
912         client_obd_list_unlock(&cli->cl_loi_list_lock);
913
914         return osc_shrink_grant_to_target(cli, target_bytes);
915 }
916
917 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
918 {
919         int rc = 0;
920         struct ost_body *body;
921
922         client_obd_list_lock(&cli->cl_loi_list_lock);
923         /* Don't shrink if we are already above or below the desired limit
924          * We don't want to shrink below a single RPC, as that will negatively
925          * impact block allocation and long-term performance. */
926         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
927                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
928
929         if (target_bytes >= cli->cl_avail_grant) {
930                 client_obd_list_unlock(&cli->cl_loi_list_lock);
931                 return 0;
932         }
933         client_obd_list_unlock(&cli->cl_loi_list_lock);
934
935         body = kzalloc(sizeof(*body), GFP_NOFS);
936         if (!body)
937                 return -ENOMEM;
938
939         osc_announce_cached(cli, &body->oa, 0);
940
941         client_obd_list_lock(&cli->cl_loi_list_lock);
942         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
943         cli->cl_avail_grant = target_bytes;
944         client_obd_list_unlock(&cli->cl_loi_list_lock);
945         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
946                 body->oa.o_valid |= OBD_MD_FLFLAGS;
947                 body->oa.o_flags = 0;
948         }
949         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
950         osc_update_next_shrink(cli);
951
952         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
953                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
954                                 sizeof(*body), body, NULL);
955         if (rc != 0)
956                 __osc_update_grant(cli, body->oa.o_grant);
957         kfree(body);
958         return rc;
959 }
960
961 static int osc_should_shrink_grant(struct client_obd *client)
962 {
963         unsigned long time = cfs_time_current();
964         unsigned long next_shrink = client->cl_next_shrink_grant;
965
966         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
967              OBD_CONNECT_GRANT_SHRINK) == 0)
968                 return 0;
969
970         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
971                 /* Get the current RPC size directly, instead of going via:
972                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
973                  * Keep comment here so that it can be found by searching. */
974                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
975
976                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
977                     client->cl_avail_grant > brw_size)
978                         return 1;
979
980                 osc_update_next_shrink(client);
981         }
982         return 0;
983 }
984
985 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
986 {
987         struct client_obd *client;
988
989         list_for_each_entry(client, &item->ti_obd_list,
990                                 cl_grant_shrink_list) {
991                 if (osc_should_shrink_grant(client))
992                         osc_shrink_grant(client);
993         }
994         return 0;
995 }
996
997 static int osc_add_shrink_grant(struct client_obd *client)
998 {
999         int rc;
1000
1001         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1002                                        TIMEOUT_GRANT,
1003                                        osc_grant_shrink_grant_cb, NULL,
1004                                        &client->cl_grant_shrink_list);
1005         if (rc) {
1006                 CERROR("add grant client %s error %d\n",
1007                         client->cl_import->imp_obd->obd_name, rc);
1008                 return rc;
1009         }
1010         CDEBUG(D_CACHE, "add grant client %s \n",
1011                client->cl_import->imp_obd->obd_name);
1012         osc_update_next_shrink(client);
1013         return 0;
1014 }
1015
1016 static int osc_del_shrink_grant(struct client_obd *client)
1017 {
1018         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1019                                          TIMEOUT_GRANT);
1020 }
1021
1022 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1023 {
1024         /*
1025          * ocd_grant is the total grant amount we're expect to hold: if we've
1026          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1027          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1028          *
1029          * race is tolerable here: if we're evicted, but imp_state already
1030          * left EVICTED state, then cl_dirty must be 0 already.
1031          */
1032         client_obd_list_lock(&cli->cl_loi_list_lock);
1033         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1034                 cli->cl_avail_grant = ocd->ocd_grant;
1035         else
1036                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1037
1038         if (cli->cl_avail_grant < 0) {
1039                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1040                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1041                       ocd->ocd_grant, cli->cl_dirty);
1042                 /* workaround for servers which do not have the patch from
1043                  * LU-2679 */
1044                 cli->cl_avail_grant = ocd->ocd_grant;
1045         }
1046
1047         /* determine the appropriate chunk size used by osc_extent. */
1048         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1049         client_obd_list_unlock(&cli->cl_loi_list_lock);
1050
1051         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1052                cli->cl_import->imp_obd->obd_name,
1053                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1054
1055         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1056             list_empty(&cli->cl_grant_shrink_list))
1057                 osc_add_shrink_grant(cli);
1058 }
1059
1060 /* We assume that the reason this OSC got a short read is because it read
1061  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1062  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1063  * this stripe never got written at or beyond this stripe offset yet. */
1064 static void handle_short_read(int nob_read, u32 page_count,
1065                               struct brw_page **pga)
1066 {
1067         char *ptr;
1068         int i = 0;
1069
1070         /* skip bytes read OK */
1071         while (nob_read > 0) {
1072                 LASSERT(page_count > 0);
1073
1074                 if (pga[i]->count > nob_read) {
1075                         /* EOF inside this page */
1076                         ptr = kmap(pga[i]->pg) +
1077                                 (pga[i]->off & ~CFS_PAGE_MASK);
1078                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1079                         kunmap(pga[i]->pg);
1080                         page_count--;
1081                         i++;
1082                         break;
1083                 }
1084
1085                 nob_read -= pga[i]->count;
1086                 page_count--;
1087                 i++;
1088         }
1089
1090         /* zero remaining pages */
1091         while (page_count-- > 0) {
1092                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1093                 memset(ptr, 0, pga[i]->count);
1094                 kunmap(pga[i]->pg);
1095                 i++;
1096         }
1097 }
1098
1099 static int check_write_rcs(struct ptlrpc_request *req,
1100                            int requested_nob, int niocount,
1101                            u32 page_count, struct brw_page **pga)
1102 {
1103         int i;
1104         __u32 *remote_rcs;
1105
1106         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1107                                                   sizeof(*remote_rcs) *
1108                                                   niocount);
1109         if (remote_rcs == NULL) {
1110                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1111                 return -EPROTO;
1112         }
1113
1114         /* return error if any niobuf was in error */
1115         for (i = 0; i < niocount; i++) {
1116                 if ((int)remote_rcs[i] < 0)
1117                         return remote_rcs[i];
1118
1119                 if (remote_rcs[i] != 0) {
1120                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1121                                 i, remote_rcs[i], req);
1122                         return -EPROTO;
1123                 }
1124         }
1125
1126         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1127                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1128                        req->rq_bulk->bd_nob_transferred, requested_nob);
1129                 return -EPROTO;
1130         }
1131
1132         return 0;
1133 }
1134
1135 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1136 {
1137         if (p1->flag != p2->flag) {
1138                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1139                                   OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1140
1141                 /* warn if we try to combine flags that we don't know to be
1142                  * safe to combine */
1143                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1144                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1145                               p1->flag, p2->flag);
1146                 }
1147                 return 0;
1148         }
1149
1150         return (p1->off + p1->count == p2->off);
1151 }
1152
1153 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1154                              struct brw_page **pga, int opc,
1155                              cksum_type_t cksum_type)
1156 {
1157         __u32 cksum;
1158         int i = 0;
1159         struct cfs_crypto_hash_desc *hdesc;
1160         unsigned int bufsize;
1161         int err;
1162         unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1163
1164         LASSERT(pg_count > 0);
1165
1166         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1167         if (IS_ERR(hdesc)) {
1168                 CERROR("Unable to initialize checksum hash %s\n",
1169                        cfs_crypto_hash_name(cfs_alg));
1170                 return PTR_ERR(hdesc);
1171         }
1172
1173         while (nob > 0 && pg_count > 0) {
1174                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1175
1176                 /* corrupt the data before we compute the checksum, to
1177                  * simulate an OST->client data error */
1178                 if (i == 0 && opc == OST_READ &&
1179                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1180                         unsigned char *ptr = kmap(pga[i]->pg);
1181                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1182
1183                         memcpy(ptr + off, "bad1", min(4, nob));
1184                         kunmap(pga[i]->pg);
1185                 }
1186                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1187                                   pga[i]->off & ~CFS_PAGE_MASK,
1188                                   count);
1189                 CDEBUG(D_PAGE,
1190                        "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1191                        pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1192                        (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1193                        page_private(pga[i]->pg),
1194                        (int)(pga[i]->off & ~CFS_PAGE_MASK));
1195
1196                 nob -= pga[i]->count;
1197                 pg_count--;
1198                 i++;
1199         }
1200
1201         bufsize = 4;
1202         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1203
1204         if (err)
1205                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1206
1207         /* For sending we only compute the wrong checksum instead
1208          * of corrupting the data so it is still correct on a redo */
1209         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1210                 cksum++;
1211
1212         return cksum;
1213 }
1214
1215 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1216                                 struct obdo *oa,
1217                                 struct lov_stripe_md *lsm, u32 page_count,
1218                                 struct brw_page **pga,
1219                                 struct ptlrpc_request **reqp,
1220                                 int reserve,
1221                                 int resend)
1222 {
1223         struct ptlrpc_request *req;
1224         struct ptlrpc_bulk_desc *desc;
1225         struct ost_body *body;
1226         struct obd_ioobj *ioobj;
1227         struct niobuf_remote *niobuf;
1228         int niocount, i, requested_nob, opc, rc;
1229         struct osc_brw_async_args *aa;
1230         struct req_capsule *pill;
1231         struct brw_page *pg_prev;
1232
1233         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1234                 return -ENOMEM; /* Recoverable */
1235         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1236                 return -EINVAL; /* Fatal */
1237
1238         if ((cmd & OBD_BRW_WRITE) != 0) {
1239                 opc = OST_WRITE;
1240                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1241                                                 osc_rq_pool,
1242                                                 &RQF_OST_BRW_WRITE);
1243         } else {
1244                 opc = OST_READ;
1245                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1246         }
1247         if (req == NULL)
1248                 return -ENOMEM;
1249
1250         for (niocount = i = 1; i < page_count; i++) {
1251                 if (!can_merge_pages(pga[i - 1], pga[i]))
1252                         niocount++;
1253         }
1254
1255         pill = &req->rq_pill;
1256         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1257                              sizeof(*ioobj));
1258         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1259                              niocount * sizeof(*niobuf));
1260
1261         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1262         if (rc) {
1263                 ptlrpc_request_free(req);
1264                 return rc;
1265         }
1266         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1267         ptlrpc_at_set_req_timeout(req);
1268         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1269          * retry logic */
1270         req->rq_no_retry_einprogress = 1;
1271
1272         desc = ptlrpc_prep_bulk_imp(req, page_count,
1273                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1274                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1275                 OST_BULK_PORTAL);
1276
1277         if (desc == NULL) {
1278                 rc = -ENOMEM;
1279                 goto out;
1280         }
1281         /* NB request now owns desc and will free it when it gets freed */
1282
1283         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1284         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1285         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1286         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1287
1288         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1289
1290         obdo_to_ioobj(oa, ioobj);
1291         ioobj->ioo_bufcnt = niocount;
1292         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1293          * that might be send for this request.  The actual number is decided
1294          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1295          * "max - 1" for old client compatibility sending "0", and also so the
1296          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1297         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1298         LASSERT(page_count > 0);
1299         pg_prev = pga[0];
1300         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1301                 struct brw_page *pg = pga[i];
1302                 int poff = pg->off & ~CFS_PAGE_MASK;
1303
1304                 LASSERT(pg->count > 0);
1305                 /* make sure there is no gap in the middle of page array */
1306                 LASSERTF(page_count == 1 ||
1307                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1308                           ergo(i > 0 && i < page_count - 1,
1309                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1310                           ergo(i == page_count - 1, poff == 0)),
1311                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1312                          i, page_count, pg, pg->off, pg->count);
1313                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1314                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1315                          i, page_count,
1316                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1317                          pg_prev->pg, page_private(pg_prev->pg),
1318                          pg_prev->pg->index, pg_prev->off);
1319                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1320                         (pg->flag & OBD_BRW_SRVLOCK));
1321
1322                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1323                 requested_nob += pg->count;
1324
1325                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1326                         niobuf--;
1327                         niobuf->len += pg->count;
1328                 } else {
1329                         niobuf->offset = pg->off;
1330                         niobuf->len = pg->count;
1331                         niobuf->flags = pg->flag;
1332                 }
1333                 pg_prev = pg;
1334         }
1335
1336         LASSERTF((void *)(niobuf - niocount) ==
1337                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1338                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1339                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1340
1341         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1342         if (resend) {
1343                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1344                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1345                         body->oa.o_flags = 0;
1346                 }
1347                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1348         }
1349
1350         if (osc_should_shrink_grant(cli))
1351                 osc_shrink_grant_local(cli, &body->oa);
1352
1353         /* size[REQ_REC_OFF] still sizeof (*body) */
1354         if (opc == OST_WRITE) {
1355                 if (cli->cl_checksum &&
1356                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1357                         /* store cl_cksum_type in a local variable since
1358                          * it can be changed via lprocfs */
1359                         cksum_type_t cksum_type = cli->cl_cksum_type;
1360
1361                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1362                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1363                                 body->oa.o_flags = 0;
1364                         }
1365                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1366                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1367                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1368                                                              page_count, pga,
1369                                                              OST_WRITE,
1370                                                              cksum_type);
1371                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1372                                body->oa.o_cksum);
1373                         /* save this in 'oa', too, for later checking */
1374                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1375                         oa->o_flags |= cksum_type_pack(cksum_type);
1376                 } else {
1377                         /* clear out the checksum flag, in case this is a
1378                          * resend but cl_checksum is no longer set. b=11238 */
1379                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1380                 }
1381                 oa->o_cksum = body->oa.o_cksum;
1382                 /* 1 RC per niobuf */
1383                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1384                                      sizeof(__u32) * niocount);
1385         } else {
1386                 if (cli->cl_checksum &&
1387                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1388                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1389                                 body->oa.o_flags = 0;
1390                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1391                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1392                 }
1393         }
1394         ptlrpc_request_set_replen(req);
1395
1396         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1397         aa = ptlrpc_req_async_args(req);
1398         aa->aa_oa = oa;
1399         aa->aa_requested_nob = requested_nob;
1400         aa->aa_nio_count = niocount;
1401         aa->aa_page_count = page_count;
1402         aa->aa_resends = 0;
1403         aa->aa_ppga = pga;
1404         aa->aa_cli = cli;
1405         INIT_LIST_HEAD(&aa->aa_oaps);
1406
1407         *reqp = req;
1408         return 0;
1409
1410  out:
1411         ptlrpc_req_finished(req);
1412         return rc;
1413 }
1414
1415 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1416                                 __u32 client_cksum, __u32 server_cksum, int nob,
1417                                 u32 page_count, struct brw_page **pga,
1418                                 cksum_type_t client_cksum_type)
1419 {
1420         __u32 new_cksum;
1421         char *msg;
1422         cksum_type_t cksum_type;
1423
1424         if (server_cksum == client_cksum) {
1425                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1426                 return 0;
1427         }
1428
1429         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1430                                        oa->o_flags : 0);
1431         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1432                                       cksum_type);
1433
1434         if (cksum_type != client_cksum_type)
1435                 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1436                         ;
1437         else if (new_cksum == server_cksum)
1438                 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1439                         ;
1440         else if (new_cksum == client_cksum)
1441                 msg = "changed in transit before arrival at OST";
1442         else
1443                 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1444                         ;
1445
1446         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1447                            " object "DOSTID" extent [%llu-%llu]\n",
1448                            msg, libcfs_nid2str(peer->nid),
1449                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1450                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1451                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1452                            POSTID(&oa->o_oi), pga[0]->off,
1453                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1454         CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1455                client_cksum, client_cksum_type,
1456                server_cksum, cksum_type, new_cksum);
1457         return 1;
1458 }
1459
1460 /* Note rc enters this function as number of bytes transferred */
1461 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1462 {
1463         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1464         const lnet_process_id_t *peer =
1465                         &req->rq_import->imp_connection->c_peer;
1466         struct client_obd *cli = aa->aa_cli;
1467         struct ost_body *body;
1468         __u32 client_cksum = 0;
1469
1470         if (rc < 0 && rc != -EDQUOT) {
1471                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1472                 return rc;
1473         }
1474
1475         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1476         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1477         if (body == NULL) {
1478                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1479                 return -EPROTO;
1480         }
1481
1482         /* set/clear over quota flag for a uid/gid */
1483         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1484             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1485                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1486
1487                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1488                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1489                        body->oa.o_flags);
1490                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1491         }
1492
1493         osc_update_grant(cli, body);
1494
1495         if (rc < 0)
1496                 return rc;
1497
1498         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1499                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1500
1501         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1502                 if (rc > 0) {
1503                         CERROR("Unexpected +ve rc %d\n", rc);
1504                         return -EPROTO;
1505                 }
1506                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1507
1508                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1509                         return -EAGAIN;
1510
1511                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1512                     check_write_checksum(&body->oa, peer, client_cksum,
1513                                          body->oa.o_cksum, aa->aa_requested_nob,
1514                                          aa->aa_page_count, aa->aa_ppga,
1515                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1516                         return -EAGAIN;
1517
1518                 rc = check_write_rcs(req, aa->aa_requested_nob,
1519                                      aa->aa_nio_count,
1520                                      aa->aa_page_count, aa->aa_ppga);
1521                 goto out;
1522         }
1523
1524         /* The rest of this function executes only for OST_READs */
1525
1526         /* if unwrap_bulk failed, return -EAGAIN to retry */
1527         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1528         if (rc < 0) {
1529                 rc = -EAGAIN;
1530                 goto out;
1531         }
1532
1533         if (rc > aa->aa_requested_nob) {
1534                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1535                        aa->aa_requested_nob);
1536                 return -EPROTO;
1537         }
1538
1539         if (rc != req->rq_bulk->bd_nob_transferred) {
1540                 CERROR("Unexpected rc %d (%d transferred)\n",
1541                         rc, req->rq_bulk->bd_nob_transferred);
1542                 return -EPROTO;
1543         }
1544
1545         if (rc < aa->aa_requested_nob)
1546                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1547
1548         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1549                 static int cksum_counter;
1550                 __u32 server_cksum = body->oa.o_cksum;
1551                 char *via = "";
1552                 char *router = "";
1553                 cksum_type_t cksum_type;
1554
1555                 cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
1556                                                body->oa.o_flags : 0);
1557                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1558                                                  aa->aa_ppga, OST_READ,
1559                                                  cksum_type);
1560
1561                 if (peer->nid != req->rq_bulk->bd_sender) {
1562                         via = " via ";
1563                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1564                 }
1565
1566                 if (server_cksum != client_cksum) {
1567                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1568                                            req->rq_import->imp_obd->obd_name,
1569                                            libcfs_nid2str(peer->nid),
1570                                            via, router,
1571                                            body->oa.o_valid & OBD_MD_FLFID ?
1572                                            body->oa.o_parent_seq : (__u64)0,
1573                                            body->oa.o_valid & OBD_MD_FLFID ?
1574                                            body->oa.o_parent_oid : 0,
1575                                            body->oa.o_valid & OBD_MD_FLFID ?
1576                                            body->oa.o_parent_ver : 0,
1577                                            POSTID(&body->oa.o_oi),
1578                                            aa->aa_ppga[0]->off,
1579                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1580                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1581                                            1);
1582                         CERROR("client %x, server %x, cksum_type %x\n",
1583                                client_cksum, server_cksum, cksum_type);
1584                         cksum_counter = 0;
1585                         aa->aa_oa->o_cksum = client_cksum;
1586                         rc = -EAGAIN;
1587                 } else {
1588                         cksum_counter++;
1589                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1590                         rc = 0;
1591                 }
1592         } else if (unlikely(client_cksum)) {
1593                 static int cksum_missed;
1594
1595                 cksum_missed++;
1596                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1597                         CERROR("Checksum %u requested from %s but not sent\n",
1598                                cksum_missed, libcfs_nid2str(peer->nid));
1599         } else {
1600                 rc = 0;
1601         }
1602 out:
1603         if (rc >= 0)
1604                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1605                                      aa->aa_oa, &body->oa);
1606
1607         return rc;
1608 }
1609
1610 static int osc_brw_redo_request(struct ptlrpc_request *request,
1611                                 struct osc_brw_async_args *aa, int rc)
1612 {
1613         struct ptlrpc_request *new_req;
1614         struct osc_brw_async_args *new_aa;
1615         struct osc_async_page *oap;
1616
1617         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1618                   "redo for recoverable error %d", rc);
1619
1620         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1621                                         OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1622                                   aa->aa_cli, aa->aa_oa,
1623                                   NULL /* lsm unused by osc currently */,
1624                                   aa->aa_page_count, aa->aa_ppga,
1625                                   &new_req, 0, 1);
1626         if (rc)
1627                 return rc;
1628
1629         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1630                 if (oap->oap_request != NULL) {
1631                         LASSERTF(request == oap->oap_request,
1632                                  "request %p != oap_request %p\n",
1633                                  request, oap->oap_request);
1634                         if (oap->oap_interrupted) {
1635                                 ptlrpc_req_finished(new_req);
1636                                 return -EINTR;
1637                         }
1638                 }
1639         }
1640         /* New request takes over pga and oaps from old request.
1641          * Note that copying a list_head doesn't work, need to move it... */
1642         aa->aa_resends++;
1643         new_req->rq_interpret_reply = request->rq_interpret_reply;
1644         new_req->rq_async_args = request->rq_async_args;
1645         /* cap resend delay to the current request timeout, this is similar to
1646          * what ptlrpc does (see after_reply()) */
1647         if (aa->aa_resends > new_req->rq_timeout)
1648                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1649         else
1650                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1651         new_req->rq_generation_set = 1;
1652         new_req->rq_import_generation = request->rq_import_generation;
1653
1654         new_aa = ptlrpc_req_async_args(new_req);
1655
1656         INIT_LIST_HEAD(&new_aa->aa_oaps);
1657         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1658         INIT_LIST_HEAD(&new_aa->aa_exts);
1659         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1660         new_aa->aa_resends = aa->aa_resends;
1661
1662         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1663                 if (oap->oap_request) {
1664                         ptlrpc_req_finished(oap->oap_request);
1665                         oap->oap_request = ptlrpc_request_addref(new_req);
1666                 }
1667         }
1668
1669         /* XXX: This code will run into problem if we're going to support
1670          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1671          * and wait for all of them to be finished. We should inherit request
1672          * set from old request. */
1673         ptlrpcd_add_req(new_req);
1674
1675         DEBUG_REQ(D_INFO, new_req, "new request");
1676         return 0;
1677 }
1678
1679 /*
1680  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1681  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1682  * fine for our small page arrays and doesn't require allocation.  its an
1683  * insertion sort that swaps elements that are strides apart, shrinking the
1684  * stride down until its '1' and the array is sorted.
1685  */
1686 static void sort_brw_pages(struct brw_page **array, int num)
1687 {
1688         int stride, i, j;
1689         struct brw_page *tmp;
1690
1691         if (num == 1)
1692                 return;
1693         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1694                 ;
1695
1696         do {
1697                 stride /= 3;
1698                 for (i = stride ; i < num ; i++) {
1699                         tmp = array[i];
1700                         j = i;
1701                         while (j >= stride && array[j - stride]->off > tmp->off) {
1702                                 array[j] = array[j - stride];
1703                                 j -= stride;
1704                         }
1705                         array[j] = tmp;
1706                 }
1707         } while (stride > 1);
1708 }
1709
1710 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1711 {
1712         LASSERT(ppga != NULL);
1713         kfree(ppga);
1714 }
1715
1716 static int brw_interpret(const struct lu_env *env,
1717                          struct ptlrpc_request *req, void *data, int rc)
1718 {
1719         struct osc_brw_async_args *aa = data;
1720         struct osc_extent *ext;
1721         struct osc_extent *tmp;
1722         struct cl_object *obj = NULL;
1723         struct client_obd *cli = aa->aa_cli;
1724
1725         rc = osc_brw_fini_request(req, rc);
1726         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1727         /* When server return -EINPROGRESS, client should always retry
1728          * regardless of the number of times the bulk was resent already. */
1729         if (osc_recoverable_error(rc)) {
1730                 if (req->rq_import_generation !=
1731                     req->rq_import->imp_generation) {
1732                         CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1733                                req->rq_import->imp_obd->obd_name,
1734                                POSTID(&aa->aa_oa->o_oi), rc);
1735                 } else if (rc == -EINPROGRESS ||
1736                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1737                         rc = osc_brw_redo_request(req, aa, rc);
1738                 } else {
1739                         CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1740                                req->rq_import->imp_obd->obd_name,
1741                                POSTID(&aa->aa_oa->o_oi), rc);
1742                 }
1743
1744                 if (rc == 0)
1745                         return 0;
1746                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1747                         rc = -EIO;
1748         }
1749
1750         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1751                 if (obj == NULL && rc == 0) {
1752                         obj = osc2cl(ext->oe_obj);
1753                         cl_object_get(obj);
1754                 }
1755
1756                 list_del_init(&ext->oe_link);
1757                 osc_extent_finish(env, ext, 1, rc);
1758         }
1759         LASSERT(list_empty(&aa->aa_exts));
1760         LASSERT(list_empty(&aa->aa_oaps));
1761
1762         if (obj != NULL) {
1763                 struct obdo *oa = aa->aa_oa;
1764                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1765                 unsigned long valid = 0;
1766
1767                 LASSERT(rc == 0);
1768                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1769                         attr->cat_blocks = oa->o_blocks;
1770                         valid |= CAT_BLOCKS;
1771                 }
1772                 if (oa->o_valid & OBD_MD_FLMTIME) {
1773                         attr->cat_mtime = oa->o_mtime;
1774                         valid |= CAT_MTIME;
1775                 }
1776                 if (oa->o_valid & OBD_MD_FLATIME) {
1777                         attr->cat_atime = oa->o_atime;
1778                         valid |= CAT_ATIME;
1779                 }
1780                 if (oa->o_valid & OBD_MD_FLCTIME) {
1781                         attr->cat_ctime = oa->o_ctime;
1782                         valid |= CAT_CTIME;
1783                 }
1784                 if (valid != 0) {
1785                         cl_object_attr_lock(obj);
1786                         cl_object_attr_set(env, obj, attr, valid);
1787                         cl_object_attr_unlock(obj);
1788                 }
1789                 cl_object_put(env, obj);
1790         }
1791         kmem_cache_free(obdo_cachep, aa->aa_oa);
1792
1793         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1794                           req->rq_bulk->bd_nob_transferred);
1795         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1796         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1797
1798         client_obd_list_lock(&cli->cl_loi_list_lock);
1799         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1800          * is called so we know whether to go to sync BRWs or wait for more
1801          * RPCs to complete */
1802         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1803                 cli->cl_w_in_flight--;
1804         else
1805                 cli->cl_r_in_flight--;
1806         osc_wake_cache_waiters(cli);
1807         client_obd_list_unlock(&cli->cl_loi_list_lock);
1808
1809         osc_io_unplug(env, cli, NULL);
1810         return rc;
1811 }
1812
1813 /**
1814  * Build an RPC by the list of extent @ext_list. The caller must ensure
1815  * that the total pages in this list are NOT over max pages per RPC.
1816  * Extents in the list must be in OES_RPC state.
1817  */
1818 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1819                   struct list_head *ext_list, int cmd)
1820 {
1821         struct ptlrpc_request *req = NULL;
1822         struct osc_extent *ext;
1823         struct brw_page **pga = NULL;
1824         struct osc_brw_async_args *aa = NULL;
1825         struct obdo *oa = NULL;
1826         struct osc_async_page *oap;
1827         struct osc_async_page *tmp;
1828         struct cl_req *clerq = NULL;
1829         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1830         struct ldlm_lock *lock = NULL;
1831         struct cl_req_attr *crattr = NULL;
1832         u64 starting_offset = OBD_OBJECT_EOF;
1833         u64 ending_offset = 0;
1834         int mpflag = 0;
1835         int mem_tight = 0;
1836         int page_count = 0;
1837         int i;
1838         int rc;
1839         struct ost_body *body;
1840         LIST_HEAD(rpc_list);
1841
1842         LASSERT(!list_empty(ext_list));
1843
1844         /* add pages into rpc_list to build BRW rpc */
1845         list_for_each_entry(ext, ext_list, oe_link) {
1846                 LASSERT(ext->oe_state == OES_RPC);
1847                 mem_tight |= ext->oe_memalloc;
1848                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1849                         ++page_count;
1850                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1851                         if (starting_offset > oap->oap_obj_off)
1852                                 starting_offset = oap->oap_obj_off;
1853                         else
1854                                 LASSERT(oap->oap_page_off == 0);
1855                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1856                                 ending_offset = oap->oap_obj_off +
1857                                                 oap->oap_count;
1858                         else
1859                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1860                                         PAGE_CACHE_SIZE);
1861                 }
1862         }
1863
1864         if (mem_tight)
1865                 mpflag = cfs_memory_pressure_get_and_set();
1866
1867         crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
1868         if (!crattr) {
1869                 rc = -ENOMEM;
1870                 goto out;
1871         }
1872
1873         pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
1874         if (pga == NULL) {
1875                 rc = -ENOMEM;
1876                 goto out;
1877         }
1878
1879         oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
1880         if (oa == NULL) {
1881                 rc = -ENOMEM;
1882                 goto out;
1883         }
1884
1885         i = 0;
1886         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1887                 struct cl_page *page = oap2cl_page(oap);
1888
1889                 if (clerq == NULL) {
1890                         clerq = cl_req_alloc(env, page, crt,
1891                                              1 /* only 1-object rpcs for now */);
1892                         if (IS_ERR(clerq)) {
1893                                 rc = PTR_ERR(clerq);
1894                                 goto out;
1895                         }
1896                         lock = oap->oap_ldlm_lock;
1897                 }
1898                 if (mem_tight)
1899                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1900                 pga[i] = &oap->oap_brw_page;
1901                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1902                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1903                        pga[i]->pg, page_index(oap->oap_page), oap,
1904                        pga[i]->flag);
1905                 i++;
1906                 cl_req_page_add(env, clerq, page);
1907         }
1908
1909         /* always get the data for the obdo for the rpc */
1910         LASSERT(clerq != NULL);
1911         crattr->cra_oa = oa;
1912         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1913         if (lock) {
1914                 oa->o_handle = lock->l_remote_handle;
1915                 oa->o_valid |= OBD_MD_FLHANDLE;
1916         }
1917
1918         rc = cl_req_prep(env, clerq);
1919         if (rc != 0) {
1920                 CERROR("cl_req_prep failed: %d\n", rc);
1921                 goto out;
1922         }
1923
1924         sort_brw_pages(pga, page_count);
1925         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1926                         pga, &req, 1, 0);
1927         if (rc != 0) {
1928                 CERROR("prep_req failed: %d\n", rc);
1929                 goto out;
1930         }
1931
1932         req->rq_interpret_reply = brw_interpret;
1933
1934         if (mem_tight != 0)
1935                 req->rq_memalloc = 1;
1936
1937         /* Need to update the timestamps after the request is built in case
1938          * we race with setattr (locally or in queue at OST).  If OST gets
1939          * later setattr before earlier BRW (as determined by the request xid),
1940          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1941          * way to do this in a single call.  bug 10150 */
1942         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1943         crattr->cra_oa = &body->oa;
1944         cl_req_attr_set(env, clerq, crattr,
1945                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1946
1947         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1948
1949         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1950         aa = ptlrpc_req_async_args(req);
1951         INIT_LIST_HEAD(&aa->aa_oaps);
1952         list_splice_init(&rpc_list, &aa->aa_oaps);
1953         INIT_LIST_HEAD(&aa->aa_exts);
1954         list_splice_init(ext_list, &aa->aa_exts);
1955         aa->aa_clerq = clerq;
1956
1957         /* queued sync pages can be torn down while the pages
1958          * were between the pending list and the rpc */
1959         tmp = NULL;
1960         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1961                 /* only one oap gets a request reference */
1962                 if (tmp == NULL)
1963                         tmp = oap;
1964                 if (oap->oap_interrupted && !req->rq_intr) {
1965                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1966                                         oap, req);
1967                         ptlrpc_mark_interrupted(req);
1968                 }
1969         }
1970         if (tmp != NULL)
1971                 tmp->oap_request = ptlrpc_request_addref(req);
1972
1973         client_obd_list_lock(&cli->cl_loi_list_lock);
1974         starting_offset >>= PAGE_CACHE_SHIFT;
1975         if (cmd == OBD_BRW_READ) {
1976                 cli->cl_r_in_flight++;
1977                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1978                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1979                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1980                                       starting_offset + 1);
1981         } else {
1982                 cli->cl_w_in_flight++;
1983                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1984                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1985                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1986                                       starting_offset + 1);
1987         }
1988         client_obd_list_unlock(&cli->cl_loi_list_lock);
1989
1990         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
1991                   page_count, aa, cli->cl_r_in_flight,
1992                   cli->cl_w_in_flight);
1993
1994         ptlrpcd_add_req(req);
1995         rc = 0;
1996
1997 out:
1998         if (mem_tight != 0)
1999                 cfs_memory_pressure_restore(mpflag);
2000
2001         kfree(crattr);
2002
2003         if (rc != 0) {
2004                 LASSERT(req == NULL);
2005
2006                 if (oa)
2007                         kmem_cache_free(obdo_cachep, oa);
2008                 kfree(pga);
2009                 /* this should happen rarely and is pretty bad, it makes the
2010                  * pending list not follow the dirty order */
2011                 while (!list_empty(ext_list)) {
2012                         ext = list_entry(ext_list->next, struct osc_extent,
2013                                              oe_link);
2014                         list_del_init(&ext->oe_link);
2015                         osc_extent_finish(env, ext, 0, rc);
2016                 }
2017                 if (clerq && !IS_ERR(clerq))
2018                         cl_req_completion(env, clerq, rc);
2019         }
2020         return rc;
2021 }
2022
2023 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2024                                         struct ldlm_enqueue_info *einfo)
2025 {
2026         void *data = einfo->ei_cbdata;
2027         int set = 0;
2028
2029         LASSERT(lock != NULL);
2030         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2031         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2032         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2033         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2034
2035         lock_res_and_lock(lock);
2036         spin_lock(&osc_ast_guard);
2037
2038         if (lock->l_ast_data == NULL)
2039                 lock->l_ast_data = data;
2040         if (lock->l_ast_data == data)
2041                 set = 1;
2042
2043         spin_unlock(&osc_ast_guard);
2044         unlock_res_and_lock(lock);
2045
2046         return set;
2047 }
2048
2049 static int osc_set_data_with_check(struct lustre_handle *lockh,
2050                                    struct ldlm_enqueue_info *einfo)
2051 {
2052         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2053         int set = 0;
2054
2055         if (lock != NULL) {
2056                 set = osc_set_lock_data_with_check(lock, einfo);
2057                 LDLM_LOCK_PUT(lock);
2058         } else
2059                 CERROR("lockh %p, data %p - client evicted?\n",
2060                        lockh, einfo->ei_cbdata);
2061         return set;
2062 }
2063
2064 /* find any ldlm lock of the inode in osc
2065  * return 0    not find
2066  *      1    find one
2067  *      < 0    error */
2068 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2069                            ldlm_iterator_t replace, void *data)
2070 {
2071         struct ldlm_res_id res_id;
2072         struct obd_device *obd = class_exp2obd(exp);
2073         int rc = 0;
2074
2075         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2076         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2077         if (rc == LDLM_ITER_STOP)
2078                 return 1;
2079         if (rc == LDLM_ITER_CONTINUE)
2080                 return 0;
2081         return rc;
2082 }
2083
2084 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2085                             obd_enqueue_update_f upcall, void *cookie,
2086                             __u64 *flags, int agl, int rc)
2087 {
2088         int intent = *flags & LDLM_FL_HAS_INTENT;
2089
2090         if (intent) {
2091                 /* The request was created before ldlm_cli_enqueue call. */
2092                 if (rc == ELDLM_LOCK_ABORTED) {
2093                         struct ldlm_reply *rep;
2094
2095                         rep = req_capsule_server_get(&req->rq_pill,
2096                                                      &RMF_DLM_REP);
2097
2098                         LASSERT(rep != NULL);
2099                         rep->lock_policy_res1 =
2100                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2101                         if (rep->lock_policy_res1)
2102                                 rc = rep->lock_policy_res1;
2103                 }
2104         }
2105
2106         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2107             (rc == 0)) {
2108                 *flags |= LDLM_FL_LVB_READY;
2109                 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2110                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2111         }
2112
2113         /* Call the update callback. */
2114         rc = (*upcall)(cookie, rc);
2115         return rc;
2116 }
2117
2118 static int osc_enqueue_interpret(const struct lu_env *env,
2119                                  struct ptlrpc_request *req,
2120                                  struct osc_enqueue_args *aa, int rc)
2121 {
2122         struct ldlm_lock *lock;
2123         struct lustre_handle handle;
2124         __u32 mode;
2125         struct ost_lvb *lvb;
2126         __u32 lvb_len;
2127         __u64 *flags = aa->oa_flags;
2128
2129         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2130          * might be freed anytime after lock upcall has been called. */
2131         lustre_handle_copy(&handle, aa->oa_lockh);
2132         mode = aa->oa_ei->ei_mode;
2133
2134         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2135          * be valid. */
2136         lock = ldlm_handle2lock(&handle);
2137
2138         /* Take an additional reference so that a blocking AST that
2139          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2140          * to arrive after an upcall has been executed by
2141          * osc_enqueue_fini(). */
2142         ldlm_lock_addref(&handle, mode);
2143
2144         /* Let CP AST to grant the lock first. */
2145         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2146
2147         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2148                 lvb = NULL;
2149                 lvb_len = 0;
2150         } else {
2151                 lvb = aa->oa_lvb;
2152                 lvb_len = sizeof(*aa->oa_lvb);
2153         }
2154
2155         /* Complete obtaining the lock procedure. */
2156         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2157                                    mode, flags, lvb, lvb_len, &handle, rc);
2158         /* Complete osc stuff. */
2159         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2160                               flags, aa->oa_agl, rc);
2161
2162         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2163
2164         /* Release the lock for async request. */
2165         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2166                 /*
2167                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2168                  * not already released by
2169                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2170                  */
2171                 ldlm_lock_decref(&handle, mode);
2172
2173         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2174                  aa->oa_lockh, req, aa);
2175         ldlm_lock_decref(&handle, mode);
2176         LDLM_LOCK_PUT(lock);
2177         return rc;
2178 }
2179
2180 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2181
2182 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2183  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2184  * other synchronous requests, however keeping some locks and trying to obtain
2185  * others may take a considerable amount of time in a case of ost failure; and
2186  * when other sync requests do not get released lock from a client, the client
2187  * is excluded from the cluster -- such scenarious make the life difficult, so
2188  * release locks just after they are obtained. */
2189 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2190                      __u64 *flags, ldlm_policy_data_t *policy,
2191                      struct ost_lvb *lvb, int kms_valid,
2192                      obd_enqueue_update_f upcall, void *cookie,
2193                      struct ldlm_enqueue_info *einfo,
2194                      struct lustre_handle *lockh,
2195                      struct ptlrpc_request_set *rqset, int async, int agl)
2196 {
2197         struct obd_device *obd = exp->exp_obd;
2198         struct ptlrpc_request *req = NULL;
2199         int intent = *flags & LDLM_FL_HAS_INTENT;
2200         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2201         ldlm_mode_t mode;
2202         int rc;
2203
2204         /* Filesystem lock extents are extended to page boundaries so that
2205          * dealing with the page cache is a little smoother.  */
2206         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2207         policy->l_extent.end |= ~CFS_PAGE_MASK;
2208
2209         /*
2210          * kms is not valid when either object is completely fresh (so that no
2211          * locks are cached), or object was evicted. In the latter case cached
2212          * lock cannot be used, because it would prime inode state with
2213          * potentially stale LVB.
2214          */
2215         if (!kms_valid)
2216                 goto no_match;
2217
2218         /* Next, search for already existing extent locks that will cover us */
2219         /* If we're trying to read, we also search for an existing PW lock.  The
2220          * VFS and page cache already protect us locally, so lots of readers/
2221          * writers can share a single PW lock.
2222          *
2223          * There are problems with conversion deadlocks, so instead of
2224          * converting a read lock to a write lock, we'll just enqueue a new
2225          * one.
2226          *
2227          * At some point we should cancel the read lock instead of making them
2228          * send us a blocking callback, but there are problems with canceling
2229          * locks out from other users right now, too. */
2230         mode = einfo->ei_mode;
2231         if (einfo->ei_mode == LCK_PR)
2232                 mode |= LCK_PW;
2233         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2234                                einfo->ei_type, policy, mode, lockh, 0);
2235         if (mode) {
2236                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2237
2238                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2239                         /* For AGL, if enqueue RPC is sent but the lock is not
2240                          * granted, then skip to process this strpe.
2241                          * Return -ECANCELED to tell the caller. */
2242                         ldlm_lock_decref(lockh, mode);
2243                         LDLM_LOCK_PUT(matched);
2244                         return -ECANCELED;
2245                 }
2246
2247                 if (osc_set_lock_data_with_check(matched, einfo)) {
2248                         *flags |= LDLM_FL_LVB_READY;
2249                         /* addref the lock only if not async requests and PW
2250                          * lock is matched whereas we asked for PR. */
2251                         if (!rqset && einfo->ei_mode != mode)
2252                                 ldlm_lock_addref(lockh, LCK_PR);
2253                         if (intent) {
2254                                 /* I would like to be able to ASSERT here that
2255                                  * rss <= kms, but I can't, for reasons which
2256                                  * are explained in lov_enqueue() */
2257                         }
2258
2259                         /* We already have a lock, and it's referenced.
2260                          *
2261                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2262                          * AGL upcall may change it to CLS_HELD directly. */
2263                         (*upcall)(cookie, ELDLM_OK);
2264
2265                         if (einfo->ei_mode != mode)
2266                                 ldlm_lock_decref(lockh, LCK_PW);
2267                         else if (rqset)
2268                                 /* For async requests, decref the lock. */
2269                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2270                         LDLM_LOCK_PUT(matched);
2271                         return ELDLM_OK;
2272                 }
2273
2274                 ldlm_lock_decref(lockh, mode);
2275                 LDLM_LOCK_PUT(matched);
2276         }
2277
2278  no_match:
2279         if (intent) {
2280                 LIST_HEAD(cancels);
2281
2282                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2283                                            &RQF_LDLM_ENQUEUE_LVB);
2284                 if (req == NULL)
2285                         return -ENOMEM;
2286
2287                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2288                 if (rc) {
2289                         ptlrpc_request_free(req);
2290                         return rc;
2291                 }
2292
2293                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2294                                      sizeof(*lvb));
2295                 ptlrpc_request_set_replen(req);
2296         }
2297
2298         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2299         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2300
2301         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2302                               sizeof(*lvb), LVB_T_OST, lockh, async);
2303         if (rqset) {
2304                 if (!rc) {
2305                         struct osc_enqueue_args *aa;
2306
2307                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2308                         aa = ptlrpc_req_async_args(req);
2309                         aa->oa_ei = einfo;
2310                         aa->oa_exp = exp;
2311                         aa->oa_flags  = flags;
2312                         aa->oa_upcall = upcall;
2313                         aa->oa_cookie = cookie;
2314                         aa->oa_lvb    = lvb;
2315                         aa->oa_lockh  = lockh;
2316                         aa->oa_agl    = !!agl;
2317
2318                         req->rq_interpret_reply =
2319                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2320                         if (rqset == PTLRPCD_SET)
2321                                 ptlrpcd_add_req(req);
2322                         else
2323                                 ptlrpc_set_add_req(rqset, req);
2324                 } else if (intent) {
2325                         ptlrpc_req_finished(req);
2326                 }
2327                 return rc;
2328         }
2329
2330         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2331         if (intent)
2332                 ptlrpc_req_finished(req);
2333
2334         return rc;
2335 }
2336
2337 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2338                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2339                    __u64 *flags, void *data, struct lustre_handle *lockh,
2340                    int unref)
2341 {
2342         struct obd_device *obd = exp->exp_obd;
2343         __u64 lflags = *flags;
2344         ldlm_mode_t rc;
2345
2346         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2347                 return -EIO;
2348
2349         /* Filesystem lock extents are extended to page boundaries so that
2350          * dealing with the page cache is a little smoother */
2351         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2352         policy->l_extent.end |= ~CFS_PAGE_MASK;
2353
2354         /* Next, search for already existing extent locks that will cover us */
2355         /* If we're trying to read, we also search for an existing PW lock.  The
2356          * VFS and page cache already protect us locally, so lots of readers/
2357          * writers can share a single PW lock. */
2358         rc = mode;
2359         if (mode == LCK_PR)
2360                 rc |= LCK_PW;
2361         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2362                              res_id, type, policy, rc, lockh, unref);
2363         if (rc) {
2364                 if (data != NULL) {
2365                         if (!osc_set_data_with_check(lockh, data)) {
2366                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2367                                         ldlm_lock_decref(lockh, rc);
2368                                 return 0;
2369                         }
2370                 }
2371                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2372                         ldlm_lock_addref(lockh, LCK_PR);
2373                         ldlm_lock_decref(lockh, LCK_PW);
2374                 }
2375                 return rc;
2376         }
2377         return rc;
2378 }
2379
2380 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2381 {
2382         if (unlikely(mode == LCK_GROUP))
2383                 ldlm_lock_decref_and_cancel(lockh, mode);
2384         else
2385                 ldlm_lock_decref(lockh, mode);
2386
2387         return 0;
2388 }
2389
2390 static int osc_statfs_interpret(const struct lu_env *env,
2391                                 struct ptlrpc_request *req,
2392                                 struct osc_async_args *aa, int rc)
2393 {
2394         struct obd_statfs *msfs;
2395
2396         if (rc == -EBADR)
2397                 /* The request has in fact never been sent
2398                  * due to issues at a higher level (LOV).
2399                  * Exit immediately since the caller is
2400                  * aware of the problem and takes care
2401                  * of the clean up */
2402                  return rc;
2403
2404         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2405             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2406                 rc = 0;
2407                 goto out;
2408         }
2409
2410         if (rc != 0)
2411                 goto out;
2412
2413         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2414         if (msfs == NULL) {
2415                 rc = -EPROTO;
2416                 goto out;
2417         }
2418
2419         *aa->aa_oi->oi_osfs = *msfs;
2420 out:
2421         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2422         return rc;
2423 }
2424
2425 static int osc_statfs_async(struct obd_export *exp,
2426                             struct obd_info *oinfo, __u64 max_age,
2427                             struct ptlrpc_request_set *rqset)
2428 {
2429         struct obd_device *obd = class_exp2obd(exp);
2430         struct ptlrpc_request *req;
2431         struct osc_async_args *aa;
2432         int rc;
2433
2434         /* We could possibly pass max_age in the request (as an absolute
2435          * timestamp or a "seconds.usec ago") so the target can avoid doing
2436          * extra calls into the filesystem if that isn't necessary (e.g.
2437          * during mount that would help a bit).  Having relative timestamps
2438          * is not so great if request processing is slow, while absolute
2439          * timestamps are not ideal because they need time synchronization. */
2440         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2441         if (req == NULL)
2442                 return -ENOMEM;
2443
2444         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2445         if (rc) {
2446                 ptlrpc_request_free(req);
2447                 return rc;
2448         }
2449         ptlrpc_request_set_replen(req);
2450         req->rq_request_portal = OST_CREATE_PORTAL;
2451         ptlrpc_at_set_req_timeout(req);
2452
2453         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2454                 /* procfs requests not want stat in wait for avoid deadlock */
2455                 req->rq_no_resend = 1;
2456                 req->rq_no_delay = 1;
2457         }
2458
2459         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2460         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2461         aa = ptlrpc_req_async_args(req);
2462         aa->aa_oi = oinfo;
2463
2464         ptlrpc_set_add_req(rqset, req);
2465         return 0;
2466 }
2467
2468 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2469                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2470 {
2471         struct obd_device *obd = class_exp2obd(exp);
2472         struct obd_statfs *msfs;
2473         struct ptlrpc_request *req;
2474         struct obd_import *imp = NULL;
2475         int rc;
2476
2477         /*Since the request might also come from lprocfs, so we need
2478          *sync this with client_disconnect_export Bug15684*/
2479         down_read(&obd->u.cli.cl_sem);
2480         if (obd->u.cli.cl_import)
2481                 imp = class_import_get(obd->u.cli.cl_import);
2482         up_read(&obd->u.cli.cl_sem);
2483         if (!imp)
2484                 return -ENODEV;
2485
2486         /* We could possibly pass max_age in the request (as an absolute
2487          * timestamp or a "seconds.usec ago") so the target can avoid doing
2488          * extra calls into the filesystem if that isn't necessary (e.g.
2489          * during mount that would help a bit).  Having relative timestamps
2490          * is not so great if request processing is slow, while absolute
2491          * timestamps are not ideal because they need time synchronization. */
2492         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2493
2494         class_import_put(imp);
2495
2496         if (req == NULL)
2497                 return -ENOMEM;
2498
2499         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2500         if (rc) {
2501                 ptlrpc_request_free(req);
2502                 return rc;
2503         }
2504         ptlrpc_request_set_replen(req);
2505         req->rq_request_portal = OST_CREATE_PORTAL;
2506         ptlrpc_at_set_req_timeout(req);
2507
2508         if (flags & OBD_STATFS_NODELAY) {
2509                 /* procfs requests not want stat in wait for avoid deadlock */
2510                 req->rq_no_resend = 1;
2511                 req->rq_no_delay = 1;
2512         }
2513
2514         rc = ptlrpc_queue_wait(req);
2515         if (rc)
2516                 goto out;
2517
2518         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2519         if (msfs == NULL) {
2520                 rc = -EPROTO;
2521                 goto out;
2522         }
2523
2524         *osfs = *msfs;
2525
2526  out:
2527         ptlrpc_req_finished(req);
2528         return rc;
2529 }
2530
2531 /* Retrieve object striping information.
2532  *
2533  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2534  * the maximum number of OST indices which will fit in the user buffer.
2535  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2536  */
2537 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2538 {
2539         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2540         struct lov_user_md_v3 lum, *lumk;
2541         struct lov_user_ost_data_v1 *lmm_objects;
2542         int rc = 0, lum_size;
2543
2544         if (!lsm)
2545                 return -ENODATA;
2546
2547         /* we only need the header part from user space to get lmm_magic and
2548          * lmm_stripe_count, (the header part is common to v1 and v3) */
2549         lum_size = sizeof(struct lov_user_md_v1);
2550         if (copy_from_user(&lum, lump, lum_size))
2551                 return -EFAULT;
2552
2553         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2554             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2555                 return -EINVAL;
2556
2557         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2558         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2559         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2560         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2561
2562         /* we can use lov_mds_md_size() to compute lum_size
2563          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2564         if (lum.lmm_stripe_count > 0) {
2565                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2566                 lumk = kzalloc(lum_size, GFP_NOFS);
2567                 if (!lumk)
2568                         return -ENOMEM;
2569
2570                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2571                         lmm_objects =
2572                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2573                 else
2574                         lmm_objects = &(lumk->lmm_objects[0]);
2575                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2576         } else {
2577                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2578                 lumk = &lum;
2579         }
2580
2581         lumk->lmm_oi = lsm->lsm_oi;
2582         lumk->lmm_stripe_count = 1;
2583
2584         if (copy_to_user(lump, lumk, lum_size))
2585                 rc = -EFAULT;
2586
2587         if (lumk != &lum)
2588                 kfree(lumk);
2589
2590         return rc;
2591 }
2592
2593 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2594                          void *karg, void *uarg)
2595 {
2596         struct obd_device *obd = exp->exp_obd;
2597         struct obd_ioctl_data *data = karg;
2598         int err = 0;
2599
2600         if (!try_module_get(THIS_MODULE)) {
2601                 CERROR("Can't get module. Is it alive?");
2602                 return -EINVAL;
2603         }
2604         switch (cmd) {
2605         case OBD_IOC_LOV_GET_CONFIG: {
2606                 char *buf;
2607                 struct lov_desc *desc;
2608                 struct obd_uuid uuid;
2609
2610                 buf = NULL;
2611                 len = 0;
2612                 if (obd_ioctl_getdata(&buf, &len, uarg)) {
2613                         err = -EINVAL;
2614                         goto out;
2615                 }
2616
2617                 data = (struct obd_ioctl_data *)buf;
2618
2619                 if (sizeof(*desc) > data->ioc_inllen1) {
2620                         obd_ioctl_freedata(buf, len);
2621                         err = -EINVAL;
2622                         goto out;
2623                 }
2624
2625                 if (data->ioc_inllen2 < sizeof(uuid)) {
2626                         obd_ioctl_freedata(buf, len);
2627                         err = -EINVAL;
2628                         goto out;
2629                 }
2630
2631                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2632                 desc->ld_tgt_count = 1;
2633                 desc->ld_active_tgt_count = 1;
2634                 desc->ld_default_stripe_count = 1;
2635                 desc->ld_default_stripe_size = 0;
2636                 desc->ld_default_stripe_offset = 0;
2637                 desc->ld_pattern = 0;
2638                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2639
2640                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2641
2642                 err = copy_to_user(uarg, buf, len);
2643                 if (err)
2644                         err = -EFAULT;
2645                 obd_ioctl_freedata(buf, len);
2646                 goto out;
2647         }
2648         case LL_IOC_LOV_SETSTRIPE:
2649                 err = obd_alloc_memmd(exp, karg);
2650                 if (err > 0)
2651                         err = 0;
2652                 goto out;
2653         case LL_IOC_LOV_GETSTRIPE:
2654                 err = osc_getstripe(karg, uarg);
2655                 goto out;
2656         case OBD_IOC_CLIENT_RECOVER:
2657                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2658                                             data->ioc_inlbuf1, 0);
2659                 if (err > 0)
2660                         err = 0;
2661                 goto out;
2662         case IOC_OSC_SET_ACTIVE:
2663                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2664                                                data->ioc_offset);
2665                 goto out;
2666         case OBD_IOC_POLL_QUOTACHECK:
2667                 err = osc_quota_poll_check(exp, karg);
2668                 goto out;
2669         case OBD_IOC_PING_TARGET:
2670                 err = ptlrpc_obd_ping(obd);
2671                 goto out;
2672         default:
2673                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2674                        cmd, current_comm());
2675                 err = -ENOTTY;
2676                 goto out;
2677         }
2678 out:
2679         module_put(THIS_MODULE);
2680         return err;
2681 }
2682
2683 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2684                         u32 keylen, void *key, __u32 *vallen, void *val,
2685                         struct lov_stripe_md *lsm)
2686 {
2687         if (!vallen || !val)
2688                 return -EFAULT;
2689
2690         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2691                 __u32 *stripe = val;
2692                 *vallen = sizeof(*stripe);
2693                 *stripe = 0;
2694                 return 0;
2695         } else if (KEY_IS(KEY_LAST_ID)) {
2696                 struct ptlrpc_request *req;
2697                 u64 *reply;
2698                 char *tmp;
2699                 int rc;
2700
2701                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2702                                            &RQF_OST_GET_INFO_LAST_ID);
2703                 if (req == NULL)
2704                         return -ENOMEM;
2705
2706                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2707                                      RCL_CLIENT, keylen);
2708                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2709                 if (rc) {
2710                         ptlrpc_request_free(req);
2711                         return rc;
2712                 }
2713
2714                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2715                 memcpy(tmp, key, keylen);
2716
2717                 req->rq_no_delay = req->rq_no_resend = 1;
2718                 ptlrpc_request_set_replen(req);
2719                 rc = ptlrpc_queue_wait(req);
2720                 if (rc)
2721                         goto out;
2722
2723                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2724                 if (reply == NULL) {
2725                         rc = -EPROTO;
2726                         goto out;
2727                 }
2728
2729                 *((u64 *)val) = *reply;
2730         out:
2731                 ptlrpc_req_finished(req);
2732                 return rc;
2733         } else if (KEY_IS(KEY_FIEMAP)) {
2734                 struct ll_fiemap_info_key *fm_key = key;
2735                 struct ldlm_res_id res_id;
2736                 ldlm_policy_data_t policy;
2737                 struct lustre_handle lockh;
2738                 ldlm_mode_t mode = 0;
2739                 struct ptlrpc_request *req;
2740                 struct ll_user_fiemap *reply;
2741                 char *tmp;
2742                 int rc;
2743
2744                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2745                         goto skip_locking;
2746
2747                 policy.l_extent.start = fm_key->fiemap.fm_start &
2748                                                 CFS_PAGE_MASK;
2749
2750                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2751                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2752                         policy.l_extent.end = OBD_OBJECT_EOF;
2753                 else
2754                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2755                                 fm_key->fiemap.fm_length +
2756                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2757
2758                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2759                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2760                                        LDLM_FL_BLOCK_GRANTED |
2761                                        LDLM_FL_LVB_READY,
2762                                        &res_id, LDLM_EXTENT, &policy,
2763                                        LCK_PR | LCK_PW, &lockh, 0);
2764                 if (mode) { /* lock is cached on client */
2765                         if (mode != LCK_PR) {
2766                                 ldlm_lock_addref(&lockh, LCK_PR);
2767                                 ldlm_lock_decref(&lockh, LCK_PW);
2768                         }
2769                 } else { /* no cached lock, needs acquire lock on server side */
2770                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2771                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2772                 }
2773
2774 skip_locking:
2775                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2776                                            &RQF_OST_GET_INFO_FIEMAP);
2777                 if (req == NULL) {
2778                         rc = -ENOMEM;
2779                         goto drop_lock;
2780                 }
2781
2782                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2783                                      RCL_CLIENT, keylen);
2784                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2785                                      RCL_CLIENT, *vallen);
2786                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2787                                      RCL_SERVER, *vallen);
2788
2789                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2790                 if (rc) {
2791                         ptlrpc_request_free(req);
2792                         goto drop_lock;
2793                 }
2794
2795                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2796                 memcpy(tmp, key, keylen);
2797                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2798                 memcpy(tmp, val, *vallen);
2799
2800                 ptlrpc_request_set_replen(req);
2801                 rc = ptlrpc_queue_wait(req);
2802                 if (rc)
2803                         goto fini_req;
2804
2805                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2806                 if (reply == NULL) {
2807                         rc = -EPROTO;
2808                         goto fini_req;
2809                 }
2810
2811                 memcpy(val, reply, *vallen);
2812 fini_req:
2813                 ptlrpc_req_finished(req);
2814 drop_lock:
2815                 if (mode)
2816                         ldlm_lock_decref(&lockh, LCK_PR);
2817                 return rc;
2818         }
2819
2820         return -EINVAL;
2821 }
2822
2823 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2824                               u32 keylen, void *key, u32 vallen,
2825                               void *val, struct ptlrpc_request_set *set)
2826 {
2827         struct ptlrpc_request *req;
2828         struct obd_device *obd = exp->exp_obd;
2829         struct obd_import *imp = class_exp2cliimp(exp);
2830         char *tmp;
2831         int rc;
2832
2833         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2834
2835         if (KEY_IS(KEY_CHECKSUM)) {
2836                 if (vallen != sizeof(int))
2837                         return -EINVAL;
2838                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2839                 return 0;
2840         }
2841
2842         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2843                 sptlrpc_conf_client_adapt(obd);
2844                 return 0;
2845         }
2846
2847         if (KEY_IS(KEY_FLUSH_CTX)) {
2848                 sptlrpc_import_flush_my_ctx(imp);
2849                 return 0;
2850         }
2851
2852         if (KEY_IS(KEY_CACHE_SET)) {
2853                 struct client_obd *cli = &obd->u.cli;
2854
2855                 LASSERT(cli->cl_cache == NULL); /* only once */
2856                 cli->cl_cache = val;
2857                 atomic_inc(&cli->cl_cache->ccc_users);
2858                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2859
2860                 /* add this osc into entity list */
2861                 LASSERT(list_empty(&cli->cl_lru_osc));
2862                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2863                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2864                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2865
2866                 return 0;
2867         }
2868
2869         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2870                 struct client_obd *cli = &obd->u.cli;
2871                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2872                 int target = *(int *)val;
2873
2874                 nr = osc_lru_shrink(cli, min(nr, target));
2875                 *(int *)val -= nr;
2876                 return 0;
2877         }
2878
2879         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2880                 return -EINVAL;
2881
2882         /* We pass all other commands directly to OST. Since nobody calls osc
2883            methods directly and everybody is supposed to go through LOV, we
2884            assume lov checked invalid values for us.
2885            The only recognised values so far are evict_by_nid and mds_conn.
2886            Even if something bad goes through, we'd get a -EINVAL from OST
2887            anyway. */
2888
2889         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2890                                                 &RQF_OST_SET_GRANT_INFO :
2891                                                 &RQF_OBD_SET_INFO);
2892         if (req == NULL)
2893                 return -ENOMEM;
2894
2895         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2896                              RCL_CLIENT, keylen);
2897         if (!KEY_IS(KEY_GRANT_SHRINK))
2898                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2899                                      RCL_CLIENT, vallen);
2900         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2901         if (rc) {
2902                 ptlrpc_request_free(req);
2903                 return rc;
2904         }
2905
2906         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2907         memcpy(tmp, key, keylen);
2908         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2909                                                         &RMF_OST_BODY :
2910                                                         &RMF_SETINFO_VAL);
2911         memcpy(tmp, val, vallen);
2912
2913         if (KEY_IS(KEY_GRANT_SHRINK)) {
2914                 struct osc_brw_async_args *aa;
2915                 struct obdo *oa;
2916
2917                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2918                 aa = ptlrpc_req_async_args(req);
2919                 oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
2920                 if (!oa) {
2921                         ptlrpc_req_finished(req);
2922                         return -ENOMEM;
2923                 }
2924                 *oa = ((struct ost_body *)val)->oa;
2925                 aa->aa_oa = oa;
2926                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2927         }
2928
2929         ptlrpc_request_set_replen(req);
2930         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2931                 LASSERT(set != NULL);
2932                 ptlrpc_set_add_req(set, req);
2933                 ptlrpc_check_set(NULL, set);
2934         } else {
2935                 ptlrpcd_add_req(req);
2936         }
2937
2938         return 0;
2939 }
2940
2941 static int osc_reconnect(const struct lu_env *env,
2942                          struct obd_export *exp, struct obd_device *obd,
2943                          struct obd_uuid *cluuid,
2944                          struct obd_connect_data *data,
2945                          void *localdata)
2946 {
2947         struct client_obd *cli = &obd->u.cli;
2948
2949         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2950                 long lost_grant;
2951
2952                 client_obd_list_lock(&cli->cl_loi_list_lock);
2953                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2954                                 2 * cli_brw_size(obd);
2955                 lost_grant = cli->cl_lost_grant;
2956                 cli->cl_lost_grant = 0;
2957                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2958
2959                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
2960                        data->ocd_connect_flags,
2961                        data->ocd_version, data->ocd_grant, lost_grant);
2962         }
2963
2964         return 0;
2965 }
2966
2967 static int osc_disconnect(struct obd_export *exp)
2968 {
2969         struct obd_device *obd = class_exp2obd(exp);
2970         int rc;
2971
2972         rc = client_disconnect_export(exp);
2973         /**
2974          * Initially we put del_shrink_grant before disconnect_export, but it
2975          * causes the following problem if setup (connect) and cleanup
2976          * (disconnect) are tangled together.
2977          *      connect p1                   disconnect p2
2978          *   ptlrpc_connect_import
2979          *     ...............         class_manual_cleanup
2980          *                                   osc_disconnect
2981          *                                   del_shrink_grant
2982          *   ptlrpc_connect_interrupt
2983          *     init_grant_shrink
2984          *   add this client to shrink list
2985          *                                    cleanup_osc
2986          * Bang! pinger trigger the shrink.
2987          * So the osc should be disconnected from the shrink list, after we
2988          * are sure the import has been destroyed. BUG18662
2989          */
2990         if (obd->u.cli.cl_import == NULL)
2991                 osc_del_shrink_grant(&obd->u.cli);
2992         return rc;
2993 }
2994
2995 static int osc_import_event(struct obd_device *obd,
2996                             struct obd_import *imp,
2997                             enum obd_import_event event)
2998 {
2999         struct client_obd *cli;
3000         int rc = 0;
3001
3002         LASSERT(imp->imp_obd == obd);
3003
3004         switch (event) {
3005         case IMP_EVENT_DISCON: {
3006                 cli = &obd->u.cli;
3007                 client_obd_list_lock(&cli->cl_loi_list_lock);
3008                 cli->cl_avail_grant = 0;
3009                 cli->cl_lost_grant = 0;
3010                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3011                 break;
3012         }
3013         case IMP_EVENT_INACTIVE: {
3014                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3015                 break;
3016         }
3017         case IMP_EVENT_INVALIDATE: {
3018                 struct ldlm_namespace *ns = obd->obd_namespace;
3019                 struct lu_env *env;
3020                 int refcheck;
3021
3022                 env = cl_env_get(&refcheck);
3023                 if (!IS_ERR(env)) {
3024                         /* Reset grants */
3025                         cli = &obd->u.cli;
3026                         /* all pages go to failing rpcs due to the invalid
3027                          * import */
3028                         osc_io_unplug(env, cli, NULL);
3029
3030                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3031                         cl_env_put(env, &refcheck);
3032                 } else
3033                         rc = PTR_ERR(env);
3034                 break;
3035         }
3036         case IMP_EVENT_ACTIVE: {
3037                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3038                 break;
3039         }
3040         case IMP_EVENT_OCD: {
3041                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3042
3043                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3044                         osc_init_grant(&obd->u.cli, ocd);
3045
3046                 /* See bug 7198 */
3047                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3048                         imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
3049
3050                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3051                 break;
3052         }
3053         case IMP_EVENT_DEACTIVATE: {
3054                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3055                 break;
3056         }
3057         case IMP_EVENT_ACTIVATE: {
3058                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3059                 break;
3060         }
3061         default:
3062                 CERROR("Unknown import event %d\n", event);
3063                 LBUG();
3064         }
3065         return rc;
3066 }
3067
3068 /**
3069  * Determine whether the lock can be canceled before replaying the lock
3070  * during recovery, see bug16774 for detailed information.
3071  *
3072  * \retval zero the lock can't be canceled
3073  * \retval other ok to cancel
3074  */
3075 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3076 {
3077         check_res_locked(lock->l_resource);
3078
3079         /*
3080          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3081          *
3082          * XXX as a future improvement, we can also cancel unused write lock
3083          * if it doesn't have dirty data and active mmaps.
3084          */
3085         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3086             (lock->l_granted_mode == LCK_PR ||
3087              lock->l_granted_mode == LCK_CR) &&
3088             (osc_dlm_lock_pageref(lock) == 0))
3089                 return 1;
3090
3091         return 0;
3092 }
3093
3094 static int brw_queue_work(const struct lu_env *env, void *data)
3095 {
3096         struct client_obd *cli = data;
3097
3098         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3099
3100         osc_io_unplug(env, cli, NULL);
3101         return 0;
3102 }
3103
3104 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3105 {
3106         struct lprocfs_static_vars lvars = { NULL };
3107         struct client_obd *cli = &obd->u.cli;
3108         void *handler;
3109         int rc;
3110         int adding;
3111         int added;
3112         int req_count;
3113
3114         rc = ptlrpcd_addref();
3115         if (rc)
3116                 return rc;
3117
3118         rc = client_obd_setup(obd, lcfg);
3119         if (rc)
3120                 goto out_ptlrpcd;
3121
3122         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3123         if (IS_ERR(handler)) {
3124                 rc = PTR_ERR(handler);
3125                 goto out_client_setup;
3126         }
3127         cli->cl_writeback_work = handler;
3128
3129         rc = osc_quota_setup(obd);
3130         if (rc)
3131                 goto out_ptlrpcd_work;
3132
3133         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3134         lprocfs_osc_init_vars(&lvars);
3135         if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
3136                 lproc_osc_attach_seqstat(obd);
3137                 sptlrpc_lprocfs_cliobd_attach(obd);
3138                 ptlrpc_lprocfs_register_obd(obd);
3139         }
3140
3141         /*
3142          * We try to control the total number of requests with a upper limit
3143          * osc_reqpool_maxreqcount. There might be some race which will cause
3144          * over-limit allocation, but it is fine.
3145          */
3146         req_count = atomic_read(&osc_pool_req_count);
3147         if (req_count < osc_reqpool_maxreqcount) {
3148                 adding = cli->cl_max_rpcs_in_flight + 2;
3149                 if (req_count + adding > osc_reqpool_maxreqcount)
3150                         adding = osc_reqpool_maxreqcount - req_count;
3151
3152                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3153                 atomic_add(added, &osc_pool_req_count);
3154         }
3155
3156         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3157         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3158         return rc;
3159
3160 out_ptlrpcd_work:
3161         ptlrpcd_destroy_work(handler);
3162 out_client_setup:
3163         client_obd_cleanup(obd);
3164 out_ptlrpcd:
3165         ptlrpcd_decref();
3166         return rc;
3167 }
3168
3169 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3170 {
3171         switch (stage) {
3172         case OBD_CLEANUP_EARLY: {
3173                 struct obd_import *imp;
3174
3175                 imp = obd->u.cli.cl_import;
3176                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3177                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3178                 ptlrpc_deactivate_import(imp);
3179                 spin_lock(&imp->imp_lock);
3180                 imp->imp_pingable = 0;
3181                 spin_unlock(&imp->imp_lock);
3182                 break;
3183         }
3184         case OBD_CLEANUP_EXPORTS: {
3185                 struct client_obd *cli = &obd->u.cli;
3186                 /* LU-464
3187                  * for echo client, export may be on zombie list, wait for
3188                  * zombie thread to cull it, because cli.cl_import will be
3189                  * cleared in client_disconnect_export():
3190                  *   class_export_destroy() -> obd_cleanup() ->
3191                  *   echo_device_free() -> echo_client_cleanup() ->
3192                  *   obd_disconnect() -> osc_disconnect() ->
3193                  *   client_disconnect_export()
3194                  */
3195                 obd_zombie_barrier();
3196                 if (cli->cl_writeback_work) {
3197                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3198                         cli->cl_writeback_work = NULL;
3199                 }
3200                 obd_cleanup_client_import(obd);
3201                 ptlrpc_lprocfs_unregister_obd(obd);
3202                 lprocfs_obd_cleanup(obd);
3203                 break;
3204                 }
3205         }
3206         return 0;
3207 }
3208
3209 int osc_cleanup(struct obd_device *obd)
3210 {
3211         struct client_obd *cli = &obd->u.cli;
3212         int rc;
3213
3214         /* lru cleanup */
3215         if (cli->cl_cache != NULL) {
3216                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3217                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3218                 list_del_init(&cli->cl_lru_osc);
3219                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3220                 cli->cl_lru_left = NULL;
3221                 atomic_dec(&cli->cl_cache->ccc_users);
3222                 cli->cl_cache = NULL;
3223         }
3224
3225         /* free memory of osc quota cache */
3226         osc_quota_cleanup(obd);
3227
3228         rc = client_obd_cleanup(obd);
3229
3230         ptlrpcd_decref();
3231         return rc;
3232 }
3233
3234 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3235 {
3236         struct lprocfs_static_vars lvars = { NULL };
3237         int rc = 0;
3238
3239         lprocfs_osc_init_vars(&lvars);
3240
3241         switch (lcfg->lcfg_command) {
3242         default:
3243                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3244                                               lcfg, obd);
3245                 if (rc > 0)
3246                         rc = 0;
3247                 break;
3248         }
3249
3250         return rc;
3251 }
3252
3253 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3254 {
3255         return osc_process_config_base(obd, buf);
3256 }
3257
3258 struct obd_ops osc_obd_ops = {
3259         .owner          = THIS_MODULE,
3260         .setup          = osc_setup,
3261         .precleanup     = osc_precleanup,
3262         .cleanup        = osc_cleanup,
3263         .add_conn       = client_import_add_conn,
3264         .del_conn       = client_import_del_conn,
3265         .connect        = client_connect_import,
3266         .reconnect      = osc_reconnect,
3267         .disconnect     = osc_disconnect,
3268         .statfs         = osc_statfs,
3269         .statfs_async   = osc_statfs_async,
3270         .packmd         = osc_packmd,
3271         .unpackmd       = osc_unpackmd,
3272         .create         = osc_create,
3273         .destroy        = osc_destroy,
3274         .getattr        = osc_getattr,
3275         .getattr_async  = osc_getattr_async,
3276         .setattr        = osc_setattr,
3277         .setattr_async  = osc_setattr_async,
3278         .find_cbdata    = osc_find_cbdata,
3279         .iocontrol      = osc_iocontrol,
3280         .get_info       = osc_get_info,
3281         .set_info_async = osc_set_info_async,
3282         .import_event   = osc_import_event,
3283         .process_config = osc_process_config,
3284         .quotactl       = osc_quotactl,
3285         .quotacheck     = osc_quotacheck,
3286 };
3287
3288 extern struct lu_kmem_descr osc_caches[];
3289 extern spinlock_t osc_ast_guard;
3290 extern struct lock_class_key osc_ast_guard_class;
3291
3292 static int __init osc_init(void)
3293 {
3294         struct lprocfs_static_vars lvars = { NULL };
3295         unsigned int reqpool_size;
3296         unsigned int reqsize;
3297         int rc;
3298
3299         /* print an address of _any_ initialized kernel symbol from this
3300          * module, to allow debugging with gdb that doesn't support data
3301          * symbols from modules.*/
3302         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3303
3304         rc = lu_kmem_init(osc_caches);
3305         if (rc)
3306                 return rc;
3307
3308         lprocfs_osc_init_vars(&lvars);
3309
3310         rc = class_register_type(&osc_obd_ops, NULL,
3311                                  LUSTRE_OSC_NAME, &osc_device_type);
3312         if (rc)
3313                 goto out_kmem;
3314
3315         spin_lock_init(&osc_ast_guard);
3316         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3317
3318         /* This is obviously too much memory, only prevent overflow here */
3319         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
3320                 rc = -EINVAL;
3321                 goto out_type;
3322         }
3323
3324         reqpool_size = osc_reqpool_mem_max << 20;
3325
3326         reqsize = 1;
3327         while (reqsize < OST_MAXREQSIZE)
3328                 reqsize = reqsize << 1;
3329
3330         /*
3331          * We don't enlarge the request count in OSC pool according to
3332          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3333          * tried after normal allocation failed. So a small OSC pool won't
3334          * cause much performance degression in most of cases.
3335          */
3336         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3337
3338         atomic_set(&osc_pool_req_count, 0);
3339         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
3340                                           ptlrpc_add_rqs_to_pool);
3341
3342         if (osc_rq_pool)
3343                 return 0;
3344
3345         rc = -ENOMEM;
3346
3347 out_type:
3348         class_unregister_type(LUSTRE_OSC_NAME);
3349 out_kmem:
3350         lu_kmem_fini(osc_caches);
3351         return rc;
3352 }
3353
3354 static void /*__exit*/ osc_exit(void)
3355 {
3356         class_unregister_type(LUSTRE_OSC_NAME);
3357         lu_kmem_fini(osc_caches);
3358         ptlrpc_free_rq_pool(osc_rq_pool);
3359 }
3360
3361 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3362 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3363 MODULE_LICENSE("GPL");
3364 MODULE_VERSION(LUSTRE_VERSION_STRING);
3365
3366 module_init(osc_init);
3367 module_exit(osc_exit);