]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/mdc/mdc_locks.c
staging/lustre: quiet console permission error messages
[karo-tx-linux.git] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 # include <linux/module.h>
40
41 #include <linux/lustre_intent.h>
42 #include <obd.h>
43 #include <obd_class.h>
44 #include <lustre_dlm.h>
45 #include <lustre_fid.h> /* fid_res_name_eq() */
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include "mdc_internal.h"
50
51 struct mdc_getattr_args {
52         struct obd_export          *ga_exp;
53         struct md_enqueue_info      *ga_minfo;
54         struct ldlm_enqueue_info    *ga_einfo;
55 };
56
57 int it_disposition(struct lookup_intent *it, int flag)
58 {
59         return it->d.lustre.it_disposition & flag;
60 }
61 EXPORT_SYMBOL(it_disposition);
62
63 void it_set_disposition(struct lookup_intent *it, int flag)
64 {
65         it->d.lustre.it_disposition |= flag;
66 }
67 EXPORT_SYMBOL(it_set_disposition);
68
69 void it_clear_disposition(struct lookup_intent *it, int flag)
70 {
71         it->d.lustre.it_disposition &= ~flag;
72 }
73 EXPORT_SYMBOL(it_clear_disposition);
74
75 int it_open_error(int phase, struct lookup_intent *it)
76 {
77         if (it_disposition(it, DISP_OPEN_LEASE)) {
78                 if (phase >= DISP_OPEN_LEASE)
79                         return it->d.lustre.it_status;
80                 else
81                         return 0;
82         }
83         if (it_disposition(it, DISP_OPEN_OPEN)) {
84                 if (phase >= DISP_OPEN_OPEN)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_OPEN_CREATE)) {
91                 if (phase >= DISP_OPEN_CREATE)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96
97         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98                 if (phase >= DISP_LOOKUP_EXECD)
99                         return it->d.lustre.it_status;
100                 else
101                         return 0;
102         }
103
104         if (it_disposition(it, DISP_IT_EXECD)) {
105                 if (phase >= DISP_IT_EXECD)
106                         return it->d.lustre.it_status;
107                 else
108                         return 0;
109         }
110         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111                it->d.lustre.it_status);
112         LBUG();
113         return 0;
114 }
115 EXPORT_SYMBOL(it_open_error);
116
117 /* this must be called on a lockh that is known to have a referenced lock */
118 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
119                       __u64 *bits)
120 {
121         struct ldlm_lock *lock;
122         struct inode *new_inode = data;
123
124         if (bits)
125                 *bits = 0;
126
127         if (!*lockh)
128                 return 0;
129
130         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132         LASSERT(lock != NULL);
133         lock_res_and_lock(lock);
134         if (lock->l_resource->lr_lvb_inode &&
135             lock->l_resource->lr_lvb_inode != data) {
136                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137                 LASSERTF(old_inode->i_state & I_FREEING,
138                          "Found existing inode %p/%lu/%u state %lu in lock: "
139                          "setting data to %p/%lu/%u\n", old_inode,
140                          old_inode->i_ino, old_inode->i_generation,
141                          old_inode->i_state,
142                          new_inode, new_inode->i_ino, new_inode->i_generation);
143         }
144         lock->l_resource->lr_lvb_inode = new_inode;
145         if (bits)
146                 *bits = lock->l_policy_data.l_inodebits.bits;
147
148         unlock_res_and_lock(lock);
149         LDLM_LOCK_PUT(lock);
150
151         return 0;
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155                            const struct lu_fid *fid, ldlm_type_t type,
156                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
157                            struct lustre_handle *lockh)
158 {
159         struct ldlm_res_id res_id;
160         ldlm_mode_t rc;
161
162         fid_build_reg_res_name(fid, &res_id);
163         /* LU-4405: Clear bits not supported by server */
164         policy->l_inodebits.bits &= exp_connect_ibits(exp);
165         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166                              &res_id, type, policy, mode, lockh, 0);
167         return rc;
168 }
169
170 int mdc_cancel_unused(struct obd_export *exp,
171                       const struct lu_fid *fid,
172                       ldlm_policy_data_t *policy,
173                       ldlm_mode_t mode,
174                       ldlm_cancel_flags_t flags,
175                       void *opaque)
176 {
177         struct ldlm_res_id res_id;
178         struct obd_device *obd = class_exp2obd(exp);
179         int rc;
180
181         fid_build_reg_res_name(fid, &res_id);
182         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183                                              policy, mode, flags, opaque);
184         return rc;
185 }
186
187 int mdc_null_inode(struct obd_export *exp,
188                    const struct lu_fid *fid)
189 {
190         struct ldlm_res_id res_id;
191         struct ldlm_resource *res;
192         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
193
194         LASSERTF(ns != NULL, "no namespace passed\n");
195
196         fid_build_reg_res_name(fid, &res_id);
197
198         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
199         if (res == NULL)
200                 return 0;
201
202         lock_res(res);
203         res->lr_lvb_inode = NULL;
204         unlock_res(res);
205
206         ldlm_resource_putref(res);
207         return 0;
208 }
209
210 /* find any ldlm lock of the inode in mdc
211  * return 0    not find
212  *      1    find one
213  *      < 0    error */
214 int mdc_find_cbdata(struct obd_export *exp,
215                     const struct lu_fid *fid,
216                     ldlm_iterator_t it, void *data)
217 {
218         struct ldlm_res_id res_id;
219         int rc = 0;
220
221         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
222         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223                                    it, data);
224         if (rc == LDLM_ITER_STOP)
225                 return 1;
226         else if (rc == LDLM_ITER_CONTINUE)
227                 return 0;
228         return rc;
229 }
230
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232 {
233         /* Don't hold error requests for replay. */
234         if (req->rq_replay) {
235                 spin_lock(&req->rq_lock);
236                 req->rq_replay = 0;
237                 spin_unlock(&req->rq_lock);
238         }
239         if (rc && req->rq_transno != 0) {
240                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
241                 LBUG();
242         }
243 }
244
245 /* Save a large LOV EA into the request buffer so that it is available
246  * for replay.  We don't do this in the initial request because the
247  * original request doesn't need this buffer (at most it sends just the
248  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249  * buffer and may also be difficult to allocate and save a very large
250  * request buffer for each open. (bug 5707)
251  *
252  * OOM here may cause recovery failure if lmm is needed (only for the
253  * original open if the MDS crashed just when this client also OOM'd)
254  * but this is incredibly unlikely, and questionable whether the client
255  * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257                                 struct mdt_body *body)
258 {
259         int     rc;
260
261         /* FIXME: remove this explicit offset. */
262         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263                                         body->eadatasize);
264         if (rc) {
265                 CERROR("Can't enlarge segment %d size to %d\n",
266                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
267                 body->valid &= ~OBD_MD_FLEASIZE;
268                 body->eadatasize = 0;
269         }
270 }
271
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273                                                    struct lookup_intent *it,
274                                                    struct md_op_data *op_data,
275                                                    void *lmm, int lmmsize,
276                                                    void *cb_data)
277 {
278         struct ptlrpc_request *req;
279         struct obd_device     *obddev = class_exp2obd(exp);
280         struct ldlm_intent    *lit;
281         LIST_HEAD(cancels);
282         int                 count = 0;
283         int                 mode;
284         int                 rc;
285
286         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287
288         /* XXX: openlock is not cancelled for cross-refs. */
289         /* If inode is known, cancel conflicting OPEN locks. */
290         if (fid_is_sane(&op_data->op_fid2)) {
291                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292                         if (it->it_flags & FMODE_WRITE)
293                                 mode = LCK_EX;
294                         else
295                                 mode = LCK_PR;
296                 } else {
297                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298                                 mode = LCK_CW;
299 #ifdef FMODE_EXEC
300                         else if (it->it_flags & FMODE_EXEC)
301                                 mode = LCK_PR;
302 #endif
303                         else
304                                 mode = LCK_CR;
305                 }
306                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
307                                                 &cancels, mode,
308                                                 MDS_INODELOCK_OPEN);
309         }
310
311         /* If CREATE, cancel parent's UPDATE lock. */
312         if (it->it_op & IT_CREAT)
313                 mode = LCK_EX;
314         else
315                 mode = LCK_CR;
316         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
317                                          &cancels, mode,
318                                          MDS_INODELOCK_UPDATE);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
321                                    &RQF_LDLM_INTENT_OPEN);
322         if (req == NULL) {
323                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
324                 return ERR_PTR(-ENOMEM);
325         }
326
327         /* parent capability */
328         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
329         /* child capability, reserve the size according to parent capa, it will
330          * be filled after we get the reply */
331         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
332
333         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
334                              op_data->op_namelen + 1);
335         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
336                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
337
338         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
339         if (rc < 0) {
340                 ptlrpc_request_free(req);
341                 return ERR_PTR(rc);
342         }
343
344         spin_lock(&req->rq_lock);
345         req->rq_replay = req->rq_import->imp_replayable;
346         spin_unlock(&req->rq_lock);
347
348         /* pack the intent */
349         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
350         lit->opc = (__u64)it->it_op;
351
352         /* pack the intended request */
353         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
354                       lmmsize);
355
356         /* for remote client, fetch remote perm for current user */
357         if (client_is_remote(exp))
358                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
359                                      sizeof(struct mdt_remote_perm));
360         ptlrpc_request_set_replen(req);
361         return req;
362 }
363
364 static struct ptlrpc_request *
365 mdc_intent_getxattr_pack(struct obd_export *exp,
366                          struct lookup_intent *it,
367                          struct md_op_data *op_data)
368 {
369         struct ptlrpc_request   *req;
370         struct ldlm_intent      *lit;
371         int                     rc, count = 0, maxdata;
372         LIST_HEAD(cancels);
373
374
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
377                                         &RQF_LDLM_INTENT_GETXATTR);
378         if (req == NULL)
379                 return ERR_PTR(-ENOMEM);
380
381         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
382
383         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 return ERR_PTR(rc);
387         }
388
389         /* pack the intent */
390         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
391         lit->opc = IT_GETXATTR;
392
393         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
394
395         /* pack the intended request */
396         mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
397                         op_data->op_valid, maxdata, -1, 0);
398
399         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
400                                 RCL_SERVER, maxdata);
401
402         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
403                                 RCL_SERVER, maxdata);
404
405         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
406                                 RCL_SERVER, maxdata);
407
408         ptlrpc_request_set_replen(req);
409
410         return req;
411 }
412
413 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
414                                                      struct lookup_intent *it,
415                                                      struct md_op_data *op_data)
416 {
417         struct ptlrpc_request *req;
418         struct obd_device     *obddev = class_exp2obd(exp);
419         struct ldlm_intent    *lit;
420         int                 rc;
421
422         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
423                                    &RQF_LDLM_INTENT_UNLINK);
424         if (req == NULL)
425                 return ERR_PTR(-ENOMEM);
426
427         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
428         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
429                              op_data->op_namelen + 1);
430
431         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
432         if (rc) {
433                 ptlrpc_request_free(req);
434                 return ERR_PTR(rc);
435         }
436
437         /* pack the intent */
438         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
439         lit->opc = (__u64)it->it_op;
440
441         /* pack the intended request */
442         mdc_unlink_pack(req, op_data);
443
444         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
445                              obddev->u.cli.cl_max_mds_easize);
446         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
447                              obddev->u.cli.cl_max_mds_cookiesize);
448         ptlrpc_request_set_replen(req);
449         return req;
450 }
451
452 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
453                                                       struct lookup_intent *it,
454                                                       struct md_op_data *op_data)
455 {
456         struct ptlrpc_request *req;
457         struct obd_device     *obddev = class_exp2obd(exp);
458         obd_valid             valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
459                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
460                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
461                                        (client_is_remote(exp) ?
462                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
463         struct ldlm_intent    *lit;
464         int                 rc;
465
466         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
467                                    &RQF_LDLM_INTENT_GETATTR);
468         if (req == NULL)
469                 return ERR_PTR(-ENOMEM);
470
471         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
472         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
473                              op_data->op_namelen + 1);
474
475         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
476         if (rc) {
477                 ptlrpc_request_free(req);
478                 return ERR_PTR(rc);
479         }
480
481         /* pack the intent */
482         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
483         lit->opc = (__u64)it->it_op;
484
485         /* pack the intended request */
486         mdc_getattr_pack(req, valid, it->it_flags, op_data,
487                          obddev->u.cli.cl_max_mds_easize);
488
489         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
490                              obddev->u.cli.cl_max_mds_easize);
491         if (client_is_remote(exp))
492                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
493                                      sizeof(struct mdt_remote_perm));
494         ptlrpc_request_set_replen(req);
495         return req;
496 }
497
498 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
499                                                      struct lookup_intent *it,
500                                                      struct md_op_data *unused)
501 {
502         struct obd_device     *obd = class_exp2obd(exp);
503         struct ptlrpc_request *req;
504         struct ldlm_intent    *lit;
505         struct layout_intent  *layout;
506         int rc;
507
508         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
509                                 &RQF_LDLM_INTENT_LAYOUT);
510         if (req == NULL)
511                 return ERR_PTR(-ENOMEM);
512
513         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
514         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
515         if (rc) {
516                 ptlrpc_request_free(req);
517                 return ERR_PTR(rc);
518         }
519
520         /* pack the intent */
521         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
522         lit->opc = (__u64)it->it_op;
523
524         /* pack the layout intent request */
525         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
526         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
527          * set for replication */
528         layout->li_opc = LAYOUT_INTENT_ACCESS;
529
530         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
531                         obd->u.cli.cl_max_mds_easize);
532         ptlrpc_request_set_replen(req);
533         return req;
534 }
535
536 static struct ptlrpc_request *
537 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
538 {
539         struct ptlrpc_request *req;
540         int rc;
541
542         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
543         if (req == NULL)
544                 return ERR_PTR(-ENOMEM);
545
546         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
547         if (rc) {
548                 ptlrpc_request_free(req);
549                 return ERR_PTR(rc);
550         }
551
552         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
553         ptlrpc_request_set_replen(req);
554         return req;
555 }
556
557 static int mdc_finish_enqueue(struct obd_export *exp,
558                               struct ptlrpc_request *req,
559                               struct ldlm_enqueue_info *einfo,
560                               struct lookup_intent *it,
561                               struct lustre_handle *lockh,
562                               int rc)
563 {
564         struct req_capsule  *pill = &req->rq_pill;
565         struct ldlm_request *lockreq;
566         struct ldlm_reply   *lockrep;
567         struct lustre_intent_data *intent = &it->d.lustre;
568         struct ldlm_lock    *lock;
569         void            *lvb_data = NULL;
570         int               lvb_len = 0;
571
572         LASSERT(rc >= 0);
573         /* Similarly, if we're going to replay this request, we don't want to
574          * actually get a lock, just perform the intent. */
575         if (req->rq_transno || req->rq_replay) {
576                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
577                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
578         }
579
580         if (rc == ELDLM_LOCK_ABORTED) {
581                 einfo->ei_mode = 0;
582                 memset(lockh, 0, sizeof(*lockh));
583                 rc = 0;
584         } else { /* rc = 0 */
585                 lock = ldlm_handle2lock(lockh);
586                 LASSERT(lock != NULL);
587
588                 /* If the server gave us back a different lock mode, we should
589                  * fix up our variables. */
590                 if (lock->l_req_mode != einfo->ei_mode) {
591                         ldlm_lock_addref(lockh, lock->l_req_mode);
592                         ldlm_lock_decref(lockh, einfo->ei_mode);
593                         einfo->ei_mode = lock->l_req_mode;
594                 }
595                 LDLM_LOCK_PUT(lock);
596         }
597
598         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
599         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
600
601         intent->it_disposition = (int)lockrep->lock_policy_res1;
602         intent->it_status = (int)lockrep->lock_policy_res2;
603         intent->it_lock_mode = einfo->ei_mode;
604         intent->it_lock_handle = lockh->cookie;
605         intent->it_data = req;
606
607         /* Technically speaking rq_transno must already be zero if
608          * it_status is in error, so the check is a bit redundant */
609         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
610                 mdc_clear_replay_flag(req, intent->it_status);
611
612         /* If we're doing an IT_OPEN which did not result in an actual
613          * successful open, then we need to remove the bit which saves
614          * this request for unconditional replay.
615          *
616          * It's important that we do this first!  Otherwise we might exit the
617          * function without doing so, and try to replay a failed create
618          * (bug 3440) */
619         if (it->it_op & IT_OPEN && req->rq_replay &&
620             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
621                 mdc_clear_replay_flag(req, intent->it_status);
622
623         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
624                   it->it_op, intent->it_disposition, intent->it_status);
625
626         /* We know what to expect, so we do any byte flipping required here */
627         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
628                 struct mdt_body *body;
629
630                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
631                 if (body == NULL) {
632                         CERROR ("Can't swab mdt_body\n");
633                         return -EPROTO;
634                 }
635
636                 if (it_disposition(it, DISP_OPEN_OPEN) &&
637                     !it_open_error(DISP_OPEN_OPEN, it)) {
638                         /*
639                          * If this is a successful OPEN request, we need to set
640                          * replay handler and data early, so that if replay
641                          * happens immediately after swabbing below, new reply
642                          * is swabbed by that handler correctly.
643                          */
644                         mdc_set_open_replay_data(NULL, NULL, it);
645                 }
646
647                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
648                         void *eadata;
649
650                         mdc_update_max_ea_from_body(exp, body);
651
652                         /*
653                          * The eadata is opaque; just check that it is there.
654                          * Eventually, obd_unpackmd() will check the contents.
655                          */
656                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
657                                                               body->eadatasize);
658                         if (eadata == NULL)
659                                 return -EPROTO;
660
661                         /* save lvb data and length in case this is for layout
662                          * lock */
663                         lvb_data = eadata;
664                         lvb_len = body->eadatasize;
665
666                         /*
667                          * We save the reply LOV EA in case we have to replay a
668                          * create for recovery.  If we didn't allocate a large
669                          * enough request buffer above we need to reallocate it
670                          * here to hold the actual LOV EA.
671                          *
672                          * To not save LOV EA if request is not going to replay
673                          * (for example error one).
674                          */
675                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
676                                 void *lmm;
677                                 if (req_capsule_get_size(pill, &RMF_EADATA,
678                                                          RCL_CLIENT) <
679                                     body->eadatasize)
680                                         mdc_realloc_openmsg(req, body);
681                                 else
682                                         req_capsule_shrink(pill, &RMF_EADATA,
683                                                            body->eadatasize,
684                                                            RCL_CLIENT);
685
686                                 req_capsule_set_size(pill, &RMF_EADATA,
687                                                      RCL_CLIENT,
688                                                      body->eadatasize);
689
690                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
691                                 if (lmm)
692                                         memcpy(lmm, eadata, body->eadatasize);
693                         }
694                 }
695
696                 if (body->valid & OBD_MD_FLRMTPERM) {
697                         struct mdt_remote_perm *perm;
698
699                         LASSERT(client_is_remote(exp));
700                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
701                                                 lustre_swab_mdt_remote_perm);
702                         if (perm == NULL)
703                                 return -EPROTO;
704                 }
705                 if (body->valid & OBD_MD_FLMDSCAPA) {
706                         struct lustre_capa *capa, *p;
707
708                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
709                         if (capa == NULL)
710                                 return -EPROTO;
711
712                         if (it->it_op & IT_OPEN) {
713                                 /* client fid capa will be checked in replay */
714                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
715                                 LASSERT(p);
716                                 *p = *capa;
717                         }
718                 }
719                 if (body->valid & OBD_MD_FLOSSCAPA) {
720                         struct lustre_capa *capa;
721
722                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
723                         if (capa == NULL)
724                                 return -EPROTO;
725                 }
726         } else if (it->it_op & IT_LAYOUT) {
727                 /* maybe the lock was granted right away and layout
728                  * is packed into RMF_DLM_LVB of req */
729                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
730                 if (lvb_len > 0) {
731                         lvb_data = req_capsule_server_sized_get(pill,
732                                                         &RMF_DLM_LVB, lvb_len);
733                         if (lvb_data == NULL)
734                                 return -EPROTO;
735                 }
736         }
737
738         /* fill in stripe data for layout lock */
739         lock = ldlm_handle2lock(lockh);
740         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
741                 void *lmm;
742
743                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
744                         ldlm_it2str(it->it_op), lvb_len);
745
746                 OBD_ALLOC_LARGE(lmm, lvb_len);
747                 if (lmm == NULL) {
748                         LDLM_LOCK_PUT(lock);
749                         return -ENOMEM;
750                 }
751                 memcpy(lmm, lvb_data, lvb_len);
752
753                 /* install lvb_data */
754                 lock_res_and_lock(lock);
755                 if (lock->l_lvb_data == NULL) {
756                         lock->l_lvb_type = LVB_T_LAYOUT;
757                         lock->l_lvb_data = lmm;
758                         lock->l_lvb_len = lvb_len;
759                         lmm = NULL;
760                 }
761                 unlock_res_and_lock(lock);
762                 if (lmm != NULL)
763                         OBD_FREE_LARGE(lmm, lvb_len);
764         }
765         if (lock != NULL)
766                 LDLM_LOCK_PUT(lock);
767
768         return rc;
769 }
770
771 /* We always reserve enough space in the reply packet for a stripe MD, because
772  * we don't know in advance the file type. */
773 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
774                 struct lookup_intent *it, struct md_op_data *op_data,
775                 struct lustre_handle *lockh, void *lmm, int lmmsize,
776                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
777 {
778         struct obd_device     *obddev = class_exp2obd(exp);
779         struct ptlrpc_request *req = NULL;
780         __u64             flags, saved_flags = extra_lock_flags;
781         int                 rc;
782         struct ldlm_res_id res_id;
783         static const ldlm_policy_data_t lookup_policy =
784                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
785         static const ldlm_policy_data_t update_policy =
786                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
787         static const ldlm_policy_data_t layout_policy =
788                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
789         static const ldlm_policy_data_t getxattr_policy = {
790                               .l_inodebits = { MDS_INODELOCK_XATTR } };
791         ldlm_policy_data_t const *policy = &lookup_policy;
792         int                 generation, resends = 0;
793         struct ldlm_reply     *lockrep;
794         enum lvb_type          lvb_type = 0;
795
796         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
797                  einfo->ei_type);
798
799         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
800
801         if (it) {
802                 saved_flags |= LDLM_FL_HAS_INTENT;
803                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
804                         policy = &update_policy;
805                 else if (it->it_op & IT_LAYOUT)
806                         policy = &layout_policy;
807                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
808                         policy = &getxattr_policy;
809         }
810
811         LASSERT(reqp == NULL);
812
813         generation = obddev->u.cli.cl_import->imp_generation;
814 resend:
815         flags = saved_flags;
816         if (!it) {
817                 /* The only way right now is FLOCK, in this case we hide flock
818                    policy as lmm, but lmmsize is 0 */
819                 LASSERT(lmm && lmmsize == 0);
820                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
821                          einfo->ei_type);
822                 policy = (ldlm_policy_data_t *)lmm;
823                 res_id.name[3] = LDLM_FLOCK;
824         } else if (it->it_op & IT_OPEN) {
825                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
826                                            einfo->ei_cbdata);
827                 policy = &update_policy;
828                 einfo->ei_cbdata = NULL;
829                 lmm = NULL;
830         } else if (it->it_op & IT_UNLINK) {
831                 req = mdc_intent_unlink_pack(exp, it, op_data);
832         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
833                 req = mdc_intent_getattr_pack(exp, it, op_data);
834         } else if (it->it_op & IT_READDIR) {
835                 req = mdc_enqueue_pack(exp, 0);
836         } else if (it->it_op & IT_LAYOUT) {
837                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
838                         return -EOPNOTSUPP;
839                 req = mdc_intent_layout_pack(exp, it, op_data);
840                 lvb_type = LVB_T_LAYOUT;
841         } else if (it->it_op & IT_GETXATTR) {
842                 req = mdc_intent_getxattr_pack(exp, it, op_data);
843         } else {
844                 LBUG();
845                 return -EINVAL;
846         }
847
848         if (IS_ERR(req))
849                 return PTR_ERR(req);
850
851         if (req != NULL && it && it->it_op & IT_CREAT)
852                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
853                  * retry logic */
854                 req->rq_no_retry_einprogress = 1;
855
856         if (resends) {
857                 req->rq_generation_set = 1;
858                 req->rq_import_generation = generation;
859                 req->rq_sent = cfs_time_current_sec() + resends;
860         }
861
862         /* It is important to obtain rpc_lock first (if applicable), so that
863          * threads that are serialised with rpc_lock are not polluting our
864          * rpcs in flight counter. We do not do flock request limiting, though*/
865         if (it) {
866                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
867                 rc = mdc_enter_request(&obddev->u.cli);
868                 if (rc != 0) {
869                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
870                         mdc_clear_replay_flag(req, 0);
871                         ptlrpc_req_finished(req);
872                         return rc;
873                 }
874         }
875
876         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
877                               0, lvb_type, lockh, 0);
878         if (!it) {
879                 /* For flock requests we immediately return without further
880                    delay and let caller deal with the rest, since rest of
881                    this function metadata processing makes no sense for flock
882                    requests anyway. But in case of problem during comms with
883                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
884                    can not rely on caller and this mainly for F_UNLCKs
885                    (explicits or automatically generated by Kernel to clean
886                    current FLocks upon exit) that can't be trashed */
887                 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
888                         goto resend;
889                 return rc;
890         }
891
892         mdc_exit_request(&obddev->u.cli);
893         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
894
895         if (rc < 0) {
896                 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
897                              "%s: ldlm_cli_enqueue failed: rc = %d\n",
898                              obddev->obd_name, rc);
899
900                 mdc_clear_replay_flag(req, rc);
901                 ptlrpc_req_finished(req);
902                 return rc;
903         }
904
905         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
906         LASSERT(lockrep != NULL);
907
908         lockrep->lock_policy_res2 =
909                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
910
911         /* Retry the create infinitely when we get -EINPROGRESS from
912          * server. This is required by the new quota design. */
913         if (it && it->it_op & IT_CREAT &&
914             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
915                 mdc_clear_replay_flag(req, rc);
916                 ptlrpc_req_finished(req);
917                 resends++;
918
919                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
920                        obddev->obd_name, resends, it->it_op,
921                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
922
923                 if (generation == obddev->u.cli.cl_import->imp_generation) {
924                         goto resend;
925                 } else {
926                         CDEBUG(D_HA, "resend cross eviction\n");
927                         return -EIO;
928                 }
929         }
930
931         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
932         if (rc < 0) {
933                 if (lustre_handle_is_used(lockh)) {
934                         ldlm_lock_decref(lockh, einfo->ei_mode);
935                         memset(lockh, 0, sizeof(*lockh));
936                 }
937                 ptlrpc_req_finished(req);
938         }
939         return rc;
940 }
941
942 static int mdc_finish_intent_lock(struct obd_export *exp,
943                                   struct ptlrpc_request *request,
944                                   struct md_op_data *op_data,
945                                   struct lookup_intent *it,
946                                   struct lustre_handle *lockh)
947 {
948         struct lustre_handle old_lock;
949         struct mdt_body *mdt_body;
950         struct ldlm_lock *lock;
951         int rc;
952
953         LASSERT(request != NULL);
954         LASSERT(request != LP_POISON);
955         LASSERT(request->rq_repmsg != LP_POISON);
956
957         if (!it_disposition(it, DISP_IT_EXECD)) {
958                 /* The server failed before it even started executing the
959                  * intent, i.e. because it couldn't unpack the request. */
960                 LASSERT(it->d.lustre.it_status != 0);
961                 return it->d.lustre.it_status;
962         }
963         rc = it_open_error(DISP_IT_EXECD, it);
964         if (rc)
965                 return rc;
966
967         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
968         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
969
970         /* If we were revalidating a fid/name pair, mark the intent in
971          * case we fail and get called again from lookup */
972         if (fid_is_sane(&op_data->op_fid2) &&
973             it->it_create_mode & M_CHECK_STALE &&
974             it->it_op != IT_GETATTR) {
975
976                 /* Also: did we find the same inode? */
977                 /* sever can return one of two fids:
978                  * op_fid2 - new allocated fid - if file is created.
979                  * op_fid3 - existent fid - if file only open.
980                  * op_fid3 is saved in lmv_intent_open */
981                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
982                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
983                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
984                                "\n", PFID(&op_data->op_fid2),
985                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
986                         return -ESTALE;
987                 }
988         }
989
990         rc = it_open_error(DISP_LOOKUP_EXECD, it);
991         if (rc)
992                 return rc;
993
994         /* keep requests around for the multiple phases of the call
995          * this shows the DISP_XX must guarantee we make it into the call
996          */
997         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
998             it_disposition(it, DISP_OPEN_CREATE) &&
999             !it_open_error(DISP_OPEN_CREATE, it)) {
1000                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1001                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1002         }
1003         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1004             it_disposition(it, DISP_OPEN_OPEN) &&
1005             !it_open_error(DISP_OPEN_OPEN, it)) {
1006                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1007                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1008                 /* BUG 11546 - eviction in the middle of open rpc processing */
1009                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1010         }
1011
1012         if (it->it_op & IT_CREAT) {
1013                 /* XXX this belongs in ll_create_it */
1014         } else if (it->it_op == IT_OPEN) {
1015                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1016         } else {
1017                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1018         }
1019
1020         /* If we already have a matching lock, then cancel the new
1021          * one.  We have to set the data here instead of in
1022          * mdc_enqueue, because we need to use the child's inode as
1023          * the l_ast_data to match, and that's not available until
1024          * intent_finish has performed the iget().) */
1025         lock = ldlm_handle2lock(lockh);
1026         if (lock) {
1027                 ldlm_policy_data_t policy = lock->l_policy_data;
1028                 LDLM_DEBUG(lock, "matching against this");
1029
1030                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1031                                          &lock->l_resource->lr_name),
1032                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1033                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1034                 LDLM_LOCK_PUT(lock);
1035
1036                 memcpy(&old_lock, lockh, sizeof(*lockh));
1037                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1038                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1039                         ldlm_lock_decref_and_cancel(lockh,
1040                                                     it->d.lustre.it_lock_mode);
1041                         memcpy(lockh, &old_lock, sizeof(old_lock));
1042                         it->d.lustre.it_lock_handle = lockh->cookie;
1043                 }
1044         }
1045         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1046                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1047                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1048         return rc;
1049 }
1050
1051 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1052                         struct lu_fid *fid, __u64 *bits)
1053 {
1054         /* We could just return 1 immediately, but since we should only
1055          * be called in revalidate_it if we already have a lock, let's
1056          * verify that. */
1057         struct ldlm_res_id res_id;
1058         struct lustre_handle lockh;
1059         ldlm_policy_data_t policy;
1060         ldlm_mode_t mode;
1061
1062         if (it->d.lustre.it_lock_handle) {
1063                 lockh.cookie = it->d.lustre.it_lock_handle;
1064                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1065         } else {
1066                 fid_build_reg_res_name(fid, &res_id);
1067                 switch (it->it_op) {
1068                 case IT_GETATTR:
1069                         /* File attributes are held under multiple bits:
1070                          * nlink is under lookup lock, size and times are
1071                          * under UPDATE lock and recently we've also got
1072                          * a separate permissions lock for owner/group/acl that
1073                          * were protected by lookup lock before.
1074                          * Getattr must provide all of that information,
1075                          * so we need to ensure we have all of those locks.
1076                          * Unfortunately, if the bits are split across multiple
1077                          * locks, there's no easy way to match all of them here,
1078                          * so an extra RPC would be performed to fetch all
1079                          * of those bits at once for now. */
1080                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1081                          * but for old MDTs (< 2.4), permission is covered
1082                          * by LOOKUP lock, so it needs to match all bits here.*/
1083                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1084                                                   MDS_INODELOCK_LOOKUP |
1085                                                   MDS_INODELOCK_PERM;
1086                         break;
1087                 case IT_LAYOUT:
1088                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1089                         break;
1090                 default:
1091                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1092                         break;
1093                 }
1094
1095                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1096                                        LDLM_IBITS, &policy,
1097                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1098                                       &lockh);
1099         }
1100
1101         if (mode) {
1102                 it->d.lustre.it_lock_handle = lockh.cookie;
1103                 it->d.lustre.it_lock_mode = mode;
1104         } else {
1105                 it->d.lustre.it_lock_handle = 0;
1106                 it->d.lustre.it_lock_mode = 0;
1107         }
1108
1109         return !!mode;
1110 }
1111
1112 /*
1113  * This long block is all about fixing up the lock and request state
1114  * so that it is correct as of the moment _before_ the operation was
1115  * applied; that way, the VFS will think that everything is normal and
1116  * call Lustre's regular VFS methods.
1117  *
1118  * If we're performing a creation, that means that unless the creation
1119  * failed with EEXIST, we should fake up a negative dentry.
1120  *
1121  * For everything else, we want to lookup to succeed.
1122  *
1123  * One additional note: if CREATE or OPEN succeeded, we add an extra
1124  * reference to the request because we need to keep it around until
1125  * ll_create/ll_open gets called.
1126  *
1127  * The server will return to us, in it_disposition, an indication of
1128  * exactly what d.lustre.it_status refers to.
1129  *
1130  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1131  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1132  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1133  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1134  * was successful.
1135  *
1136  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1137  * child lookup.
1138  */
1139 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1140                     void *lmm, int lmmsize, struct lookup_intent *it,
1141                     int lookup_flags, struct ptlrpc_request **reqp,
1142                     ldlm_blocking_callback cb_blocking,
1143                     __u64 extra_lock_flags)
1144 {
1145         struct ldlm_enqueue_info einfo = {
1146                 .ei_type        = LDLM_IBITS,
1147                 .ei_mode        = it_to_lock_mode(it),
1148                 .ei_cb_bl       = cb_blocking,
1149                 .ei_cb_cp       = ldlm_completion_ast,
1150         };
1151         struct lustre_handle lockh;
1152         int rc = 0;
1153
1154         LASSERT(it);
1155
1156         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1157                 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1158                 op_data->op_name, PFID(&op_data->op_fid2),
1159                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1160                 it->it_flags);
1161
1162         lockh.cookie = 0;
1163         if (fid_is_sane(&op_data->op_fid2) &&
1164             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1165                 /* We could just return 1 immediately, but since we should only
1166                  * be called in revalidate_it if we already have a lock, let's
1167                  * verify that. */
1168                 it->d.lustre.it_lock_handle = 0;
1169                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1170                 /* Only return failure if it was not GETATTR by cfid
1171                    (from inode_revalidate) */
1172                 if (rc || op_data->op_namelen != 0)
1173                         return rc;
1174         }
1175
1176         /* For case if upper layer did not alloc fid, do it now. */
1177         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1178                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1179                 if (rc < 0) {
1180                         CERROR("Can't alloc new fid, rc %d\n", rc);
1181                         return rc;
1182                 }
1183         }
1184         rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1185                          extra_lock_flags);
1186         if (rc < 0)
1187                 return rc;
1188
1189         *reqp = it->d.lustre.it_data;
1190         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1191         return rc;
1192 }
1193
1194 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1195                                               struct ptlrpc_request *req,
1196                                               void *args, int rc)
1197 {
1198         struct mdc_getattr_args  *ga = args;
1199         struct obd_export       *exp = ga->ga_exp;
1200         struct md_enqueue_info   *minfo = ga->ga_minfo;
1201         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1202         struct lookup_intent     *it;
1203         struct lustre_handle     *lockh;
1204         struct obd_device       *obddev;
1205         struct ldlm_reply        *lockrep;
1206         __u64                flags = LDLM_FL_HAS_INTENT;
1207
1208         it    = &minfo->mi_it;
1209         lockh = &minfo->mi_lockh;
1210
1211         obddev = class_exp2obd(exp);
1212
1213         mdc_exit_request(&obddev->u.cli);
1214         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1215                 rc = -ETIMEDOUT;
1216
1217         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1218                                    &flags, NULL, 0, lockh, rc);
1219         if (rc < 0) {
1220                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1221                 mdc_clear_replay_flag(req, rc);
1222                 GOTO(out, rc);
1223         }
1224
1225         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1226         LASSERT(lockrep != NULL);
1227
1228         lockrep->lock_policy_res2 =
1229                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1230
1231         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1232         if (rc)
1233                 GOTO(out, rc);
1234
1235         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1236
1237 out:
1238         OBD_FREE_PTR(einfo);
1239         minfo->mi_cb(req, minfo, rc);
1240         return 0;
1241 }
1242
1243 int mdc_intent_getattr_async(struct obd_export *exp,
1244                              struct md_enqueue_info *minfo,
1245                              struct ldlm_enqueue_info *einfo)
1246 {
1247         struct md_op_data       *op_data = &minfo->mi_data;
1248         struct lookup_intent    *it = &minfo->mi_it;
1249         struct ptlrpc_request   *req;
1250         struct mdc_getattr_args *ga;
1251         struct obd_device       *obddev = class_exp2obd(exp);
1252         struct ldlm_res_id       res_id;
1253         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1254          *     for statahead currently. Consider CMD in future, such two bits
1255          *     maybe managed by different MDS, should be adjusted then. */
1256         ldlm_policy_data_t       policy = {
1257                                         .l_inodebits = { MDS_INODELOCK_LOOKUP |
1258                                                          MDS_INODELOCK_UPDATE }
1259                                  };
1260         int                   rc = 0;
1261         __u64               flags = LDLM_FL_HAS_INTENT;
1262
1263         CDEBUG(D_DLMTRACE,
1264                 "name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1265                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1266                 ldlm_it2str(it->it_op), it->it_flags);
1267
1268         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1269         req = mdc_intent_getattr_pack(exp, it, op_data);
1270         if (IS_ERR(req))
1271                 return PTR_ERR(req);
1272
1273         rc = mdc_enter_request(&obddev->u.cli);
1274         if (rc != 0) {
1275                 ptlrpc_req_finished(req);
1276                 return rc;
1277         }
1278
1279         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1280                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1281         if (rc < 0) {
1282                 mdc_exit_request(&obddev->u.cli);
1283                 ptlrpc_req_finished(req);
1284                 return rc;
1285         }
1286
1287         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1288         ga = ptlrpc_req_async_args(req);
1289         ga->ga_exp = exp;
1290         ga->ga_minfo = minfo;
1291         ga->ga_einfo = einfo;
1292
1293         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1294         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1295
1296         return 0;
1297 }