]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/osc/osc_request.c
staging/lustre/ptlrpc: Translate between host and network errnos
[karo-tx-linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <linux/libcfs/libcfs.h>
40
41
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
46 #include <obd_ost.h>
47 #include <obd_lov.h>
48
49 #ifdef  __CYGWIN__
50 # include <ctype.h>
51 #endif
52
53 #include <lustre_ha.h>
54 #include <lprocfs_status.h>
55 #include <lustre_log.h>
56 #include <lustre_debug.h>
57 #include <lustre_param.h>
58 #include <lustre_fid.h>
59 #include "osc_internal.h"
60 #include "osc_cl_internal.h"
61
62 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63 static int brw_interpret(const struct lu_env *env,
64                          struct ptlrpc_request *req, void *data, int rc);
65 int osc_cleanup(struct obd_device *obd);
66
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69                       struct lov_stripe_md *lsm)
70 {
71         int lmm_size;
72         ENTRY;
73
74         lmm_size = sizeof(**lmmp);
75         if (lmmp == NULL)
76                 RETURN(lmm_size);
77
78         if (*lmmp != NULL && lsm == NULL) {
79                 OBD_FREE(*lmmp, lmm_size);
80                 *lmmp = NULL;
81                 RETURN(0);
82         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
83                 RETURN(-EBADF);
84         }
85
86         if (*lmmp == NULL) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (*lmmp == NULL)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm)
93                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
94
95         RETURN(lmm_size);
96 }
97
98 /* Unpack OSC object metadata from disk storage (LE byte order). */
99 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
100                         struct lov_mds_md *lmm, int lmm_bytes)
101 {
102         int lsm_size;
103         struct obd_import *imp = class_exp2cliimp(exp);
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof(*lmm)) {
108                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
109                                exp->exp_obd->obd_name, lmm_bytes,
110                                (int)sizeof(*lmm));
111                         RETURN(-EINVAL);
112                 }
113                 /* XXX LOV_MAGIC etc check? */
114
115                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
116                         CERROR("%s: zero lmm_object_id: rc = %d\n",
117                                exp->exp_obd->obd_name, -EINVAL);
118                         RETURN(-EINVAL);
119                 }
120         }
121
122         lsm_size = lov_stripe_md_size(1);
123         if (lsmp == NULL)
124                 RETURN(lsm_size);
125
126         if (*lsmp != NULL && lmm == NULL) {
127                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
128                 OBD_FREE(*lsmp, lsm_size);
129                 *lsmp = NULL;
130                 RETURN(0);
131         }
132
133         if (*lsmp == NULL) {
134                 OBD_ALLOC(*lsmp, lsm_size);
135                 if (unlikely(*lsmp == NULL))
136                         RETURN(-ENOMEM);
137                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
139                         OBD_FREE(*lsmp, lsm_size);
140                         RETURN(-ENOMEM);
141                 }
142                 loi_init((*lsmp)->lsm_oinfo[0]);
143         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
144                 RETURN(-EBADF);
145         }
146
147         if (lmm != NULL)
148                 /* XXX zero *lsmp? */
149                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
150
151         if (imp != NULL &&
152             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
153                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
154         else
155                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156
157         RETURN(lsm_size);
158 }
159
160 static inline void osc_pack_capa(struct ptlrpc_request *req,
161                                  struct ost_body *body, void *capa)
162 {
163         struct obd_capa *oc = (struct obd_capa *)capa;
164         struct lustre_capa *c;
165
166         if (!capa)
167                 return;
168
169         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
170         LASSERT(c);
171         capa_cpy(c, oc);
172         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
173         DEBUG_CAPA(D_SEC, c, "pack");
174 }
175
176 static inline void osc_pack_req_body(struct ptlrpc_request *req,
177                                      struct obd_info *oinfo)
178 {
179         struct ost_body *body;
180
181         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
182         LASSERT(body);
183
184         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
185                              oinfo->oi_oa);
186         osc_pack_capa(req, body, oinfo->oi_capa);
187 }
188
189 static inline void osc_set_capa_size(struct ptlrpc_request *req,
190                                      const struct req_msg_field *field,
191                                      struct obd_capa *oc)
192 {
193         if (oc == NULL)
194                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
195         else
196                 /* it is already calculated as sizeof struct obd_capa */
197                 ;
198 }
199
200 static int osc_getattr_interpret(const struct lu_env *env,
201                                  struct ptlrpc_request *req,
202                                  struct osc_async_args *aa, int rc)
203 {
204         struct ost_body *body;
205         ENTRY;
206
207         if (rc != 0)
208                 GOTO(out, rc);
209
210         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
211         if (body) {
212                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
213                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
214                                      aa->aa_oi->oi_oa, &body->oa);
215
216                 /* This should really be sent by the OST */
217                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
218                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
219         } else {
220                 CDEBUG(D_INFO, "can't unpack ost_body\n");
221                 rc = -EPROTO;
222                 aa->aa_oi->oi_oa->o_valid = 0;
223         }
224 out:
225         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
226         RETURN(rc);
227 }
228
229 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
230                              struct ptlrpc_request_set *set)
231 {
232         struct ptlrpc_request *req;
233         struct osc_async_args *aa;
234         int                 rc;
235         ENTRY;
236
237         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
238         if (req == NULL)
239                 RETURN(-ENOMEM);
240
241         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
242         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
243         if (rc) {
244                 ptlrpc_request_free(req);
245                 RETURN(rc);
246         }
247
248         osc_pack_req_body(req, oinfo);
249
250         ptlrpc_request_set_replen(req);
251         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
252
253         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
254         aa = ptlrpc_req_async_args(req);
255         aa->aa_oi = oinfo;
256
257         ptlrpc_set_add_req(set, req);
258         RETURN(0);
259 }
260
261 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
262                        struct obd_info *oinfo)
263 {
264         struct ptlrpc_request *req;
265         struct ost_body       *body;
266         int                 rc;
267         ENTRY;
268
269         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
270         if (req == NULL)
271                 RETURN(-ENOMEM);
272
273         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
274         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
275         if (rc) {
276                 ptlrpc_request_free(req);
277                 RETURN(rc);
278         }
279
280         osc_pack_req_body(req, oinfo);
281
282         ptlrpc_request_set_replen(req);
283
284         rc = ptlrpc_queue_wait(req);
285         if (rc)
286                 GOTO(out, rc);
287
288         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
289         if (body == NULL)
290                 GOTO(out, rc = -EPROTO);
291
292         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
293         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
294                              &body->oa);
295
296         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
297         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
298
299         EXIT;
300  out:
301         ptlrpc_req_finished(req);
302         return rc;
303 }
304
305 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
306                        struct obd_info *oinfo, struct obd_trans_info *oti)
307 {
308         struct ptlrpc_request *req;
309         struct ost_body       *body;
310         int                 rc;
311         ENTRY;
312
313         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
314
315         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
316         if (req == NULL)
317                 RETURN(-ENOMEM);
318
319         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
321         if (rc) {
322                 ptlrpc_request_free(req);
323                 RETURN(rc);
324         }
325
326         osc_pack_req_body(req, oinfo);
327
328         ptlrpc_request_set_replen(req);
329
330         rc = ptlrpc_queue_wait(req);
331         if (rc)
332                 GOTO(out, rc);
333
334         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
335         if (body == NULL)
336                 GOTO(out, rc = -EPROTO);
337
338         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
339                              &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
362                              &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                   rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                 rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458
459         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
460
461         ptlrpc_request_set_replen(req);
462
463         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
464             oa->o_flags == OBD_FL_DELORPHAN) {
465                 DEBUG_REQ(D_HA, req,
466                           "delorphan from OST integration");
467                 /* Don't resend the delorphan req */
468                 req->rq_no_resend = req->rq_no_delay = 1;
469         }
470
471         rc = ptlrpc_queue_wait(req);
472         if (rc)
473                 GOTO(out_req, rc);
474
475         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
476         if (body == NULL)
477                 GOTO(out_req, rc = -EPROTO);
478
479         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
480         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
481
482         oa->o_blksize = cli_brw_size(exp->exp_obd);
483         oa->o_valid |= OBD_MD_FLBLKSZ;
484
485         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
486          * have valid lsm_oinfo data structs, so don't go touching that.
487          * This needs to be fixed in a big way.
488          */
489         lsm->lsm_oi = oa->o_oi;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body  *body;
519         int                   rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
538                              oinfo->oi_oa);
539         osc_pack_capa(req, body, oinfo->oi_capa);
540
541         ptlrpc_request_set_replen(req);
542
543         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
544         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
545         sa = ptlrpc_req_async_args(req);
546         sa->sa_oa     = oinfo->oi_oa;
547         sa->sa_upcall = upcall;
548         sa->sa_cookie = cookie;
549         if (rqset == PTLRPCD_SET)
550                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
551         else
552                 ptlrpc_set_add_req(rqset, req);
553
554         RETURN(0);
555 }
556
557 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
558                      struct obd_info *oinfo, struct obd_trans_info *oti,
559                      struct ptlrpc_request_set *rqset)
560 {
561         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
562         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
563         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
564         return osc_punch_base(exp, oinfo,
565                               oinfo->oi_cb_up, oinfo, rqset);
566 }
567
568 static int osc_sync_interpret(const struct lu_env *env,
569                               struct ptlrpc_request *req,
570                               void *arg, int rc)
571 {
572         struct osc_fsync_args *fa = arg;
573         struct ost_body *body;
574         ENTRY;
575
576         if (rc)
577                 GOTO(out, rc);
578
579         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580         if (body == NULL) {
581                 CERROR ("can't unpack ost_body\n");
582                 GOTO(out, rc = -EPROTO);
583         }
584
585         *fa->fa_oi->oi_oa = body->oa;
586 out:
587         rc = fa->fa_upcall(fa->fa_cookie, rc);
588         RETURN(rc);
589 }
590
591 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
592                   obd_enqueue_update_f upcall, void *cookie,
593                   struct ptlrpc_request_set *rqset)
594 {
595         struct ptlrpc_request *req;
596         struct ost_body       *body;
597         struct osc_fsync_args *fa;
598         int                 rc;
599         ENTRY;
600
601         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
602         if (req == NULL)
603                 RETURN(-ENOMEM);
604
605         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
606         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
607         if (rc) {
608                 ptlrpc_request_free(req);
609                 RETURN(rc);
610         }
611
612         /* overload the size and blocks fields in the oa with start/end */
613         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614         LASSERT(body);
615         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
616                              oinfo->oi_oa);
617         osc_pack_capa(req, body, oinfo->oi_capa);
618
619         ptlrpc_request_set_replen(req);
620         req->rq_interpret_reply = osc_sync_interpret;
621
622         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
623         fa = ptlrpc_req_async_args(req);
624         fa->fa_oi = oinfo;
625         fa->fa_upcall = upcall;
626         fa->fa_cookie = cookie;
627
628         if (rqset == PTLRPCD_SET)
629                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
630         else
631                 ptlrpc_set_add_req(rqset, req);
632
633         RETURN (0);
634 }
635
636 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
637                     struct obd_info *oinfo, obd_size start, obd_size end,
638                     struct ptlrpc_request_set *set)
639 {
640         ENTRY;
641
642         if (!oinfo->oi_oa) {
643                 CDEBUG(D_INFO, "oa NULL\n");
644                 RETURN(-EINVAL);
645         }
646
647         oinfo->oi_oa->o_size = start;
648         oinfo->oi_oa->o_blocks = end;
649         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
650
651         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
652 }
653
654 /* Find and cancel locally locks matched by @mode in the resource found by
655  * @objid. Found locks are added into @cancel list. Returns the amount of
656  * locks added to @cancels list. */
657 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
658                                    struct list_head *cancels,
659                                    ldlm_mode_t mode, int lock_flags)
660 {
661         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
662         struct ldlm_res_id res_id;
663         struct ldlm_resource *res;
664         int count;
665         ENTRY;
666
667         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
668          * export) but disabled through procfs (flag in NS).
669          *
670          * This distinguishes from a case when ELC is not supported originally,
671          * when we still want to cancel locks in advance and just cancel them
672          * locally, without sending any RPC. */
673         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
674                 RETURN(0);
675
676         ostid_build_res_name(&oa->o_oi, &res_id);
677         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
678         if (res == NULL)
679                 RETURN(0);
680
681         LDLM_RESOURCE_ADDREF(res);
682         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
683                                            lock_flags, 0, NULL);
684         LDLM_RESOURCE_DELREF(res);
685         ldlm_resource_putref(res);
686         RETURN(count);
687 }
688
689 static int osc_destroy_interpret(const struct lu_env *env,
690                                  struct ptlrpc_request *req, void *data,
691                                  int rc)
692 {
693         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
694
695         atomic_dec(&cli->cl_destroy_in_flight);
696         wake_up(&cli->cl_destroy_waitq);
697         return 0;
698 }
699
700 static int osc_can_send_destroy(struct client_obd *cli)
701 {
702         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
703             cli->cl_max_rpcs_in_flight) {
704                 /* The destroy request can be sent */
705                 return 1;
706         }
707         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
708             cli->cl_max_rpcs_in_flight) {
709                 /*
710                  * The counter has been modified between the two atomic
711                  * operations.
712                  */
713                 wake_up(&cli->cl_destroy_waitq);
714         }
715         return 0;
716 }
717
718 int osc_create(const struct lu_env *env, struct obd_export *exp,
719                struct obdo *oa, struct lov_stripe_md **ea,
720                struct obd_trans_info *oti)
721 {
722         int rc = 0;
723         ENTRY;
724
725         LASSERT(oa);
726         LASSERT(ea);
727         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
728
729         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
730             oa->o_flags == OBD_FL_RECREATE_OBJS) {
731                 RETURN(osc_real_create(exp, oa, ea, oti));
732         }
733
734         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
735                 RETURN(osc_real_create(exp, oa, ea, oti));
736
737         /* we should not get here anymore */
738         LBUG();
739
740         RETURN(rc);
741 }
742
743 /* Destroy requests can be async always on the client, and we don't even really
744  * care about the return code since the client cannot do anything at all about
745  * a destroy failure.
746  * When the MDS is unlinking a filename, it saves the file objects into a
747  * recovery llog, and these object records are cancelled when the OST reports
748  * they were destroyed and sync'd to disk (i.e. transaction committed).
749  * If the client dies, or the OST is down when the object should be destroyed,
750  * the records are not cancelled, and when the OST reconnects to the MDS next,
751  * it will retrieve the llog unlink logs and then sends the log cancellation
752  * cookies to the MDS after committing destroy transactions. */
753 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
754                        struct obdo *oa, struct lov_stripe_md *ea,
755                        struct obd_trans_info *oti, struct obd_export *md_export,
756                        void *capa)
757 {
758         struct client_obd     *cli = &exp->exp_obd->u.cli;
759         struct ptlrpc_request *req;
760         struct ost_body       *body;
761         LIST_HEAD(cancels);
762         int rc, count;
763         ENTRY;
764
765         if (!oa) {
766                 CDEBUG(D_INFO, "oa NULL\n");
767                 RETURN(-EINVAL);
768         }
769
770         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771                                         LDLM_FL_DISCARD_DATA);
772
773         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
774         if (req == NULL) {
775                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
776                 RETURN(-ENOMEM);
777         }
778
779         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
781                                0, &cancels, count);
782         if (rc) {
783                 ptlrpc_request_free(req);
784                 RETURN(rc);
785         }
786
787         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788         ptlrpc_at_set_req_timeout(req);
789
790         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791                 oa->o_lcookie = *oti->oti_logcookies;
792         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
793         LASSERT(body);
794         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
795
796         osc_pack_capa(req, body, (struct obd_capa *)capa);
797         ptlrpc_request_set_replen(req);
798
799         /* If osc_destory is for destroying the unlink orphan,
800          * sent from MDT to OST, which should not be blocked here,
801          * because the process might be triggered by ptlrpcd, and
802          * it is not good to block ptlrpcd thread (b=16006)*/
803         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804                 req->rq_interpret_reply = osc_destroy_interpret;
805                 if (!osc_can_send_destroy(cli)) {
806                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
807                                                           NULL);
808
809                         /*
810                          * Wait until the number of on-going destroy RPCs drops
811                          * under max_rpc_in_flight
812                          */
813                         l_wait_event_exclusive(cli->cl_destroy_waitq,
814                                                osc_can_send_destroy(cli), &lwi);
815                 }
816         }
817
818         /* Do not wait for response */
819         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
820         RETURN(0);
821 }
822
823 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
824                                 long writing_bytes)
825 {
826         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
827
828         LASSERT(!(oa->o_valid & bits));
829
830         oa->o_valid |= bits;
831         client_obd_list_lock(&cli->cl_loi_list_lock);
832         oa->o_dirty = cli->cl_dirty;
833         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834                      cli->cl_dirty_max)) {
835                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
836                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
837                 oa->o_undirty = 0;
838         } else if (unlikely(atomic_read(&obd_dirty_pages) -
839                             atomic_read(&obd_dirty_transit_pages) >
840                             (long)(obd_max_dirty_pages + 1))) {
841                 /* The atomic_read() allowing the atomic_inc() are
842                  * not covered by a lock thus they may safely race and trip
843                  * this CERROR() unless we add in a small fudge factor (+1). */
844                 CERROR("dirty %d - %d > system dirty_max %d\n",
845                        atomic_read(&obd_dirty_pages),
846                        atomic_read(&obd_dirty_transit_pages),
847                        obd_max_dirty_pages);
848                 oa->o_undirty = 0;
849         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850                 CERROR("dirty %lu - dirty_max %lu too big???\n",
851                        cli->cl_dirty, cli->cl_dirty_max);
852                 oa->o_undirty = 0;
853         } else {
854                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
855                                       PAGE_CACHE_SHIFT)*
856                                      (cli->cl_max_rpcs_in_flight + 1);
857                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
858         }
859         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860         oa->o_dropped = cli->cl_lost_grant;
861         cli->cl_lost_grant = 0;
862         client_obd_list_unlock(&cli->cl_loi_list_lock);
863         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
864                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
865
866 }
867
868 void osc_update_next_shrink(struct client_obd *cli)
869 {
870         cli->cl_next_shrink_grant =
871                 cfs_time_shift(cli->cl_grant_shrink_interval);
872         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873                cli->cl_next_shrink_grant);
874 }
875
876 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
877 {
878         client_obd_list_lock(&cli->cl_loi_list_lock);
879         cli->cl_avail_grant += grant;
880         client_obd_list_unlock(&cli->cl_loi_list_lock);
881 }
882
883 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
884 {
885         if (body->oa.o_valid & OBD_MD_FLGRANT) {
886                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
887                 __osc_update_grant(cli, body->oa.o_grant);
888         }
889 }
890
891 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
892                               obd_count keylen, void *key, obd_count vallen,
893                               void *val, struct ptlrpc_request_set *set);
894
895 static int osc_shrink_grant_interpret(const struct lu_env *env,
896                                       struct ptlrpc_request *req,
897                                       void *aa, int rc)
898 {
899         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
900         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
901         struct ost_body *body;
902
903         if (rc != 0) {
904                 __osc_update_grant(cli, oa->o_grant);
905                 GOTO(out, rc);
906         }
907
908         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
909         LASSERT(body);
910         osc_update_grant(cli, body);
911 out:
912         OBDO_FREE(oa);
913         return rc;
914 }
915
916 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
917 {
918         client_obd_list_lock(&cli->cl_loi_list_lock);
919         oa->o_grant = cli->cl_avail_grant / 4;
920         cli->cl_avail_grant -= oa->o_grant;
921         client_obd_list_unlock(&cli->cl_loi_list_lock);
922         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923                 oa->o_valid |= OBD_MD_FLFLAGS;
924                 oa->o_flags = 0;
925         }
926         oa->o_flags |= OBD_FL_SHRINK_GRANT;
927         osc_update_next_shrink(cli);
928 }
929
930 /* Shrink the current grant, either from some large amount to enough for a
931  * full set of in-flight RPCs, or if we have already shrunk to that limit
932  * then to enough for a single RPC.  This avoids keeping more grant than
933  * needed, and avoids shrinking the grant piecemeal. */
934 static int osc_shrink_grant(struct client_obd *cli)
935 {
936         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
938
939         client_obd_list_lock(&cli->cl_loi_list_lock);
940         if (cli->cl_avail_grant <= target_bytes)
941                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942         client_obd_list_unlock(&cli->cl_loi_list_lock);
943
944         return osc_shrink_grant_to_target(cli, target_bytes);
945 }
946
947 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
948 {
949         int                     rc = 0;
950         struct ost_body *body;
951         ENTRY;
952
953         client_obd_list_lock(&cli->cl_loi_list_lock);
954         /* Don't shrink if we are already above or below the desired limit
955          * We don't want to shrink below a single RPC, as that will negatively
956          * impact block allocation and long-term performance. */
957         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
958                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
959
960         if (target_bytes >= cli->cl_avail_grant) {
961                 client_obd_list_unlock(&cli->cl_loi_list_lock);
962                 RETURN(0);
963         }
964         client_obd_list_unlock(&cli->cl_loi_list_lock);
965
966         OBD_ALLOC_PTR(body);
967         if (!body)
968                 RETURN(-ENOMEM);
969
970         osc_announce_cached(cli, &body->oa, 0);
971
972         client_obd_list_lock(&cli->cl_loi_list_lock);
973         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
974         cli->cl_avail_grant = target_bytes;
975         client_obd_list_unlock(&cli->cl_loi_list_lock);
976         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
977                 body->oa.o_valid |= OBD_MD_FLFLAGS;
978                 body->oa.o_flags = 0;
979         }
980         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
981         osc_update_next_shrink(cli);
982
983         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
984                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
985                                 sizeof(*body), body, NULL);
986         if (rc != 0)
987                 __osc_update_grant(cli, body->oa.o_grant);
988         OBD_FREE_PTR(body);
989         RETURN(rc);
990 }
991
992 static int osc_should_shrink_grant(struct client_obd *client)
993 {
994         cfs_time_t time = cfs_time_current();
995         cfs_time_t next_shrink = client->cl_next_shrink_grant;
996
997         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
998              OBD_CONNECT_GRANT_SHRINK) == 0)
999                 return 0;
1000
1001         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1002                 /* Get the current RPC size directly, instead of going via:
1003                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1004                  * Keep comment here so that it can be found by searching. */
1005                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1006
1007                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1008                     client->cl_avail_grant > brw_size)
1009                         return 1;
1010                 else
1011                         osc_update_next_shrink(client);
1012         }
1013         return 0;
1014 }
1015
1016 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1017 {
1018         struct client_obd *client;
1019
1020         list_for_each_entry(client, &item->ti_obd_list,
1021                                 cl_grant_shrink_list) {
1022                 if (osc_should_shrink_grant(client))
1023                         osc_shrink_grant(client);
1024         }
1025         return 0;
1026 }
1027
1028 static int osc_add_shrink_grant(struct client_obd *client)
1029 {
1030         int rc;
1031
1032         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1033                                        TIMEOUT_GRANT,
1034                                        osc_grant_shrink_grant_cb, NULL,
1035                                        &client->cl_grant_shrink_list);
1036         if (rc) {
1037                 CERROR("add grant client %s error %d\n",
1038                         client->cl_import->imp_obd->obd_name, rc);
1039                 return rc;
1040         }
1041         CDEBUG(D_CACHE, "add grant client %s \n",
1042                client->cl_import->imp_obd->obd_name);
1043         osc_update_next_shrink(client);
1044         return 0;
1045 }
1046
1047 static int osc_del_shrink_grant(struct client_obd *client)
1048 {
1049         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1050                                          TIMEOUT_GRANT);
1051 }
1052
1053 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1054 {
1055         /*
1056          * ocd_grant is the total grant amount we're expect to hold: if we've
1057          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1058          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1059          *
1060          * race is tolerable here: if we're evicted, but imp_state already
1061          * left EVICTED state, then cl_dirty must be 0 already.
1062          */
1063         client_obd_list_lock(&cli->cl_loi_list_lock);
1064         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1065                 cli->cl_avail_grant = ocd->ocd_grant;
1066         else
1067                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1068
1069         if (cli->cl_avail_grant < 0) {
1070                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1071                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1072                       ocd->ocd_grant, cli->cl_dirty);
1073                 /* workaround for servers which do not have the patch from
1074                  * LU-2679 */
1075                 cli->cl_avail_grant = ocd->ocd_grant;
1076         }
1077
1078         /* determine the appropriate chunk size used by osc_extent. */
1079         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1080         client_obd_list_unlock(&cli->cl_loi_list_lock);
1081
1082         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1083                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1084                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1085
1086         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1087             list_empty(&cli->cl_grant_shrink_list))
1088                 osc_add_shrink_grant(cli);
1089 }
1090
1091 /* We assume that the reason this OSC got a short read is because it read
1092  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1093  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1094  * this stripe never got written at or beyond this stripe offset yet. */
1095 static void handle_short_read(int nob_read, obd_count page_count,
1096                               struct brw_page **pga)
1097 {
1098         char *ptr;
1099         int i = 0;
1100
1101         /* skip bytes read OK */
1102         while (nob_read > 0) {
1103                 LASSERT (page_count > 0);
1104
1105                 if (pga[i]->count > nob_read) {
1106                         /* EOF inside this page */
1107                         ptr = kmap(pga[i]->pg) +
1108                                 (pga[i]->off & ~CFS_PAGE_MASK);
1109                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1110                         kunmap(pga[i]->pg);
1111                         page_count--;
1112                         i++;
1113                         break;
1114                 }
1115
1116                 nob_read -= pga[i]->count;
1117                 page_count--;
1118                 i++;
1119         }
1120
1121         /* zero remaining pages */
1122         while (page_count-- > 0) {
1123                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1124                 memset(ptr, 0, pga[i]->count);
1125                 kunmap(pga[i]->pg);
1126                 i++;
1127         }
1128 }
1129
1130 static int check_write_rcs(struct ptlrpc_request *req,
1131                            int requested_nob, int niocount,
1132                            obd_count page_count, struct brw_page **pga)
1133 {
1134         int     i;
1135         __u32   *remote_rcs;
1136
1137         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1138                                                   sizeof(*remote_rcs) *
1139                                                   niocount);
1140         if (remote_rcs == NULL) {
1141                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1142                 return(-EPROTO);
1143         }
1144
1145         /* return error if any niobuf was in error */
1146         for (i = 0; i < niocount; i++) {
1147                 if ((int)remote_rcs[i] < 0)
1148                         return(remote_rcs[i]);
1149
1150                 if (remote_rcs[i] != 0) {
1151                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1152                                 i, remote_rcs[i], req);
1153                         return(-EPROTO);
1154                 }
1155         }
1156
1157         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1158                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1159                        req->rq_bulk->bd_nob_transferred, requested_nob);
1160                 return(-EPROTO);
1161         }
1162
1163         return (0);
1164 }
1165
1166 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1167 {
1168         if (p1->flag != p2->flag) {
1169                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1170                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1171
1172                 /* warn if we try to combine flags that we don't know to be
1173                  * safe to combine */
1174                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1175                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1176                               "report this at http://bugs.whamcloud.com/\n",
1177                               p1->flag, p2->flag);
1178                 }
1179                 return 0;
1180         }
1181
1182         return (p1->off + p1->count == p2->off);
1183 }
1184
1185 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1186                                    struct brw_page **pga, int opc,
1187                                    cksum_type_t cksum_type)
1188 {
1189         __u32                           cksum;
1190         int                             i = 0;
1191         struct cfs_crypto_hash_desc     *hdesc;
1192         unsigned int                    bufsize;
1193         int                             err;
1194         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1195
1196         LASSERT(pg_count > 0);
1197
1198         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1199         if (IS_ERR(hdesc)) {
1200                 CERROR("Unable to initialize checksum hash %s\n",
1201                        cfs_crypto_hash_name(cfs_alg));
1202                 return PTR_ERR(hdesc);
1203         }
1204
1205         while (nob > 0 && pg_count > 0) {
1206                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1207
1208                 /* corrupt the data before we compute the checksum, to
1209                  * simulate an OST->client data error */
1210                 if (i == 0 && opc == OST_READ &&
1211                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1212                         unsigned char *ptr = kmap(pga[i]->pg);
1213                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1214                         memcpy(ptr + off, "bad1", min(4, nob));
1215                         kunmap(pga[i]->pg);
1216                 }
1217                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1218                                   pga[i]->off & ~CFS_PAGE_MASK,
1219                                   count);
1220                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1221                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1222
1223                 nob -= pga[i]->count;
1224                 pg_count--;
1225                 i++;
1226         }
1227
1228         bufsize = 4;
1229         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1230
1231         if (err)
1232                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1233
1234         /* For sending we only compute the wrong checksum instead
1235          * of corrupting the data so it is still correct on a redo */
1236         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1237                 cksum++;
1238
1239         return cksum;
1240 }
1241
1242 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1243                                 struct lov_stripe_md *lsm, obd_count page_count,
1244                                 struct brw_page **pga,
1245                                 struct ptlrpc_request **reqp,
1246                                 struct obd_capa *ocapa, int reserve,
1247                                 int resend)
1248 {
1249         struct ptlrpc_request   *req;
1250         struct ptlrpc_bulk_desc *desc;
1251         struct ost_body  *body;
1252         struct obd_ioobj        *ioobj;
1253         struct niobuf_remote    *niobuf;
1254         int niocount, i, requested_nob, opc, rc;
1255         struct osc_brw_async_args *aa;
1256         struct req_capsule      *pill;
1257         struct brw_page *pg_prev;
1258
1259         ENTRY;
1260         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1261                 RETURN(-ENOMEM); /* Recoverable */
1262         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1263                 RETURN(-EINVAL); /* Fatal */
1264
1265         if ((cmd & OBD_BRW_WRITE) != 0) {
1266                 opc = OST_WRITE;
1267                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1268                                                 cli->cl_import->imp_rq_pool,
1269                                                 &RQF_OST_BRW_WRITE);
1270         } else {
1271                 opc = OST_READ;
1272                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1273         }
1274         if (req == NULL)
1275                 RETURN(-ENOMEM);
1276
1277         for (niocount = i = 1; i < page_count; i++) {
1278                 if (!can_merge_pages(pga[i - 1], pga[i]))
1279                         niocount++;
1280         }
1281
1282         pill = &req->rq_pill;
1283         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1284                              sizeof(*ioobj));
1285         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1286                              niocount * sizeof(*niobuf));
1287         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1288
1289         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1290         if (rc) {
1291                 ptlrpc_request_free(req);
1292                 RETURN(rc);
1293         }
1294         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1295         ptlrpc_at_set_req_timeout(req);
1296         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1297          * retry logic */
1298         req->rq_no_retry_einprogress = 1;
1299
1300         desc = ptlrpc_prep_bulk_imp(req, page_count,
1301                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1302                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1303                 OST_BULK_PORTAL);
1304
1305         if (desc == NULL)
1306                 GOTO(out, rc = -ENOMEM);
1307         /* NB request now owns desc and will free it when it gets freed */
1308
1309         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1310         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1311         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1313
1314         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1315
1316         obdo_to_ioobj(oa, ioobj);
1317         ioobj->ioo_bufcnt = niocount;
1318         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1319          * that might be send for this request.  The actual number is decided
1320          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1321          * "max - 1" for old client compatibility sending "0", and also so the
1322          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1323         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1324         osc_pack_capa(req, body, ocapa);
1325         LASSERT(page_count > 0);
1326         pg_prev = pga[0];
1327         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1328                 struct brw_page *pg = pga[i];
1329                 int poff = pg->off & ~CFS_PAGE_MASK;
1330
1331                 LASSERT(pg->count > 0);
1332                 /* make sure there is no gap in the middle of page array */
1333                 LASSERTF(page_count == 1 ||
1334                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1335                           ergo(i > 0 && i < page_count - 1,
1336                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1337                           ergo(i == page_count - 1, poff == 0)),
1338                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1339                          i, page_count, pg, pg->off, pg->count);
1340                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1341                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1342                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1343                          i, page_count,
1344                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1345                          pg_prev->pg, page_private(pg_prev->pg),
1346                          pg_prev->pg->index, pg_prev->off);
1347                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1348                         (pg->flag & OBD_BRW_SRVLOCK));
1349
1350                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1351                 requested_nob += pg->count;
1352
1353                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1354                         niobuf--;
1355                         niobuf->len += pg->count;
1356                 } else {
1357                         niobuf->offset = pg->off;
1358                         niobuf->len    = pg->count;
1359                         niobuf->flags  = pg->flag;
1360                 }
1361                 pg_prev = pg;
1362         }
1363
1364         LASSERTF((void *)(niobuf - niocount) ==
1365                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1366                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1367                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1368
1369         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1370         if (resend) {
1371                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1372                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1373                         body->oa.o_flags = 0;
1374                 }
1375                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1376         }
1377
1378         if (osc_should_shrink_grant(cli))
1379                 osc_shrink_grant_local(cli, &body->oa);
1380
1381         /* size[REQ_REC_OFF] still sizeof (*body) */
1382         if (opc == OST_WRITE) {
1383                 if (cli->cl_checksum &&
1384                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1385                         /* store cl_cksum_type in a local variable since
1386                          * it can be changed via lprocfs */
1387                         cksum_type_t cksum_type = cli->cl_cksum_type;
1388
1389                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1390                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1391                                 body->oa.o_flags = 0;
1392                         }
1393                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1394                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1395                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1396                                                              page_count, pga,
1397                                                              OST_WRITE,
1398                                                              cksum_type);
1399                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1400                                body->oa.o_cksum);
1401                         /* save this in 'oa', too, for later checking */
1402                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403                         oa->o_flags |= cksum_type_pack(cksum_type);
1404                 } else {
1405                         /* clear out the checksum flag, in case this is a
1406                          * resend but cl_checksum is no longer set. b=11238 */
1407                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1408                 }
1409                 oa->o_cksum = body->oa.o_cksum;
1410                 /* 1 RC per niobuf */
1411                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1412                                      sizeof(__u32) * niocount);
1413         } else {
1414                 if (cli->cl_checksum &&
1415                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1416                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1417                                 body->oa.o_flags = 0;
1418                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1419                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1420                 }
1421         }
1422         ptlrpc_request_set_replen(req);
1423
1424         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1425         aa = ptlrpc_req_async_args(req);
1426         aa->aa_oa = oa;
1427         aa->aa_requested_nob = requested_nob;
1428         aa->aa_nio_count = niocount;
1429         aa->aa_page_count = page_count;
1430         aa->aa_resends = 0;
1431         aa->aa_ppga = pga;
1432         aa->aa_cli = cli;
1433         INIT_LIST_HEAD(&aa->aa_oaps);
1434         if (ocapa && reserve)
1435                 aa->aa_ocapa = capa_get(ocapa);
1436
1437         *reqp = req;
1438         RETURN(0);
1439
1440  out:
1441         ptlrpc_req_finished(req);
1442         RETURN(rc);
1443 }
1444
1445 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1446                                 __u32 client_cksum, __u32 server_cksum, int nob,
1447                                 obd_count page_count, struct brw_page **pga,
1448                                 cksum_type_t client_cksum_type)
1449 {
1450         __u32 new_cksum;
1451         char *msg;
1452         cksum_type_t cksum_type;
1453
1454         if (server_cksum == client_cksum) {
1455                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1456                 return 0;
1457         }
1458
1459         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1460                                        oa->o_flags : 0);
1461         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1462                                       cksum_type);
1463
1464         if (cksum_type != client_cksum_type)
1465                 msg = "the server did not use the checksum type specified in "
1466                       "the original request - likely a protocol problem";
1467         else if (new_cksum == server_cksum)
1468                 msg = "changed on the client after we checksummed it - "
1469                       "likely false positive due to mmap IO (bug 11742)";
1470         else if (new_cksum == client_cksum)
1471                 msg = "changed in transit before arrival at OST";
1472         else
1473                 msg = "changed in transit AND doesn't match the original - "
1474                       "likely false positive due to mmap IO (bug 11742)";
1475
1476         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1477                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1478                            msg, libcfs_nid2str(peer->nid),
1479                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1480                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1481                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1482                            POSTID(&oa->o_oi), pga[0]->off,
1483                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1484         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1485                "client csum now %x\n", client_cksum, client_cksum_type,
1486                server_cksum, cksum_type, new_cksum);
1487         return 1;
1488 }
1489
1490 /* Note rc enters this function as number of bytes transferred */
1491 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1492 {
1493         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1494         const lnet_process_id_t *peer =
1495                         &req->rq_import->imp_connection->c_peer;
1496         struct client_obd *cli = aa->aa_cli;
1497         struct ost_body *body;
1498         __u32 client_cksum = 0;
1499         ENTRY;
1500
1501         if (rc < 0 && rc != -EDQUOT) {
1502                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1503                 RETURN(rc);
1504         }
1505
1506         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1507         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1508         if (body == NULL) {
1509                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1510                 RETURN(-EPROTO);
1511         }
1512
1513         /* set/clear over quota flag for a uid/gid */
1514         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1515             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1516                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1517
1518                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1519                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1520                        body->oa.o_flags);
1521                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1522         }
1523
1524         osc_update_grant(cli, body);
1525
1526         if (rc < 0)
1527                 RETURN(rc);
1528
1529         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1530                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1531
1532         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1533                 if (rc > 0) {
1534                         CERROR("Unexpected +ve rc %d\n", rc);
1535                         RETURN(-EPROTO);
1536                 }
1537                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1538
1539                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1540                         RETURN(-EAGAIN);
1541
1542                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1543                     check_write_checksum(&body->oa, peer, client_cksum,
1544                                          body->oa.o_cksum, aa->aa_requested_nob,
1545                                          aa->aa_page_count, aa->aa_ppga,
1546                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1547                         RETURN(-EAGAIN);
1548
1549                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1550                                      aa->aa_page_count, aa->aa_ppga);
1551                 GOTO(out, rc);
1552         }
1553
1554         /* The rest of this function executes only for OST_READs */
1555
1556         /* if unwrap_bulk failed, return -EAGAIN to retry */
1557         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1558         if (rc < 0)
1559                 GOTO(out, rc = -EAGAIN);
1560
1561         if (rc > aa->aa_requested_nob) {
1562                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1563                        aa->aa_requested_nob);
1564                 RETURN(-EPROTO);
1565         }
1566
1567         if (rc != req->rq_bulk->bd_nob_transferred) {
1568                 CERROR ("Unexpected rc %d (%d transferred)\n",
1569                         rc, req->rq_bulk->bd_nob_transferred);
1570                 return (-EPROTO);
1571         }
1572
1573         if (rc < aa->aa_requested_nob)
1574                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1575
1576         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1577                 static int cksum_counter;
1578                 __u32      server_cksum = body->oa.o_cksum;
1579                 char      *via;
1580                 char      *router;
1581                 cksum_type_t cksum_type;
1582
1583                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1584                                                body->oa.o_flags : 0);
1585                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1586                                                  aa->aa_ppga, OST_READ,
1587                                                  cksum_type);
1588
1589                 if (peer->nid == req->rq_bulk->bd_sender) {
1590                         via = router = "";
1591                 } else {
1592                         via = " via ";
1593                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1594                 }
1595
1596                 if (server_cksum == ~0 && rc > 0) {
1597                         CERROR("Protocol error: server %s set the 'checksum' "
1598                                "bit, but didn't send a checksum.  Not fatal, "
1599                                "but please notify on http://bugs.whamcloud.com/\n",
1600                                libcfs_nid2str(peer->nid));
1601                 } else if (server_cksum != client_cksum) {
1602                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1603                                            "%s%s%s inode "DFID" object "DOSTID
1604                                            " extent ["LPU64"-"LPU64"]\n",
1605                                            req->rq_import->imp_obd->obd_name,
1606                                            libcfs_nid2str(peer->nid),
1607                                            via, router,
1608                                            body->oa.o_valid & OBD_MD_FLFID ?
1609                                                 body->oa.o_parent_seq : (__u64)0,
1610                                            body->oa.o_valid & OBD_MD_FLFID ?
1611                                                 body->oa.o_parent_oid : 0,
1612                                            body->oa.o_valid & OBD_MD_FLFID ?
1613                                                 body->oa.o_parent_ver : 0,
1614                                            POSTID(&body->oa.o_oi),
1615                                            aa->aa_ppga[0]->off,
1616                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1617                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1618                                                                         1);
1619                         CERROR("client %x, server %x, cksum_type %x\n",
1620                                client_cksum, server_cksum, cksum_type);
1621                         cksum_counter = 0;
1622                         aa->aa_oa->o_cksum = client_cksum;
1623                         rc = -EAGAIN;
1624                 } else {
1625                         cksum_counter++;
1626                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1627                         rc = 0;
1628                 }
1629         } else if (unlikely(client_cksum)) {
1630                 static int cksum_missed;
1631
1632                 cksum_missed++;
1633                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1634                         CERROR("Checksum %u requested from %s but not sent\n",
1635                                cksum_missed, libcfs_nid2str(peer->nid));
1636         } else {
1637                 rc = 0;
1638         }
1639 out:
1640         if (rc >= 0)
1641                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1642                                      aa->aa_oa, &body->oa);
1643
1644         RETURN(rc);
1645 }
1646
1647 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1648                             struct lov_stripe_md *lsm,
1649                             obd_count page_count, struct brw_page **pga,
1650                             struct obd_capa *ocapa)
1651 {
1652         struct ptlrpc_request *req;
1653         int                 rc;
1654         wait_queue_head_t           waitq;
1655         int                 generation, resends = 0;
1656         struct l_wait_info     lwi;
1657
1658         ENTRY;
1659
1660         init_waitqueue_head(&waitq);
1661         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1662
1663 restart_bulk:
1664         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1665                                   page_count, pga, &req, ocapa, 0, resends);
1666         if (rc != 0)
1667                 return (rc);
1668
1669         if (resends) {
1670                 req->rq_generation_set = 1;
1671                 req->rq_import_generation = generation;
1672                 req->rq_sent = cfs_time_current_sec() + resends;
1673         }
1674
1675         rc = ptlrpc_queue_wait(req);
1676
1677         if (rc == -ETIMEDOUT && req->rq_resend) {
1678                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1679                 ptlrpc_req_finished(req);
1680                 goto restart_bulk;
1681         }
1682
1683         rc = osc_brw_fini_request(req, rc);
1684
1685         ptlrpc_req_finished(req);
1686         /* When server return -EINPROGRESS, client should always retry
1687          * regardless of the number of times the bulk was resent already.*/
1688         if (osc_recoverable_error(rc)) {
1689                 resends++;
1690                 if (rc != -EINPROGRESS &&
1691                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1692                         CERROR("%s: too many resend retries for object: "
1693                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1694                                POSTID(&oa->o_oi), rc);
1695                         goto out;
1696                 }
1697                 if (generation !=
1698                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1699                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1700                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1701                                POSTID(&oa->o_oi), rc);
1702                         goto out;
1703                 }
1704
1705                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1706                                        NULL);
1707                 l_wait_event(waitq, 0, &lwi);
1708
1709                 goto restart_bulk;
1710         }
1711 out:
1712         if (rc == -EAGAIN || rc == -EINPROGRESS)
1713                 rc = -EIO;
1714         RETURN (rc);
1715 }
1716
1717 static int osc_brw_redo_request(struct ptlrpc_request *request,
1718                                 struct osc_brw_async_args *aa, int rc)
1719 {
1720         struct ptlrpc_request *new_req;
1721         struct osc_brw_async_args *new_aa;
1722         struct osc_async_page *oap;
1723         ENTRY;
1724
1725         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1726                   "redo for recoverable error %d", rc);
1727
1728         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1729                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1730                                   aa->aa_cli, aa->aa_oa,
1731                                   NULL /* lsm unused by osc currently */,
1732                                   aa->aa_page_count, aa->aa_ppga,
1733                                   &new_req, aa->aa_ocapa, 0, 1);
1734         if (rc)
1735                 RETURN(rc);
1736
1737         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1738                 if (oap->oap_request != NULL) {
1739                         LASSERTF(request == oap->oap_request,
1740                                  "request %p != oap_request %p\n",
1741                                  request, oap->oap_request);
1742                         if (oap->oap_interrupted) {
1743                                 ptlrpc_req_finished(new_req);
1744                                 RETURN(-EINTR);
1745                         }
1746                 }
1747         }
1748         /* New request takes over pga and oaps from old request.
1749          * Note that copying a list_head doesn't work, need to move it... */
1750         aa->aa_resends++;
1751         new_req->rq_interpret_reply = request->rq_interpret_reply;
1752         new_req->rq_async_args = request->rq_async_args;
1753         /* cap resend delay to the current request timeout, this is similar to
1754          * what ptlrpc does (see after_reply()) */
1755         if (aa->aa_resends > new_req->rq_timeout)
1756                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1757         else
1758                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1759         new_req->rq_generation_set = 1;
1760         new_req->rq_import_generation = request->rq_import_generation;
1761
1762         new_aa = ptlrpc_req_async_args(new_req);
1763
1764         INIT_LIST_HEAD(&new_aa->aa_oaps);
1765         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1766         INIT_LIST_HEAD(&new_aa->aa_exts);
1767         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1768         new_aa->aa_resends = aa->aa_resends;
1769
1770         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1771                 if (oap->oap_request) {
1772                         ptlrpc_req_finished(oap->oap_request);
1773                         oap->oap_request = ptlrpc_request_addref(new_req);
1774                 }
1775         }
1776
1777         new_aa->aa_ocapa = aa->aa_ocapa;
1778         aa->aa_ocapa = NULL;
1779
1780         /* XXX: This code will run into problem if we're going to support
1781          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1782          * and wait for all of them to be finished. We should inherit request
1783          * set from old request. */
1784         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1785
1786         DEBUG_REQ(D_INFO, new_req, "new request");
1787         RETURN(0);
1788 }
1789
1790 /*
1791  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1792  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1793  * fine for our small page arrays and doesn't require allocation.  its an
1794  * insertion sort that swaps elements that are strides apart, shrinking the
1795  * stride down until its '1' and the array is sorted.
1796  */
1797 static void sort_brw_pages(struct brw_page **array, int num)
1798 {
1799         int stride, i, j;
1800         struct brw_page *tmp;
1801
1802         if (num == 1)
1803                 return;
1804         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1805                 ;
1806
1807         do {
1808                 stride /= 3;
1809                 for (i = stride ; i < num ; i++) {
1810                         tmp = array[i];
1811                         j = i;
1812                         while (j >= stride && array[j - stride]->off > tmp->off) {
1813                                 array[j] = array[j - stride];
1814                                 j -= stride;
1815                         }
1816                         array[j] = tmp;
1817                 }
1818         } while (stride > 1);
1819 }
1820
1821 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1822 {
1823         int count = 1;
1824         int offset;
1825         int i = 0;
1826
1827         LASSERT (pages > 0);
1828         offset = pg[i]->off & ~CFS_PAGE_MASK;
1829
1830         for (;;) {
1831                 pages--;
1832                 if (pages == 0)  /* that's all */
1833                         return count;
1834
1835                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1836                         return count;   /* doesn't end on page boundary */
1837
1838                 i++;
1839                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1840                 if (offset != 0)        /* doesn't start on page boundary */
1841                         return count;
1842
1843                 count++;
1844         }
1845 }
1846
1847 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1848 {
1849         struct brw_page **ppga;
1850         int i;
1851
1852         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1853         if (ppga == NULL)
1854                 return NULL;
1855
1856         for (i = 0; i < count; i++)
1857                 ppga[i] = pga + i;
1858         return ppga;
1859 }
1860
1861 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1862 {
1863         LASSERT(ppga != NULL);
1864         OBD_FREE(ppga, sizeof(*ppga) * count);
1865 }
1866
1867 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1868                    obd_count page_count, struct brw_page *pga,
1869                    struct obd_trans_info *oti)
1870 {
1871         struct obdo *saved_oa = NULL;
1872         struct brw_page **ppga, **orig;
1873         struct obd_import *imp = class_exp2cliimp(exp);
1874         struct client_obd *cli;
1875         int rc, page_count_orig;
1876         ENTRY;
1877
1878         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1879         cli = &imp->imp_obd->u.cli;
1880
1881         if (cmd & OBD_BRW_CHECK) {
1882                 /* The caller just wants to know if there's a chance that this
1883                  * I/O can succeed */
1884
1885                 if (imp->imp_invalid)
1886                         RETURN(-EIO);
1887                 RETURN(0);
1888         }
1889
1890         /* test_brw with a failed create can trip this, maybe others. */
1891         LASSERT(cli->cl_max_pages_per_rpc);
1892
1893         rc = 0;
1894
1895         orig = ppga = osc_build_ppga(pga, page_count);
1896         if (ppga == NULL)
1897                 RETURN(-ENOMEM);
1898         page_count_orig = page_count;
1899
1900         sort_brw_pages(ppga, page_count);
1901         while (page_count) {
1902                 obd_count pages_per_brw;
1903
1904                 if (page_count > cli->cl_max_pages_per_rpc)
1905                         pages_per_brw = cli->cl_max_pages_per_rpc;
1906                 else
1907                         pages_per_brw = page_count;
1908
1909                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1910
1911                 if (saved_oa != NULL) {
1912                         /* restore previously saved oa */
1913                         *oinfo->oi_oa = *saved_oa;
1914                 } else if (page_count > pages_per_brw) {
1915                         /* save a copy of oa (brw will clobber it) */
1916                         OBDO_ALLOC(saved_oa);
1917                         if (saved_oa == NULL)
1918                                 GOTO(out, rc = -ENOMEM);
1919                         *saved_oa = *oinfo->oi_oa;
1920                 }
1921
1922                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1923                                       pages_per_brw, ppga, oinfo->oi_capa);
1924
1925                 if (rc != 0)
1926                         break;
1927
1928                 page_count -= pages_per_brw;
1929                 ppga += pages_per_brw;
1930         }
1931
1932 out:
1933         osc_release_ppga(orig, page_count_orig);
1934
1935         if (saved_oa != NULL)
1936                 OBDO_FREE(saved_oa);
1937
1938         RETURN(rc);
1939 }
1940
1941 static int brw_interpret(const struct lu_env *env,
1942                          struct ptlrpc_request *req, void *data, int rc)
1943 {
1944         struct osc_brw_async_args *aa = data;
1945         struct osc_extent *ext;
1946         struct osc_extent *tmp;
1947         struct cl_object  *obj = NULL;
1948         struct client_obd *cli = aa->aa_cli;
1949         ENTRY;
1950
1951         rc = osc_brw_fini_request(req, rc);
1952         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1953         /* When server return -EINPROGRESS, client should always retry
1954          * regardless of the number of times the bulk was resent already. */
1955         if (osc_recoverable_error(rc)) {
1956                 if (req->rq_import_generation !=
1957                     req->rq_import->imp_generation) {
1958                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1959                                ""DOSTID", rc = %d.\n",
1960                                req->rq_import->imp_obd->obd_name,
1961                                POSTID(&aa->aa_oa->o_oi), rc);
1962                 } else if (rc == -EINPROGRESS ||
1963                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1964                         rc = osc_brw_redo_request(req, aa, rc);
1965                 } else {
1966                         CERROR("%s: too many resent retries for object: "
1967                                ""LPU64":"LPU64", rc = %d.\n",
1968                                req->rq_import->imp_obd->obd_name,
1969                                POSTID(&aa->aa_oa->o_oi), rc);
1970                 }
1971
1972                 if (rc == 0)
1973                         RETURN(0);
1974                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1975                         rc = -EIO;
1976         }
1977
1978         if (aa->aa_ocapa) {
1979                 capa_put(aa->aa_ocapa);
1980                 aa->aa_ocapa = NULL;
1981         }
1982
1983         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1984                 if (obj == NULL && rc == 0) {
1985                         obj = osc2cl(ext->oe_obj);
1986                         cl_object_get(obj);
1987                 }
1988
1989                 list_del_init(&ext->oe_link);
1990                 osc_extent_finish(env, ext, 1, rc);
1991         }
1992         LASSERT(list_empty(&aa->aa_exts));
1993         LASSERT(list_empty(&aa->aa_oaps));
1994
1995         if (obj != NULL) {
1996                 struct obdo *oa = aa->aa_oa;
1997                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1998                 unsigned long valid = 0;
1999
2000                 LASSERT(rc == 0);
2001                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2002                         attr->cat_blocks = oa->o_blocks;
2003                         valid |= CAT_BLOCKS;
2004                 }
2005                 if (oa->o_valid & OBD_MD_FLMTIME) {
2006                         attr->cat_mtime = oa->o_mtime;
2007                         valid |= CAT_MTIME;
2008                 }
2009                 if (oa->o_valid & OBD_MD_FLATIME) {
2010                         attr->cat_atime = oa->o_atime;
2011                         valid |= CAT_ATIME;
2012                 }
2013                 if (oa->o_valid & OBD_MD_FLCTIME) {
2014                         attr->cat_ctime = oa->o_ctime;
2015                         valid |= CAT_CTIME;
2016                 }
2017                 if (valid != 0) {
2018                         cl_object_attr_lock(obj);
2019                         cl_object_attr_set(env, obj, attr, valid);
2020                         cl_object_attr_unlock(obj);
2021                 }
2022                 cl_object_put(env, obj);
2023         }
2024         OBDO_FREE(aa->aa_oa);
2025
2026         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2027                           req->rq_bulk->bd_nob_transferred);
2028         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2029         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2030
2031         client_obd_list_lock(&cli->cl_loi_list_lock);
2032         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2033          * is called so we know whether to go to sync BRWs or wait for more
2034          * RPCs to complete */
2035         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2036                 cli->cl_w_in_flight--;
2037         else
2038                 cli->cl_r_in_flight--;
2039         osc_wake_cache_waiters(cli);
2040         client_obd_list_unlock(&cli->cl_loi_list_lock);
2041
2042         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2043         RETURN(rc);
2044 }
2045
2046 /**
2047  * Build an RPC by the list of extent @ext_list. The caller must ensure
2048  * that the total pages in this list are NOT over max pages per RPC.
2049  * Extents in the list must be in OES_RPC state.
2050  */
2051 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2052                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
2053 {
2054         struct ptlrpc_request           *req = NULL;
2055         struct osc_extent               *ext;
2056         struct brw_page                 **pga = NULL;
2057         struct osc_brw_async_args       *aa = NULL;
2058         struct obdo                     *oa = NULL;
2059         struct osc_async_page           *oap;
2060         struct osc_async_page           *tmp;
2061         struct cl_req                   *clerq = NULL;
2062         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2063                                                                       CRT_READ;
2064         struct ldlm_lock                *lock = NULL;
2065         struct cl_req_attr              *crattr = NULL;
2066         obd_off                         starting_offset = OBD_OBJECT_EOF;
2067         obd_off                         ending_offset = 0;
2068         int                             mpflag = 0;
2069         int                             mem_tight = 0;
2070         int                             page_count = 0;
2071         int                             i;
2072         int                             rc;
2073         LIST_HEAD(rpc_list);
2074
2075         ENTRY;
2076         LASSERT(!list_empty(ext_list));
2077
2078         /* add pages into rpc_list to build BRW rpc */
2079         list_for_each_entry(ext, ext_list, oe_link) {
2080                 LASSERT(ext->oe_state == OES_RPC);
2081                 mem_tight |= ext->oe_memalloc;
2082                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2083                         ++page_count;
2084                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2085                         if (starting_offset > oap->oap_obj_off)
2086                                 starting_offset = oap->oap_obj_off;
2087                         else
2088                                 LASSERT(oap->oap_page_off == 0);
2089                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2090                                 ending_offset = oap->oap_obj_off +
2091                                                 oap->oap_count;
2092                         else
2093                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2094                                         PAGE_CACHE_SIZE);
2095                 }
2096         }
2097
2098         if (mem_tight)
2099                 mpflag = cfs_memory_pressure_get_and_set();
2100
2101         OBD_ALLOC(crattr, sizeof(*crattr));
2102         if (crattr == NULL)
2103                 GOTO(out, rc = -ENOMEM);
2104
2105         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2106         if (pga == NULL)
2107                 GOTO(out, rc = -ENOMEM);
2108
2109         OBDO_ALLOC(oa);
2110         if (oa == NULL)
2111                 GOTO(out, rc = -ENOMEM);
2112
2113         i = 0;
2114         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2115                 struct cl_page *page = oap2cl_page(oap);
2116                 if (clerq == NULL) {
2117                         clerq = cl_req_alloc(env, page, crt,
2118                                              1 /* only 1-object rpcs for now */);
2119                         if (IS_ERR(clerq))
2120                                 GOTO(out, rc = PTR_ERR(clerq));
2121                         lock = oap->oap_ldlm_lock;
2122                 }
2123                 if (mem_tight)
2124                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2125                 pga[i] = &oap->oap_brw_page;
2126                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2127                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2128                        pga[i]->pg, page_index(oap->oap_page), oap,
2129                        pga[i]->flag);
2130                 i++;
2131                 cl_req_page_add(env, clerq, page);
2132         }
2133
2134         /* always get the data for the obdo for the rpc */
2135         LASSERT(clerq != NULL);
2136         crattr->cra_oa = oa;
2137         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2138         if (lock) {
2139                 oa->o_handle = lock->l_remote_handle;
2140                 oa->o_valid |= OBD_MD_FLHANDLE;
2141         }
2142
2143         rc = cl_req_prep(env, clerq);
2144         if (rc != 0) {
2145                 CERROR("cl_req_prep failed: %d\n", rc);
2146                 GOTO(out, rc);
2147         }
2148
2149         sort_brw_pages(pga, page_count);
2150         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2151                         pga, &req, crattr->cra_capa, 1, 0);
2152         if (rc != 0) {
2153                 CERROR("prep_req failed: %d\n", rc);
2154                 GOTO(out, rc);
2155         }
2156
2157         req->rq_interpret_reply = brw_interpret;
2158
2159         if (mem_tight != 0)
2160                 req->rq_memalloc = 1;
2161
2162         /* Need to update the timestamps after the request is built in case
2163          * we race with setattr (locally or in queue at OST).  If OST gets
2164          * later setattr before earlier BRW (as determined by the request xid),
2165          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2166          * way to do this in a single call.  bug 10150 */
2167         cl_req_attr_set(env, clerq, crattr,
2168                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2169
2170         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2171
2172         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2173         aa = ptlrpc_req_async_args(req);
2174         INIT_LIST_HEAD(&aa->aa_oaps);
2175         list_splice_init(&rpc_list, &aa->aa_oaps);
2176         INIT_LIST_HEAD(&aa->aa_exts);
2177         list_splice_init(ext_list, &aa->aa_exts);
2178         aa->aa_clerq = clerq;
2179
2180         /* queued sync pages can be torn down while the pages
2181          * were between the pending list and the rpc */
2182         tmp = NULL;
2183         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2184                 /* only one oap gets a request reference */
2185                 if (tmp == NULL)
2186                         tmp = oap;
2187                 if (oap->oap_interrupted && !req->rq_intr) {
2188                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2189                                         oap, req);
2190                         ptlrpc_mark_interrupted(req);
2191                 }
2192         }
2193         if (tmp != NULL)
2194                 tmp->oap_request = ptlrpc_request_addref(req);
2195
2196         client_obd_list_lock(&cli->cl_loi_list_lock);
2197         starting_offset >>= PAGE_CACHE_SHIFT;
2198         if (cmd == OBD_BRW_READ) {
2199                 cli->cl_r_in_flight++;
2200                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2201                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2202                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2203                                       starting_offset + 1);
2204         } else {
2205                 cli->cl_w_in_flight++;
2206                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2207                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2208                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2209                                       starting_offset + 1);
2210         }
2211         client_obd_list_unlock(&cli->cl_loi_list_lock);
2212
2213         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2214                   page_count, aa, cli->cl_r_in_flight,
2215                   cli->cl_w_in_flight);
2216
2217         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2218          * see which CPU/NUMA node the majority of pages were allocated
2219          * on, and try to assign the async RPC to the CPU core
2220          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2221          *
2222          * But on the other hand, we expect that multiple ptlrpcd
2223          * threads and the initial write sponsor can run in parallel,
2224          * especially when data checksum is enabled, which is CPU-bound
2225          * operation and single ptlrpcd thread cannot process in time.
2226          * So more ptlrpcd threads sharing BRW load
2227          * (with PDL_POLICY_ROUND) seems better.
2228          */
2229         ptlrpcd_add_req(req, pol, -1);
2230         rc = 0;
2231         EXIT;
2232
2233 out:
2234         if (mem_tight != 0)
2235                 cfs_memory_pressure_restore(mpflag);
2236
2237         if (crattr != NULL) {
2238                 capa_put(crattr->cra_capa);
2239                 OBD_FREE(crattr, sizeof(*crattr));
2240         }
2241
2242         if (rc != 0) {
2243                 LASSERT(req == NULL);
2244
2245                 if (oa)
2246                         OBDO_FREE(oa);
2247                 if (pga)
2248                         OBD_FREE(pga, sizeof(*pga) * page_count);
2249                 /* this should happen rarely and is pretty bad, it makes the
2250                  * pending list not follow the dirty order */
2251                 while (!list_empty(ext_list)) {
2252                         ext = list_entry(ext_list->next, struct osc_extent,
2253                                              oe_link);
2254                         list_del_init(&ext->oe_link);
2255                         osc_extent_finish(env, ext, 0, rc);
2256                 }
2257                 if (clerq && !IS_ERR(clerq))
2258                         cl_req_completion(env, clerq, rc);
2259         }
2260         RETURN(rc);
2261 }
2262
2263 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2264                                         struct ldlm_enqueue_info *einfo)
2265 {
2266         void *data = einfo->ei_cbdata;
2267         int set = 0;
2268
2269         LASSERT(lock != NULL);
2270         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2271         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2272         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2273         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2274
2275         lock_res_and_lock(lock);
2276         spin_lock(&osc_ast_guard);
2277
2278         if (lock->l_ast_data == NULL)
2279                 lock->l_ast_data = data;
2280         if (lock->l_ast_data == data)
2281                 set = 1;
2282
2283         spin_unlock(&osc_ast_guard);
2284         unlock_res_and_lock(lock);
2285
2286         return set;
2287 }
2288
2289 static int osc_set_data_with_check(struct lustre_handle *lockh,
2290                                    struct ldlm_enqueue_info *einfo)
2291 {
2292         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2293         int set = 0;
2294
2295         if (lock != NULL) {
2296                 set = osc_set_lock_data_with_check(lock, einfo);
2297                 LDLM_LOCK_PUT(lock);
2298         } else
2299                 CERROR("lockh %p, data %p - client evicted?\n",
2300                        lockh, einfo->ei_cbdata);
2301         return set;
2302 }
2303
2304 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2305                              ldlm_iterator_t replace, void *data)
2306 {
2307         struct ldlm_res_id res_id;
2308         struct obd_device *obd = class_exp2obd(exp);
2309
2310         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2311         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2312         return 0;
2313 }
2314
2315 /* find any ldlm lock of the inode in osc
2316  * return 0    not find
2317  *      1    find one
2318  *      < 0    error */
2319 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2320                            ldlm_iterator_t replace, void *data)
2321 {
2322         struct ldlm_res_id res_id;
2323         struct obd_device *obd = class_exp2obd(exp);
2324         int rc = 0;
2325
2326         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2327         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2328         if (rc == LDLM_ITER_STOP)
2329                 return(1);
2330         if (rc == LDLM_ITER_CONTINUE)
2331                 return(0);
2332         return(rc);
2333 }
2334
2335 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2336                             obd_enqueue_update_f upcall, void *cookie,
2337                             __u64 *flags, int agl, int rc)
2338 {
2339         int intent = *flags & LDLM_FL_HAS_INTENT;
2340         ENTRY;
2341
2342         if (intent) {
2343                 /* The request was created before ldlm_cli_enqueue call. */
2344                 if (rc == ELDLM_LOCK_ABORTED) {
2345                         struct ldlm_reply *rep;
2346                         rep = req_capsule_server_get(&req->rq_pill,
2347                                                      &RMF_DLM_REP);
2348
2349                         LASSERT(rep != NULL);
2350                         rep->lock_policy_res1 =
2351                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2352                         if (rep->lock_policy_res1)
2353                                 rc = rep->lock_policy_res1;
2354                 }
2355         }
2356
2357         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2358             (rc == 0)) {
2359                 *flags |= LDLM_FL_LVB_READY;
2360                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2361                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2362         }
2363
2364         /* Call the update callback. */
2365         rc = (*upcall)(cookie, rc);
2366         RETURN(rc);
2367 }
2368
2369 static int osc_enqueue_interpret(const struct lu_env *env,
2370                                  struct ptlrpc_request *req,
2371                                  struct osc_enqueue_args *aa, int rc)
2372 {
2373         struct ldlm_lock *lock;
2374         struct lustre_handle handle;
2375         __u32 mode;
2376         struct ost_lvb *lvb;
2377         __u32 lvb_len;
2378         __u64 *flags = aa->oa_flags;
2379
2380         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2381          * might be freed anytime after lock upcall has been called. */
2382         lustre_handle_copy(&handle, aa->oa_lockh);
2383         mode = aa->oa_ei->ei_mode;
2384
2385         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2386          * be valid. */
2387         lock = ldlm_handle2lock(&handle);
2388
2389         /* Take an additional reference so that a blocking AST that
2390          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2391          * to arrive after an upcall has been executed by
2392          * osc_enqueue_fini(). */
2393         ldlm_lock_addref(&handle, mode);
2394
2395         /* Let CP AST to grant the lock first. */
2396         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2397
2398         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2399                 lvb = NULL;
2400                 lvb_len = 0;
2401         } else {
2402                 lvb = aa->oa_lvb;
2403                 lvb_len = sizeof(*aa->oa_lvb);
2404         }
2405
2406         /* Complete obtaining the lock procedure. */
2407         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2408                                    mode, flags, lvb, lvb_len, &handle, rc);
2409         /* Complete osc stuff. */
2410         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2411                               flags, aa->oa_agl, rc);
2412
2413         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2414
2415         /* Release the lock for async request. */
2416         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2417                 /*
2418                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2419                  * not already released by
2420                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2421                  */
2422                 ldlm_lock_decref(&handle, mode);
2423
2424         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2425                  aa->oa_lockh, req, aa);
2426         ldlm_lock_decref(&handle, mode);
2427         LDLM_LOCK_PUT(lock);
2428         return rc;
2429 }
2430
2431 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2432                         struct lov_oinfo *loi, int flags,
2433                         struct ost_lvb *lvb, __u32 mode, int rc)
2434 {
2435         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2436
2437         if (rc == ELDLM_OK) {
2438                 __u64 tmp;
2439
2440                 LASSERT(lock != NULL);
2441                 loi->loi_lvb = *lvb;
2442                 tmp = loi->loi_lvb.lvb_size;
2443                 /* Extend KMS up to the end of this lock and no further
2444                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2445                 if (tmp > lock->l_policy_data.l_extent.end)
2446                         tmp = lock->l_policy_data.l_extent.end + 1;
2447                 if (tmp >= loi->loi_kms) {
2448                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2449                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2450                         loi_kms_set(loi, tmp);
2451                 } else {
2452                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2453                                    LPU64"; leaving kms="LPU64", end="LPU64,
2454                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2455                                    lock->l_policy_data.l_extent.end);
2456                 }
2457                 ldlm_lock_allow_match(lock);
2458         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2459                 LASSERT(lock != NULL);
2460                 loi->loi_lvb = *lvb;
2461                 ldlm_lock_allow_match(lock);
2462                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2463                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2464                 rc = ELDLM_OK;
2465         }
2466
2467         if (lock != NULL) {
2468                 if (rc != ELDLM_OK)
2469                         ldlm_lock_fail_match(lock);
2470
2471                 LDLM_LOCK_PUT(lock);
2472         }
2473 }
2474 EXPORT_SYMBOL(osc_update_enqueue);
2475
2476 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2477
2478 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2479  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2480  * other synchronous requests, however keeping some locks and trying to obtain
2481  * others may take a considerable amount of time in a case of ost failure; and
2482  * when other sync requests do not get released lock from a client, the client
2483  * is excluded from the cluster -- such scenarious make the life difficult, so
2484  * release locks just after they are obtained. */
2485 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2486                      __u64 *flags, ldlm_policy_data_t *policy,
2487                      struct ost_lvb *lvb, int kms_valid,
2488                      obd_enqueue_update_f upcall, void *cookie,
2489                      struct ldlm_enqueue_info *einfo,
2490                      struct lustre_handle *lockh,
2491                      struct ptlrpc_request_set *rqset, int async, int agl)
2492 {
2493         struct obd_device *obd = exp->exp_obd;
2494         struct ptlrpc_request *req = NULL;
2495         int intent = *flags & LDLM_FL_HAS_INTENT;
2496         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2497         ldlm_mode_t mode;
2498         int rc;
2499         ENTRY;
2500
2501         /* Filesystem lock extents are extended to page boundaries so that
2502          * dealing with the page cache is a little smoother.  */
2503         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2504         policy->l_extent.end |= ~CFS_PAGE_MASK;
2505
2506         /*
2507          * kms is not valid when either object is completely fresh (so that no
2508          * locks are cached), or object was evicted. In the latter case cached
2509          * lock cannot be used, because it would prime inode state with
2510          * potentially stale LVB.
2511          */
2512         if (!kms_valid)
2513                 goto no_match;
2514
2515         /* Next, search for already existing extent locks that will cover us */
2516         /* If we're trying to read, we also search for an existing PW lock.  The
2517          * VFS and page cache already protect us locally, so lots of readers/
2518          * writers can share a single PW lock.
2519          *
2520          * There are problems with conversion deadlocks, so instead of
2521          * converting a read lock to a write lock, we'll just enqueue a new
2522          * one.
2523          *
2524          * At some point we should cancel the read lock instead of making them
2525          * send us a blocking callback, but there are problems with canceling
2526          * locks out from other users right now, too. */
2527         mode = einfo->ei_mode;
2528         if (einfo->ei_mode == LCK_PR)
2529                 mode |= LCK_PW;
2530         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2531                                einfo->ei_type, policy, mode, lockh, 0);
2532         if (mode) {
2533                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2534
2535                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2536                         /* For AGL, if enqueue RPC is sent but the lock is not
2537                          * granted, then skip to process this strpe.
2538                          * Return -ECANCELED to tell the caller. */
2539                         ldlm_lock_decref(lockh, mode);
2540                         LDLM_LOCK_PUT(matched);
2541                         RETURN(-ECANCELED);
2542                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2543                         *flags |= LDLM_FL_LVB_READY;
2544                         /* addref the lock only if not async requests and PW
2545                          * lock is matched whereas we asked for PR. */
2546                         if (!rqset && einfo->ei_mode != mode)
2547                                 ldlm_lock_addref(lockh, LCK_PR);
2548                         if (intent) {
2549                                 /* I would like to be able to ASSERT here that
2550                                  * rss <= kms, but I can't, for reasons which
2551                                  * are explained in lov_enqueue() */
2552                         }
2553
2554                         /* We already have a lock, and it's referenced.
2555                          *
2556                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2557                          * AGL upcall may change it to CLS_HELD directly. */
2558                         (*upcall)(cookie, ELDLM_OK);
2559
2560                         if (einfo->ei_mode != mode)
2561                                 ldlm_lock_decref(lockh, LCK_PW);
2562                         else if (rqset)
2563                                 /* For async requests, decref the lock. */
2564                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2565                         LDLM_LOCK_PUT(matched);
2566                         RETURN(ELDLM_OK);
2567                 } else {
2568                         ldlm_lock_decref(lockh, mode);
2569                         LDLM_LOCK_PUT(matched);
2570                 }
2571         }
2572
2573  no_match:
2574         if (intent) {
2575                 LIST_HEAD(cancels);
2576                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2577                                            &RQF_LDLM_ENQUEUE_LVB);
2578                 if (req == NULL)
2579                         RETURN(-ENOMEM);
2580
2581                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2582                 if (rc) {
2583                         ptlrpc_request_free(req);
2584                         RETURN(rc);
2585                 }
2586
2587                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2588                                      sizeof *lvb);
2589                 ptlrpc_request_set_replen(req);
2590         }
2591
2592         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2593         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2594
2595         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2596                               sizeof(*lvb), LVB_T_OST, lockh, async);
2597         if (rqset) {
2598                 if (!rc) {
2599                         struct osc_enqueue_args *aa;
2600                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2601                         aa = ptlrpc_req_async_args(req);
2602                         aa->oa_ei = einfo;
2603                         aa->oa_exp = exp;
2604                         aa->oa_flags  = flags;
2605                         aa->oa_upcall = upcall;
2606                         aa->oa_cookie = cookie;
2607                         aa->oa_lvb    = lvb;
2608                         aa->oa_lockh  = lockh;
2609                         aa->oa_agl    = !!agl;
2610
2611                         req->rq_interpret_reply =
2612                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2613                         if (rqset == PTLRPCD_SET)
2614                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2615                         else
2616                                 ptlrpc_set_add_req(rqset, req);
2617                 } else if (intent) {
2618                         ptlrpc_req_finished(req);
2619                 }
2620                 RETURN(rc);
2621         }
2622
2623         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2624         if (intent)
2625                 ptlrpc_req_finished(req);
2626
2627         RETURN(rc);
2628 }
2629
2630 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2631                        struct ldlm_enqueue_info *einfo,
2632                        struct ptlrpc_request_set *rqset)
2633 {
2634         struct ldlm_res_id res_id;
2635         int rc;
2636         ENTRY;
2637
2638         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2639         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2640                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2641                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2642                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2643                               rqset, rqset != NULL, 0);
2644         RETURN(rc);
2645 }
2646
2647 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2648                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2649                    int *flags, void *data, struct lustre_handle *lockh,
2650                    int unref)
2651 {
2652         struct obd_device *obd = exp->exp_obd;
2653         int lflags = *flags;
2654         ldlm_mode_t rc;
2655         ENTRY;
2656
2657         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2658                 RETURN(-EIO);
2659
2660         /* Filesystem lock extents are extended to page boundaries so that
2661          * dealing with the page cache is a little smoother */
2662         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2663         policy->l_extent.end |= ~CFS_PAGE_MASK;
2664
2665         /* Next, search for already existing extent locks that will cover us */
2666         /* If we're trying to read, we also search for an existing PW lock.  The
2667          * VFS and page cache already protect us locally, so lots of readers/
2668          * writers can share a single PW lock. */
2669         rc = mode;
2670         if (mode == LCK_PR)
2671                 rc |= LCK_PW;
2672         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2673                              res_id, type, policy, rc, lockh, unref);
2674         if (rc) {
2675                 if (data != NULL) {
2676                         if (!osc_set_data_with_check(lockh, data)) {
2677                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2678                                         ldlm_lock_decref(lockh, rc);
2679                                 RETURN(0);
2680                         }
2681                 }
2682                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2683                         ldlm_lock_addref(lockh, LCK_PR);
2684                         ldlm_lock_decref(lockh, LCK_PW);
2685                 }
2686                 RETURN(rc);
2687         }
2688         RETURN(rc);
2689 }
2690
2691 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2692 {
2693         ENTRY;
2694
2695         if (unlikely(mode == LCK_GROUP))
2696                 ldlm_lock_decref_and_cancel(lockh, mode);
2697         else
2698                 ldlm_lock_decref(lockh, mode);
2699
2700         RETURN(0);
2701 }
2702
2703 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2704                       __u32 mode, struct lustre_handle *lockh)
2705 {
2706         ENTRY;
2707         RETURN(osc_cancel_base(lockh, mode));
2708 }
2709
2710 static int osc_cancel_unused(struct obd_export *exp,
2711                              struct lov_stripe_md *lsm,
2712                              ldlm_cancel_flags_t flags,
2713                              void *opaque)
2714 {
2715         struct obd_device *obd = class_exp2obd(exp);
2716         struct ldlm_res_id res_id, *resp = NULL;
2717
2718         if (lsm != NULL) {
2719                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2720                 resp = &res_id;
2721         }
2722
2723         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2724 }
2725
2726 static int osc_statfs_interpret(const struct lu_env *env,
2727                                 struct ptlrpc_request *req,
2728                                 struct osc_async_args *aa, int rc)
2729 {
2730         struct obd_statfs *msfs;
2731         ENTRY;
2732
2733         if (rc == -EBADR)
2734                 /* The request has in fact never been sent
2735                  * due to issues at a higher level (LOV).
2736                  * Exit immediately since the caller is
2737                  * aware of the problem and takes care
2738                  * of the clean up */
2739                  RETURN(rc);
2740
2741         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2742             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2743                 GOTO(out, rc = 0);
2744
2745         if (rc != 0)
2746                 GOTO(out, rc);
2747
2748         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2749         if (msfs == NULL) {
2750                 GOTO(out, rc = -EPROTO);
2751         }
2752
2753         *aa->aa_oi->oi_osfs = *msfs;
2754 out:
2755         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2756         RETURN(rc);
2757 }
2758
2759 static int osc_statfs_async(struct obd_export *exp,
2760                             struct obd_info *oinfo, __u64 max_age,
2761                             struct ptlrpc_request_set *rqset)
2762 {
2763         struct obd_device     *obd = class_exp2obd(exp);
2764         struct ptlrpc_request *req;
2765         struct osc_async_args *aa;
2766         int                 rc;
2767         ENTRY;
2768
2769         /* We could possibly pass max_age in the request (as an absolute
2770          * timestamp or a "seconds.usec ago") so the target can avoid doing
2771          * extra calls into the filesystem if that isn't necessary (e.g.
2772          * during mount that would help a bit).  Having relative timestamps
2773          * is not so great if request processing is slow, while absolute
2774          * timestamps are not ideal because they need time synchronization. */
2775         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2776         if (req == NULL)
2777                 RETURN(-ENOMEM);
2778
2779         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2780         if (rc) {
2781                 ptlrpc_request_free(req);
2782                 RETURN(rc);
2783         }
2784         ptlrpc_request_set_replen(req);
2785         req->rq_request_portal = OST_CREATE_PORTAL;
2786         ptlrpc_at_set_req_timeout(req);
2787
2788         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2789                 /* procfs requests not want stat in wait for avoid deadlock */
2790                 req->rq_no_resend = 1;
2791                 req->rq_no_delay = 1;
2792         }
2793
2794         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2795         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2796         aa = ptlrpc_req_async_args(req);
2797         aa->aa_oi = oinfo;
2798
2799         ptlrpc_set_add_req(rqset, req);
2800         RETURN(0);
2801 }
2802
2803 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2804                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2805 {
2806         struct obd_device     *obd = class_exp2obd(exp);
2807         struct obd_statfs     *msfs;
2808         struct ptlrpc_request *req;
2809         struct obd_import     *imp = NULL;
2810         int rc;
2811         ENTRY;
2812
2813         /*Since the request might also come from lprocfs, so we need
2814          *sync this with client_disconnect_export Bug15684*/
2815         down_read(&obd->u.cli.cl_sem);
2816         if (obd->u.cli.cl_import)
2817                 imp = class_import_get(obd->u.cli.cl_import);
2818         up_read(&obd->u.cli.cl_sem);
2819         if (!imp)
2820                 RETURN(-ENODEV);
2821
2822         /* We could possibly pass max_age in the request (as an absolute
2823          * timestamp or a "seconds.usec ago") so the target can avoid doing
2824          * extra calls into the filesystem if that isn't necessary (e.g.
2825          * during mount that would help a bit).  Having relative timestamps
2826          * is not so great if request processing is slow, while absolute
2827          * timestamps are not ideal because they need time synchronization. */
2828         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2829
2830         class_import_put(imp);
2831
2832         if (req == NULL)
2833                 RETURN(-ENOMEM);
2834
2835         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2836         if (rc) {
2837                 ptlrpc_request_free(req);
2838                 RETURN(rc);
2839         }
2840         ptlrpc_request_set_replen(req);
2841         req->rq_request_portal = OST_CREATE_PORTAL;
2842         ptlrpc_at_set_req_timeout(req);
2843
2844         if (flags & OBD_STATFS_NODELAY) {
2845                 /* procfs requests not want stat in wait for avoid deadlock */
2846                 req->rq_no_resend = 1;
2847                 req->rq_no_delay = 1;
2848         }
2849
2850         rc = ptlrpc_queue_wait(req);
2851         if (rc)
2852                 GOTO(out, rc);
2853
2854         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2855         if (msfs == NULL) {
2856                 GOTO(out, rc = -EPROTO);
2857         }
2858
2859         *osfs = *msfs;
2860
2861         EXIT;
2862  out:
2863         ptlrpc_req_finished(req);
2864         return rc;
2865 }
2866
2867 /* Retrieve object striping information.
2868  *
2869  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2870  * the maximum number of OST indices which will fit in the user buffer.
2871  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2872  */
2873 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2874 {
2875         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2876         struct lov_user_md_v3 lum, *lumk;
2877         struct lov_user_ost_data_v1 *lmm_objects;
2878         int rc = 0, lum_size;
2879         ENTRY;
2880
2881         if (!lsm)
2882                 RETURN(-ENODATA);
2883
2884         /* we only need the header part from user space to get lmm_magic and
2885          * lmm_stripe_count, (the header part is common to v1 and v3) */
2886         lum_size = sizeof(struct lov_user_md_v1);
2887         if (copy_from_user(&lum, lump, lum_size))
2888                 RETURN(-EFAULT);
2889
2890         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2891             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2892                 RETURN(-EINVAL);
2893
2894         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2895         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2896         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2897         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2898
2899         /* we can use lov_mds_md_size() to compute lum_size
2900          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2901         if (lum.lmm_stripe_count > 0) {
2902                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2903                 OBD_ALLOC(lumk, lum_size);
2904                 if (!lumk)
2905                         RETURN(-ENOMEM);
2906
2907                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2908                         lmm_objects =
2909                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2910                 else
2911                         lmm_objects = &(lumk->lmm_objects[0]);
2912                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2913         } else {
2914                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2915                 lumk = &lum;
2916         }
2917
2918         lumk->lmm_oi = lsm->lsm_oi;
2919         lumk->lmm_stripe_count = 1;
2920
2921         if (copy_to_user(lump, lumk, lum_size))
2922                 rc = -EFAULT;
2923
2924         if (lumk != &lum)
2925                 OBD_FREE(lumk, lum_size);
2926
2927         RETURN(rc);
2928 }
2929
2930
2931 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2932                          void *karg, void *uarg)
2933 {
2934         struct obd_device *obd = exp->exp_obd;
2935         struct obd_ioctl_data *data = karg;
2936         int err = 0;
2937         ENTRY;
2938
2939         if (!try_module_get(THIS_MODULE)) {
2940                 CERROR("Can't get module. Is it alive?");
2941                 return -EINVAL;
2942         }
2943         switch (cmd) {
2944         case OBD_IOC_LOV_GET_CONFIG: {
2945                 char *buf;
2946                 struct lov_desc *desc;
2947                 struct obd_uuid uuid;
2948
2949                 buf = NULL;
2950                 len = 0;
2951                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2952                         GOTO(out, err = -EINVAL);
2953
2954                 data = (struct obd_ioctl_data *)buf;
2955
2956                 if (sizeof(*desc) > data->ioc_inllen1) {
2957                         obd_ioctl_freedata(buf, len);
2958                         GOTO(out, err = -EINVAL);
2959                 }
2960
2961                 if (data->ioc_inllen2 < sizeof(uuid)) {
2962                         obd_ioctl_freedata(buf, len);
2963                         GOTO(out, err = -EINVAL);
2964                 }
2965
2966                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2967                 desc->ld_tgt_count = 1;
2968                 desc->ld_active_tgt_count = 1;
2969                 desc->ld_default_stripe_count = 1;
2970                 desc->ld_default_stripe_size = 0;
2971                 desc->ld_default_stripe_offset = 0;
2972                 desc->ld_pattern = 0;
2973                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2974
2975                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2976
2977                 err = copy_to_user((void *)uarg, buf, len);
2978                 if (err)
2979                         err = -EFAULT;
2980                 obd_ioctl_freedata(buf, len);
2981                 GOTO(out, err);
2982         }
2983         case LL_IOC_LOV_SETSTRIPE:
2984                 err = obd_alloc_memmd(exp, karg);
2985                 if (err > 0)
2986                         err = 0;
2987                 GOTO(out, err);
2988         case LL_IOC_LOV_GETSTRIPE:
2989                 err = osc_getstripe(karg, uarg);
2990                 GOTO(out, err);
2991         case OBD_IOC_CLIENT_RECOVER:
2992                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2993                                             data->ioc_inlbuf1, 0);
2994                 if (err > 0)
2995                         err = 0;
2996                 GOTO(out, err);
2997         case IOC_OSC_SET_ACTIVE:
2998                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2999                                                data->ioc_offset);
3000                 GOTO(out, err);
3001         case OBD_IOC_POLL_QUOTACHECK:
3002                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3003                 GOTO(out, err);
3004         case OBD_IOC_PING_TARGET:
3005                 err = ptlrpc_obd_ping(obd);
3006                 GOTO(out, err);
3007         default:
3008                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3009                        cmd, current_comm());
3010                 GOTO(out, err = -ENOTTY);
3011         }
3012 out:
3013         module_put(THIS_MODULE);
3014         return err;
3015 }
3016
3017 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3018                         obd_count keylen, void *key, __u32 *vallen, void *val,
3019                         struct lov_stripe_md *lsm)
3020 {
3021         ENTRY;
3022         if (!vallen || !val)
3023                 RETURN(-EFAULT);
3024
3025         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3026                 __u32 *stripe = val;
3027                 *vallen = sizeof(*stripe);
3028                 *stripe = 0;
3029                 RETURN(0);
3030         } else if (KEY_IS(KEY_LAST_ID)) {
3031                 struct ptlrpc_request *req;
3032                 obd_id          *reply;
3033                 char              *tmp;
3034                 int                 rc;
3035
3036                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3037                                            &RQF_OST_GET_INFO_LAST_ID);
3038                 if (req == NULL)
3039                         RETURN(-ENOMEM);
3040
3041                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3042                                      RCL_CLIENT, keylen);
3043                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3044                 if (rc) {
3045                         ptlrpc_request_free(req);
3046                         RETURN(rc);
3047                 }
3048
3049                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3050                 memcpy(tmp, key, keylen);
3051
3052                 req->rq_no_delay = req->rq_no_resend = 1;
3053                 ptlrpc_request_set_replen(req);
3054                 rc = ptlrpc_queue_wait(req);
3055                 if (rc)
3056                         GOTO(out, rc);
3057
3058                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3059                 if (reply == NULL)
3060                         GOTO(out, rc = -EPROTO);
3061
3062                 *((obd_id *)val) = *reply;
3063         out:
3064                 ptlrpc_req_finished(req);
3065                 RETURN(rc);
3066         } else if (KEY_IS(KEY_FIEMAP)) {
3067                 struct ll_fiemap_info_key *fm_key =
3068                                 (struct ll_fiemap_info_key *)key;
3069                 struct ldlm_res_id       res_id;
3070                 ldlm_policy_data_t       policy;
3071                 struct lustre_handle     lockh;
3072                 ldlm_mode_t              mode = 0;
3073                 struct ptlrpc_request   *req;
3074                 struct ll_user_fiemap   *reply;
3075                 char                    *tmp;
3076                 int                      rc;
3077
3078                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3079                         goto skip_locking;
3080
3081                 policy.l_extent.start = fm_key->fiemap.fm_start &
3082                                                 CFS_PAGE_MASK;
3083
3084                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3085                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3086                         policy.l_extent.end = OBD_OBJECT_EOF;
3087                 else
3088                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3089                                 fm_key->fiemap.fm_length +
3090                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3091
3092                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3093                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3094                                        LDLM_FL_BLOCK_GRANTED |
3095                                        LDLM_FL_LVB_READY,
3096                                        &res_id, LDLM_EXTENT, &policy,
3097                                        LCK_PR | LCK_PW, &lockh, 0);
3098                 if (mode) { /* lock is cached on client */
3099                         if (mode != LCK_PR) {
3100                                 ldlm_lock_addref(&lockh, LCK_PR);
3101                                 ldlm_lock_decref(&lockh, LCK_PW);
3102                         }
3103                 } else { /* no cached lock, needs acquire lock on server side */
3104                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3105                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3106                 }
3107
3108 skip_locking:
3109                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3110                                            &RQF_OST_GET_INFO_FIEMAP);
3111                 if (req == NULL)
3112                         GOTO(drop_lock, rc = -ENOMEM);
3113
3114                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3115                                      RCL_CLIENT, keylen);
3116                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3117                                      RCL_CLIENT, *vallen);
3118                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3119                                      RCL_SERVER, *vallen);
3120
3121                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3122                 if (rc) {
3123                         ptlrpc_request_free(req);
3124                         GOTO(drop_lock, rc);
3125                 }
3126
3127                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3128                 memcpy(tmp, key, keylen);
3129                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3130                 memcpy(tmp, val, *vallen);
3131
3132                 ptlrpc_request_set_replen(req);
3133                 rc = ptlrpc_queue_wait(req);
3134                 if (rc)
3135                         GOTO(fini_req, rc);
3136
3137                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3138                 if (reply == NULL)
3139                         GOTO(fini_req, rc = -EPROTO);
3140
3141                 memcpy(val, reply, *vallen);
3142 fini_req:
3143                 ptlrpc_req_finished(req);
3144 drop_lock:
3145                 if (mode)
3146                         ldlm_lock_decref(&lockh, LCK_PR);
3147                 RETURN(rc);
3148         }
3149
3150         RETURN(-EINVAL);
3151 }
3152
3153 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3154                               obd_count keylen, void *key, obd_count vallen,
3155                               void *val, struct ptlrpc_request_set *set)
3156 {
3157         struct ptlrpc_request *req;
3158         struct obd_device     *obd = exp->exp_obd;
3159         struct obd_import     *imp = class_exp2cliimp(exp);
3160         char              *tmp;
3161         int                 rc;
3162         ENTRY;
3163
3164         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3165
3166         if (KEY_IS(KEY_CHECKSUM)) {
3167                 if (vallen != sizeof(int))
3168                         RETURN(-EINVAL);
3169                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3170                 RETURN(0);
3171         }
3172
3173         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3174                 sptlrpc_conf_client_adapt(obd);
3175                 RETURN(0);
3176         }
3177
3178         if (KEY_IS(KEY_FLUSH_CTX)) {
3179                 sptlrpc_import_flush_my_ctx(imp);
3180                 RETURN(0);
3181         }
3182
3183         if (KEY_IS(KEY_CACHE_SET)) {
3184                 struct client_obd *cli = &obd->u.cli;
3185
3186                 LASSERT(cli->cl_cache == NULL); /* only once */
3187                 cli->cl_cache = (struct cl_client_cache *)val;
3188                 atomic_inc(&cli->cl_cache->ccc_users);
3189                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3190
3191                 /* add this osc into entity list */
3192                 LASSERT(list_empty(&cli->cl_lru_osc));
3193                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3194                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3195                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3196
3197                 RETURN(0);
3198         }
3199
3200         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3201                 struct client_obd *cli = &obd->u.cli;
3202                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3203                 int target = *(int *)val;
3204
3205                 nr = osc_lru_shrink(cli, min(nr, target));
3206                 *(int *)val -= nr;
3207                 RETURN(0);
3208         }
3209
3210         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3211                 RETURN(-EINVAL);
3212
3213         /* We pass all other commands directly to OST. Since nobody calls osc
3214            methods directly and everybody is supposed to go through LOV, we
3215            assume lov checked invalid values for us.
3216            The only recognised values so far are evict_by_nid and mds_conn.
3217            Even if something bad goes through, we'd get a -EINVAL from OST
3218            anyway. */
3219
3220         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3221                                                 &RQF_OST_SET_GRANT_INFO :
3222                                                 &RQF_OBD_SET_INFO);
3223         if (req == NULL)
3224                 RETURN(-ENOMEM);
3225
3226         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3227                              RCL_CLIENT, keylen);
3228         if (!KEY_IS(KEY_GRANT_SHRINK))
3229                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3230                                      RCL_CLIENT, vallen);
3231         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3232         if (rc) {
3233                 ptlrpc_request_free(req);
3234                 RETURN(rc);
3235         }
3236
3237         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3238         memcpy(tmp, key, keylen);
3239         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3240                                                         &RMF_OST_BODY :
3241                                                         &RMF_SETINFO_VAL);
3242         memcpy(tmp, val, vallen);
3243
3244         if (KEY_IS(KEY_GRANT_SHRINK)) {
3245                 struct osc_grant_args *aa;
3246                 struct obdo *oa;
3247
3248                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3249                 aa = ptlrpc_req_async_args(req);
3250                 OBDO_ALLOC(oa);
3251                 if (!oa) {
3252                         ptlrpc_req_finished(req);
3253                         RETURN(-ENOMEM);
3254                 }
3255                 *oa = ((struct ost_body *)val)->oa;
3256                 aa->aa_oa = oa;
3257                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3258         }
3259
3260         ptlrpc_request_set_replen(req);
3261         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3262                 LASSERT(set != NULL);
3263                 ptlrpc_set_add_req(set, req);
3264                 ptlrpc_check_set(NULL, set);
3265         } else
3266                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3267
3268         RETURN(0);
3269 }
3270
3271
3272 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3273                          struct obd_device *disk_obd, int *index)
3274 {
3275         /* this code is not supposed to be used with LOD/OSP
3276          * to be removed soon */
3277         LBUG();
3278         return 0;
3279 }
3280
3281 static int osc_llog_finish(struct obd_device *obd, int count)
3282 {
3283         struct llog_ctxt *ctxt;
3284
3285         ENTRY;
3286
3287         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3288         if (ctxt) {
3289                 llog_cat_close(NULL, ctxt->loc_handle);
3290                 llog_cleanup(NULL, ctxt);
3291         }
3292
3293         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3294         if (ctxt)
3295                 llog_cleanup(NULL, ctxt);
3296         RETURN(0);
3297 }
3298
3299 static int osc_reconnect(const struct lu_env *env,
3300                          struct obd_export *exp, struct obd_device *obd,
3301                          struct obd_uuid *cluuid,
3302                          struct obd_connect_data *data,
3303                          void *localdata)
3304 {
3305         struct client_obd *cli = &obd->u.cli;
3306
3307         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3308                 long lost_grant;
3309
3310                 client_obd_list_lock(&cli->cl_loi_list_lock);
3311                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3312                                 2 * cli_brw_size(obd);
3313                 lost_grant = cli->cl_lost_grant;
3314                 cli->cl_lost_grant = 0;
3315                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3316
3317                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3318                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3319                        data->ocd_version, data->ocd_grant, lost_grant);
3320         }
3321
3322         RETURN(0);
3323 }
3324
3325 static int osc_disconnect(struct obd_export *exp)
3326 {
3327         struct obd_device *obd = class_exp2obd(exp);
3328         struct llog_ctxt  *ctxt;
3329         int rc;
3330
3331         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3332         if (ctxt) {
3333                 if (obd->u.cli.cl_conn_count == 1) {
3334                         /* Flush any remaining cancel messages out to the
3335                          * target */
3336                         llog_sync(ctxt, exp, 0);
3337                 }
3338                 llog_ctxt_put(ctxt);
3339         } else {
3340                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3341                        obd);
3342         }
3343
3344         rc = client_disconnect_export(exp);
3345         /**
3346          * Initially we put del_shrink_grant before disconnect_export, but it
3347          * causes the following problem if setup (connect) and cleanup
3348          * (disconnect) are tangled together.
3349          *      connect p1                   disconnect p2
3350          *   ptlrpc_connect_import
3351          *     ...............         class_manual_cleanup
3352          *                                   osc_disconnect
3353          *                                   del_shrink_grant
3354          *   ptlrpc_connect_interrupt
3355          *     init_grant_shrink
3356          *   add this client to shrink list
3357          *                                    cleanup_osc
3358          * Bang! pinger trigger the shrink.
3359          * So the osc should be disconnected from the shrink list, after we
3360          * are sure the import has been destroyed. BUG18662
3361          */
3362         if (obd->u.cli.cl_import == NULL)
3363                 osc_del_shrink_grant(&obd->u.cli);
3364         return rc;
3365 }
3366
3367 static int osc_import_event(struct obd_device *obd,
3368                             struct obd_import *imp,
3369                             enum obd_import_event event)
3370 {
3371         struct client_obd *cli;
3372         int rc = 0;
3373
3374         ENTRY;
3375         LASSERT(imp->imp_obd == obd);
3376
3377         switch (event) {
3378         case IMP_EVENT_DISCON: {
3379                 cli = &obd->u.cli;
3380                 client_obd_list_lock(&cli->cl_loi_list_lock);
3381                 cli->cl_avail_grant = 0;
3382                 cli->cl_lost_grant = 0;
3383                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3384                 break;
3385         }
3386         case IMP_EVENT_INACTIVE: {
3387                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3388                 break;
3389         }
3390         case IMP_EVENT_INVALIDATE: {
3391                 struct ldlm_namespace *ns = obd->obd_namespace;
3392                 struct lu_env    *env;
3393                 int                 refcheck;
3394
3395                 env = cl_env_get(&refcheck);
3396                 if (!IS_ERR(env)) {
3397                         /* Reset grants */
3398                         cli = &obd->u.cli;
3399                         /* all pages go to failing rpcs due to the invalid
3400                          * import */
3401                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3402
3403                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3404                         cl_env_put(env, &refcheck);
3405                 } else
3406                         rc = PTR_ERR(env);
3407                 break;
3408         }
3409         case IMP_EVENT_ACTIVE: {
3410                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3411                 break;
3412         }
3413         case IMP_EVENT_OCD: {
3414                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3415
3416                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3417                         osc_init_grant(&obd->u.cli, ocd);
3418
3419                 /* See bug 7198 */
3420                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3421                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3422
3423                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3424                 break;
3425         }
3426         case IMP_EVENT_DEACTIVATE: {
3427                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3428                 break;
3429         }
3430         case IMP_EVENT_ACTIVATE: {
3431                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3432                 break;
3433         }
3434         default:
3435                 CERROR("Unknown import event %d\n", event);
3436                 LBUG();
3437         }
3438         RETURN(rc);
3439 }
3440
3441 /**
3442  * Determine whether the lock can be canceled before replaying the lock
3443  * during recovery, see bug16774 for detailed information.
3444  *
3445  * \retval zero the lock can't be canceled
3446  * \retval other ok to cancel
3447  */
3448 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3449 {
3450         check_res_locked(lock->l_resource);
3451
3452         /*
3453          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3454          *
3455          * XXX as a future improvement, we can also cancel unused write lock
3456          * if it doesn't have dirty data and active mmaps.
3457          */
3458         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3459             (lock->l_granted_mode == LCK_PR ||
3460              lock->l_granted_mode == LCK_CR) &&
3461             (osc_dlm_lock_pageref(lock) == 0))
3462                 RETURN(1);
3463
3464         RETURN(0);
3465 }
3466
3467 static int brw_queue_work(const struct lu_env *env, void *data)
3468 {
3469         struct client_obd *cli = data;
3470
3471         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3472
3473         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3474         RETURN(0);
3475 }
3476
3477 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3478 {
3479         struct lprocfs_static_vars lvars = { 0 };
3480         struct client_obd         *cli = &obd->u.cli;
3481         void                   *handler;
3482         int                     rc;
3483         ENTRY;
3484
3485         rc = ptlrpcd_addref();
3486         if (rc)
3487                 RETURN(rc);
3488
3489         rc = client_obd_setup(obd, lcfg);
3490         if (rc)
3491                 GOTO(out_ptlrpcd, rc);
3492
3493         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3494         if (IS_ERR(handler))
3495                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3496         cli->cl_writeback_work = handler;
3497
3498         rc = osc_quota_setup(obd);
3499         if (rc)
3500                 GOTO(out_ptlrpcd_work, rc);
3501
3502         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3503         lprocfs_osc_init_vars(&lvars);
3504         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3505                 lproc_osc_attach_seqstat(obd);
3506                 sptlrpc_lprocfs_cliobd_attach(obd);
3507                 ptlrpc_lprocfs_register_obd(obd);
3508         }
3509
3510         /* We need to allocate a few requests more, because
3511          * brw_interpret tries to create new requests before freeing
3512          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3513          * reserved, but I'm afraid that might be too much wasted RAM
3514          * in fact, so 2 is just my guess and still should work. */
3515         cli->cl_import->imp_rq_pool =
3516                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3517                                     OST_MAXREQSIZE,
3518                                     ptlrpc_add_rqs_to_pool);
3519
3520         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3521         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3522         RETURN(rc);
3523
3524 out_ptlrpcd_work:
3525         ptlrpcd_destroy_work(handler);
3526 out_client_setup:
3527         client_obd_cleanup(obd);
3528 out_ptlrpcd:
3529         ptlrpcd_decref();
3530         RETURN(rc);
3531 }
3532
3533 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3534 {
3535         int rc = 0;
3536         ENTRY;
3537
3538         switch (stage) {
3539         case OBD_CLEANUP_EARLY: {
3540                 struct obd_import *imp;
3541                 imp = obd->u.cli.cl_import;
3542                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3543                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3544                 ptlrpc_deactivate_import(imp);
3545                 spin_lock(&imp->imp_lock);
3546                 imp->imp_pingable = 0;
3547                 spin_unlock(&imp->imp_lock);
3548                 break;
3549         }
3550         case OBD_CLEANUP_EXPORTS: {
3551                 struct client_obd *cli = &obd->u.cli;
3552                 /* LU-464
3553                  * for echo client, export may be on zombie list, wait for
3554                  * zombie thread to cull it, because cli.cl_import will be
3555                  * cleared in client_disconnect_export():
3556                  *   class_export_destroy() -> obd_cleanup() ->
3557                  *   echo_device_free() -> echo_client_cleanup() ->
3558                  *   obd_disconnect() -> osc_disconnect() ->
3559                  *   client_disconnect_export()
3560                  */
3561                 obd_zombie_barrier();
3562                 if (cli->cl_writeback_work) {
3563                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3564                         cli->cl_writeback_work = NULL;
3565                 }
3566                 obd_cleanup_client_import(obd);
3567                 ptlrpc_lprocfs_unregister_obd(obd);
3568                 lprocfs_obd_cleanup(obd);
3569                 rc = obd_llog_finish(obd, 0);
3570                 if (rc != 0)
3571                         CERROR("failed to cleanup llogging subsystems\n");
3572                 break;
3573                 }
3574         }
3575         RETURN(rc);
3576 }
3577
3578 int osc_cleanup(struct obd_device *obd)
3579 {
3580         struct client_obd *cli = &obd->u.cli;
3581         int rc;
3582
3583         ENTRY;
3584
3585         /* lru cleanup */
3586         if (cli->cl_cache != NULL) {
3587                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3588                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3589                 list_del_init(&cli->cl_lru_osc);
3590                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3591                 cli->cl_lru_left = NULL;
3592                 atomic_dec(&cli->cl_cache->ccc_users);
3593                 cli->cl_cache = NULL;
3594         }
3595
3596         /* free memory of osc quota cache */
3597         osc_quota_cleanup(obd);
3598
3599         rc = client_obd_cleanup(obd);
3600
3601         ptlrpcd_decref();
3602         RETURN(rc);
3603 }
3604
3605 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3606 {
3607         struct lprocfs_static_vars lvars = { 0 };
3608         int rc = 0;
3609
3610         lprocfs_osc_init_vars(&lvars);
3611
3612         switch (lcfg->lcfg_command) {
3613         default:
3614                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3615                                               lcfg, obd);
3616                 if (rc > 0)
3617                         rc = 0;
3618                 break;
3619         }
3620
3621         return(rc);
3622 }
3623
3624 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3625 {
3626         return osc_process_config_base(obd, buf);
3627 }
3628
3629 struct obd_ops osc_obd_ops = {
3630         .o_owner                = THIS_MODULE,
3631         .o_setup                = osc_setup,
3632         .o_precleanup      = osc_precleanup,
3633         .o_cleanup            = osc_cleanup,
3634         .o_add_conn          = client_import_add_conn,
3635         .o_del_conn          = client_import_del_conn,
3636         .o_connect            = client_connect_import,
3637         .o_reconnect        = osc_reconnect,
3638         .o_disconnect      = osc_disconnect,
3639         .o_statfs              = osc_statfs,
3640         .o_statfs_async  = osc_statfs_async,
3641         .o_packmd              = osc_packmd,
3642         .o_unpackmd          = osc_unpackmd,
3643         .o_create              = osc_create,
3644         .o_destroy            = osc_destroy,
3645         .o_getattr            = osc_getattr,
3646         .o_getattr_async        = osc_getattr_async,
3647         .o_setattr            = osc_setattr,
3648         .o_setattr_async        = osc_setattr_async,
3649         .o_brw            = osc_brw,
3650         .o_punch                = osc_punch,
3651         .o_sync          = osc_sync,
3652         .o_enqueue            = osc_enqueue,
3653         .o_change_cbdata        = osc_change_cbdata,
3654         .o_find_cbdata    = osc_find_cbdata,
3655         .o_cancel              = osc_cancel,
3656         .o_cancel_unused        = osc_cancel_unused,
3657         .o_iocontrol        = osc_iocontrol,
3658         .o_get_info          = osc_get_info,
3659         .o_set_info_async       = osc_set_info_async,
3660         .o_import_event  = osc_import_event,
3661         .o_llog_init        = osc_llog_init,
3662         .o_llog_finish    = osc_llog_finish,
3663         .o_process_config       = osc_process_config,
3664         .o_quotactl          = osc_quotactl,
3665         .o_quotacheck      = osc_quotacheck,
3666 };
3667
3668 extern struct lu_kmem_descr osc_caches[];
3669 extern spinlock_t osc_ast_guard;
3670 extern struct lock_class_key osc_ast_guard_class;
3671
3672 int __init osc_init(void)
3673 {
3674         struct lprocfs_static_vars lvars = { 0 };
3675         int rc;
3676         ENTRY;
3677
3678         /* print an address of _any_ initialized kernel symbol from this
3679          * module, to allow debugging with gdb that doesn't support data
3680          * symbols from modules.*/
3681         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3682
3683         rc = lu_kmem_init(osc_caches);
3684
3685         lprocfs_osc_init_vars(&lvars);
3686
3687         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3688                                  LUSTRE_OSC_NAME, &osc_device_type);
3689         if (rc) {
3690                 lu_kmem_fini(osc_caches);
3691                 RETURN(rc);
3692         }
3693
3694         spin_lock_init(&osc_ast_guard);
3695         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3696
3697         RETURN(rc);
3698 }
3699
3700 static void /*__exit*/ osc_exit(void)
3701 {
3702         class_unregister_type(LUSTRE_OSC_NAME);
3703         lu_kmem_fini(osc_caches);
3704 }
3705
3706 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3707 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3708 MODULE_LICENSE("GPL");
3709
3710 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);