]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/osc/osc_request.c
staging: lustre: remove ENTRY macro
[karo-tx-linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <linux/libcfs/libcfs.h>
40
41
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
46 #include <obd_ost.h>
47 #include <obd_lov.h>
48
49 #ifdef  __CYGWIN__
50 # include <ctype.h>
51 #endif
52
53 #include <lustre_ha.h>
54 #include <lprocfs_status.h>
55 #include <lustre_log.h>
56 #include <lustre_debug.h>
57 #include <lustre_param.h>
58 #include <lustre_fid.h>
59 #include "osc_internal.h"
60 #include "osc_cl_internal.h"
61
62 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63 static int brw_interpret(const struct lu_env *env,
64                          struct ptlrpc_request *req, void *data, int rc);
65 int osc_cleanup(struct obd_device *obd);
66
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69                       struct lov_stripe_md *lsm)
70 {
71         int lmm_size;
72
73         lmm_size = sizeof(**lmmp);
74         if (lmmp == NULL)
75                 RETURN(lmm_size);
76
77         if (*lmmp != NULL && lsm == NULL) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
82                 RETURN(-EBADF);
83         }
84
85         if (*lmmp == NULL) {
86                 OBD_ALLOC(*lmmp, lmm_size);
87                 if (*lmmp == NULL)
88                         RETURN(-ENOMEM);
89         }
90
91         if (lsm)
92                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
93
94         RETURN(lmm_size);
95 }
96
97 /* Unpack OSC object metadata from disk storage (LE byte order). */
98 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
99                         struct lov_mds_md *lmm, int lmm_bytes)
100 {
101         int lsm_size;
102         struct obd_import *imp = class_exp2cliimp(exp);
103
104         if (lmm != NULL) {
105                 if (lmm_bytes < sizeof(*lmm)) {
106                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
107                                exp->exp_obd->obd_name, lmm_bytes,
108                                (int)sizeof(*lmm));
109                         RETURN(-EINVAL);
110                 }
111                 /* XXX LOV_MAGIC etc check? */
112
113                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
114                         CERROR("%s: zero lmm_object_id: rc = %d\n",
115                                exp->exp_obd->obd_name, -EINVAL);
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (unlikely(*lsmp == NULL))
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
142                 RETURN(-EBADF);
143         }
144
145         if (lmm != NULL)
146                 /* XXX zero *lsmp? */
147                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
148
149         if (imp != NULL &&
150             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
151                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
152         else
153                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
154
155         RETURN(lsm_size);
156 }
157
158 static inline void osc_pack_capa(struct ptlrpc_request *req,
159                                  struct ost_body *body, void *capa)
160 {
161         struct obd_capa *oc = (struct obd_capa *)capa;
162         struct lustre_capa *c;
163
164         if (!capa)
165                 return;
166
167         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
168         LASSERT(c);
169         capa_cpy(c, oc);
170         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
171         DEBUG_CAPA(D_SEC, c, "pack");
172 }
173
174 static inline void osc_pack_req_body(struct ptlrpc_request *req,
175                                      struct obd_info *oinfo)
176 {
177         struct ost_body *body;
178
179         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
180         LASSERT(body);
181
182         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
183                              oinfo->oi_oa);
184         osc_pack_capa(req, body, oinfo->oi_capa);
185 }
186
187 static inline void osc_set_capa_size(struct ptlrpc_request *req,
188                                      const struct req_msg_field *field,
189                                      struct obd_capa *oc)
190 {
191         if (oc == NULL)
192                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
193         else
194                 /* it is already calculated as sizeof struct obd_capa */
195                 ;
196 }
197
198 static int osc_getattr_interpret(const struct lu_env *env,
199                                  struct ptlrpc_request *req,
200                                  struct osc_async_args *aa, int rc)
201 {
202         struct ost_body *body;
203
204         if (rc != 0)
205                 GOTO(out, rc);
206
207         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
208         if (body) {
209                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
210                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
211                                      aa->aa_oi->oi_oa, &body->oa);
212
213                 /* This should really be sent by the OST */
214                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
215                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
216         } else {
217                 CDEBUG(D_INFO, "can't unpack ost_body\n");
218                 rc = -EPROTO;
219                 aa->aa_oi->oi_oa->o_valid = 0;
220         }
221 out:
222         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
223         RETURN(rc);
224 }
225
226 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
227                              struct ptlrpc_request_set *set)
228 {
229         struct ptlrpc_request *req;
230         struct osc_async_args *aa;
231         int                 rc;
232
233         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
234         if (req == NULL)
235                 RETURN(-ENOMEM);
236
237         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
238         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
239         if (rc) {
240                 ptlrpc_request_free(req);
241                 RETURN(rc);
242         }
243
244         osc_pack_req_body(req, oinfo);
245
246         ptlrpc_request_set_replen(req);
247         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
248
249         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
250         aa = ptlrpc_req_async_args(req);
251         aa->aa_oi = oinfo;
252
253         ptlrpc_set_add_req(set, req);
254         RETURN(0);
255 }
256
257 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
258                        struct obd_info *oinfo)
259 {
260         struct ptlrpc_request *req;
261         struct ost_body       *body;
262         int                 rc;
263
264         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
265         if (req == NULL)
266                 RETURN(-ENOMEM);
267
268         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
269         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
270         if (rc) {
271                 ptlrpc_request_free(req);
272                 RETURN(rc);
273         }
274
275         osc_pack_req_body(req, oinfo);
276
277         ptlrpc_request_set_replen(req);
278
279         rc = ptlrpc_queue_wait(req);
280         if (rc)
281                 GOTO(out, rc);
282
283         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
284         if (body == NULL)
285                 GOTO(out, rc = -EPROTO);
286
287         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
288         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
289                              &body->oa);
290
291         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
292         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
293
294         EXIT;
295  out:
296         ptlrpc_req_finished(req);
297         return rc;
298 }
299
300 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
301                        struct obd_info *oinfo, struct obd_trans_info *oti)
302 {
303         struct ptlrpc_request *req;
304         struct ost_body       *body;
305         int                 rc;
306
307         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
308
309         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
310         if (req == NULL)
311                 RETURN(-ENOMEM);
312
313         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
314         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
315         if (rc) {
316                 ptlrpc_request_free(req);
317                 RETURN(rc);
318         }
319
320         osc_pack_req_body(req, oinfo);
321
322         ptlrpc_request_set_replen(req);
323
324         rc = ptlrpc_queue_wait(req);
325         if (rc)
326                 GOTO(out, rc);
327
328         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
329         if (body == NULL)
330                 GOTO(out, rc = -EPROTO);
331
332         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
333                              &body->oa);
334
335         EXIT;
336 out:
337         ptlrpc_req_finished(req);
338         RETURN(rc);
339 }
340
341 static int osc_setattr_interpret(const struct lu_env *env,
342                                  struct ptlrpc_request *req,
343                                  struct osc_setattr_args *sa, int rc)
344 {
345         struct ost_body *body;
346
347         if (rc != 0)
348                 GOTO(out, rc);
349
350         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
351         if (body == NULL)
352                 GOTO(out, rc = -EPROTO);
353
354         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
355                              &body->oa);
356 out:
357         rc = sa->sa_upcall(sa->sa_cookie, rc);
358         RETURN(rc);
359 }
360
361 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
362                            struct obd_trans_info *oti,
363                            obd_enqueue_update_f upcall, void *cookie,
364                            struct ptlrpc_request_set *rqset)
365 {
366         struct ptlrpc_request   *req;
367         struct osc_setattr_args *sa;
368         int                   rc;
369
370         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
371         if (req == NULL)
372                 RETURN(-ENOMEM);
373
374         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
375         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
376         if (rc) {
377                 ptlrpc_request_free(req);
378                 RETURN(rc);
379         }
380
381         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
382                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
383
384         osc_pack_req_body(req, oinfo);
385
386         ptlrpc_request_set_replen(req);
387
388         /* do mds to ost setattr asynchronously */
389         if (!rqset) {
390                 /* Do not wait for response. */
391                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
392         } else {
393                 req->rq_interpret_reply =
394                         (ptlrpc_interpterer_t)osc_setattr_interpret;
395
396                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
397                 sa = ptlrpc_req_async_args(req);
398                 sa->sa_oa = oinfo->oi_oa;
399                 sa->sa_upcall = upcall;
400                 sa->sa_cookie = cookie;
401
402                 if (rqset == PTLRPCD_SET)
403                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
404                 else
405                         ptlrpc_set_add_req(rqset, req);
406         }
407
408         RETURN(0);
409 }
410
411 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
412                              struct obd_trans_info *oti,
413                              struct ptlrpc_request_set *rqset)
414 {
415         return osc_setattr_async_base(exp, oinfo, oti,
416                                       oinfo->oi_cb_up, oinfo, rqset);
417 }
418
419 int osc_real_create(struct obd_export *exp, struct obdo *oa,
420                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
421 {
422         struct ptlrpc_request *req;
423         struct ost_body       *body;
424         struct lov_stripe_md  *lsm;
425         int                 rc;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449
450         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
451
452         ptlrpc_request_set_replen(req);
453
454         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
455             oa->o_flags == OBD_FL_DELORPHAN) {
456                 DEBUG_REQ(D_HA, req,
457                           "delorphan from OST integration");
458                 /* Don't resend the delorphan req */
459                 req->rq_no_resend = req->rq_no_delay = 1;
460         }
461
462         rc = ptlrpc_queue_wait(req);
463         if (rc)
464                 GOTO(out_req, rc);
465
466         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
467         if (body == NULL)
468                 GOTO(out_req, rc = -EPROTO);
469
470         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
471         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
472
473         oa->o_blksize = cli_brw_size(exp->exp_obd);
474         oa->o_valid |= OBD_MD_FLBLKSZ;
475
476         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
477          * have valid lsm_oinfo data structs, so don't go touching that.
478          * This needs to be fixed in a big way.
479          */
480         lsm->lsm_oi = oa->o_oi;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
504                    obd_enqueue_update_f upcall, void *cookie,
505                    struct ptlrpc_request_set *rqset)
506 {
507         struct ptlrpc_request   *req;
508         struct osc_setattr_args *sa;
509         struct ost_body  *body;
510         int                   rc;
511
512         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
513         if (req == NULL)
514                 RETURN(-ENOMEM);
515
516         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
517         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
518         if (rc) {
519                 ptlrpc_request_free(req);
520                 RETURN(rc);
521         }
522         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
523         ptlrpc_at_set_req_timeout(req);
524
525         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
526         LASSERT(body);
527         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
528                              oinfo->oi_oa);
529         osc_pack_capa(req, body, oinfo->oi_capa);
530
531         ptlrpc_request_set_replen(req);
532
533         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
534         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
535         sa = ptlrpc_req_async_args(req);
536         sa->sa_oa     = oinfo->oi_oa;
537         sa->sa_upcall = upcall;
538         sa->sa_cookie = cookie;
539         if (rqset == PTLRPCD_SET)
540                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
541         else
542                 ptlrpc_set_add_req(rqset, req);
543
544         RETURN(0);
545 }
546
547 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
548                      struct obd_info *oinfo, struct obd_trans_info *oti,
549                      struct ptlrpc_request_set *rqset)
550 {
551         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
552         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
553         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
554         return osc_punch_base(exp, oinfo,
555                               oinfo->oi_cb_up, oinfo, rqset);
556 }
557
558 static int osc_sync_interpret(const struct lu_env *env,
559                               struct ptlrpc_request *req,
560                               void *arg, int rc)
561 {
562         struct osc_fsync_args *fa = arg;
563         struct ost_body *body;
564
565         if (rc)
566                 GOTO(out, rc);
567
568         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
569         if (body == NULL) {
570                 CERROR ("can't unpack ost_body\n");
571                 GOTO(out, rc = -EPROTO);
572         }
573
574         *fa->fa_oi->oi_oa = body->oa;
575 out:
576         rc = fa->fa_upcall(fa->fa_cookie, rc);
577         RETURN(rc);
578 }
579
580 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
581                   obd_enqueue_update_f upcall, void *cookie,
582                   struct ptlrpc_request_set *rqset)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         struct osc_fsync_args *fa;
587         int                 rc;
588
589         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
590         if (req == NULL)
591                 RETURN(-ENOMEM);
592
593         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
594         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
595         if (rc) {
596                 ptlrpc_request_free(req);
597                 RETURN(rc);
598         }
599
600         /* overload the size and blocks fields in the oa with start/end */
601         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
602         LASSERT(body);
603         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
604                              oinfo->oi_oa);
605         osc_pack_capa(req, body, oinfo->oi_capa);
606
607         ptlrpc_request_set_replen(req);
608         req->rq_interpret_reply = osc_sync_interpret;
609
610         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
611         fa = ptlrpc_req_async_args(req);
612         fa->fa_oi = oinfo;
613         fa->fa_upcall = upcall;
614         fa->fa_cookie = cookie;
615
616         if (rqset == PTLRPCD_SET)
617                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
618         else
619                 ptlrpc_set_add_req(rqset, req);
620
621         RETURN (0);
622 }
623
624 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
625                     struct obd_info *oinfo, obd_size start, obd_size end,
626                     struct ptlrpc_request_set *set)
627 {
628         if (!oinfo->oi_oa) {
629                 CDEBUG(D_INFO, "oa NULL\n");
630                 RETURN(-EINVAL);
631         }
632
633         oinfo->oi_oa->o_size = start;
634         oinfo->oi_oa->o_blocks = end;
635         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
636
637         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
638 }
639
640 /* Find and cancel locally locks matched by @mode in the resource found by
641  * @objid. Found locks are added into @cancel list. Returns the amount of
642  * locks added to @cancels list. */
643 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
644                                    struct list_head *cancels,
645                                    ldlm_mode_t mode, int lock_flags)
646 {
647         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
648         struct ldlm_res_id res_id;
649         struct ldlm_resource *res;
650         int count;
651
652         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
653          * export) but disabled through procfs (flag in NS).
654          *
655          * This distinguishes from a case when ELC is not supported originally,
656          * when we still want to cancel locks in advance and just cancel them
657          * locally, without sending any RPC. */
658         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
659                 RETURN(0);
660
661         ostid_build_res_name(&oa->o_oi, &res_id);
662         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
663         if (res == NULL)
664                 RETURN(0);
665
666         LDLM_RESOURCE_ADDREF(res);
667         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
668                                            lock_flags, 0, NULL);
669         LDLM_RESOURCE_DELREF(res);
670         ldlm_resource_putref(res);
671         RETURN(count);
672 }
673
674 static int osc_destroy_interpret(const struct lu_env *env,
675                                  struct ptlrpc_request *req, void *data,
676                                  int rc)
677 {
678         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
679
680         atomic_dec(&cli->cl_destroy_in_flight);
681         wake_up(&cli->cl_destroy_waitq);
682         return 0;
683 }
684
685 static int osc_can_send_destroy(struct client_obd *cli)
686 {
687         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
688             cli->cl_max_rpcs_in_flight) {
689                 /* The destroy request can be sent */
690                 return 1;
691         }
692         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
693             cli->cl_max_rpcs_in_flight) {
694                 /*
695                  * The counter has been modified between the two atomic
696                  * operations.
697                  */
698                 wake_up(&cli->cl_destroy_waitq);
699         }
700         return 0;
701 }
702
703 int osc_create(const struct lu_env *env, struct obd_export *exp,
704                struct obdo *oa, struct lov_stripe_md **ea,
705                struct obd_trans_info *oti)
706 {
707         int rc = 0;
708
709         LASSERT(oa);
710         LASSERT(ea);
711         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
712
713         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
714             oa->o_flags == OBD_FL_RECREATE_OBJS) {
715                 RETURN(osc_real_create(exp, oa, ea, oti));
716         }
717
718         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
719                 RETURN(osc_real_create(exp, oa, ea, oti));
720
721         /* we should not get here anymore */
722         LBUG();
723
724         RETURN(rc);
725 }
726
727 /* Destroy requests can be async always on the client, and we don't even really
728  * care about the return code since the client cannot do anything at all about
729  * a destroy failure.
730  * When the MDS is unlinking a filename, it saves the file objects into a
731  * recovery llog, and these object records are cancelled when the OST reports
732  * they were destroyed and sync'd to disk (i.e. transaction committed).
733  * If the client dies, or the OST is down when the object should be destroyed,
734  * the records are not cancelled, and when the OST reconnects to the MDS next,
735  * it will retrieve the llog unlink logs and then sends the log cancellation
736  * cookies to the MDS after committing destroy transactions. */
737 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
738                        struct obdo *oa, struct lov_stripe_md *ea,
739                        struct obd_trans_info *oti, struct obd_export *md_export,
740                        void *capa)
741 {
742         struct client_obd     *cli = &exp->exp_obd->u.cli;
743         struct ptlrpc_request *req;
744         struct ost_body       *body;
745         LIST_HEAD(cancels);
746         int rc, count;
747
748         if (!oa) {
749                 CDEBUG(D_INFO, "oa NULL\n");
750                 RETURN(-EINVAL);
751         }
752
753         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
754                                         LDLM_FL_DISCARD_DATA);
755
756         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
757         if (req == NULL) {
758                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
759                 RETURN(-ENOMEM);
760         }
761
762         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
763         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
764                                0, &cancels, count);
765         if (rc) {
766                 ptlrpc_request_free(req);
767                 RETURN(rc);
768         }
769
770         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
771         ptlrpc_at_set_req_timeout(req);
772
773         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
774                 oa->o_lcookie = *oti->oti_logcookies;
775         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
776         LASSERT(body);
777         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
778
779         osc_pack_capa(req, body, (struct obd_capa *)capa);
780         ptlrpc_request_set_replen(req);
781
782         /* If osc_destory is for destroying the unlink orphan,
783          * sent from MDT to OST, which should not be blocked here,
784          * because the process might be triggered by ptlrpcd, and
785          * it is not good to block ptlrpcd thread (b=16006)*/
786         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
787                 req->rq_interpret_reply = osc_destroy_interpret;
788                 if (!osc_can_send_destroy(cli)) {
789                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
790                                                           NULL);
791
792                         /*
793                          * Wait until the number of on-going destroy RPCs drops
794                          * under max_rpc_in_flight
795                          */
796                         l_wait_event_exclusive(cli->cl_destroy_waitq,
797                                                osc_can_send_destroy(cli), &lwi);
798                 }
799         }
800
801         /* Do not wait for response */
802         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
803         RETURN(0);
804 }
805
806 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
807                                 long writing_bytes)
808 {
809         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
810
811         LASSERT(!(oa->o_valid & bits));
812
813         oa->o_valid |= bits;
814         client_obd_list_lock(&cli->cl_loi_list_lock);
815         oa->o_dirty = cli->cl_dirty;
816         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
817                      cli->cl_dirty_max)) {
818                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
819                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
820                 oa->o_undirty = 0;
821         } else if (unlikely(atomic_read(&obd_dirty_pages) -
822                             atomic_read(&obd_dirty_transit_pages) >
823                             (long)(obd_max_dirty_pages + 1))) {
824                 /* The atomic_read() allowing the atomic_inc() are
825                  * not covered by a lock thus they may safely race and trip
826                  * this CERROR() unless we add in a small fudge factor (+1). */
827                 CERROR("dirty %d - %d > system dirty_max %d\n",
828                        atomic_read(&obd_dirty_pages),
829                        atomic_read(&obd_dirty_transit_pages),
830                        obd_max_dirty_pages);
831                 oa->o_undirty = 0;
832         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
833                 CERROR("dirty %lu - dirty_max %lu too big???\n",
834                        cli->cl_dirty, cli->cl_dirty_max);
835                 oa->o_undirty = 0;
836         } else {
837                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
838                                       PAGE_CACHE_SHIFT)*
839                                      (cli->cl_max_rpcs_in_flight + 1);
840                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
841         }
842         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
843         oa->o_dropped = cli->cl_lost_grant;
844         cli->cl_lost_grant = 0;
845         client_obd_list_unlock(&cli->cl_loi_list_lock);
846         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
847                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
848
849 }
850
851 void osc_update_next_shrink(struct client_obd *cli)
852 {
853         cli->cl_next_shrink_grant =
854                 cfs_time_shift(cli->cl_grant_shrink_interval);
855         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
856                cli->cl_next_shrink_grant);
857 }
858
859 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
860 {
861         client_obd_list_lock(&cli->cl_loi_list_lock);
862         cli->cl_avail_grant += grant;
863         client_obd_list_unlock(&cli->cl_loi_list_lock);
864 }
865
866 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
867 {
868         if (body->oa.o_valid & OBD_MD_FLGRANT) {
869                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
870                 __osc_update_grant(cli, body->oa.o_grant);
871         }
872 }
873
874 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
875                               obd_count keylen, void *key, obd_count vallen,
876                               void *val, struct ptlrpc_request_set *set);
877
878 static int osc_shrink_grant_interpret(const struct lu_env *env,
879                                       struct ptlrpc_request *req,
880                                       void *aa, int rc)
881 {
882         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
883         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
884         struct ost_body *body;
885
886         if (rc != 0) {
887                 __osc_update_grant(cli, oa->o_grant);
888                 GOTO(out, rc);
889         }
890
891         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
892         LASSERT(body);
893         osc_update_grant(cli, body);
894 out:
895         OBDO_FREE(oa);
896         return rc;
897 }
898
899 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
900 {
901         client_obd_list_lock(&cli->cl_loi_list_lock);
902         oa->o_grant = cli->cl_avail_grant / 4;
903         cli->cl_avail_grant -= oa->o_grant;
904         client_obd_list_unlock(&cli->cl_loi_list_lock);
905         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
906                 oa->o_valid |= OBD_MD_FLFLAGS;
907                 oa->o_flags = 0;
908         }
909         oa->o_flags |= OBD_FL_SHRINK_GRANT;
910         osc_update_next_shrink(cli);
911 }
912
913 /* Shrink the current grant, either from some large amount to enough for a
914  * full set of in-flight RPCs, or if we have already shrunk to that limit
915  * then to enough for a single RPC.  This avoids keeping more grant than
916  * needed, and avoids shrinking the grant piecemeal. */
917 static int osc_shrink_grant(struct client_obd *cli)
918 {
919         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
920                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
921
922         client_obd_list_lock(&cli->cl_loi_list_lock);
923         if (cli->cl_avail_grant <= target_bytes)
924                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
925         client_obd_list_unlock(&cli->cl_loi_list_lock);
926
927         return osc_shrink_grant_to_target(cli, target_bytes);
928 }
929
930 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
931 {
932         int                     rc = 0;
933         struct ost_body *body;
934
935         client_obd_list_lock(&cli->cl_loi_list_lock);
936         /* Don't shrink if we are already above or below the desired limit
937          * We don't want to shrink below a single RPC, as that will negatively
938          * impact block allocation and long-term performance. */
939         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
940                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
941
942         if (target_bytes >= cli->cl_avail_grant) {
943                 client_obd_list_unlock(&cli->cl_loi_list_lock);
944                 RETURN(0);
945         }
946         client_obd_list_unlock(&cli->cl_loi_list_lock);
947
948         OBD_ALLOC_PTR(body);
949         if (!body)
950                 RETURN(-ENOMEM);
951
952         osc_announce_cached(cli, &body->oa, 0);
953
954         client_obd_list_lock(&cli->cl_loi_list_lock);
955         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
956         cli->cl_avail_grant = target_bytes;
957         client_obd_list_unlock(&cli->cl_loi_list_lock);
958         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
959                 body->oa.o_valid |= OBD_MD_FLFLAGS;
960                 body->oa.o_flags = 0;
961         }
962         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
963         osc_update_next_shrink(cli);
964
965         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
966                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
967                                 sizeof(*body), body, NULL);
968         if (rc != 0)
969                 __osc_update_grant(cli, body->oa.o_grant);
970         OBD_FREE_PTR(body);
971         RETURN(rc);
972 }
973
974 static int osc_should_shrink_grant(struct client_obd *client)
975 {
976         cfs_time_t time = cfs_time_current();
977         cfs_time_t next_shrink = client->cl_next_shrink_grant;
978
979         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
980              OBD_CONNECT_GRANT_SHRINK) == 0)
981                 return 0;
982
983         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
984                 /* Get the current RPC size directly, instead of going via:
985                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
986                  * Keep comment here so that it can be found by searching. */
987                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
988
989                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
990                     client->cl_avail_grant > brw_size)
991                         return 1;
992                 else
993                         osc_update_next_shrink(client);
994         }
995         return 0;
996 }
997
998 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
999 {
1000         struct client_obd *client;
1001
1002         list_for_each_entry(client, &item->ti_obd_list,
1003                                 cl_grant_shrink_list) {
1004                 if (osc_should_shrink_grant(client))
1005                         osc_shrink_grant(client);
1006         }
1007         return 0;
1008 }
1009
1010 static int osc_add_shrink_grant(struct client_obd *client)
1011 {
1012         int rc;
1013
1014         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1015                                        TIMEOUT_GRANT,
1016                                        osc_grant_shrink_grant_cb, NULL,
1017                                        &client->cl_grant_shrink_list);
1018         if (rc) {
1019                 CERROR("add grant client %s error %d\n",
1020                         client->cl_import->imp_obd->obd_name, rc);
1021                 return rc;
1022         }
1023         CDEBUG(D_CACHE, "add grant client %s \n",
1024                client->cl_import->imp_obd->obd_name);
1025         osc_update_next_shrink(client);
1026         return 0;
1027 }
1028
1029 static int osc_del_shrink_grant(struct client_obd *client)
1030 {
1031         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1032                                          TIMEOUT_GRANT);
1033 }
1034
1035 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1036 {
1037         /*
1038          * ocd_grant is the total grant amount we're expect to hold: if we've
1039          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1040          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1041          *
1042          * race is tolerable here: if we're evicted, but imp_state already
1043          * left EVICTED state, then cl_dirty must be 0 already.
1044          */
1045         client_obd_list_lock(&cli->cl_loi_list_lock);
1046         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1047                 cli->cl_avail_grant = ocd->ocd_grant;
1048         else
1049                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1050
1051         if (cli->cl_avail_grant < 0) {
1052                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1053                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1054                       ocd->ocd_grant, cli->cl_dirty);
1055                 /* workaround for servers which do not have the patch from
1056                  * LU-2679 */
1057                 cli->cl_avail_grant = ocd->ocd_grant;
1058         }
1059
1060         /* determine the appropriate chunk size used by osc_extent. */
1061         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1062         client_obd_list_unlock(&cli->cl_loi_list_lock);
1063
1064         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1065                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1066                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1067
1068         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1069             list_empty(&cli->cl_grant_shrink_list))
1070                 osc_add_shrink_grant(cli);
1071 }
1072
1073 /* We assume that the reason this OSC got a short read is because it read
1074  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1075  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1076  * this stripe never got written at or beyond this stripe offset yet. */
1077 static void handle_short_read(int nob_read, obd_count page_count,
1078                               struct brw_page **pga)
1079 {
1080         char *ptr;
1081         int i = 0;
1082
1083         /* skip bytes read OK */
1084         while (nob_read > 0) {
1085                 LASSERT (page_count > 0);
1086
1087                 if (pga[i]->count > nob_read) {
1088                         /* EOF inside this page */
1089                         ptr = kmap(pga[i]->pg) +
1090                                 (pga[i]->off & ~CFS_PAGE_MASK);
1091                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1092                         kunmap(pga[i]->pg);
1093                         page_count--;
1094                         i++;
1095                         break;
1096                 }
1097
1098                 nob_read -= pga[i]->count;
1099                 page_count--;
1100                 i++;
1101         }
1102
1103         /* zero remaining pages */
1104         while (page_count-- > 0) {
1105                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1106                 memset(ptr, 0, pga[i]->count);
1107                 kunmap(pga[i]->pg);
1108                 i++;
1109         }
1110 }
1111
1112 static int check_write_rcs(struct ptlrpc_request *req,
1113                            int requested_nob, int niocount,
1114                            obd_count page_count, struct brw_page **pga)
1115 {
1116         int     i;
1117         __u32   *remote_rcs;
1118
1119         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1120                                                   sizeof(*remote_rcs) *
1121                                                   niocount);
1122         if (remote_rcs == NULL) {
1123                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1124                 return(-EPROTO);
1125         }
1126
1127         /* return error if any niobuf was in error */
1128         for (i = 0; i < niocount; i++) {
1129                 if ((int)remote_rcs[i] < 0)
1130                         return(remote_rcs[i]);
1131
1132                 if (remote_rcs[i] != 0) {
1133                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1134                                 i, remote_rcs[i], req);
1135                         return(-EPROTO);
1136                 }
1137         }
1138
1139         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1140                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1141                        req->rq_bulk->bd_nob_transferred, requested_nob);
1142                 return(-EPROTO);
1143         }
1144
1145         return (0);
1146 }
1147
1148 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1149 {
1150         if (p1->flag != p2->flag) {
1151                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1152                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1153
1154                 /* warn if we try to combine flags that we don't know to be
1155                  * safe to combine */
1156                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1157                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1158                               "report this at http://bugs.whamcloud.com/\n",
1159                               p1->flag, p2->flag);
1160                 }
1161                 return 0;
1162         }
1163
1164         return (p1->off + p1->count == p2->off);
1165 }
1166
1167 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1168                                    struct brw_page **pga, int opc,
1169                                    cksum_type_t cksum_type)
1170 {
1171         __u32                           cksum;
1172         int                             i = 0;
1173         struct cfs_crypto_hash_desc     *hdesc;
1174         unsigned int                    bufsize;
1175         int                             err;
1176         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1177
1178         LASSERT(pg_count > 0);
1179
1180         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1181         if (IS_ERR(hdesc)) {
1182                 CERROR("Unable to initialize checksum hash %s\n",
1183                        cfs_crypto_hash_name(cfs_alg));
1184                 return PTR_ERR(hdesc);
1185         }
1186
1187         while (nob > 0 && pg_count > 0) {
1188                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1189
1190                 /* corrupt the data before we compute the checksum, to
1191                  * simulate an OST->client data error */
1192                 if (i == 0 && opc == OST_READ &&
1193                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1194                         unsigned char *ptr = kmap(pga[i]->pg);
1195                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1196                         memcpy(ptr + off, "bad1", min(4, nob));
1197                         kunmap(pga[i]->pg);
1198                 }
1199                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1200                                   pga[i]->off & ~CFS_PAGE_MASK,
1201                                   count);
1202                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1203                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1204
1205                 nob -= pga[i]->count;
1206                 pg_count--;
1207                 i++;
1208         }
1209
1210         bufsize = 4;
1211         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1212
1213         if (err)
1214                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1215
1216         /* For sending we only compute the wrong checksum instead
1217          * of corrupting the data so it is still correct on a redo */
1218         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1219                 cksum++;
1220
1221         return cksum;
1222 }
1223
1224 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1225                                 struct lov_stripe_md *lsm, obd_count page_count,
1226                                 struct brw_page **pga,
1227                                 struct ptlrpc_request **reqp,
1228                                 struct obd_capa *ocapa, int reserve,
1229                                 int resend)
1230 {
1231         struct ptlrpc_request   *req;
1232         struct ptlrpc_bulk_desc *desc;
1233         struct ost_body  *body;
1234         struct obd_ioobj        *ioobj;
1235         struct niobuf_remote    *niobuf;
1236         int niocount, i, requested_nob, opc, rc;
1237         struct osc_brw_async_args *aa;
1238         struct req_capsule      *pill;
1239         struct brw_page *pg_prev;
1240
1241         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1242                 RETURN(-ENOMEM); /* Recoverable */
1243         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1244                 RETURN(-EINVAL); /* Fatal */
1245
1246         if ((cmd & OBD_BRW_WRITE) != 0) {
1247                 opc = OST_WRITE;
1248                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1249                                                 cli->cl_import->imp_rq_pool,
1250                                                 &RQF_OST_BRW_WRITE);
1251         } else {
1252                 opc = OST_READ;
1253                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1254         }
1255         if (req == NULL)
1256                 RETURN(-ENOMEM);
1257
1258         for (niocount = i = 1; i < page_count; i++) {
1259                 if (!can_merge_pages(pga[i - 1], pga[i]))
1260                         niocount++;
1261         }
1262
1263         pill = &req->rq_pill;
1264         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1265                              sizeof(*ioobj));
1266         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1267                              niocount * sizeof(*niobuf));
1268         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1269
1270         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1271         if (rc) {
1272                 ptlrpc_request_free(req);
1273                 RETURN(rc);
1274         }
1275         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1276         ptlrpc_at_set_req_timeout(req);
1277         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1278          * retry logic */
1279         req->rq_no_retry_einprogress = 1;
1280
1281         desc = ptlrpc_prep_bulk_imp(req, page_count,
1282                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1283                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1284                 OST_BULK_PORTAL);
1285
1286         if (desc == NULL)
1287                 GOTO(out, rc = -ENOMEM);
1288         /* NB request now owns desc and will free it when it gets freed */
1289
1290         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1291         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1292         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1293         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1294
1295         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1296
1297         obdo_to_ioobj(oa, ioobj);
1298         ioobj->ioo_bufcnt = niocount;
1299         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1300          * that might be send for this request.  The actual number is decided
1301          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1302          * "max - 1" for old client compatibility sending "0", and also so the
1303          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1304         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1305         osc_pack_capa(req, body, ocapa);
1306         LASSERT(page_count > 0);
1307         pg_prev = pga[0];
1308         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1309                 struct brw_page *pg = pga[i];
1310                 int poff = pg->off & ~CFS_PAGE_MASK;
1311
1312                 LASSERT(pg->count > 0);
1313                 /* make sure there is no gap in the middle of page array */
1314                 LASSERTF(page_count == 1 ||
1315                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1316                           ergo(i > 0 && i < page_count - 1,
1317                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1318                           ergo(i == page_count - 1, poff == 0)),
1319                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1320                          i, page_count, pg, pg->off, pg->count);
1321                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1322                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1323                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1324                          i, page_count,
1325                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1326                          pg_prev->pg, page_private(pg_prev->pg),
1327                          pg_prev->pg->index, pg_prev->off);
1328                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1329                         (pg->flag & OBD_BRW_SRVLOCK));
1330
1331                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1332                 requested_nob += pg->count;
1333
1334                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1335                         niobuf--;
1336                         niobuf->len += pg->count;
1337                 } else {
1338                         niobuf->offset = pg->off;
1339                         niobuf->len    = pg->count;
1340                         niobuf->flags  = pg->flag;
1341                 }
1342                 pg_prev = pg;
1343         }
1344
1345         LASSERTF((void *)(niobuf - niocount) ==
1346                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1347                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1348                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1349
1350         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1351         if (resend) {
1352                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1353                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1354                         body->oa.o_flags = 0;
1355                 }
1356                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1357         }
1358
1359         if (osc_should_shrink_grant(cli))
1360                 osc_shrink_grant_local(cli, &body->oa);
1361
1362         /* size[REQ_REC_OFF] still sizeof (*body) */
1363         if (opc == OST_WRITE) {
1364                 if (cli->cl_checksum &&
1365                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1366                         /* store cl_cksum_type in a local variable since
1367                          * it can be changed via lprocfs */
1368                         cksum_type_t cksum_type = cli->cl_cksum_type;
1369
1370                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1371                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1372                                 body->oa.o_flags = 0;
1373                         }
1374                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1375                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1376                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1377                                                              page_count, pga,
1378                                                              OST_WRITE,
1379                                                              cksum_type);
1380                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1381                                body->oa.o_cksum);
1382                         /* save this in 'oa', too, for later checking */
1383                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1384                         oa->o_flags |= cksum_type_pack(cksum_type);
1385                 } else {
1386                         /* clear out the checksum flag, in case this is a
1387                          * resend but cl_checksum is no longer set. b=11238 */
1388                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1389                 }
1390                 oa->o_cksum = body->oa.o_cksum;
1391                 /* 1 RC per niobuf */
1392                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1393                                      sizeof(__u32) * niocount);
1394         } else {
1395                 if (cli->cl_checksum &&
1396                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1397                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1398                                 body->oa.o_flags = 0;
1399                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1400                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1401                 }
1402         }
1403         ptlrpc_request_set_replen(req);
1404
1405         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1406         aa = ptlrpc_req_async_args(req);
1407         aa->aa_oa = oa;
1408         aa->aa_requested_nob = requested_nob;
1409         aa->aa_nio_count = niocount;
1410         aa->aa_page_count = page_count;
1411         aa->aa_resends = 0;
1412         aa->aa_ppga = pga;
1413         aa->aa_cli = cli;
1414         INIT_LIST_HEAD(&aa->aa_oaps);
1415         if (ocapa && reserve)
1416                 aa->aa_ocapa = capa_get(ocapa);
1417
1418         *reqp = req;
1419         RETURN(0);
1420
1421  out:
1422         ptlrpc_req_finished(req);
1423         RETURN(rc);
1424 }
1425
1426 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1427                                 __u32 client_cksum, __u32 server_cksum, int nob,
1428                                 obd_count page_count, struct brw_page **pga,
1429                                 cksum_type_t client_cksum_type)
1430 {
1431         __u32 new_cksum;
1432         char *msg;
1433         cksum_type_t cksum_type;
1434
1435         if (server_cksum == client_cksum) {
1436                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1437                 return 0;
1438         }
1439
1440         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1441                                        oa->o_flags : 0);
1442         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1443                                       cksum_type);
1444
1445         if (cksum_type != client_cksum_type)
1446                 msg = "the server did not use the checksum type specified in "
1447                       "the original request - likely a protocol problem";
1448         else if (new_cksum == server_cksum)
1449                 msg = "changed on the client after we checksummed it - "
1450                       "likely false positive due to mmap IO (bug 11742)";
1451         else if (new_cksum == client_cksum)
1452                 msg = "changed in transit before arrival at OST";
1453         else
1454                 msg = "changed in transit AND doesn't match the original - "
1455                       "likely false positive due to mmap IO (bug 11742)";
1456
1457         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1458                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1459                            msg, libcfs_nid2str(peer->nid),
1460                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1461                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1462                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1463                            POSTID(&oa->o_oi), pga[0]->off,
1464                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1465         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1466                "client csum now %x\n", client_cksum, client_cksum_type,
1467                server_cksum, cksum_type, new_cksum);
1468         return 1;
1469 }
1470
1471 /* Note rc enters this function as number of bytes transferred */
1472 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1473 {
1474         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1475         const lnet_process_id_t *peer =
1476                         &req->rq_import->imp_connection->c_peer;
1477         struct client_obd *cli = aa->aa_cli;
1478         struct ost_body *body;
1479         __u32 client_cksum = 0;
1480
1481         if (rc < 0 && rc != -EDQUOT) {
1482                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1483                 RETURN(rc);
1484         }
1485
1486         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1487         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1488         if (body == NULL) {
1489                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1490                 RETURN(-EPROTO);
1491         }
1492
1493         /* set/clear over quota flag for a uid/gid */
1494         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1495             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1496                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1497
1498                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1499                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1500                        body->oa.o_flags);
1501                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1502         }
1503
1504         osc_update_grant(cli, body);
1505
1506         if (rc < 0)
1507                 RETURN(rc);
1508
1509         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1510                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1511
1512         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1513                 if (rc > 0) {
1514                         CERROR("Unexpected +ve rc %d\n", rc);
1515                         RETURN(-EPROTO);
1516                 }
1517                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1518
1519                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1520                         RETURN(-EAGAIN);
1521
1522                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1523                     check_write_checksum(&body->oa, peer, client_cksum,
1524                                          body->oa.o_cksum, aa->aa_requested_nob,
1525                                          aa->aa_page_count, aa->aa_ppga,
1526                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1527                         RETURN(-EAGAIN);
1528
1529                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1530                                      aa->aa_page_count, aa->aa_ppga);
1531                 GOTO(out, rc);
1532         }
1533
1534         /* The rest of this function executes only for OST_READs */
1535
1536         /* if unwrap_bulk failed, return -EAGAIN to retry */
1537         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1538         if (rc < 0)
1539                 GOTO(out, rc = -EAGAIN);
1540
1541         if (rc > aa->aa_requested_nob) {
1542                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1543                        aa->aa_requested_nob);
1544                 RETURN(-EPROTO);
1545         }
1546
1547         if (rc != req->rq_bulk->bd_nob_transferred) {
1548                 CERROR ("Unexpected rc %d (%d transferred)\n",
1549                         rc, req->rq_bulk->bd_nob_transferred);
1550                 return (-EPROTO);
1551         }
1552
1553         if (rc < aa->aa_requested_nob)
1554                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1555
1556         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1557                 static int cksum_counter;
1558                 __u32      server_cksum = body->oa.o_cksum;
1559                 char      *via;
1560                 char      *router;
1561                 cksum_type_t cksum_type;
1562
1563                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1564                                                body->oa.o_flags : 0);
1565                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1566                                                  aa->aa_ppga, OST_READ,
1567                                                  cksum_type);
1568
1569                 if (peer->nid == req->rq_bulk->bd_sender) {
1570                         via = router = "";
1571                 } else {
1572                         via = " via ";
1573                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1574                 }
1575
1576                 if (server_cksum == ~0 && rc > 0) {
1577                         CERROR("Protocol error: server %s set the 'checksum' "
1578                                "bit, but didn't send a checksum.  Not fatal, "
1579                                "but please notify on http://bugs.whamcloud.com/\n",
1580                                libcfs_nid2str(peer->nid));
1581                 } else if (server_cksum != client_cksum) {
1582                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1583                                            "%s%s%s inode "DFID" object "DOSTID
1584                                            " extent ["LPU64"-"LPU64"]\n",
1585                                            req->rq_import->imp_obd->obd_name,
1586                                            libcfs_nid2str(peer->nid),
1587                                            via, router,
1588                                            body->oa.o_valid & OBD_MD_FLFID ?
1589                                                 body->oa.o_parent_seq : (__u64)0,
1590                                            body->oa.o_valid & OBD_MD_FLFID ?
1591                                                 body->oa.o_parent_oid : 0,
1592                                            body->oa.o_valid & OBD_MD_FLFID ?
1593                                                 body->oa.o_parent_ver : 0,
1594                                            POSTID(&body->oa.o_oi),
1595                                            aa->aa_ppga[0]->off,
1596                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1597                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1598                                                                         1);
1599                         CERROR("client %x, server %x, cksum_type %x\n",
1600                                client_cksum, server_cksum, cksum_type);
1601                         cksum_counter = 0;
1602                         aa->aa_oa->o_cksum = client_cksum;
1603                         rc = -EAGAIN;
1604                 } else {
1605                         cksum_counter++;
1606                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1607                         rc = 0;
1608                 }
1609         } else if (unlikely(client_cksum)) {
1610                 static int cksum_missed;
1611
1612                 cksum_missed++;
1613                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1614                         CERROR("Checksum %u requested from %s but not sent\n",
1615                                cksum_missed, libcfs_nid2str(peer->nid));
1616         } else {
1617                 rc = 0;
1618         }
1619 out:
1620         if (rc >= 0)
1621                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1622                                      aa->aa_oa, &body->oa);
1623
1624         RETURN(rc);
1625 }
1626
1627 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1628                             struct lov_stripe_md *lsm,
1629                             obd_count page_count, struct brw_page **pga,
1630                             struct obd_capa *ocapa)
1631 {
1632         struct ptlrpc_request *req;
1633         int                 rc;
1634         wait_queue_head_t           waitq;
1635         int                 generation, resends = 0;
1636         struct l_wait_info     lwi;
1637
1638         init_waitqueue_head(&waitq);
1639         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1640
1641 restart_bulk:
1642         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1643                                   page_count, pga, &req, ocapa, 0, resends);
1644         if (rc != 0)
1645                 return (rc);
1646
1647         if (resends) {
1648                 req->rq_generation_set = 1;
1649                 req->rq_import_generation = generation;
1650                 req->rq_sent = cfs_time_current_sec() + resends;
1651         }
1652
1653         rc = ptlrpc_queue_wait(req);
1654
1655         if (rc == -ETIMEDOUT && req->rq_resend) {
1656                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1657                 ptlrpc_req_finished(req);
1658                 goto restart_bulk;
1659         }
1660
1661         rc = osc_brw_fini_request(req, rc);
1662
1663         ptlrpc_req_finished(req);
1664         /* When server return -EINPROGRESS, client should always retry
1665          * regardless of the number of times the bulk was resent already.*/
1666         if (osc_recoverable_error(rc)) {
1667                 resends++;
1668                 if (rc != -EINPROGRESS &&
1669                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1670                         CERROR("%s: too many resend retries for object: "
1671                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1672                                POSTID(&oa->o_oi), rc);
1673                         goto out;
1674                 }
1675                 if (generation !=
1676                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1677                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1678                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1679                                POSTID(&oa->o_oi), rc);
1680                         goto out;
1681                 }
1682
1683                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1684                                        NULL);
1685                 l_wait_event(waitq, 0, &lwi);
1686
1687                 goto restart_bulk;
1688         }
1689 out:
1690         if (rc == -EAGAIN || rc == -EINPROGRESS)
1691                 rc = -EIO;
1692         RETURN (rc);
1693 }
1694
1695 static int osc_brw_redo_request(struct ptlrpc_request *request,
1696                                 struct osc_brw_async_args *aa, int rc)
1697 {
1698         struct ptlrpc_request *new_req;
1699         struct osc_brw_async_args *new_aa;
1700         struct osc_async_page *oap;
1701
1702         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1703                   "redo for recoverable error %d", rc);
1704
1705         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1706                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1707                                   aa->aa_cli, aa->aa_oa,
1708                                   NULL /* lsm unused by osc currently */,
1709                                   aa->aa_page_count, aa->aa_ppga,
1710                                   &new_req, aa->aa_ocapa, 0, 1);
1711         if (rc)
1712                 RETURN(rc);
1713
1714         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1715                 if (oap->oap_request != NULL) {
1716                         LASSERTF(request == oap->oap_request,
1717                                  "request %p != oap_request %p\n",
1718                                  request, oap->oap_request);
1719                         if (oap->oap_interrupted) {
1720                                 ptlrpc_req_finished(new_req);
1721                                 RETURN(-EINTR);
1722                         }
1723                 }
1724         }
1725         /* New request takes over pga and oaps from old request.
1726          * Note that copying a list_head doesn't work, need to move it... */
1727         aa->aa_resends++;
1728         new_req->rq_interpret_reply = request->rq_interpret_reply;
1729         new_req->rq_async_args = request->rq_async_args;
1730         /* cap resend delay to the current request timeout, this is similar to
1731          * what ptlrpc does (see after_reply()) */
1732         if (aa->aa_resends > new_req->rq_timeout)
1733                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1734         else
1735                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1736         new_req->rq_generation_set = 1;
1737         new_req->rq_import_generation = request->rq_import_generation;
1738
1739         new_aa = ptlrpc_req_async_args(new_req);
1740
1741         INIT_LIST_HEAD(&new_aa->aa_oaps);
1742         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1743         INIT_LIST_HEAD(&new_aa->aa_exts);
1744         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1745         new_aa->aa_resends = aa->aa_resends;
1746
1747         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1748                 if (oap->oap_request) {
1749                         ptlrpc_req_finished(oap->oap_request);
1750                         oap->oap_request = ptlrpc_request_addref(new_req);
1751                 }
1752         }
1753
1754         new_aa->aa_ocapa = aa->aa_ocapa;
1755         aa->aa_ocapa = NULL;
1756
1757         /* XXX: This code will run into problem if we're going to support
1758          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1759          * and wait for all of them to be finished. We should inherit request
1760          * set from old request. */
1761         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1762
1763         DEBUG_REQ(D_INFO, new_req, "new request");
1764         RETURN(0);
1765 }
1766
1767 /*
1768  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1769  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1770  * fine for our small page arrays and doesn't require allocation.  its an
1771  * insertion sort that swaps elements that are strides apart, shrinking the
1772  * stride down until its '1' and the array is sorted.
1773  */
1774 static void sort_brw_pages(struct brw_page **array, int num)
1775 {
1776         int stride, i, j;
1777         struct brw_page *tmp;
1778
1779         if (num == 1)
1780                 return;
1781         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1782                 ;
1783
1784         do {
1785                 stride /= 3;
1786                 for (i = stride ; i < num ; i++) {
1787                         tmp = array[i];
1788                         j = i;
1789                         while (j >= stride && array[j - stride]->off > tmp->off) {
1790                                 array[j] = array[j - stride];
1791                                 j -= stride;
1792                         }
1793                         array[j] = tmp;
1794                 }
1795         } while (stride > 1);
1796 }
1797
1798 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1799 {
1800         int count = 1;
1801         int offset;
1802         int i = 0;
1803
1804         LASSERT (pages > 0);
1805         offset = pg[i]->off & ~CFS_PAGE_MASK;
1806
1807         for (;;) {
1808                 pages--;
1809                 if (pages == 0)  /* that's all */
1810                         return count;
1811
1812                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1813                         return count;   /* doesn't end on page boundary */
1814
1815                 i++;
1816                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1817                 if (offset != 0)        /* doesn't start on page boundary */
1818                         return count;
1819
1820                 count++;
1821         }
1822 }
1823
1824 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1825 {
1826         struct brw_page **ppga;
1827         int i;
1828
1829         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1830         if (ppga == NULL)
1831                 return NULL;
1832
1833         for (i = 0; i < count; i++)
1834                 ppga[i] = pga + i;
1835         return ppga;
1836 }
1837
1838 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1839 {
1840         LASSERT(ppga != NULL);
1841         OBD_FREE(ppga, sizeof(*ppga) * count);
1842 }
1843
1844 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1845                    obd_count page_count, struct brw_page *pga,
1846                    struct obd_trans_info *oti)
1847 {
1848         struct obdo *saved_oa = NULL;
1849         struct brw_page **ppga, **orig;
1850         struct obd_import *imp = class_exp2cliimp(exp);
1851         struct client_obd *cli;
1852         int rc, page_count_orig;
1853
1854         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1855         cli = &imp->imp_obd->u.cli;
1856
1857         if (cmd & OBD_BRW_CHECK) {
1858                 /* The caller just wants to know if there's a chance that this
1859                  * I/O can succeed */
1860
1861                 if (imp->imp_invalid)
1862                         RETURN(-EIO);
1863                 RETURN(0);
1864         }
1865
1866         /* test_brw with a failed create can trip this, maybe others. */
1867         LASSERT(cli->cl_max_pages_per_rpc);
1868
1869         rc = 0;
1870
1871         orig = ppga = osc_build_ppga(pga, page_count);
1872         if (ppga == NULL)
1873                 RETURN(-ENOMEM);
1874         page_count_orig = page_count;
1875
1876         sort_brw_pages(ppga, page_count);
1877         while (page_count) {
1878                 obd_count pages_per_brw;
1879
1880                 if (page_count > cli->cl_max_pages_per_rpc)
1881                         pages_per_brw = cli->cl_max_pages_per_rpc;
1882                 else
1883                         pages_per_brw = page_count;
1884
1885                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1886
1887                 if (saved_oa != NULL) {
1888                         /* restore previously saved oa */
1889                         *oinfo->oi_oa = *saved_oa;
1890                 } else if (page_count > pages_per_brw) {
1891                         /* save a copy of oa (brw will clobber it) */
1892                         OBDO_ALLOC(saved_oa);
1893                         if (saved_oa == NULL)
1894                                 GOTO(out, rc = -ENOMEM);
1895                         *saved_oa = *oinfo->oi_oa;
1896                 }
1897
1898                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1899                                       pages_per_brw, ppga, oinfo->oi_capa);
1900
1901                 if (rc != 0)
1902                         break;
1903
1904                 page_count -= pages_per_brw;
1905                 ppga += pages_per_brw;
1906         }
1907
1908 out:
1909         osc_release_ppga(orig, page_count_orig);
1910
1911         if (saved_oa != NULL)
1912                 OBDO_FREE(saved_oa);
1913
1914         RETURN(rc);
1915 }
1916
1917 static int brw_interpret(const struct lu_env *env,
1918                          struct ptlrpc_request *req, void *data, int rc)
1919 {
1920         struct osc_brw_async_args *aa = data;
1921         struct osc_extent *ext;
1922         struct osc_extent *tmp;
1923         struct cl_object  *obj = NULL;
1924         struct client_obd *cli = aa->aa_cli;
1925
1926         rc = osc_brw_fini_request(req, rc);
1927         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1928         /* When server return -EINPROGRESS, client should always retry
1929          * regardless of the number of times the bulk was resent already. */
1930         if (osc_recoverable_error(rc)) {
1931                 if (req->rq_import_generation !=
1932                     req->rq_import->imp_generation) {
1933                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1934                                ""DOSTID", rc = %d.\n",
1935                                req->rq_import->imp_obd->obd_name,
1936                                POSTID(&aa->aa_oa->o_oi), rc);
1937                 } else if (rc == -EINPROGRESS ||
1938                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1939                         rc = osc_brw_redo_request(req, aa, rc);
1940                 } else {
1941                         CERROR("%s: too many resent retries for object: "
1942                                ""LPU64":"LPU64", rc = %d.\n",
1943                                req->rq_import->imp_obd->obd_name,
1944                                POSTID(&aa->aa_oa->o_oi), rc);
1945                 }
1946
1947                 if (rc == 0)
1948                         RETURN(0);
1949                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1950                         rc = -EIO;
1951         }
1952
1953         if (aa->aa_ocapa) {
1954                 capa_put(aa->aa_ocapa);
1955                 aa->aa_ocapa = NULL;
1956         }
1957
1958         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1959                 if (obj == NULL && rc == 0) {
1960                         obj = osc2cl(ext->oe_obj);
1961                         cl_object_get(obj);
1962                 }
1963
1964                 list_del_init(&ext->oe_link);
1965                 osc_extent_finish(env, ext, 1, rc);
1966         }
1967         LASSERT(list_empty(&aa->aa_exts));
1968         LASSERT(list_empty(&aa->aa_oaps));
1969
1970         if (obj != NULL) {
1971                 struct obdo *oa = aa->aa_oa;
1972                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1973                 unsigned long valid = 0;
1974
1975                 LASSERT(rc == 0);
1976                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1977                         attr->cat_blocks = oa->o_blocks;
1978                         valid |= CAT_BLOCKS;
1979                 }
1980                 if (oa->o_valid & OBD_MD_FLMTIME) {
1981                         attr->cat_mtime = oa->o_mtime;
1982                         valid |= CAT_MTIME;
1983                 }
1984                 if (oa->o_valid & OBD_MD_FLATIME) {
1985                         attr->cat_atime = oa->o_atime;
1986                         valid |= CAT_ATIME;
1987                 }
1988                 if (oa->o_valid & OBD_MD_FLCTIME) {
1989                         attr->cat_ctime = oa->o_ctime;
1990                         valid |= CAT_CTIME;
1991                 }
1992                 if (valid != 0) {
1993                         cl_object_attr_lock(obj);
1994                         cl_object_attr_set(env, obj, attr, valid);
1995                         cl_object_attr_unlock(obj);
1996                 }
1997                 cl_object_put(env, obj);
1998         }
1999         OBDO_FREE(aa->aa_oa);
2000
2001         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2002                           req->rq_bulk->bd_nob_transferred);
2003         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2004         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2005
2006         client_obd_list_lock(&cli->cl_loi_list_lock);
2007         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2008          * is called so we know whether to go to sync BRWs or wait for more
2009          * RPCs to complete */
2010         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2011                 cli->cl_w_in_flight--;
2012         else
2013                 cli->cl_r_in_flight--;
2014         osc_wake_cache_waiters(cli);
2015         client_obd_list_unlock(&cli->cl_loi_list_lock);
2016
2017         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2018         RETURN(rc);
2019 }
2020
2021 /**
2022  * Build an RPC by the list of extent @ext_list. The caller must ensure
2023  * that the total pages in this list are NOT over max pages per RPC.
2024  * Extents in the list must be in OES_RPC state.
2025  */
2026 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2027                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
2028 {
2029         struct ptlrpc_request           *req = NULL;
2030         struct osc_extent               *ext;
2031         struct brw_page                 **pga = NULL;
2032         struct osc_brw_async_args       *aa = NULL;
2033         struct obdo                     *oa = NULL;
2034         struct osc_async_page           *oap;
2035         struct osc_async_page           *tmp;
2036         struct cl_req                   *clerq = NULL;
2037         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2038                                                                       CRT_READ;
2039         struct ldlm_lock                *lock = NULL;
2040         struct cl_req_attr              *crattr = NULL;
2041         obd_off                         starting_offset = OBD_OBJECT_EOF;
2042         obd_off                         ending_offset = 0;
2043         int                             mpflag = 0;
2044         int                             mem_tight = 0;
2045         int                             page_count = 0;
2046         int                             i;
2047         int                             rc;
2048         LIST_HEAD(rpc_list);
2049
2050         LASSERT(!list_empty(ext_list));
2051
2052         /* add pages into rpc_list to build BRW rpc */
2053         list_for_each_entry(ext, ext_list, oe_link) {
2054                 LASSERT(ext->oe_state == OES_RPC);
2055                 mem_tight |= ext->oe_memalloc;
2056                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2057                         ++page_count;
2058                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2059                         if (starting_offset > oap->oap_obj_off)
2060                                 starting_offset = oap->oap_obj_off;
2061                         else
2062                                 LASSERT(oap->oap_page_off == 0);
2063                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2064                                 ending_offset = oap->oap_obj_off +
2065                                                 oap->oap_count;
2066                         else
2067                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2068                                         PAGE_CACHE_SIZE);
2069                 }
2070         }
2071
2072         if (mem_tight)
2073                 mpflag = cfs_memory_pressure_get_and_set();
2074
2075         OBD_ALLOC(crattr, sizeof(*crattr));
2076         if (crattr == NULL)
2077                 GOTO(out, rc = -ENOMEM);
2078
2079         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2080         if (pga == NULL)
2081                 GOTO(out, rc = -ENOMEM);
2082
2083         OBDO_ALLOC(oa);
2084         if (oa == NULL)
2085                 GOTO(out, rc = -ENOMEM);
2086
2087         i = 0;
2088         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2089                 struct cl_page *page = oap2cl_page(oap);
2090                 if (clerq == NULL) {
2091                         clerq = cl_req_alloc(env, page, crt,
2092                                              1 /* only 1-object rpcs for now */);
2093                         if (IS_ERR(clerq))
2094                                 GOTO(out, rc = PTR_ERR(clerq));
2095                         lock = oap->oap_ldlm_lock;
2096                 }
2097                 if (mem_tight)
2098                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2099                 pga[i] = &oap->oap_brw_page;
2100                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2101                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2102                        pga[i]->pg, page_index(oap->oap_page), oap,
2103                        pga[i]->flag);
2104                 i++;
2105                 cl_req_page_add(env, clerq, page);
2106         }
2107
2108         /* always get the data for the obdo for the rpc */
2109         LASSERT(clerq != NULL);
2110         crattr->cra_oa = oa;
2111         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2112         if (lock) {
2113                 oa->o_handle = lock->l_remote_handle;
2114                 oa->o_valid |= OBD_MD_FLHANDLE;
2115         }
2116
2117         rc = cl_req_prep(env, clerq);
2118         if (rc != 0) {
2119                 CERROR("cl_req_prep failed: %d\n", rc);
2120                 GOTO(out, rc);
2121         }
2122
2123         sort_brw_pages(pga, page_count);
2124         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2125                         pga, &req, crattr->cra_capa, 1, 0);
2126         if (rc != 0) {
2127                 CERROR("prep_req failed: %d\n", rc);
2128                 GOTO(out, rc);
2129         }
2130
2131         req->rq_interpret_reply = brw_interpret;
2132
2133         if (mem_tight != 0)
2134                 req->rq_memalloc = 1;
2135
2136         /* Need to update the timestamps after the request is built in case
2137          * we race with setattr (locally or in queue at OST).  If OST gets
2138          * later setattr before earlier BRW (as determined by the request xid),
2139          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2140          * way to do this in a single call.  bug 10150 */
2141         cl_req_attr_set(env, clerq, crattr,
2142                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2143
2144         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2145
2146         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2147         aa = ptlrpc_req_async_args(req);
2148         INIT_LIST_HEAD(&aa->aa_oaps);
2149         list_splice_init(&rpc_list, &aa->aa_oaps);
2150         INIT_LIST_HEAD(&aa->aa_exts);
2151         list_splice_init(ext_list, &aa->aa_exts);
2152         aa->aa_clerq = clerq;
2153
2154         /* queued sync pages can be torn down while the pages
2155          * were between the pending list and the rpc */
2156         tmp = NULL;
2157         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2158                 /* only one oap gets a request reference */
2159                 if (tmp == NULL)
2160                         tmp = oap;
2161                 if (oap->oap_interrupted && !req->rq_intr) {
2162                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2163                                         oap, req);
2164                         ptlrpc_mark_interrupted(req);
2165                 }
2166         }
2167         if (tmp != NULL)
2168                 tmp->oap_request = ptlrpc_request_addref(req);
2169
2170         client_obd_list_lock(&cli->cl_loi_list_lock);
2171         starting_offset >>= PAGE_CACHE_SHIFT;
2172         if (cmd == OBD_BRW_READ) {
2173                 cli->cl_r_in_flight++;
2174                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2175                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2176                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2177                                       starting_offset + 1);
2178         } else {
2179                 cli->cl_w_in_flight++;
2180                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2181                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2182                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2183                                       starting_offset + 1);
2184         }
2185         client_obd_list_unlock(&cli->cl_loi_list_lock);
2186
2187         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2188                   page_count, aa, cli->cl_r_in_flight,
2189                   cli->cl_w_in_flight);
2190
2191         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2192          * see which CPU/NUMA node the majority of pages were allocated
2193          * on, and try to assign the async RPC to the CPU core
2194          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2195          *
2196          * But on the other hand, we expect that multiple ptlrpcd
2197          * threads and the initial write sponsor can run in parallel,
2198          * especially when data checksum is enabled, which is CPU-bound
2199          * operation and single ptlrpcd thread cannot process in time.
2200          * So more ptlrpcd threads sharing BRW load
2201          * (with PDL_POLICY_ROUND) seems better.
2202          */
2203         ptlrpcd_add_req(req, pol, -1);
2204         rc = 0;
2205         EXIT;
2206
2207 out:
2208         if (mem_tight != 0)
2209                 cfs_memory_pressure_restore(mpflag);
2210
2211         if (crattr != NULL) {
2212                 capa_put(crattr->cra_capa);
2213                 OBD_FREE(crattr, sizeof(*crattr));
2214         }
2215
2216         if (rc != 0) {
2217                 LASSERT(req == NULL);
2218
2219                 if (oa)
2220                         OBDO_FREE(oa);
2221                 if (pga)
2222                         OBD_FREE(pga, sizeof(*pga) * page_count);
2223                 /* this should happen rarely and is pretty bad, it makes the
2224                  * pending list not follow the dirty order */
2225                 while (!list_empty(ext_list)) {
2226                         ext = list_entry(ext_list->next, struct osc_extent,
2227                                              oe_link);
2228                         list_del_init(&ext->oe_link);
2229                         osc_extent_finish(env, ext, 0, rc);
2230                 }
2231                 if (clerq && !IS_ERR(clerq))
2232                         cl_req_completion(env, clerq, rc);
2233         }
2234         RETURN(rc);
2235 }
2236
2237 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2238                                         struct ldlm_enqueue_info *einfo)
2239 {
2240         void *data = einfo->ei_cbdata;
2241         int set = 0;
2242
2243         LASSERT(lock != NULL);
2244         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2245         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2246         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2247         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2248
2249         lock_res_and_lock(lock);
2250         spin_lock(&osc_ast_guard);
2251
2252         if (lock->l_ast_data == NULL)
2253                 lock->l_ast_data = data;
2254         if (lock->l_ast_data == data)
2255                 set = 1;
2256
2257         spin_unlock(&osc_ast_guard);
2258         unlock_res_and_lock(lock);
2259
2260         return set;
2261 }
2262
2263 static int osc_set_data_with_check(struct lustre_handle *lockh,
2264                                    struct ldlm_enqueue_info *einfo)
2265 {
2266         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2267         int set = 0;
2268
2269         if (lock != NULL) {
2270                 set = osc_set_lock_data_with_check(lock, einfo);
2271                 LDLM_LOCK_PUT(lock);
2272         } else
2273                 CERROR("lockh %p, data %p - client evicted?\n",
2274                        lockh, einfo->ei_cbdata);
2275         return set;
2276 }
2277
2278 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2279                              ldlm_iterator_t replace, void *data)
2280 {
2281         struct ldlm_res_id res_id;
2282         struct obd_device *obd = class_exp2obd(exp);
2283
2284         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2285         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2286         return 0;
2287 }
2288
2289 /* find any ldlm lock of the inode in osc
2290  * return 0    not find
2291  *      1    find one
2292  *      < 0    error */
2293 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2294                            ldlm_iterator_t replace, void *data)
2295 {
2296         struct ldlm_res_id res_id;
2297         struct obd_device *obd = class_exp2obd(exp);
2298         int rc = 0;
2299
2300         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2301         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2302         if (rc == LDLM_ITER_STOP)
2303                 return(1);
2304         if (rc == LDLM_ITER_CONTINUE)
2305                 return(0);
2306         return(rc);
2307 }
2308
2309 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2310                             obd_enqueue_update_f upcall, void *cookie,
2311                             __u64 *flags, int agl, int rc)
2312 {
2313         int intent = *flags & LDLM_FL_HAS_INTENT;
2314
2315         if (intent) {
2316                 /* The request was created before ldlm_cli_enqueue call. */
2317                 if (rc == ELDLM_LOCK_ABORTED) {
2318                         struct ldlm_reply *rep;
2319                         rep = req_capsule_server_get(&req->rq_pill,
2320                                                      &RMF_DLM_REP);
2321
2322                         LASSERT(rep != NULL);
2323                         rep->lock_policy_res1 =
2324                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2325                         if (rep->lock_policy_res1)
2326                                 rc = rep->lock_policy_res1;
2327                 }
2328         }
2329
2330         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2331             (rc == 0)) {
2332                 *flags |= LDLM_FL_LVB_READY;
2333                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2334                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2335         }
2336
2337         /* Call the update callback. */
2338         rc = (*upcall)(cookie, rc);
2339         RETURN(rc);
2340 }
2341
2342 static int osc_enqueue_interpret(const struct lu_env *env,
2343                                  struct ptlrpc_request *req,
2344                                  struct osc_enqueue_args *aa, int rc)
2345 {
2346         struct ldlm_lock *lock;
2347         struct lustre_handle handle;
2348         __u32 mode;
2349         struct ost_lvb *lvb;
2350         __u32 lvb_len;
2351         __u64 *flags = aa->oa_flags;
2352
2353         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2354          * might be freed anytime after lock upcall has been called. */
2355         lustre_handle_copy(&handle, aa->oa_lockh);
2356         mode = aa->oa_ei->ei_mode;
2357
2358         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2359          * be valid. */
2360         lock = ldlm_handle2lock(&handle);
2361
2362         /* Take an additional reference so that a blocking AST that
2363          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2364          * to arrive after an upcall has been executed by
2365          * osc_enqueue_fini(). */
2366         ldlm_lock_addref(&handle, mode);
2367
2368         /* Let CP AST to grant the lock first. */
2369         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2370
2371         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2372                 lvb = NULL;
2373                 lvb_len = 0;
2374         } else {
2375                 lvb = aa->oa_lvb;
2376                 lvb_len = sizeof(*aa->oa_lvb);
2377         }
2378
2379         /* Complete obtaining the lock procedure. */
2380         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2381                                    mode, flags, lvb, lvb_len, &handle, rc);
2382         /* Complete osc stuff. */
2383         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2384                               flags, aa->oa_agl, rc);
2385
2386         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2387
2388         /* Release the lock for async request. */
2389         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2390                 /*
2391                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2392                  * not already released by
2393                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2394                  */
2395                 ldlm_lock_decref(&handle, mode);
2396
2397         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2398                  aa->oa_lockh, req, aa);
2399         ldlm_lock_decref(&handle, mode);
2400         LDLM_LOCK_PUT(lock);
2401         return rc;
2402 }
2403
2404 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2405                         struct lov_oinfo *loi, int flags,
2406                         struct ost_lvb *lvb, __u32 mode, int rc)
2407 {
2408         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2409
2410         if (rc == ELDLM_OK) {
2411                 __u64 tmp;
2412
2413                 LASSERT(lock != NULL);
2414                 loi->loi_lvb = *lvb;
2415                 tmp = loi->loi_lvb.lvb_size;
2416                 /* Extend KMS up to the end of this lock and no further
2417                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2418                 if (tmp > lock->l_policy_data.l_extent.end)
2419                         tmp = lock->l_policy_data.l_extent.end + 1;
2420                 if (tmp >= loi->loi_kms) {
2421                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2422                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2423                         loi_kms_set(loi, tmp);
2424                 } else {
2425                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2426                                    LPU64"; leaving kms="LPU64", end="LPU64,
2427                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2428                                    lock->l_policy_data.l_extent.end);
2429                 }
2430                 ldlm_lock_allow_match(lock);
2431         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2432                 LASSERT(lock != NULL);
2433                 loi->loi_lvb = *lvb;
2434                 ldlm_lock_allow_match(lock);
2435                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2436                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2437                 rc = ELDLM_OK;
2438         }
2439
2440         if (lock != NULL) {
2441                 if (rc != ELDLM_OK)
2442                         ldlm_lock_fail_match(lock);
2443
2444                 LDLM_LOCK_PUT(lock);
2445         }
2446 }
2447 EXPORT_SYMBOL(osc_update_enqueue);
2448
2449 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2450
2451 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2452  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2453  * other synchronous requests, however keeping some locks and trying to obtain
2454  * others may take a considerable amount of time in a case of ost failure; and
2455  * when other sync requests do not get released lock from a client, the client
2456  * is excluded from the cluster -- such scenarious make the life difficult, so
2457  * release locks just after they are obtained. */
2458 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2459                      __u64 *flags, ldlm_policy_data_t *policy,
2460                      struct ost_lvb *lvb, int kms_valid,
2461                      obd_enqueue_update_f upcall, void *cookie,
2462                      struct ldlm_enqueue_info *einfo,
2463                      struct lustre_handle *lockh,
2464                      struct ptlrpc_request_set *rqset, int async, int agl)
2465 {
2466         struct obd_device *obd = exp->exp_obd;
2467         struct ptlrpc_request *req = NULL;
2468         int intent = *flags & LDLM_FL_HAS_INTENT;
2469         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2470         ldlm_mode_t mode;
2471         int rc;
2472
2473         /* Filesystem lock extents are extended to page boundaries so that
2474          * dealing with the page cache is a little smoother.  */
2475         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2476         policy->l_extent.end |= ~CFS_PAGE_MASK;
2477
2478         /*
2479          * kms is not valid when either object is completely fresh (so that no
2480          * locks are cached), or object was evicted. In the latter case cached
2481          * lock cannot be used, because it would prime inode state with
2482          * potentially stale LVB.
2483          */
2484         if (!kms_valid)
2485                 goto no_match;
2486
2487         /* Next, search for already existing extent locks that will cover us */
2488         /* If we're trying to read, we also search for an existing PW lock.  The
2489          * VFS and page cache already protect us locally, so lots of readers/
2490          * writers can share a single PW lock.
2491          *
2492          * There are problems with conversion deadlocks, so instead of
2493          * converting a read lock to a write lock, we'll just enqueue a new
2494          * one.
2495          *
2496          * At some point we should cancel the read lock instead of making them
2497          * send us a blocking callback, but there are problems with canceling
2498          * locks out from other users right now, too. */
2499         mode = einfo->ei_mode;
2500         if (einfo->ei_mode == LCK_PR)
2501                 mode |= LCK_PW;
2502         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2503                                einfo->ei_type, policy, mode, lockh, 0);
2504         if (mode) {
2505                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2506
2507                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2508                         /* For AGL, if enqueue RPC is sent but the lock is not
2509                          * granted, then skip to process this strpe.
2510                          * Return -ECANCELED to tell the caller. */
2511                         ldlm_lock_decref(lockh, mode);
2512                         LDLM_LOCK_PUT(matched);
2513                         RETURN(-ECANCELED);
2514                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2515                         *flags |= LDLM_FL_LVB_READY;
2516                         /* addref the lock only if not async requests and PW
2517                          * lock is matched whereas we asked for PR. */
2518                         if (!rqset && einfo->ei_mode != mode)
2519                                 ldlm_lock_addref(lockh, LCK_PR);
2520                         if (intent) {
2521                                 /* I would like to be able to ASSERT here that
2522                                  * rss <= kms, but I can't, for reasons which
2523                                  * are explained in lov_enqueue() */
2524                         }
2525
2526                         /* We already have a lock, and it's referenced.
2527                          *
2528                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2529                          * AGL upcall may change it to CLS_HELD directly. */
2530                         (*upcall)(cookie, ELDLM_OK);
2531
2532                         if (einfo->ei_mode != mode)
2533                                 ldlm_lock_decref(lockh, LCK_PW);
2534                         else if (rqset)
2535                                 /* For async requests, decref the lock. */
2536                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2537                         LDLM_LOCK_PUT(matched);
2538                         RETURN(ELDLM_OK);
2539                 } else {
2540                         ldlm_lock_decref(lockh, mode);
2541                         LDLM_LOCK_PUT(matched);
2542                 }
2543         }
2544
2545  no_match:
2546         if (intent) {
2547                 LIST_HEAD(cancels);
2548                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2549                                            &RQF_LDLM_ENQUEUE_LVB);
2550                 if (req == NULL)
2551                         RETURN(-ENOMEM);
2552
2553                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2554                 if (rc) {
2555                         ptlrpc_request_free(req);
2556                         RETURN(rc);
2557                 }
2558
2559                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2560                                      sizeof *lvb);
2561                 ptlrpc_request_set_replen(req);
2562         }
2563
2564         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2565         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2566
2567         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2568                               sizeof(*lvb), LVB_T_OST, lockh, async);
2569         if (rqset) {
2570                 if (!rc) {
2571                         struct osc_enqueue_args *aa;
2572                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2573                         aa = ptlrpc_req_async_args(req);
2574                         aa->oa_ei = einfo;
2575                         aa->oa_exp = exp;
2576                         aa->oa_flags  = flags;
2577                         aa->oa_upcall = upcall;
2578                         aa->oa_cookie = cookie;
2579                         aa->oa_lvb    = lvb;
2580                         aa->oa_lockh  = lockh;
2581                         aa->oa_agl    = !!agl;
2582
2583                         req->rq_interpret_reply =
2584                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2585                         if (rqset == PTLRPCD_SET)
2586                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2587                         else
2588                                 ptlrpc_set_add_req(rqset, req);
2589                 } else if (intent) {
2590                         ptlrpc_req_finished(req);
2591                 }
2592                 RETURN(rc);
2593         }
2594
2595         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2596         if (intent)
2597                 ptlrpc_req_finished(req);
2598
2599         RETURN(rc);
2600 }
2601
2602 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2603                        struct ldlm_enqueue_info *einfo,
2604                        struct ptlrpc_request_set *rqset)
2605 {
2606         struct ldlm_res_id res_id;
2607         int rc;
2608
2609         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2610         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2611                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2612                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2613                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2614                               rqset, rqset != NULL, 0);
2615         RETURN(rc);
2616 }
2617
2618 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2619                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2620                    int *flags, void *data, struct lustre_handle *lockh,
2621                    int unref)
2622 {
2623         struct obd_device *obd = exp->exp_obd;
2624         int lflags = *flags;
2625         ldlm_mode_t rc;
2626
2627         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2628                 RETURN(-EIO);
2629
2630         /* Filesystem lock extents are extended to page boundaries so that
2631          * dealing with the page cache is a little smoother */
2632         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2633         policy->l_extent.end |= ~CFS_PAGE_MASK;
2634
2635         /* Next, search for already existing extent locks that will cover us */
2636         /* If we're trying to read, we also search for an existing PW lock.  The
2637          * VFS and page cache already protect us locally, so lots of readers/
2638          * writers can share a single PW lock. */
2639         rc = mode;
2640         if (mode == LCK_PR)
2641                 rc |= LCK_PW;
2642         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2643                              res_id, type, policy, rc, lockh, unref);
2644         if (rc) {
2645                 if (data != NULL) {
2646                         if (!osc_set_data_with_check(lockh, data)) {
2647                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2648                                         ldlm_lock_decref(lockh, rc);
2649                                 RETURN(0);
2650                         }
2651                 }
2652                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2653                         ldlm_lock_addref(lockh, LCK_PR);
2654                         ldlm_lock_decref(lockh, LCK_PW);
2655                 }
2656                 RETURN(rc);
2657         }
2658         RETURN(rc);
2659 }
2660
2661 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2662 {
2663         if (unlikely(mode == LCK_GROUP))
2664                 ldlm_lock_decref_and_cancel(lockh, mode);
2665         else
2666                 ldlm_lock_decref(lockh, mode);
2667
2668         RETURN(0);
2669 }
2670
2671 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2672                       __u32 mode, struct lustre_handle *lockh)
2673 {
2674         RETURN(osc_cancel_base(lockh, mode));
2675 }
2676
2677 static int osc_cancel_unused(struct obd_export *exp,
2678                              struct lov_stripe_md *lsm,
2679                              ldlm_cancel_flags_t flags,
2680                              void *opaque)
2681 {
2682         struct obd_device *obd = class_exp2obd(exp);
2683         struct ldlm_res_id res_id, *resp = NULL;
2684
2685         if (lsm != NULL) {
2686                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2687                 resp = &res_id;
2688         }
2689
2690         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2691 }
2692
2693 static int osc_statfs_interpret(const struct lu_env *env,
2694                                 struct ptlrpc_request *req,
2695                                 struct osc_async_args *aa, int rc)
2696 {
2697         struct obd_statfs *msfs;
2698
2699         if (rc == -EBADR)
2700                 /* The request has in fact never been sent
2701                  * due to issues at a higher level (LOV).
2702                  * Exit immediately since the caller is
2703                  * aware of the problem and takes care
2704                  * of the clean up */
2705                  RETURN(rc);
2706
2707         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2708             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2709                 GOTO(out, rc = 0);
2710
2711         if (rc != 0)
2712                 GOTO(out, rc);
2713
2714         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2715         if (msfs == NULL) {
2716                 GOTO(out, rc = -EPROTO);
2717         }
2718
2719         *aa->aa_oi->oi_osfs = *msfs;
2720 out:
2721         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2722         RETURN(rc);
2723 }
2724
2725 static int osc_statfs_async(struct obd_export *exp,
2726                             struct obd_info *oinfo, __u64 max_age,
2727                             struct ptlrpc_request_set *rqset)
2728 {
2729         struct obd_device     *obd = class_exp2obd(exp);
2730         struct ptlrpc_request *req;
2731         struct osc_async_args *aa;
2732         int                 rc;
2733
2734         /* We could possibly pass max_age in the request (as an absolute
2735          * timestamp or a "seconds.usec ago") so the target can avoid doing
2736          * extra calls into the filesystem if that isn't necessary (e.g.
2737          * during mount that would help a bit).  Having relative timestamps
2738          * is not so great if request processing is slow, while absolute
2739          * timestamps are not ideal because they need time synchronization. */
2740         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2741         if (req == NULL)
2742                 RETURN(-ENOMEM);
2743
2744         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2745         if (rc) {
2746                 ptlrpc_request_free(req);
2747                 RETURN(rc);
2748         }
2749         ptlrpc_request_set_replen(req);
2750         req->rq_request_portal = OST_CREATE_PORTAL;
2751         ptlrpc_at_set_req_timeout(req);
2752
2753         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2754                 /* procfs requests not want stat in wait for avoid deadlock */
2755                 req->rq_no_resend = 1;
2756                 req->rq_no_delay = 1;
2757         }
2758
2759         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2760         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2761         aa = ptlrpc_req_async_args(req);
2762         aa->aa_oi = oinfo;
2763
2764         ptlrpc_set_add_req(rqset, req);
2765         RETURN(0);
2766 }
2767
2768 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2769                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2770 {
2771         struct obd_device     *obd = class_exp2obd(exp);
2772         struct obd_statfs     *msfs;
2773         struct ptlrpc_request *req;
2774         struct obd_import     *imp = NULL;
2775         int rc;
2776
2777         /*Since the request might also come from lprocfs, so we need
2778          *sync this with client_disconnect_export Bug15684*/
2779         down_read(&obd->u.cli.cl_sem);
2780         if (obd->u.cli.cl_import)
2781                 imp = class_import_get(obd->u.cli.cl_import);
2782         up_read(&obd->u.cli.cl_sem);
2783         if (!imp)
2784                 RETURN(-ENODEV);
2785
2786         /* We could possibly pass max_age in the request (as an absolute
2787          * timestamp or a "seconds.usec ago") so the target can avoid doing
2788          * extra calls into the filesystem if that isn't necessary (e.g.
2789          * during mount that would help a bit).  Having relative timestamps
2790          * is not so great if request processing is slow, while absolute
2791          * timestamps are not ideal because they need time synchronization. */
2792         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2793
2794         class_import_put(imp);
2795
2796         if (req == NULL)
2797                 RETURN(-ENOMEM);
2798
2799         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2800         if (rc) {
2801                 ptlrpc_request_free(req);
2802                 RETURN(rc);
2803         }
2804         ptlrpc_request_set_replen(req);
2805         req->rq_request_portal = OST_CREATE_PORTAL;
2806         ptlrpc_at_set_req_timeout(req);
2807
2808         if (flags & OBD_STATFS_NODELAY) {
2809                 /* procfs requests not want stat in wait for avoid deadlock */
2810                 req->rq_no_resend = 1;
2811                 req->rq_no_delay = 1;
2812         }
2813
2814         rc = ptlrpc_queue_wait(req);
2815         if (rc)
2816                 GOTO(out, rc);
2817
2818         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2819         if (msfs == NULL) {
2820                 GOTO(out, rc = -EPROTO);
2821         }
2822
2823         *osfs = *msfs;
2824
2825         EXIT;
2826  out:
2827         ptlrpc_req_finished(req);
2828         return rc;
2829 }
2830
2831 /* Retrieve object striping information.
2832  *
2833  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2834  * the maximum number of OST indices which will fit in the user buffer.
2835  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2836  */
2837 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2838 {
2839         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2840         struct lov_user_md_v3 lum, *lumk;
2841         struct lov_user_ost_data_v1 *lmm_objects;
2842         int rc = 0, lum_size;
2843
2844         if (!lsm)
2845                 RETURN(-ENODATA);
2846
2847         /* we only need the header part from user space to get lmm_magic and
2848          * lmm_stripe_count, (the header part is common to v1 and v3) */
2849         lum_size = sizeof(struct lov_user_md_v1);
2850         if (copy_from_user(&lum, lump, lum_size))
2851                 RETURN(-EFAULT);
2852
2853         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2854             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2855                 RETURN(-EINVAL);
2856
2857         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2858         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2859         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2860         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2861
2862         /* we can use lov_mds_md_size() to compute lum_size
2863          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2864         if (lum.lmm_stripe_count > 0) {
2865                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2866                 OBD_ALLOC(lumk, lum_size);
2867                 if (!lumk)
2868                         RETURN(-ENOMEM);
2869
2870                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2871                         lmm_objects =
2872                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2873                 else
2874                         lmm_objects = &(lumk->lmm_objects[0]);
2875                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2876         } else {
2877                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2878                 lumk = &lum;
2879         }
2880
2881         lumk->lmm_oi = lsm->lsm_oi;
2882         lumk->lmm_stripe_count = 1;
2883
2884         if (copy_to_user(lump, lumk, lum_size))
2885                 rc = -EFAULT;
2886
2887         if (lumk != &lum)
2888                 OBD_FREE(lumk, lum_size);
2889
2890         RETURN(rc);
2891 }
2892
2893
2894 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2895                          void *karg, void *uarg)
2896 {
2897         struct obd_device *obd = exp->exp_obd;
2898         struct obd_ioctl_data *data = karg;
2899         int err = 0;
2900
2901         if (!try_module_get(THIS_MODULE)) {
2902                 CERROR("Can't get module. Is it alive?");
2903                 return -EINVAL;
2904         }
2905         switch (cmd) {
2906         case OBD_IOC_LOV_GET_CONFIG: {
2907                 char *buf;
2908                 struct lov_desc *desc;
2909                 struct obd_uuid uuid;
2910
2911                 buf = NULL;
2912                 len = 0;
2913                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2914                         GOTO(out, err = -EINVAL);
2915
2916                 data = (struct obd_ioctl_data *)buf;
2917
2918                 if (sizeof(*desc) > data->ioc_inllen1) {
2919                         obd_ioctl_freedata(buf, len);
2920                         GOTO(out, err = -EINVAL);
2921                 }
2922
2923                 if (data->ioc_inllen2 < sizeof(uuid)) {
2924                         obd_ioctl_freedata(buf, len);
2925                         GOTO(out, err = -EINVAL);
2926                 }
2927
2928                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2929                 desc->ld_tgt_count = 1;
2930                 desc->ld_active_tgt_count = 1;
2931                 desc->ld_default_stripe_count = 1;
2932                 desc->ld_default_stripe_size = 0;
2933                 desc->ld_default_stripe_offset = 0;
2934                 desc->ld_pattern = 0;
2935                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2936
2937                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2938
2939                 err = copy_to_user((void *)uarg, buf, len);
2940                 if (err)
2941                         err = -EFAULT;
2942                 obd_ioctl_freedata(buf, len);
2943                 GOTO(out, err);
2944         }
2945         case LL_IOC_LOV_SETSTRIPE:
2946                 err = obd_alloc_memmd(exp, karg);
2947                 if (err > 0)
2948                         err = 0;
2949                 GOTO(out, err);
2950         case LL_IOC_LOV_GETSTRIPE:
2951                 err = osc_getstripe(karg, uarg);
2952                 GOTO(out, err);
2953         case OBD_IOC_CLIENT_RECOVER:
2954                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2955                                             data->ioc_inlbuf1, 0);
2956                 if (err > 0)
2957                         err = 0;
2958                 GOTO(out, err);
2959         case IOC_OSC_SET_ACTIVE:
2960                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2961                                                data->ioc_offset);
2962                 GOTO(out, err);
2963         case OBD_IOC_POLL_QUOTACHECK:
2964                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2965                 GOTO(out, err);
2966         case OBD_IOC_PING_TARGET:
2967                 err = ptlrpc_obd_ping(obd);
2968                 GOTO(out, err);
2969         default:
2970                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2971                        cmd, current_comm());
2972                 GOTO(out, err = -ENOTTY);
2973         }
2974 out:
2975         module_put(THIS_MODULE);
2976         return err;
2977 }
2978
2979 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2980                         obd_count keylen, void *key, __u32 *vallen, void *val,
2981                         struct lov_stripe_md *lsm)
2982 {
2983         if (!vallen || !val)
2984                 RETURN(-EFAULT);
2985
2986         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2987                 __u32 *stripe = val;
2988                 *vallen = sizeof(*stripe);
2989                 *stripe = 0;
2990                 RETURN(0);
2991         } else if (KEY_IS(KEY_LAST_ID)) {
2992                 struct ptlrpc_request *req;
2993                 obd_id          *reply;
2994                 char              *tmp;
2995                 int                 rc;
2996
2997                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2998                                            &RQF_OST_GET_INFO_LAST_ID);
2999                 if (req == NULL)
3000                         RETURN(-ENOMEM);
3001
3002                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3003                                      RCL_CLIENT, keylen);
3004                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3005                 if (rc) {
3006                         ptlrpc_request_free(req);
3007                         RETURN(rc);
3008                 }
3009
3010                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3011                 memcpy(tmp, key, keylen);
3012
3013                 req->rq_no_delay = req->rq_no_resend = 1;
3014                 ptlrpc_request_set_replen(req);
3015                 rc = ptlrpc_queue_wait(req);
3016                 if (rc)
3017                         GOTO(out, rc);
3018
3019                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3020                 if (reply == NULL)
3021                         GOTO(out, rc = -EPROTO);
3022
3023                 *((obd_id *)val) = *reply;
3024         out:
3025                 ptlrpc_req_finished(req);
3026                 RETURN(rc);
3027         } else if (KEY_IS(KEY_FIEMAP)) {
3028                 struct ll_fiemap_info_key *fm_key =
3029                                 (struct ll_fiemap_info_key *)key;
3030                 struct ldlm_res_id       res_id;
3031                 ldlm_policy_data_t       policy;
3032                 struct lustre_handle     lockh;
3033                 ldlm_mode_t              mode = 0;
3034                 struct ptlrpc_request   *req;
3035                 struct ll_user_fiemap   *reply;
3036                 char                    *tmp;
3037                 int                      rc;
3038
3039                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3040                         goto skip_locking;
3041
3042                 policy.l_extent.start = fm_key->fiemap.fm_start &
3043                                                 CFS_PAGE_MASK;
3044
3045                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3046                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3047                         policy.l_extent.end = OBD_OBJECT_EOF;
3048                 else
3049                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3050                                 fm_key->fiemap.fm_length +
3051                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3052
3053                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3054                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3055                                        LDLM_FL_BLOCK_GRANTED |
3056                                        LDLM_FL_LVB_READY,
3057                                        &res_id, LDLM_EXTENT, &policy,
3058                                        LCK_PR | LCK_PW, &lockh, 0);
3059                 if (mode) { /* lock is cached on client */
3060                         if (mode != LCK_PR) {
3061                                 ldlm_lock_addref(&lockh, LCK_PR);
3062                                 ldlm_lock_decref(&lockh, LCK_PW);
3063                         }
3064                 } else { /* no cached lock, needs acquire lock on server side */
3065                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3066                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3067                 }
3068
3069 skip_locking:
3070                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3071                                            &RQF_OST_GET_INFO_FIEMAP);
3072                 if (req == NULL)
3073                         GOTO(drop_lock, rc = -ENOMEM);
3074
3075                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3076                                      RCL_CLIENT, keylen);
3077                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3078                                      RCL_CLIENT, *vallen);
3079                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3080                                      RCL_SERVER, *vallen);
3081
3082                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3083                 if (rc) {
3084                         ptlrpc_request_free(req);
3085                         GOTO(drop_lock, rc);
3086                 }
3087
3088                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3089                 memcpy(tmp, key, keylen);
3090                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3091                 memcpy(tmp, val, *vallen);
3092
3093                 ptlrpc_request_set_replen(req);
3094                 rc = ptlrpc_queue_wait(req);
3095                 if (rc)
3096                         GOTO(fini_req, rc);
3097
3098                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3099                 if (reply == NULL)
3100                         GOTO(fini_req, rc = -EPROTO);
3101
3102                 memcpy(val, reply, *vallen);
3103 fini_req:
3104                 ptlrpc_req_finished(req);
3105 drop_lock:
3106                 if (mode)
3107                         ldlm_lock_decref(&lockh, LCK_PR);
3108                 RETURN(rc);
3109         }
3110
3111         RETURN(-EINVAL);
3112 }
3113
3114 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3115                               obd_count keylen, void *key, obd_count vallen,
3116                               void *val, struct ptlrpc_request_set *set)
3117 {
3118         struct ptlrpc_request *req;
3119         struct obd_device     *obd = exp->exp_obd;
3120         struct obd_import     *imp = class_exp2cliimp(exp);
3121         char              *tmp;
3122         int                 rc;
3123
3124         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3125
3126         if (KEY_IS(KEY_CHECKSUM)) {
3127                 if (vallen != sizeof(int))
3128                         RETURN(-EINVAL);
3129                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3130                 RETURN(0);
3131         }
3132
3133         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3134                 sptlrpc_conf_client_adapt(obd);
3135                 RETURN(0);
3136         }
3137
3138         if (KEY_IS(KEY_FLUSH_CTX)) {
3139                 sptlrpc_import_flush_my_ctx(imp);
3140                 RETURN(0);
3141         }
3142
3143         if (KEY_IS(KEY_CACHE_SET)) {
3144                 struct client_obd *cli = &obd->u.cli;
3145
3146                 LASSERT(cli->cl_cache == NULL); /* only once */
3147                 cli->cl_cache = (struct cl_client_cache *)val;
3148                 atomic_inc(&cli->cl_cache->ccc_users);
3149                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3150
3151                 /* add this osc into entity list */
3152                 LASSERT(list_empty(&cli->cl_lru_osc));
3153                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3154                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3155                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3156
3157                 RETURN(0);
3158         }
3159
3160         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3161                 struct client_obd *cli = &obd->u.cli;
3162                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3163                 int target = *(int *)val;
3164
3165                 nr = osc_lru_shrink(cli, min(nr, target));
3166                 *(int *)val -= nr;
3167                 RETURN(0);
3168         }
3169
3170         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3171                 RETURN(-EINVAL);
3172
3173         /* We pass all other commands directly to OST. Since nobody calls osc
3174            methods directly and everybody is supposed to go through LOV, we
3175            assume lov checked invalid values for us.
3176            The only recognised values so far are evict_by_nid and mds_conn.
3177            Even if something bad goes through, we'd get a -EINVAL from OST
3178            anyway. */
3179
3180         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3181                                                 &RQF_OST_SET_GRANT_INFO :
3182                                                 &RQF_OBD_SET_INFO);
3183         if (req == NULL)
3184                 RETURN(-ENOMEM);
3185
3186         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3187                              RCL_CLIENT, keylen);
3188         if (!KEY_IS(KEY_GRANT_SHRINK))
3189                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3190                                      RCL_CLIENT, vallen);
3191         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3192         if (rc) {
3193                 ptlrpc_request_free(req);
3194                 RETURN(rc);
3195         }
3196
3197         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3198         memcpy(tmp, key, keylen);
3199         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3200                                                         &RMF_OST_BODY :
3201                                                         &RMF_SETINFO_VAL);
3202         memcpy(tmp, val, vallen);
3203
3204         if (KEY_IS(KEY_GRANT_SHRINK)) {
3205                 struct osc_grant_args *aa;
3206                 struct obdo *oa;
3207
3208                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3209                 aa = ptlrpc_req_async_args(req);
3210                 OBDO_ALLOC(oa);
3211                 if (!oa) {
3212                         ptlrpc_req_finished(req);
3213                         RETURN(-ENOMEM);
3214                 }
3215                 *oa = ((struct ost_body *)val)->oa;
3216                 aa->aa_oa = oa;
3217                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3218         }
3219
3220         ptlrpc_request_set_replen(req);
3221         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3222                 LASSERT(set != NULL);
3223                 ptlrpc_set_add_req(set, req);
3224                 ptlrpc_check_set(NULL, set);
3225         } else
3226                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3227
3228         RETURN(0);
3229 }
3230
3231
3232 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3233                          struct obd_device *disk_obd, int *index)
3234 {
3235         /* this code is not supposed to be used with LOD/OSP
3236          * to be removed soon */
3237         LBUG();
3238         return 0;
3239 }
3240
3241 static int osc_llog_finish(struct obd_device *obd, int count)
3242 {
3243         struct llog_ctxt *ctxt;
3244
3245         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3246         if (ctxt) {
3247                 llog_cat_close(NULL, ctxt->loc_handle);
3248                 llog_cleanup(NULL, ctxt);
3249         }
3250
3251         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3252         if (ctxt)
3253                 llog_cleanup(NULL, ctxt);
3254         RETURN(0);
3255 }
3256
3257 static int osc_reconnect(const struct lu_env *env,
3258                          struct obd_export *exp, struct obd_device *obd,
3259                          struct obd_uuid *cluuid,
3260                          struct obd_connect_data *data,
3261                          void *localdata)
3262 {
3263         struct client_obd *cli = &obd->u.cli;
3264
3265         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3266                 long lost_grant;
3267
3268                 client_obd_list_lock(&cli->cl_loi_list_lock);
3269                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3270                                 2 * cli_brw_size(obd);
3271                 lost_grant = cli->cl_lost_grant;
3272                 cli->cl_lost_grant = 0;
3273                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3274
3275                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3276                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3277                        data->ocd_version, data->ocd_grant, lost_grant);
3278         }
3279
3280         RETURN(0);
3281 }
3282
3283 static int osc_disconnect(struct obd_export *exp)
3284 {
3285         struct obd_device *obd = class_exp2obd(exp);
3286         struct llog_ctxt  *ctxt;
3287         int rc;
3288
3289         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3290         if (ctxt) {
3291                 if (obd->u.cli.cl_conn_count == 1) {
3292                         /* Flush any remaining cancel messages out to the
3293                          * target */
3294                         llog_sync(ctxt, exp, 0);
3295                 }
3296                 llog_ctxt_put(ctxt);
3297         } else {
3298                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3299                        obd);
3300         }
3301
3302         rc = client_disconnect_export(exp);
3303         /**
3304          * Initially we put del_shrink_grant before disconnect_export, but it
3305          * causes the following problem if setup (connect) and cleanup
3306          * (disconnect) are tangled together.
3307          *      connect p1                   disconnect p2
3308          *   ptlrpc_connect_import
3309          *     ...............         class_manual_cleanup
3310          *                                   osc_disconnect
3311          *                                   del_shrink_grant
3312          *   ptlrpc_connect_interrupt
3313          *     init_grant_shrink
3314          *   add this client to shrink list
3315          *                                    cleanup_osc
3316          * Bang! pinger trigger the shrink.
3317          * So the osc should be disconnected from the shrink list, after we
3318          * are sure the import has been destroyed. BUG18662
3319          */
3320         if (obd->u.cli.cl_import == NULL)
3321                 osc_del_shrink_grant(&obd->u.cli);
3322         return rc;
3323 }
3324
3325 static int osc_import_event(struct obd_device *obd,
3326                             struct obd_import *imp,
3327                             enum obd_import_event event)
3328 {
3329         struct client_obd *cli;
3330         int rc = 0;
3331
3332         LASSERT(imp->imp_obd == obd);
3333
3334         switch (event) {
3335         case IMP_EVENT_DISCON: {
3336                 cli = &obd->u.cli;
3337                 client_obd_list_lock(&cli->cl_loi_list_lock);
3338                 cli->cl_avail_grant = 0;
3339                 cli->cl_lost_grant = 0;
3340                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3341                 break;
3342         }
3343         case IMP_EVENT_INACTIVE: {
3344                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3345                 break;
3346         }
3347         case IMP_EVENT_INVALIDATE: {
3348                 struct ldlm_namespace *ns = obd->obd_namespace;
3349                 struct lu_env    *env;
3350                 int                 refcheck;
3351
3352                 env = cl_env_get(&refcheck);
3353                 if (!IS_ERR(env)) {
3354                         /* Reset grants */
3355                         cli = &obd->u.cli;
3356                         /* all pages go to failing rpcs due to the invalid
3357                          * import */
3358                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3359
3360                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3361                         cl_env_put(env, &refcheck);
3362                 } else
3363                         rc = PTR_ERR(env);
3364                 break;
3365         }
3366         case IMP_EVENT_ACTIVE: {
3367                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3368                 break;
3369         }
3370         case IMP_EVENT_OCD: {
3371                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3372
3373                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3374                         osc_init_grant(&obd->u.cli, ocd);
3375
3376                 /* See bug 7198 */
3377                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3378                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3379
3380                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3381                 break;
3382         }
3383         case IMP_EVENT_DEACTIVATE: {
3384                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3385                 break;
3386         }
3387         case IMP_EVENT_ACTIVATE: {
3388                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3389                 break;
3390         }
3391         default:
3392                 CERROR("Unknown import event %d\n", event);
3393                 LBUG();
3394         }
3395         RETURN(rc);
3396 }
3397
3398 /**
3399  * Determine whether the lock can be canceled before replaying the lock
3400  * during recovery, see bug16774 for detailed information.
3401  *
3402  * \retval zero the lock can't be canceled
3403  * \retval other ok to cancel
3404  */
3405 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3406 {
3407         check_res_locked(lock->l_resource);
3408
3409         /*
3410          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3411          *
3412          * XXX as a future improvement, we can also cancel unused write lock
3413          * if it doesn't have dirty data and active mmaps.
3414          */
3415         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3416             (lock->l_granted_mode == LCK_PR ||
3417              lock->l_granted_mode == LCK_CR) &&
3418             (osc_dlm_lock_pageref(lock) == 0))
3419                 RETURN(1);
3420
3421         RETURN(0);
3422 }
3423
3424 static int brw_queue_work(const struct lu_env *env, void *data)
3425 {
3426         struct client_obd *cli = data;
3427
3428         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3429
3430         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3431         RETURN(0);
3432 }
3433
3434 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3435 {
3436         struct lprocfs_static_vars lvars = { 0 };
3437         struct client_obd         *cli = &obd->u.cli;
3438         void                   *handler;
3439         int                     rc;
3440
3441         rc = ptlrpcd_addref();
3442         if (rc)
3443                 RETURN(rc);
3444
3445         rc = client_obd_setup(obd, lcfg);
3446         if (rc)
3447                 GOTO(out_ptlrpcd, rc);
3448
3449         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3450         if (IS_ERR(handler))
3451                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3452         cli->cl_writeback_work = handler;
3453
3454         rc = osc_quota_setup(obd);
3455         if (rc)
3456                 GOTO(out_ptlrpcd_work, rc);
3457
3458         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3459         lprocfs_osc_init_vars(&lvars);
3460         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3461                 lproc_osc_attach_seqstat(obd);
3462                 sptlrpc_lprocfs_cliobd_attach(obd);
3463                 ptlrpc_lprocfs_register_obd(obd);
3464         }
3465
3466         /* We need to allocate a few requests more, because
3467          * brw_interpret tries to create new requests before freeing
3468          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3469          * reserved, but I'm afraid that might be too much wasted RAM
3470          * in fact, so 2 is just my guess and still should work. */
3471         cli->cl_import->imp_rq_pool =
3472                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3473                                     OST_MAXREQSIZE,
3474                                     ptlrpc_add_rqs_to_pool);
3475
3476         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3477         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3478         RETURN(rc);
3479
3480 out_ptlrpcd_work:
3481         ptlrpcd_destroy_work(handler);
3482 out_client_setup:
3483         client_obd_cleanup(obd);
3484 out_ptlrpcd:
3485         ptlrpcd_decref();
3486         RETURN(rc);
3487 }
3488
3489 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3490 {
3491         int rc = 0;
3492
3493         switch (stage) {
3494         case OBD_CLEANUP_EARLY: {
3495                 struct obd_import *imp;
3496                 imp = obd->u.cli.cl_import;
3497                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3498                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3499                 ptlrpc_deactivate_import(imp);
3500                 spin_lock(&imp->imp_lock);
3501                 imp->imp_pingable = 0;
3502                 spin_unlock(&imp->imp_lock);
3503                 break;
3504         }
3505         case OBD_CLEANUP_EXPORTS: {
3506                 struct client_obd *cli = &obd->u.cli;
3507                 /* LU-464
3508                  * for echo client, export may be on zombie list, wait for
3509                  * zombie thread to cull it, because cli.cl_import will be
3510                  * cleared in client_disconnect_export():
3511                  *   class_export_destroy() -> obd_cleanup() ->
3512                  *   echo_device_free() -> echo_client_cleanup() ->
3513                  *   obd_disconnect() -> osc_disconnect() ->
3514                  *   client_disconnect_export()
3515                  */
3516                 obd_zombie_barrier();
3517                 if (cli->cl_writeback_work) {
3518                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3519                         cli->cl_writeback_work = NULL;
3520                 }
3521                 obd_cleanup_client_import(obd);
3522                 ptlrpc_lprocfs_unregister_obd(obd);
3523                 lprocfs_obd_cleanup(obd);
3524                 rc = obd_llog_finish(obd, 0);
3525                 if (rc != 0)
3526                         CERROR("failed to cleanup llogging subsystems\n");
3527                 break;
3528                 }
3529         }
3530         RETURN(rc);
3531 }
3532
3533 int osc_cleanup(struct obd_device *obd)
3534 {
3535         struct client_obd *cli = &obd->u.cli;
3536         int rc;
3537
3538         /* lru cleanup */
3539         if (cli->cl_cache != NULL) {
3540                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3541                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3542                 list_del_init(&cli->cl_lru_osc);
3543                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3544                 cli->cl_lru_left = NULL;
3545                 atomic_dec(&cli->cl_cache->ccc_users);
3546                 cli->cl_cache = NULL;
3547         }
3548
3549         /* free memory of osc quota cache */
3550         osc_quota_cleanup(obd);
3551
3552         rc = client_obd_cleanup(obd);
3553
3554         ptlrpcd_decref();
3555         RETURN(rc);
3556 }
3557
3558 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3559 {
3560         struct lprocfs_static_vars lvars = { 0 };
3561         int rc = 0;
3562
3563         lprocfs_osc_init_vars(&lvars);
3564
3565         switch (lcfg->lcfg_command) {
3566         default:
3567                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3568                                               lcfg, obd);
3569                 if (rc > 0)
3570                         rc = 0;
3571                 break;
3572         }
3573
3574         return(rc);
3575 }
3576
3577 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3578 {
3579         return osc_process_config_base(obd, buf);
3580 }
3581
3582 struct obd_ops osc_obd_ops = {
3583         .o_owner                = THIS_MODULE,
3584         .o_setup                = osc_setup,
3585         .o_precleanup      = osc_precleanup,
3586         .o_cleanup            = osc_cleanup,
3587         .o_add_conn          = client_import_add_conn,
3588         .o_del_conn          = client_import_del_conn,
3589         .o_connect            = client_connect_import,
3590         .o_reconnect        = osc_reconnect,
3591         .o_disconnect      = osc_disconnect,
3592         .o_statfs              = osc_statfs,
3593         .o_statfs_async  = osc_statfs_async,
3594         .o_packmd              = osc_packmd,
3595         .o_unpackmd          = osc_unpackmd,
3596         .o_create              = osc_create,
3597         .o_destroy            = osc_destroy,
3598         .o_getattr            = osc_getattr,
3599         .o_getattr_async        = osc_getattr_async,
3600         .o_setattr            = osc_setattr,
3601         .o_setattr_async        = osc_setattr_async,
3602         .o_brw            = osc_brw,
3603         .o_punch                = osc_punch,
3604         .o_sync          = osc_sync,
3605         .o_enqueue            = osc_enqueue,
3606         .o_change_cbdata        = osc_change_cbdata,
3607         .o_find_cbdata    = osc_find_cbdata,
3608         .o_cancel              = osc_cancel,
3609         .o_cancel_unused        = osc_cancel_unused,
3610         .o_iocontrol        = osc_iocontrol,
3611         .o_get_info          = osc_get_info,
3612         .o_set_info_async       = osc_set_info_async,
3613         .o_import_event  = osc_import_event,
3614         .o_llog_init        = osc_llog_init,
3615         .o_llog_finish    = osc_llog_finish,
3616         .o_process_config       = osc_process_config,
3617         .o_quotactl          = osc_quotactl,
3618         .o_quotacheck      = osc_quotacheck,
3619 };
3620
3621 extern struct lu_kmem_descr osc_caches[];
3622 extern spinlock_t osc_ast_guard;
3623 extern struct lock_class_key osc_ast_guard_class;
3624
3625 int __init osc_init(void)
3626 {
3627         struct lprocfs_static_vars lvars = { 0 };
3628         int rc;
3629
3630         /* print an address of _any_ initialized kernel symbol from this
3631          * module, to allow debugging with gdb that doesn't support data
3632          * symbols from modules.*/
3633         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3634
3635         rc = lu_kmem_init(osc_caches);
3636         if (rc)
3637                 RETURN(rc);
3638
3639         lprocfs_osc_init_vars(&lvars);
3640
3641         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3642                                  LUSTRE_OSC_NAME, &osc_device_type);
3643         if (rc) {
3644                 lu_kmem_fini(osc_caches);
3645                 RETURN(rc);
3646         }
3647
3648         spin_lock_init(&osc_ast_guard);
3649         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3650
3651         RETURN(rc);
3652 }
3653
3654 static void /*__exit*/ osc_exit(void)
3655 {
3656         class_unregister_type(LUSTRE_OSC_NAME);
3657         lu_kmem_fini(osc_caches);
3658 }
3659
3660 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3661 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3662 MODULE_LICENSE("GPL");
3663
3664 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);