]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/mdc/mdc_request.c
staging/lustre/mdc: Adjust NULL comparison codestyle
[karo-tx-linux.git] / drivers / staging / lustre / lustre / mdc / mdc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 # include <linux/module.h>
40 # include <linux/pagemap.h>
41 # include <linux/miscdevice.h>
42 # include <linux/init.h>
43 # include <linux/utsname.h>
44
45 #include "../include/lustre_acl.h"
46 #include "../include/obd_class.h"
47 #include "../include/lustre_fid.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_param.h"
50 #include "../include/lustre_log.h"
51 #include "../include/lustre_kernelcomm.h"
52
53 #include "mdc_internal.h"
54
55 #define REQUEST_MINOR 244
56
57 static int mdc_cleanup(struct obd_device *obd);
58
59 static inline int mdc_queue_wait(struct ptlrpc_request *req)
60 {
61         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
62         int rc;
63
64         /* mdc_enter_request() ensures that this client has no more
65          * than cl_max_rpcs_in_flight RPCs simultaneously inf light
66          * against an MDT. */
67         rc = mdc_enter_request(cli);
68         if (rc != 0)
69                 return rc;
70
71         rc = ptlrpc_queue_wait(req);
72         mdc_exit_request(cli);
73
74         return rc;
75 }
76
77 static int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid)
78 {
79         struct ptlrpc_request *req;
80         struct mdt_body       *body;
81         int                 rc;
82
83         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
84                                         &RQF_MDS_GETSTATUS,
85                                         LUSTRE_MDS_VERSION, MDS_GETSTATUS);
86         if (!req)
87                 return -ENOMEM;
88
89         mdc_pack_body(req, NULL, 0, 0, -1, 0);
90         req->rq_send_state = LUSTRE_IMP_FULL;
91
92         ptlrpc_request_set_replen(req);
93
94         rc = ptlrpc_queue_wait(req);
95         if (rc)
96                 goto out;
97
98         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
99         if (!body) {
100                 rc = -EPROTO;
101                 goto out;
102         }
103
104         *rootfid = body->fid1;
105         CDEBUG(D_NET,
106                "root fid="DFID", last_committed=%llu\n",
107                PFID(rootfid),
108                lustre_msg_get_last_committed(req->rq_repmsg));
109 out:
110         ptlrpc_req_finished(req);
111         return rc;
112 }
113
114 /*
115  * This function now is known to always saying that it will receive 4 buffers
116  * from server. Even for cases when acl_size and md_size is zero, RPC header
117  * will contain 4 fields and RPC itself will contain zero size fields. This is
118  * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed
119  * and thus zero, it shrinks it, making zero size. The same story about
120  * md_size. And this is course of problem when client waits for smaller number
121  * of fields. This issue will be fixed later when client gets aware of RPC
122  * layouts.  --umka
123  */
124 static int mdc_getattr_common(struct obd_export *exp,
125                               struct ptlrpc_request *req)
126 {
127         struct req_capsule *pill = &req->rq_pill;
128         struct mdt_body    *body;
129         void           *eadata;
130         int              rc;
131
132         /* Request message already built. */
133         rc = ptlrpc_queue_wait(req);
134         if (rc != 0)
135                 return rc;
136
137         /* sanity check for the reply */
138         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
139         if (!body)
140                 return -EPROTO;
141
142         CDEBUG(D_NET, "mode: %o\n", body->mode);
143
144         if (body->eadatasize != 0) {
145                 mdc_update_max_ea_from_body(exp, body);
146
147                 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
148                                                       body->eadatasize);
149                 if (!eadata)
150                         return -EPROTO;
151         }
152
153         if (body->valid & OBD_MD_FLRMTPERM) {
154                 struct mdt_remote_perm *perm;
155
156                 LASSERT(client_is_remote(exp));
157                 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
158                                                 lustre_swab_mdt_remote_perm);
159                 if (!perm)
160                         return -EPROTO;
161         }
162
163         return 0;
164 }
165
166 static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
167                 struct ptlrpc_request **request)
168 {
169         struct ptlrpc_request *req;
170         int                 rc;
171
172         /* Single MDS without an LMV case */
173         if (op_data->op_flags & MF_GET_MDT_IDX) {
174                 op_data->op_mds = 0;
175                 return 0;
176         }
177         *request = NULL;
178         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
179         if (!req)
180                 return -ENOMEM;
181
182         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
183         if (rc) {
184                 ptlrpc_request_free(req);
185                 return rc;
186         }
187
188         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
189                       op_data->op_mode, -1, 0);
190
191         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
192                              op_data->op_mode);
193         if (op_data->op_valid & OBD_MD_FLRMTPERM) {
194                 LASSERT(client_is_remote(exp));
195                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
196                                      sizeof(struct mdt_remote_perm));
197         }
198         ptlrpc_request_set_replen(req);
199
200         rc = mdc_getattr_common(exp, req);
201         if (rc)
202                 ptlrpc_req_finished(req);
203         else
204                 *request = req;
205         return rc;
206 }
207
208 static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
209                      struct ptlrpc_request **request)
210 {
211         struct ptlrpc_request *req;
212         int                 rc;
213
214         *request = NULL;
215         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
216                                    &RQF_MDS_GETATTR_NAME);
217         if (!req)
218                 return -ENOMEM;
219
220         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
221                              op_data->op_namelen + 1);
222
223         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR_NAME);
224         if (rc) {
225                 ptlrpc_request_free(req);
226                 return rc;
227         }
228
229         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
230                       op_data->op_mode, op_data->op_suppgids[0], 0);
231
232         if (op_data->op_name) {
233                 char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
234
235                 LASSERT(strnlen(op_data->op_name, op_data->op_namelen) ==
236                                 op_data->op_namelen);
237                 memcpy(name, op_data->op_name, op_data->op_namelen);
238         }
239
240         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
241                              op_data->op_mode);
242         ptlrpc_request_set_replen(req);
243
244         rc = mdc_getattr_common(exp, req);
245         if (rc)
246                 ptlrpc_req_finished(req);
247         else
248                 *request = req;
249         return rc;
250 }
251
252 static int mdc_is_subdir(struct obd_export *exp,
253                          const struct lu_fid *pfid,
254                          const struct lu_fid *cfid,
255                          struct ptlrpc_request **request)
256 {
257         struct ptlrpc_request  *req;
258         int                  rc;
259
260         *request = NULL;
261         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
262                                         &RQF_MDS_IS_SUBDIR, LUSTRE_MDS_VERSION,
263                                         MDS_IS_SUBDIR);
264         if (!req)
265                 return -ENOMEM;
266
267         mdc_is_subdir_pack(req, pfid, cfid, 0);
268         ptlrpc_request_set_replen(req);
269
270         rc = ptlrpc_queue_wait(req);
271         if (rc && rc != -EREMOTE)
272                 ptlrpc_req_finished(req);
273         else
274                 *request = req;
275         return rc;
276 }
277
278 static int mdc_xattr_common(struct obd_export *exp,
279                             const struct req_format *fmt,
280                             const struct lu_fid *fid,
281                             int opcode, u64 valid,
282                             const char *xattr_name, const char *input,
283                             int input_size, int output_size, int flags,
284                             __u32 suppgid, struct ptlrpc_request **request)
285 {
286         struct ptlrpc_request *req;
287         int   xattr_namelen = 0;
288         char *tmp;
289         int   rc;
290
291         *request = NULL;
292         req = ptlrpc_request_alloc(class_exp2cliimp(exp), fmt);
293         if (!req)
294                 return -ENOMEM;
295
296         if (xattr_name) {
297                 xattr_namelen = strlen(xattr_name) + 1;
298                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
299                                      xattr_namelen);
300         }
301         if (input_size) {
302                 LASSERT(input);
303                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304                                      input_size);
305         }
306
307         /* Flush local XATTR locks to get rid of a possible cancel RPC */
308         if (opcode == MDS_REINT && fid_is_sane(fid) &&
309             exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) {
310                 LIST_HEAD(cancels);
311                 int count;
312
313                 /* Without that packing would fail */
314                 if (input_size == 0)
315                         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
316                                              RCL_CLIENT, 0);
317
318                 count = mdc_resource_get_unused(exp, fid,
319                                                 &cancels, LCK_EX,
320                                                 MDS_INODELOCK_XATTR);
321
322                 rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
323                 if (rc) {
324                         ptlrpc_request_free(req);
325                         return rc;
326                 }
327         } else {
328                 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
329                 if (rc) {
330                         ptlrpc_request_free(req);
331                         return rc;
332                 }
333         }
334
335         if (opcode == MDS_REINT) {
336                 struct mdt_rec_setxattr *rec;
337
338                 CLASSERT(sizeof(struct mdt_rec_setxattr) ==
339                          sizeof(struct mdt_rec_reint));
340                 rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
341                 rec->sx_opcode = REINT_SETXATTR;
342                 rec->sx_fsuid  = from_kuid(&init_user_ns, current_fsuid());
343                 rec->sx_fsgid  = from_kgid(&init_user_ns, current_fsgid());
344                 rec->sx_cap    = cfs_curproc_cap_pack();
345                 rec->sx_suppgid1 = suppgid;
346                 rec->sx_suppgid2 = -1;
347                 rec->sx_fid    = *fid;
348                 rec->sx_valid  = valid | OBD_MD_FLCTIME;
349                 rec->sx_time   = ktime_get_real_seconds();
350                 rec->sx_size   = output_size;
351                 rec->sx_flags  = flags;
352
353         } else {
354                 mdc_pack_body(req, fid, valid, output_size, suppgid, flags);
355         }
356
357         if (xattr_name) {
358                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
359                 memcpy(tmp, xattr_name, xattr_namelen);
360         }
361         if (input_size) {
362                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
363                 memcpy(tmp, input, input_size);
364         }
365
366         if (req_capsule_has_field(&req->rq_pill, &RMF_EADATA, RCL_SERVER))
367                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
368                                      RCL_SERVER, output_size);
369         ptlrpc_request_set_replen(req);
370
371         /* make rpc */
372         if (opcode == MDS_REINT)
373                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
374
375         rc = ptlrpc_queue_wait(req);
376
377         if (opcode == MDS_REINT)
378                 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
379
380         if (rc)
381                 ptlrpc_req_finished(req);
382         else
383                 *request = req;
384         return rc;
385 }
386
387 static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
388                         u64 valid, const char *xattr_name,
389                         const char *input, int input_size, int output_size,
390                         int flags, __u32 suppgid,
391                         struct ptlrpc_request **request)
392 {
393         return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR,
394                                 fid, MDS_REINT, valid, xattr_name,
395                                 input, input_size, output_size, flags,
396                                 suppgid, request);
397 }
398
399 static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
400                         u64 valid, const char *xattr_name,
401                         const char *input, int input_size, int output_size,
402                         int flags, struct ptlrpc_request **request)
403 {
404         return mdc_xattr_common(exp, &RQF_MDS_GETXATTR,
405                                 fid, MDS_GETXATTR, valid, xattr_name,
406                                 input, input_size, output_size, flags,
407                                 -1, request);
408 }
409
410 #ifdef CONFIG_FS_POSIX_ACL
411 static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md)
412 {
413         struct req_capsule     *pill = &req->rq_pill;
414         struct mdt_body *body = md->body;
415         struct posix_acl       *acl;
416         void               *buf;
417         int                  rc;
418
419         if (!body->aclsize)
420                 return 0;
421
422         buf = req_capsule_server_sized_get(pill, &RMF_ACL, body->aclsize);
423
424         if (!buf)
425                 return -EPROTO;
426
427         acl = posix_acl_from_xattr(&init_user_ns, buf, body->aclsize);
428         if (!acl)
429                 return 0;
430
431         if (IS_ERR(acl)) {
432                 rc = PTR_ERR(acl);
433                 CERROR("convert xattr to acl: %d\n", rc);
434                 return rc;
435         }
436
437         rc = posix_acl_valid(acl);
438         if (rc) {
439                 CERROR("validate acl: %d\n", rc);
440                 posix_acl_release(acl);
441                 return rc;
442         }
443
444         md->posix_acl = acl;
445         return 0;
446 }
447 #else
448 #define mdc_unpack_acl(req, md) 0
449 #endif
450
451 static int mdc_get_lustre_md(struct obd_export *exp,
452                              struct ptlrpc_request *req,
453                              struct obd_export *dt_exp,
454                              struct obd_export *md_exp,
455                              struct lustre_md *md)
456 {
457         struct req_capsule *pill = &req->rq_pill;
458         int rc;
459
460         LASSERT(md);
461         memset(md, 0, sizeof(*md));
462
463         md->body = req_capsule_server_get(pill, &RMF_MDT_BODY);
464
465         if (md->body->valid & OBD_MD_FLEASIZE) {
466                 int lmmsize;
467                 struct lov_mds_md *lmm;
468
469                 if (!S_ISREG(md->body->mode)) {
470                         CDEBUG(D_INFO,
471                                "OBD_MD_FLEASIZE set, should be a regular file, but is not\n");
472                         rc = -EPROTO;
473                         goto out;
474                 }
475
476                 if (md->body->eadatasize == 0) {
477                         CDEBUG(D_INFO,
478                                "OBD_MD_FLEASIZE set, but eadatasize 0\n");
479                         rc = -EPROTO;
480                         goto out;
481                 }
482                 lmmsize = md->body->eadatasize;
483                 lmm = req_capsule_server_sized_get(pill, &RMF_MDT_MD, lmmsize);
484                 if (!lmm) {
485                         rc = -EPROTO;
486                         goto out;
487                 }
488
489                 rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
490                 if (rc < 0)
491                         goto out;
492
493                 if (rc < sizeof(*md->lsm)) {
494                         CDEBUG(D_INFO,
495                                "lsm size too small: rc < sizeof (*md->lsm) (%d < %d)\n",
496                                rc, (int)sizeof(*md->lsm));
497                         rc = -EPROTO;
498                         goto out;
499                 }
500
501         } else if (md->body->valid & OBD_MD_FLDIREA) {
502                 int lmvsize;
503                 struct lov_mds_md *lmv;
504
505                 if (!S_ISDIR(md->body->mode)) {
506                         CDEBUG(D_INFO,
507                                "OBD_MD_FLDIREA set, should be a directory, but is not\n");
508                         rc = -EPROTO;
509                         goto out;
510                 }
511
512                 if (md->body->eadatasize == 0) {
513                         CDEBUG(D_INFO,
514                                "OBD_MD_FLDIREA is set, but eadatasize 0\n");
515                         return -EPROTO;
516                 }
517                 if (md->body->valid & OBD_MD_MEA) {
518                         lmvsize = md->body->eadatasize;
519                         lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
520                                                            lmvsize);
521                         if (!lmv) {
522                                 rc = -EPROTO;
523                                 goto out;
524                         }
525
526                         rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv,
527                                           lmvsize);
528                         if (rc < 0)
529                                 goto out;
530
531                         if (rc < sizeof(*md->mea)) {
532                                 CDEBUG(D_INFO,
533                                        "size too small: rc < sizeof(*md->mea) (%d < %d)\n",
534                                         rc, (int)sizeof(*md->mea));
535                                 rc = -EPROTO;
536                                 goto out;
537                         }
538                 }
539         }
540         rc = 0;
541
542         if (md->body->valid & OBD_MD_FLRMTPERM) {
543                 /* remote permission */
544                 LASSERT(client_is_remote(exp));
545                 md->remote_perm = req_capsule_server_swab_get(pill, &RMF_ACL,
546                                                 lustre_swab_mdt_remote_perm);
547                 if (!md->remote_perm) {
548                         rc = -EPROTO;
549                         goto out;
550                 }
551         } else if (md->body->valid & OBD_MD_FLACL) {
552                 /* for ACL, it's possible that FLACL is set but aclsize is zero.
553                  * only when aclsize != 0 there's an actual segment for ACL
554                  * in reply buffer.
555                  */
556                 if (md->body->aclsize) {
557                         rc = mdc_unpack_acl(req, md);
558                         if (rc)
559                                 goto out;
560 #ifdef CONFIG_FS_POSIX_ACL
561                 } else {
562                         md->posix_acl = NULL;
563 #endif
564                 }
565         }
566
567 out:
568         if (rc) {
569 #ifdef CONFIG_FS_POSIX_ACL
570                 posix_acl_release(md->posix_acl);
571 #endif
572                 if (md->lsm)
573                         obd_free_memmd(dt_exp, &md->lsm);
574         }
575         return rc;
576 }
577
578 static int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
579 {
580         return 0;
581 }
582
583 /**
584  * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING
585  * RPC chains.
586  */
587 void mdc_replay_open(struct ptlrpc_request *req)
588 {
589         struct md_open_data *mod = req->rq_cb_data;
590         struct ptlrpc_request *close_req;
591         struct obd_client_handle *och;
592         struct lustre_handle old;
593         struct mdt_body *body;
594
595         if (!mod) {
596                 DEBUG_REQ(D_ERROR, req,
597                           "Can't properly replay without open data.");
598                 return;
599         }
600
601         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
602
603         och = mod->mod_och;
604         if (och) {
605                 struct lustre_handle *file_fh;
606
607                 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
608
609                 file_fh = &och->och_fh;
610                 CDEBUG(D_HA, "updating handle from %#llx to %#llx\n",
611                        file_fh->cookie, body->handle.cookie);
612                 old = *file_fh;
613                 *file_fh = body->handle;
614         }
615         close_req = mod->mod_close_req;
616         if (close_req) {
617                 __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
618                 struct mdt_ioepoch *epoch;
619
620                 LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING);
621                 epoch = req_capsule_client_get(&close_req->rq_pill,
622                                                &RMF_MDT_EPOCH);
623                 LASSERT(epoch);
624
625                 if (och)
626                         LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
627                 DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
628                 epoch->handle = body->handle;
629         }
630 }
631
632 void mdc_commit_open(struct ptlrpc_request *req)
633 {
634         struct md_open_data *mod = req->rq_cb_data;
635
636         if (!mod)
637                 return;
638
639         /**
640          * No need to touch md_open_data::mod_och, it holds a reference on
641          * \var mod and will zero references to each other, \var mod will be
642          * freed after that when md_open_data::mod_och will put the reference.
643          */
644
645         /**
646          * Do not let open request to disappear as it still may be needed
647          * for close rpc to happen (it may happen on evict only, otherwise
648          * ptlrpc_request::rq_replay does not let mdc_commit_open() to be
649          * called), just mark this rpc as committed to distinguish these 2
650          * cases, see mdc_close() for details. The open request reference will
651          * be put along with freeing \var mod.
652          */
653         ptlrpc_request_addref(req);
654         spin_lock(&req->rq_lock);
655         req->rq_committed = 1;
656         spin_unlock(&req->rq_lock);
657         req->rq_cb_data = NULL;
658         obd_mod_put(mod);
659 }
660
661 int mdc_set_open_replay_data(struct obd_export *exp,
662                              struct obd_client_handle *och,
663                              struct lookup_intent *it)
664 {
665         struct md_open_data   *mod;
666         struct mdt_rec_create *rec;
667         struct mdt_body       *body;
668         struct ptlrpc_request *open_req = it->d.lustre.it_data;
669         struct obd_import     *imp = open_req->rq_import;
670
671         if (!open_req->rq_replay)
672                 return 0;
673
674         rec = req_capsule_client_get(&open_req->rq_pill, &RMF_REC_REINT);
675         body = req_capsule_server_get(&open_req->rq_pill, &RMF_MDT_BODY);
676         LASSERT(rec);
677         /* Incoming message in my byte order (it's been swabbed). */
678         /* Outgoing messages always in my byte order. */
679         LASSERT(body);
680
681         /* Only if the import is replayable, we set replay_open data */
682         if (och && imp->imp_replayable) {
683                 mod = obd_mod_alloc();
684                 if (!mod) {
685                         DEBUG_REQ(D_ERROR, open_req,
686                                   "Can't allocate md_open_data");
687                         return 0;
688                 }
689
690                 /**
691                  * Take a reference on \var mod, to be freed on mdc_close().
692                  * It protects \var mod from being freed on eviction (commit
693                  * callback is called despite rq_replay flag).
694                  * Another reference for \var och.
695                  */
696                 obd_mod_get(mod);
697                 obd_mod_get(mod);
698
699                 spin_lock(&open_req->rq_lock);
700                 och->och_mod = mod;
701                 mod->mod_och = och;
702                 mod->mod_is_create = it_disposition(it, DISP_OPEN_CREATE) ||
703                                      it_disposition(it, DISP_OPEN_STRIPE);
704                 mod->mod_open_req = open_req;
705                 open_req->rq_cb_data = mod;
706                 open_req->rq_commit_cb = mdc_commit_open;
707                 spin_unlock(&open_req->rq_lock);
708         }
709
710         rec->cr_fid2 = body->fid1;
711         rec->cr_ioepoch = body->ioepoch;
712         rec->cr_old_handle.cookie = body->handle.cookie;
713         open_req->rq_replay_cb = mdc_replay_open;
714         if (!fid_is_sane(&body->fid1)) {
715                 DEBUG_REQ(D_ERROR, open_req,
716                           "Saving replay request with insane fid");
717                 LBUG();
718         }
719
720         DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data");
721         return 0;
722 }
723
724 static void mdc_free_open(struct md_open_data *mod)
725 {
726         int committed = 0;
727
728         if (mod->mod_is_create == 0 &&
729             imp_connect_disp_stripe(mod->mod_open_req->rq_import))
730                 committed = 1;
731
732         LASSERT(mod->mod_open_req->rq_replay == 0);
733
734         DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "free open request\n");
735
736         ptlrpc_request_committed(mod->mod_open_req, committed);
737         if (mod->mod_close_req)
738                 ptlrpc_request_committed(mod->mod_close_req, committed);
739 }
740
741 static int mdc_clear_open_replay_data(struct obd_export *exp,
742                                       struct obd_client_handle *och)
743 {
744         struct md_open_data *mod = och->och_mod;
745
746         /**
747          * It is possible to not have \var mod in a case of eviction between
748          * lookup and ll_file_open().
749          **/
750         if (!mod)
751                 return 0;
752
753         LASSERT(mod != LP_POISON);
754         LASSERT(mod->mod_open_req);
755         mdc_free_open(mod);
756
757         mod->mod_och = NULL;
758         och->och_mod = NULL;
759         obd_mod_put(mod);
760
761         return 0;
762 }
763
764 /* Prepares the request for the replay by the given reply */
765 static void mdc_close_handle_reply(struct ptlrpc_request *req,
766                                    struct md_op_data *op_data, int rc) {
767         struct mdt_body  *repbody;
768         struct mdt_ioepoch *epoch;
769
770         if (req && rc == -EAGAIN) {
771                 repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
772                 epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
773
774                 epoch->flags |= MF_SOM_AU;
775                 if (repbody->valid & OBD_MD_FLGETATTRLOCK)
776                         op_data->op_flags |= MF_GETATTR_LOCK;
777         }
778 }
779
780 static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
781                      struct md_open_data *mod, struct ptlrpc_request **request)
782 {
783         struct obd_device     *obd = class_exp2obd(exp);
784         struct ptlrpc_request *req;
785         struct req_format     *req_fmt;
786         int                    rc;
787         int                    saved_rc = 0;
788
789         req_fmt = &RQF_MDS_CLOSE;
790         if (op_data->op_bias & MDS_HSM_RELEASE) {
791                 req_fmt = &RQF_MDS_RELEASE_CLOSE;
792
793                 /* allocate a FID for volatile file */
794                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
795                 if (rc < 0) {
796                         CERROR("%s: "DFID" failed to allocate FID: %d\n",
797                                obd->obd_name, PFID(&op_data->op_fid1), rc);
798                         /* save the errcode and proceed to close */
799                         saved_rc = rc;
800                 }
801         }
802
803         *request = NULL;
804         req = ptlrpc_request_alloc(class_exp2cliimp(exp), req_fmt);
805         if (!req)
806                 return -ENOMEM;
807
808         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE);
809         if (rc) {
810                 ptlrpc_request_free(req);
811                 return rc;
812         }
813
814         /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a
815          * portal whose threads are not taking any DLM locks and are therefore
816          * always progressing */
817         req->rq_request_portal = MDS_READPAGE_PORTAL;
818         ptlrpc_at_set_req_timeout(req);
819
820         /* Ensure that this close's handle is fixed up during replay. */
821         if (likely(mod)) {
822                 LASSERTF(mod->mod_open_req &&
823                          mod->mod_open_req->rq_type != LI_POISON,
824                          "POISONED open %p!\n", mod->mod_open_req);
825
826                 mod->mod_close_req = req;
827
828                 DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
829                 /* We no longer want to preserve this open for replay even
830                  * though the open was committed. b=3632, b=3633 */
831                 spin_lock(&mod->mod_open_req->rq_lock);
832                 mod->mod_open_req->rq_replay = 0;
833                 spin_unlock(&mod->mod_open_req->rq_lock);
834         } else {
835                  CDEBUG(D_HA,
836                         "couldn't find open req; expecting close error\n");
837         }
838
839         mdc_close_pack(req, op_data);
840
841         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
842                              obd->u.cli.cl_default_mds_easize);
843         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
844                              obd->u.cli.cl_default_mds_cookiesize);
845
846         ptlrpc_request_set_replen(req);
847
848         mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
849         rc = ptlrpc_queue_wait(req);
850         mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
851
852         if (!req->rq_repmsg) {
853                 CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
854                        req->rq_status);
855                 if (rc == 0)
856                         rc = req->rq_status ?: -EIO;
857         } else if (rc == 0 || rc == -EAGAIN) {
858                 struct mdt_body *body;
859
860                 rc = lustre_msg_get_status(req->rq_repmsg);
861                 if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
862                         DEBUG_REQ(D_ERROR, req,
863                                   "type == PTL_RPC_MSG_ERR, err = %d", rc);
864                         if (rc > 0)
865                                 rc = -rc;
866                 }
867                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
868                 if (!body)
869                         rc = -EPROTO;
870         } else if (rc == -ESTALE) {
871                 /**
872                  * it can be allowed error after 3633 if open was committed and
873                  * server failed before close was sent. Let's check if mod
874                  * exists and return no error in that case
875                  */
876                 if (mod) {
877                         DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc);
878                         if (mod->mod_open_req->rq_committed)
879                                 rc = 0;
880                 }
881         }
882
883         if (mod) {
884                 if (rc != 0)
885                         mod->mod_close_req = NULL;
886                 /* Since now, mod is accessed through open_req only,
887                  * thus close req does not keep a reference on mod anymore. */
888                 obd_mod_put(mod);
889         }
890         *request = req;
891         mdc_close_handle_reply(req, op_data, rc);
892         return rc < 0 ? rc : saved_rc;
893 }
894
895 static int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
896                             struct md_open_data *mod)
897 {
898         struct obd_device     *obd = class_exp2obd(exp);
899         struct ptlrpc_request *req;
900         int                 rc;
901
902         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
903                                    &RQF_MDS_DONE_WRITING);
904         if (!req)
905                 return -ENOMEM;
906
907         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING);
908         if (rc) {
909                 ptlrpc_request_free(req);
910                 return rc;
911         }
912
913         if (mod) {
914                 LASSERTF(mod->mod_open_req &&
915                          mod->mod_open_req->rq_type != LI_POISON,
916                          "POISONED setattr %p!\n", mod->mod_open_req);
917
918                 mod->mod_close_req = req;
919                 DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr");
920                 /* We no longer want to preserve this setattr for replay even
921                  * though the open was committed. b=3632, b=3633 */
922                 spin_lock(&mod->mod_open_req->rq_lock);
923                 mod->mod_open_req->rq_replay = 0;
924                 spin_unlock(&mod->mod_open_req->rq_lock);
925         }
926
927         mdc_close_pack(req, op_data);
928         ptlrpc_request_set_replen(req);
929
930         mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
931         rc = ptlrpc_queue_wait(req);
932         mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
933
934         if (rc == -ESTALE) {
935                 /**
936                  * it can be allowed error after 3633 if open or setattr were
937                  * committed and server failed before close was sent.
938                  * Let's check if mod exists and return no error in that case
939                  */
940                 if (mod) {
941                         if (mod->mod_open_req->rq_committed)
942                                 rc = 0;
943                 }
944         }
945
946         if (mod) {
947                 if (rc != 0)
948                         mod->mod_close_req = NULL;
949                 LASSERT(mod->mod_open_req);
950                 mdc_free_open(mod);
951
952                 /* Since now, mod is accessed through setattr req only,
953                  * thus DW req does not keep a reference on mod anymore. */
954                 obd_mod_put(mod);
955         }
956
957         mdc_close_handle_reply(req, op_data, rc);
958         ptlrpc_req_finished(req);
959         return rc;
960 }
961
962 static int mdc_readpage(struct obd_export *exp, struct md_op_data *op_data,
963                         struct page **pages, struct ptlrpc_request **request)
964 {
965         struct ptlrpc_request   *req;
966         struct ptlrpc_bulk_desc *desc;
967         int                   i;
968         wait_queue_head_t             waitq;
969         int                   resends = 0;
970         struct l_wait_info       lwi;
971         int                   rc;
972
973         *request = NULL;
974         init_waitqueue_head(&waitq);
975
976 restart_bulk:
977         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
978         if (!req)
979                 return -ENOMEM;
980
981         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE);
982         if (rc) {
983                 ptlrpc_request_free(req);
984                 return rc;
985         }
986
987         req->rq_request_portal = MDS_READPAGE_PORTAL;
988         ptlrpc_at_set_req_timeout(req);
989
990         desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK,
991                                     MDS_BULK_PORTAL);
992         if (!desc) {
993                 ptlrpc_request_free(req);
994                 return -ENOMEM;
995         }
996
997         /* NB req now owns desc and will free it when it gets freed */
998         for (i = 0; i < op_data->op_npages; i++)
999                 ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
1000
1001         mdc_readdir_pack(req, op_data->op_offset,
1002                          PAGE_CACHE_SIZE * op_data->op_npages,
1003                          &op_data->op_fid1);
1004
1005         ptlrpc_request_set_replen(req);
1006         rc = ptlrpc_queue_wait(req);
1007         if (rc) {
1008                 ptlrpc_req_finished(req);
1009                 if (rc != -ETIMEDOUT)
1010                         return rc;
1011
1012                 resends++;
1013                 if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1014                         CERROR("too many resend retries, returning error\n");
1015                         return -EIO;
1016                 }
1017                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends),
1018                                        NULL, NULL, NULL);
1019                 l_wait_event(waitq, 0, &lwi);
1020
1021                 goto restart_bulk;
1022         }
1023
1024         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
1025                                           req->rq_bulk->bd_nob_transferred);
1026         if (rc < 0) {
1027                 ptlrpc_req_finished(req);
1028                 return rc;
1029         }
1030
1031         if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) {
1032                 CERROR("Unexpected # bytes transferred: %d (%ld expected)\n",
1033                         req->rq_bulk->bd_nob_transferred,
1034                         PAGE_CACHE_SIZE * op_data->op_npages);
1035                 ptlrpc_req_finished(req);
1036                 return -EPROTO;
1037         }
1038
1039         *request = req;
1040         return 0;
1041 }
1042
1043 static int mdc_statfs(const struct lu_env *env,
1044                       struct obd_export *exp, struct obd_statfs *osfs,
1045                       __u64 max_age, __u32 flags)
1046 {
1047         struct obd_device     *obd = class_exp2obd(exp);
1048         struct ptlrpc_request *req;
1049         struct obd_statfs     *msfs;
1050         struct obd_import     *imp = NULL;
1051         int                 rc;
1052
1053         /*
1054          * Since the request might also come from lprocfs, so we need
1055          * sync this with client_disconnect_export Bug15684
1056          */
1057         down_read(&obd->u.cli.cl_sem);
1058         if (obd->u.cli.cl_import)
1059                 imp = class_import_get(obd->u.cli.cl_import);
1060         up_read(&obd->u.cli.cl_sem);
1061         if (!imp)
1062                 return -ENODEV;
1063
1064         req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_STATFS,
1065                                         LUSTRE_MDS_VERSION, MDS_STATFS);
1066         if (!req) {
1067                 rc = -ENOMEM;
1068                 goto output;
1069         }
1070
1071         ptlrpc_request_set_replen(req);
1072
1073         if (flags & OBD_STATFS_NODELAY) {
1074                 /* procfs requests not want stay in wait for avoid deadlock */
1075                 req->rq_no_resend = 1;
1076                 req->rq_no_delay = 1;
1077         }
1078
1079         rc = ptlrpc_queue_wait(req);
1080         if (rc) {
1081                 /* check connection error first */
1082                 if (imp->imp_connect_error)
1083                         rc = imp->imp_connect_error;
1084                 goto out;
1085         }
1086
1087         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1088         if (!msfs) {
1089                 rc = -EPROTO;
1090                 goto out;
1091         }
1092
1093         *osfs = *msfs;
1094 out:
1095         ptlrpc_req_finished(req);
1096 output:
1097         class_import_put(imp);
1098         return rc;
1099 }
1100
1101 static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
1102 {
1103         __u32 keylen, vallen;
1104         void *key;
1105         int rc;
1106
1107         if (gf->gf_pathlen > PATH_MAX)
1108                 return -ENAMETOOLONG;
1109         if (gf->gf_pathlen < 2)
1110                 return -EOVERFLOW;
1111
1112         /* Key is KEY_FID2PATH + getinfo_fid2path description */
1113         keylen = cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf);
1114         key = kzalloc(keylen, GFP_NOFS);
1115         if (!key)
1116                 return -ENOMEM;
1117         memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH));
1118         memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf));
1119
1120         CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n",
1121                PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno);
1122
1123         if (!fid_is_sane(&gf->gf_fid)) {
1124                 rc = -EINVAL;
1125                 goto out;
1126         }
1127
1128         /* Val is struct getinfo_fid2path result plus path */
1129         vallen = sizeof(*gf) + gf->gf_pathlen;
1130
1131         rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
1132         if (rc != 0 && rc != -EREMOTE)
1133                 goto out;
1134
1135         if (vallen <= sizeof(*gf)) {
1136                 rc = -EPROTO;
1137                 goto out;
1138         } else if (vallen > sizeof(*gf) + gf->gf_pathlen) {
1139                 rc = -EOVERFLOW;
1140                 goto out;
1141         }
1142
1143         CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n%s\n",
1144                PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno, gf->gf_path);
1145
1146 out:
1147         kfree(key);
1148         return rc;
1149 }
1150
1151 static int mdc_ioc_hsm_progress(struct obd_export *exp,
1152                                 struct hsm_progress_kernel *hpk)
1153 {
1154         struct obd_import               *imp = class_exp2cliimp(exp);
1155         struct hsm_progress_kernel      *req_hpk;
1156         struct ptlrpc_request           *req;
1157         int                              rc;
1158
1159         req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_PROGRESS,
1160                                         LUSTRE_MDS_VERSION, MDS_HSM_PROGRESS);
1161         if (!req) {
1162                 rc = -ENOMEM;
1163                 goto out;
1164         }
1165
1166         mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1167
1168         /* Copy hsm_progress struct */
1169         req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS);
1170         if (!req_hpk) {
1171                 rc = -EPROTO;
1172                 goto out;
1173         }
1174
1175         *req_hpk = *hpk;
1176         req_hpk->hpk_errval = lustre_errno_hton(hpk->hpk_errval);
1177
1178         ptlrpc_request_set_replen(req);
1179
1180         rc = mdc_queue_wait(req);
1181 out:
1182         ptlrpc_req_finished(req);
1183         return rc;
1184 }
1185
1186 static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives)
1187 {
1188         __u32                   *archive_mask;
1189         struct ptlrpc_request   *req;
1190         int                      rc;
1191
1192         req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_REGISTER,
1193                                         LUSTRE_MDS_VERSION,
1194                                         MDS_HSM_CT_REGISTER);
1195         if (!req) {
1196                 rc = -ENOMEM;
1197                 goto out;
1198         }
1199
1200         mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1201
1202         /* Copy hsm_progress struct */
1203         archive_mask = req_capsule_client_get(&req->rq_pill,
1204                                               &RMF_MDS_HSM_ARCHIVE);
1205         if (!archive_mask) {
1206                 rc = -EPROTO;
1207                 goto out;
1208         }
1209
1210         *archive_mask = archives;
1211
1212         ptlrpc_request_set_replen(req);
1213
1214         rc = mdc_queue_wait(req);
1215 out:
1216         ptlrpc_req_finished(req);
1217         return rc;
1218 }
1219
1220 static int mdc_ioc_hsm_current_action(struct obd_export *exp,
1221                                       struct md_op_data *op_data)
1222 {
1223         struct hsm_current_action       *hca = op_data->op_data;
1224         struct hsm_current_action       *req_hca;
1225         struct ptlrpc_request           *req;
1226         int                              rc;
1227
1228         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1229                                    &RQF_MDS_HSM_ACTION);
1230         if (!req)
1231                 return -ENOMEM;
1232
1233         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_ACTION);
1234         if (rc) {
1235                 ptlrpc_request_free(req);
1236                 return rc;
1237         }
1238
1239         mdc_pack_body(req, &op_data->op_fid1, OBD_MD_FLRMTPERM, 0,
1240                       op_data->op_suppgids[0], 0);
1241
1242         ptlrpc_request_set_replen(req);
1243
1244         rc = mdc_queue_wait(req);
1245         if (rc)
1246                 goto out;
1247
1248         req_hca = req_capsule_server_get(&req->rq_pill,
1249                                          &RMF_MDS_HSM_CURRENT_ACTION);
1250         if (!req_hca) {
1251                 rc = -EPROTO;
1252                 goto out;
1253         }
1254
1255         *hca = *req_hca;
1256
1257 out:
1258         ptlrpc_req_finished(req);
1259         return rc;
1260 }
1261
1262 static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
1263 {
1264         struct ptlrpc_request   *req;
1265         int                      rc;
1266
1267         req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_UNREGISTER,
1268                                         LUSTRE_MDS_VERSION,
1269                                         MDS_HSM_CT_UNREGISTER);
1270         if (!req) {
1271                 rc = -ENOMEM;
1272                 goto out;
1273         }
1274
1275         mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1276
1277         ptlrpc_request_set_replen(req);
1278
1279         rc = mdc_queue_wait(req);
1280 out:
1281         ptlrpc_req_finished(req);
1282         return rc;
1283 }
1284
1285 static int mdc_ioc_hsm_state_get(struct obd_export *exp,
1286                                  struct md_op_data *op_data)
1287 {
1288         struct hsm_user_state   *hus = op_data->op_data;
1289         struct hsm_user_state   *req_hus;
1290         struct ptlrpc_request   *req;
1291         int                      rc;
1292
1293         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1294                                    &RQF_MDS_HSM_STATE_GET);
1295         if (!req)
1296                 return -ENOMEM;
1297
1298         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_GET);
1299         if (rc != 0) {
1300                 ptlrpc_request_free(req);
1301                 return rc;
1302         }
1303
1304         mdc_pack_body(req, &op_data->op_fid1, OBD_MD_FLRMTPERM, 0,
1305                       op_data->op_suppgids[0], 0);
1306
1307         ptlrpc_request_set_replen(req);
1308
1309         rc = mdc_queue_wait(req);
1310         if (rc)
1311                 goto out;
1312
1313         req_hus = req_capsule_server_get(&req->rq_pill, &RMF_HSM_USER_STATE);
1314         if (!req_hus) {
1315                 rc = -EPROTO;
1316                 goto out;
1317         }
1318
1319         *hus = *req_hus;
1320
1321 out:
1322         ptlrpc_req_finished(req);
1323         return rc;
1324 }
1325
1326 static int mdc_ioc_hsm_state_set(struct obd_export *exp,
1327                                  struct md_op_data *op_data)
1328 {
1329         struct hsm_state_set    *hss = op_data->op_data;
1330         struct hsm_state_set    *req_hss;
1331         struct ptlrpc_request   *req;
1332         int                      rc;
1333
1334         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1335                                    &RQF_MDS_HSM_STATE_SET);
1336         if (!req)
1337                 return -ENOMEM;
1338
1339         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_SET);
1340         if (rc) {
1341                 ptlrpc_request_free(req);
1342                 return rc;
1343         }
1344
1345         mdc_pack_body(req, &op_data->op_fid1, OBD_MD_FLRMTPERM, 0,
1346                       op_data->op_suppgids[0], 0);
1347
1348         /* Copy states */
1349         req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET);
1350         if (!req_hss) {
1351                 rc = -EPROTO;
1352                 goto out;
1353         }
1354         *req_hss = *hss;
1355
1356         ptlrpc_request_set_replen(req);
1357
1358         rc = mdc_queue_wait(req);
1359 out:
1360         ptlrpc_req_finished(req);
1361         return rc;
1362 }
1363
1364 static int mdc_ioc_hsm_request(struct obd_export *exp,
1365                                struct hsm_user_request *hur)
1366 {
1367         struct obd_import       *imp = class_exp2cliimp(exp);
1368         struct ptlrpc_request   *req;
1369         struct hsm_request      *req_hr;
1370         struct hsm_user_item    *req_hui;
1371         char                    *req_opaque;
1372         int                      rc;
1373
1374         req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_REQUEST);
1375         if (!req) {
1376                 rc = -ENOMEM;
1377                 goto out;
1378         }
1379
1380         req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM, RCL_CLIENT,
1381                              hur->hur_request.hr_itemcount
1382                              * sizeof(struct hsm_user_item));
1383         req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, RCL_CLIENT,
1384                              hur->hur_request.hr_data_len);
1385
1386         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_REQUEST);
1387         if (rc) {
1388                 ptlrpc_request_free(req);
1389                 return rc;
1390         }
1391
1392         mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1393
1394         /* Copy hsm_request struct */
1395         req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST);
1396         if (!req_hr) {
1397                 rc = -EPROTO;
1398                 goto out;
1399         }
1400         *req_hr = hur->hur_request;
1401
1402         /* Copy hsm_user_item structs */
1403         req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM);
1404         if (!req_hui) {
1405                 rc = -EPROTO;
1406                 goto out;
1407         }
1408         memcpy(req_hui, hur->hur_user_item,
1409                hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item));
1410
1411         /* Copy opaque field */
1412         req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA);
1413         if (!req_opaque) {
1414                 rc = -EPROTO;
1415                 goto out;
1416         }
1417         memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len);
1418
1419         ptlrpc_request_set_replen(req);
1420
1421         rc = mdc_queue_wait(req);
1422 out:
1423         ptlrpc_req_finished(req);
1424         return rc;
1425 }
1426
1427 static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags)
1428 {
1429         struct kuc_hdr *lh = (struct kuc_hdr *)buf;
1430
1431         LASSERT(len <= KUC_CHANGELOG_MSG_MAXSIZE);
1432
1433         lh->kuc_magic = KUC_MAGIC;
1434         lh->kuc_transport = KUC_TRANSPORT_CHANGELOG;
1435         lh->kuc_flags = flags;
1436         lh->kuc_msgtype = CL_RECORD;
1437         lh->kuc_msglen = len;
1438         return lh;
1439 }
1440
1441 #define D_CHANGELOG 0
1442
1443 struct changelog_show {
1444         __u64           cs_startrec;
1445         __u32           cs_flags;
1446         struct file     *cs_fp;
1447         char            *cs_buf;
1448         struct obd_device *cs_obd;
1449 };
1450
1451 static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh,
1452                              struct llog_rec_hdr *hdr, void *data)
1453 {
1454         struct changelog_show *cs = data;
1455         struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
1456         struct kuc_hdr *lh;
1457         int len, rc;
1458
1459         if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
1460                 rc = -EINVAL;
1461                 CERROR("%s: not a changelog rec %x/%d: rc = %d\n",
1462                        cs->cs_obd->obd_name, rec->cr_hdr.lrh_type,
1463                        rec->cr.cr_type, rc);
1464                 return rc;
1465         }
1466
1467         if (rec->cr.cr_index < cs->cs_startrec) {
1468                 /* Skip entries earlier than what we are interested in */
1469                 CDEBUG(D_CHANGELOG, "rec=%llu start=%llu\n",
1470                        rec->cr.cr_index, cs->cs_startrec);
1471                 return 0;
1472         }
1473
1474         CDEBUG(D_CHANGELOG, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID
1475                 " %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
1476                 changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
1477                 rec->cr.cr_flags & CLF_FLAGMASK,
1478                 PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
1479                 rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
1480
1481         len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
1482
1483         /* Set up the message */
1484         lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags);
1485         memcpy(lh + 1, &rec->cr, len - sizeof(*lh));
1486
1487         rc = libcfs_kkuc_msg_put(cs->cs_fp, lh);
1488         CDEBUG(D_CHANGELOG, "kucmsg fp %p len %d rc %d\n", cs->cs_fp, len, rc);
1489
1490         return rc;
1491 }
1492
1493 static int mdc_changelog_send_thread(void *csdata)
1494 {
1495         struct changelog_show *cs = csdata;
1496         struct llog_ctxt *ctxt = NULL;
1497         struct llog_handle *llh = NULL;
1498         struct kuc_hdr *kuch;
1499         int rc;
1500
1501         CDEBUG(D_CHANGELOG, "changelog to fp=%p start %llu\n",
1502                cs->cs_fp, cs->cs_startrec);
1503
1504         cs->cs_buf = kzalloc(KUC_CHANGELOG_MSG_MAXSIZE, GFP_NOFS);
1505         if (!cs->cs_buf) {
1506                 rc = -ENOMEM;
1507                 goto out;
1508         }
1509
1510         /* Set up the remote catalog handle */
1511         ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
1512         if (!ctxt) {
1513                 rc = -ENOENT;
1514                 goto out;
1515         }
1516         rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG,
1517                        LLOG_OPEN_EXISTS);
1518         if (rc) {
1519                 CERROR("%s: fail to open changelog catalog: rc = %d\n",
1520                        cs->cs_obd->obd_name, rc);
1521                 goto out;
1522         }
1523         rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT, NULL);
1524         if (rc) {
1525                 CERROR("llog_init_handle failed %d\n", rc);
1526                 goto out;
1527         }
1528
1529         rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0);
1530
1531         /* Send EOF no matter what our result */
1532         kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags);
1533         if (kuch) {
1534                 kuch->kuc_msgtype = CL_EOF;
1535                 libcfs_kkuc_msg_put(cs->cs_fp, kuch);
1536         }
1537
1538 out:
1539         fput(cs->cs_fp);
1540         if (llh)
1541                 llog_cat_close(NULL, llh);
1542         if (ctxt)
1543                 llog_ctxt_put(ctxt);
1544         kfree(cs->cs_buf);
1545         kfree(cs);
1546         return rc;
1547 }
1548
1549 static int mdc_ioc_changelog_send(struct obd_device *obd,
1550                                   struct ioc_changelog *icc)
1551 {
1552         struct changelog_show *cs;
1553         struct task_struct *task;
1554         int rc;
1555
1556         /* Freed in mdc_changelog_send_thread */
1557         cs = kzalloc(sizeof(*cs), GFP_NOFS);
1558         if (!cs)
1559                 return -ENOMEM;
1560
1561         cs->cs_obd = obd;
1562         cs->cs_startrec = icc->icc_recno;
1563         /* matching fput in mdc_changelog_send_thread */
1564         cs->cs_fp = fget(icc->icc_id);
1565         cs->cs_flags = icc->icc_flags;
1566
1567         /*
1568          * New thread because we should return to user app before
1569          * writing into our pipe
1570          */
1571         task = kthread_run(mdc_changelog_send_thread, cs,
1572                            "mdc_clg_send_thread");
1573         if (IS_ERR(task)) {
1574                 rc = PTR_ERR(task);
1575                 CERROR("%s: can't start changelog thread: rc = %d\n",
1576                        obd->obd_name, rc);
1577                 kfree(cs);
1578         } else {
1579                 rc = 0;
1580                 CDEBUG(D_CHANGELOG, "%s: started changelog thread\n",
1581                        obd->obd_name);
1582         }
1583
1584         CERROR("Failed to start changelog thread: %d\n", rc);
1585         return rc;
1586 }
1587
1588 static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
1589                                 struct lustre_kernelcomm *lk);
1590
1591 static int mdc_quotacheck(struct obd_device *unused, struct obd_export *exp,
1592                           struct obd_quotactl *oqctl)
1593 {
1594         struct client_obd       *cli = &exp->exp_obd->u.cli;
1595         struct ptlrpc_request   *req;
1596         struct obd_quotactl     *body;
1597         int                   rc;
1598
1599         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
1600                                         &RQF_MDS_QUOTACHECK, LUSTRE_MDS_VERSION,
1601                                         MDS_QUOTACHECK);
1602         if (!req)
1603                 return -ENOMEM;
1604
1605         body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1606         *body = *oqctl;
1607
1608         ptlrpc_request_set_replen(req);
1609
1610         /* the next poll will find -ENODATA, that means quotacheck is
1611          * going on */
1612         cli->cl_qchk_stat = -ENODATA;
1613         rc = ptlrpc_queue_wait(req);
1614         if (rc)
1615                 cli->cl_qchk_stat = rc;
1616         ptlrpc_req_finished(req);
1617         return rc;
1618 }
1619
1620 static int mdc_quota_poll_check(struct obd_export *exp,
1621                                 struct if_quotacheck *qchk)
1622 {
1623         struct client_obd *cli = &exp->exp_obd->u.cli;
1624         int rc;
1625
1626         qchk->obd_uuid = cli->cl_target_uuid;
1627         memcpy(qchk->obd_type, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME));
1628
1629         rc = cli->cl_qchk_stat;
1630         /* the client is not the previous one */
1631         if (rc == CL_NOT_QUOTACHECKED)
1632                 rc = -EINTR;
1633         return rc;
1634 }
1635
1636 static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
1637                         struct obd_quotactl *oqctl)
1638 {
1639         struct ptlrpc_request   *req;
1640         struct obd_quotactl     *oqc;
1641         int                   rc;
1642
1643         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
1644                                         &RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION,
1645                                         MDS_QUOTACTL);
1646         if (!req)
1647                 return -ENOMEM;
1648
1649         oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1650         *oqc = *oqctl;
1651
1652         ptlrpc_request_set_replen(req);
1653         ptlrpc_at_set_req_timeout(req);
1654         req->rq_no_resend = 1;
1655
1656         rc = ptlrpc_queue_wait(req);
1657         if (rc)
1658                 CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
1659
1660         if (req->rq_repmsg) {
1661                 oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1662                 if (oqc) {
1663                         *oqctl = *oqc;
1664                 } else if (!rc) {
1665                         CERROR("Can't unpack obd_quotactl\n");
1666                         rc = -EPROTO;
1667                 }
1668         } else if (!rc) {
1669                 CERROR("Can't unpack obd_quotactl\n");
1670                 rc = -EPROTO;
1671         }
1672         ptlrpc_req_finished(req);
1673
1674         return rc;
1675 }
1676
1677 static int mdc_ioc_swap_layouts(struct obd_export *exp,
1678                                 struct md_op_data *op_data)
1679 {
1680         LIST_HEAD(cancels);
1681         struct ptlrpc_request   *req;
1682         int                      rc, count;
1683         struct mdc_swap_layouts *msl, *payload;
1684
1685         msl = op_data->op_data;
1686
1687         /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the
1688          * first thing it will do is to cancel the 2 layout
1689          * locks hold by this client.
1690          * So the client must cancel its layout locks on the 2 fids
1691          * with the request RPC to avoid extra RPC round trips
1692          */
1693         count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
1694                                         LCK_CR, MDS_INODELOCK_LAYOUT);
1695         count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels,
1696                                          LCK_CR, MDS_INODELOCK_LAYOUT);
1697
1698         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1699                                    &RQF_MDS_SWAP_LAYOUTS);
1700         if (!req) {
1701                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
1702                 return -ENOMEM;
1703         }
1704
1705         rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count);
1706         if (rc) {
1707                 ptlrpc_request_free(req);
1708                 return rc;
1709         }
1710
1711         mdc_swap_layouts_pack(req, op_data);
1712
1713         payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS);
1714         LASSERT(payload);
1715
1716         *payload = *msl;
1717
1718         ptlrpc_request_set_replen(req);
1719
1720         rc = ptlrpc_queue_wait(req);
1721
1722         ptlrpc_req_finished(req);
1723         return rc;
1724 }
1725
1726 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1727                          void *karg, void __user *uarg)
1728 {
1729         struct obd_device *obd = exp->exp_obd;
1730         struct obd_ioctl_data *data = karg;
1731         struct obd_import *imp = obd->u.cli.cl_import;
1732         int rc;
1733
1734         if (!try_module_get(THIS_MODULE)) {
1735                 CERROR("Can't get module. Is it alive?");
1736                 return -EINVAL;
1737         }
1738         switch (cmd) {
1739         case OBD_IOC_CHANGELOG_SEND:
1740                 rc = mdc_ioc_changelog_send(obd, karg);
1741                 goto out;
1742         case OBD_IOC_CHANGELOG_CLEAR: {
1743                 struct ioc_changelog *icc = karg;
1744                 struct changelog_setinfo cs = {
1745                         .cs_recno = icc->icc_recno,
1746                         .cs_id = icc->icc_id
1747                 };
1748
1749                 rc = obd_set_info_async(NULL, exp, strlen(KEY_CHANGELOG_CLEAR),
1750                                         KEY_CHANGELOG_CLEAR, sizeof(cs), &cs,
1751                                         NULL);
1752                 goto out;
1753         }
1754         case OBD_IOC_FID2PATH:
1755                 rc = mdc_ioc_fid2path(exp, karg);
1756                 goto out;
1757         case LL_IOC_HSM_CT_START:
1758                 rc = mdc_ioc_hsm_ct_start(exp, karg);
1759                 /* ignore if it was already registered on this MDS. */
1760                 if (rc == -EEXIST)
1761                         rc = 0;
1762                 goto out;
1763         case LL_IOC_HSM_PROGRESS:
1764                 rc = mdc_ioc_hsm_progress(exp, karg);
1765                 goto out;
1766         case LL_IOC_HSM_STATE_GET:
1767                 rc = mdc_ioc_hsm_state_get(exp, karg);
1768                 goto out;
1769         case LL_IOC_HSM_STATE_SET:
1770                 rc = mdc_ioc_hsm_state_set(exp, karg);
1771                 goto out;
1772         case LL_IOC_HSM_ACTION:
1773                 rc = mdc_ioc_hsm_current_action(exp, karg);
1774                 goto out;
1775         case LL_IOC_HSM_REQUEST:
1776                 rc = mdc_ioc_hsm_request(exp, karg);
1777                 goto out;
1778         case OBD_IOC_CLIENT_RECOVER:
1779                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0);
1780                 if (rc < 0)
1781                         goto out;
1782                 rc = 0;
1783                 goto out;
1784         case IOC_OSC_SET_ACTIVE:
1785                 rc = ptlrpc_set_import_active(imp, data->ioc_offset);
1786                 goto out;
1787         case OBD_IOC_POLL_QUOTACHECK:
1788                 rc = mdc_quota_poll_check(exp, (struct if_quotacheck *)karg);
1789                 goto out;
1790         case OBD_IOC_PING_TARGET:
1791                 rc = ptlrpc_obd_ping(obd);
1792                 goto out;
1793         /*
1794          * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by
1795          * LMV instead of MDC. But when the cluster is upgraded from 1.8,
1796          * there'd be no LMV layer thus we might be called here. Eventually
1797          * this code should be removed.
1798          * bz20731, LU-592.
1799          */
1800         case IOC_OBD_STATFS: {
1801                 struct obd_statfs stat_buf = {0};
1802
1803                 if (*((__u32 *) data->ioc_inlbuf2) != 0) {
1804                         rc = -ENODEV;
1805                         goto out;
1806                 }
1807
1808                 /* copy UUID */
1809                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd),
1810                                  min_t(size_t, data->ioc_plen2,
1811                                                sizeof(struct obd_uuid)))) {
1812                         rc = -EFAULT;
1813                         goto out;
1814                 }
1815
1816                 rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf,
1817                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
1818                                 0);
1819                 if (rc != 0)
1820                         goto out;
1821
1822                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1823                                  min_t(size_t, data->ioc_plen1,
1824                                                sizeof(stat_buf)))) {
1825                         rc = -EFAULT;
1826                         goto out;
1827                 }
1828
1829                 rc = 0;
1830                 goto out;
1831         }
1832         case OBD_IOC_QUOTACTL: {
1833                 struct if_quotactl *qctl = karg;
1834                 struct obd_quotactl *oqctl;
1835
1836                 oqctl = kzalloc(sizeof(*oqctl), GFP_NOFS);
1837                 if (!oqctl) {
1838                         rc = -ENOMEM;
1839                         goto out;
1840                 }
1841
1842                 QCTL_COPY(oqctl, qctl);
1843                 rc = obd_quotactl(exp, oqctl);
1844                 if (rc == 0) {
1845                         QCTL_COPY(qctl, oqctl);
1846                         qctl->qc_valid = QC_MDTIDX;
1847                         qctl->obd_uuid = obd->u.cli.cl_target_uuid;
1848                 }
1849
1850                 kfree(oqctl);
1851                 goto out;
1852         }
1853         case LL_IOC_GET_CONNECT_FLAGS:
1854                 if (copy_to_user(uarg, exp_connect_flags_ptr(exp),
1855                                  sizeof(*exp_connect_flags_ptr(exp)))) {
1856                         rc = -EFAULT;
1857                         goto out;
1858                 }
1859
1860                 rc = 0;
1861                 goto out;
1862         case LL_IOC_LOV_SWAP_LAYOUTS:
1863                 rc = mdc_ioc_swap_layouts(exp, karg);
1864                 goto out;
1865         default:
1866                 CERROR("unrecognised ioctl: cmd = %#x\n", cmd);
1867                 rc = -ENOTTY;
1868                 goto out;
1869         }
1870 out:
1871         module_put(THIS_MODULE);
1872
1873         return rc;
1874 }
1875
1876 static int mdc_get_info_rpc(struct obd_export *exp,
1877                             u32 keylen, void *key,
1878                             int vallen, void *val)
1879 {
1880         struct obd_import      *imp = class_exp2cliimp(exp);
1881         struct ptlrpc_request  *req;
1882         char               *tmp;
1883         int                  rc = -EINVAL;
1884
1885         req = ptlrpc_request_alloc(imp, &RQF_MDS_GET_INFO);
1886         if (!req)
1887                 return -ENOMEM;
1888
1889         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY,
1890                              RCL_CLIENT, keylen);
1891         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN,
1892                              RCL_CLIENT, sizeof(__u32));
1893
1894         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO);
1895         if (rc) {
1896                 ptlrpc_request_free(req);
1897                 return rc;
1898         }
1899
1900         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
1901         memcpy(tmp, key, keylen);
1902         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN);
1903         memcpy(tmp, &vallen, sizeof(__u32));
1904
1905         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL,
1906                              RCL_SERVER, vallen);
1907         ptlrpc_request_set_replen(req);
1908
1909         rc = ptlrpc_queue_wait(req);
1910         /* -EREMOTE means the get_info result is partial, and it needs to
1911          * continue on another MDT, see fid2path part in lmv_iocontrol */
1912         if (rc == 0 || rc == -EREMOTE) {
1913                 tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
1914                 memcpy(val, tmp, vallen);
1915                 if (ptlrpc_rep_need_swab(req)) {
1916                         if (KEY_IS(KEY_FID2PATH))
1917                                 lustre_swab_fid2path(val);
1918                 }
1919         }
1920         ptlrpc_req_finished(req);
1921
1922         return rc;
1923 }
1924
1925 static void lustre_swab_hai(struct hsm_action_item *h)
1926 {
1927         __swab32s(&h->hai_len);
1928         __swab32s(&h->hai_action);
1929         lustre_swab_lu_fid(&h->hai_fid);
1930         lustre_swab_lu_fid(&h->hai_dfid);
1931         __swab64s(&h->hai_cookie);
1932         __swab64s(&h->hai_extent.offset);
1933         __swab64s(&h->hai_extent.length);
1934         __swab64s(&h->hai_gid);
1935 }
1936
1937 static void lustre_swab_hal(struct hsm_action_list *h)
1938 {
1939         struct hsm_action_item  *hai;
1940         int                      i;
1941
1942         __swab32s(&h->hal_version);
1943         __swab32s(&h->hal_count);
1944         __swab32s(&h->hal_archive_id);
1945         __swab64s(&h->hal_flags);
1946         hai = hai_zero(h);
1947         for (i = 0; i < h->hal_count; i++, hai = hai_next(hai))
1948                 lustre_swab_hai(hai);
1949 }
1950
1951 static void lustre_swab_kuch(struct kuc_hdr *l)
1952 {
1953         __swab16s(&l->kuc_magic);
1954         /* __u8 l->kuc_transport */
1955         __swab16s(&l->kuc_msgtype);
1956         __swab16s(&l->kuc_msglen);
1957 }
1958
1959 static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
1960                                 struct lustre_kernelcomm *lk)
1961 {
1962         struct obd_import  *imp = class_exp2cliimp(exp);
1963         __u32               archive = lk->lk_data;
1964         int                 rc = 0;
1965
1966         if (lk->lk_group != KUC_GRP_HSM) {
1967                 CERROR("Bad copytool group %d\n", lk->lk_group);
1968                 return -EINVAL;
1969         }
1970
1971         CDEBUG(D_HSM, "CT start r%d w%d u%d g%d f%#x\n", lk->lk_rfd, lk->lk_wfd,
1972                lk->lk_uid, lk->lk_group, lk->lk_flags);
1973
1974         if (lk->lk_flags & LK_FLG_STOP) {
1975                 /* Unregister with the coordinator */
1976                 rc = mdc_ioc_hsm_ct_unregister(imp);
1977         } else {
1978                 rc = mdc_ioc_hsm_ct_register(imp, archive);
1979         }
1980
1981         return rc;
1982 }
1983
1984 /**
1985  * Send a message to any listening copytools
1986  * @param val KUC message (kuc_hdr + hsm_action_list)
1987  * @param len total length of message
1988  */
1989 static int mdc_hsm_copytool_send(int len, void *val)
1990 {
1991         struct kuc_hdr          *lh = (struct kuc_hdr *)val;
1992         struct hsm_action_list  *hal = (struct hsm_action_list *)(lh + 1);
1993
1994         if (len < sizeof(*lh) + sizeof(*hal)) {
1995                 CERROR("Short HSM message %d < %d\n", len,
1996                        (int) (sizeof(*lh) + sizeof(*hal)));
1997                 return -EPROTO;
1998         }
1999         if (lh->kuc_magic == __swab16(KUC_MAGIC)) {
2000                 lustre_swab_kuch(lh);
2001                 lustre_swab_hal(hal);
2002         } else if (lh->kuc_magic != KUC_MAGIC) {
2003                 CERROR("Bad magic %x!=%x\n", lh->kuc_magic, KUC_MAGIC);
2004                 return -EPROTO;
2005         }
2006
2007         CDEBUG(D_HSM,
2008                "Received message mg=%x t=%d m=%d l=%d actions=%d on %s\n",
2009                lh->kuc_magic, lh->kuc_transport, lh->kuc_msgtype,
2010                lh->kuc_msglen, hal->hal_count, hal->hal_fsname);
2011
2012         /* Broadcast to HSM listeners */
2013         return libcfs_kkuc_group_put(KUC_GRP_HSM, lh);
2014 }
2015
2016 /**
2017  * callback function passed to kuc for re-registering each HSM copytool
2018  * running on MDC, after MDT shutdown/recovery.
2019  * @param data copytool registration data
2020  * @param cb_arg callback argument (obd_import)
2021  */
2022 static int mdc_hsm_ct_reregister(void *data, void *cb_arg)
2023 {
2024         struct kkuc_ct_data     *kcd = data;
2025         struct obd_import       *imp = (struct obd_import *)cb_arg;
2026         int                      rc;
2027
2028         if (!kcd || kcd->kcd_magic != KKUC_CT_DATA_MAGIC)
2029                 return -EPROTO;
2030
2031         if (!obd_uuid_equals(&kcd->kcd_uuid, &imp->imp_obd->obd_uuid))
2032                 return 0;
2033
2034         CDEBUG(D_HA, "%s: recover copytool registration to MDT (archive=%#x)\n",
2035                imp->imp_obd->obd_name, kcd->kcd_archive);
2036         rc = mdc_ioc_hsm_ct_register(imp, kcd->kcd_archive);
2037
2038         /* ignore error if the copytool is already registered */
2039         return (rc == -EEXIST) ? 0 : rc;
2040 }
2041
2042 static int mdc_set_info_async(const struct lu_env *env,
2043                               struct obd_export *exp,
2044                               u32 keylen, void *key,
2045                               u32 vallen, void *val,
2046                               struct ptlrpc_request_set *set)
2047 {
2048         struct obd_import       *imp = class_exp2cliimp(exp);
2049         int                      rc;
2050
2051         if (KEY_IS(KEY_READ_ONLY)) {
2052                 if (vallen != sizeof(int))
2053                         return -EINVAL;
2054
2055                 spin_lock(&imp->imp_lock);
2056                 if (*((int *)val)) {
2057                         imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
2058                         imp->imp_connect_data.ocd_connect_flags |=
2059                                                         OBD_CONNECT_RDONLY;
2060                 } else {
2061                         imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
2062                         imp->imp_connect_data.ocd_connect_flags &=
2063                                                         ~OBD_CONNECT_RDONLY;
2064                 }
2065                 spin_unlock(&imp->imp_lock);
2066
2067                 rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2068                                        keylen, key, vallen, val, set);
2069                 return rc;
2070         }
2071         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2072                 sptlrpc_conf_client_adapt(exp->exp_obd);
2073                 return 0;
2074         }
2075         if (KEY_IS(KEY_FLUSH_CTX)) {
2076                 sptlrpc_import_flush_my_ctx(imp);
2077                 return 0;
2078         }
2079         if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2080                 rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2081                                        keylen, key, vallen, val, set);
2082                 return rc;
2083         }
2084         if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) {
2085                 rc = mdc_hsm_copytool_send(vallen, val);
2086                 return rc;
2087         }
2088
2089         CERROR("Unknown key %s\n", (char *)key);
2090         return -EINVAL;
2091 }
2092
2093 static int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
2094                         __u32 keylen, void *key, __u32 *vallen, void *val,
2095                         struct lov_stripe_md *lsm)
2096 {
2097         int rc = -EINVAL;
2098
2099         if (KEY_IS(KEY_MAX_EASIZE)) {
2100                 int mdsize, *max_easize;
2101
2102                 if (*vallen != sizeof(int))
2103                         return -EINVAL;
2104                 mdsize = *(int *)val;
2105                 if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
2106                         exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
2107                 max_easize = val;
2108                 *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
2109                 return 0;
2110         } else if (KEY_IS(KEY_DEFAULT_EASIZE)) {
2111                 int *default_easize;
2112
2113                 if (*vallen != sizeof(int))
2114                         return -EINVAL;
2115                 default_easize = val;
2116                 *default_easize = exp->exp_obd->u.cli.cl_default_mds_easize;
2117                 return 0;
2118         } else if (KEY_IS(KEY_CONN_DATA)) {
2119                 struct obd_import *imp = class_exp2cliimp(exp);
2120                 struct obd_connect_data *data = val;
2121
2122                 if (*vallen != sizeof(*data))
2123                         return -EINVAL;
2124
2125                 *data = imp->imp_connect_data;
2126                 return 0;
2127         } else if (KEY_IS(KEY_TGT_COUNT)) {
2128                 *((int *)val) = 1;
2129                 return 0;
2130         }
2131
2132         rc = mdc_get_info_rpc(exp, keylen, key, *vallen, val);
2133
2134         return rc;
2135 }
2136
2137 static int mdc_sync(struct obd_export *exp, const struct lu_fid *fid,
2138                     struct ptlrpc_request **request)
2139 {
2140         struct ptlrpc_request *req;
2141         int                 rc;
2142
2143         *request = NULL;
2144         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_SYNC);
2145         if (!req)
2146                 return -ENOMEM;
2147
2148         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SYNC);
2149         if (rc) {
2150                 ptlrpc_request_free(req);
2151                 return rc;
2152         }
2153
2154         mdc_pack_body(req, fid, 0, 0, -1, 0);
2155
2156         ptlrpc_request_set_replen(req);
2157
2158         rc = ptlrpc_queue_wait(req);
2159         if (rc)
2160                 ptlrpc_req_finished(req);
2161         else
2162                 *request = req;
2163         return rc;
2164 }
2165
2166 static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
2167                             enum obd_import_event event)
2168 {
2169         int rc = 0;
2170
2171         LASSERT(imp->imp_obd == obd);
2172
2173         switch (event) {
2174         case IMP_EVENT_DISCON: {
2175 #if 0
2176                 /* XXX Pass event up to OBDs stack. used only for FLD now */
2177                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DISCON, NULL);
2178 #endif
2179                 break;
2180         }
2181         case IMP_EVENT_INACTIVE: {
2182                 struct client_obd *cli = &obd->u.cli;
2183                 /*
2184                  * Flush current sequence to make client obtain new one
2185                  * from server in case of disconnect/reconnect.
2186                  */
2187                 if (cli->cl_seq)
2188                         seq_client_flush(cli->cl_seq);
2189
2190                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2191                 break;
2192         }
2193         case IMP_EVENT_INVALIDATE: {
2194                 struct ldlm_namespace *ns = obd->obd_namespace;
2195
2196                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2197
2198                 break;
2199         }
2200         case IMP_EVENT_ACTIVE:
2201                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2202                 /* redo the kuc registration after reconnecting */
2203                 if (rc == 0)
2204                         /* re-register HSM agents */
2205                         rc = libcfs_kkuc_group_foreach(KUC_GRP_HSM,
2206                                                        mdc_hsm_ct_reregister,
2207                                                        (void *)imp);
2208                 break;
2209         case IMP_EVENT_OCD:
2210                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2211                 break;
2212         case IMP_EVENT_DEACTIVATE:
2213         case IMP_EVENT_ACTIVATE:
2214                 break;
2215         default:
2216                 CERROR("Unknown import event %x\n", event);
2217                 LBUG();
2218         }
2219         return rc;
2220 }
2221
2222 int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
2223                   struct md_op_data *op_data)
2224 {
2225         struct client_obd *cli = &exp->exp_obd->u.cli;
2226         struct lu_client_seq *seq = cli->cl_seq;
2227
2228         return seq_client_alloc_fid(NULL, seq, fid);
2229 }
2230
2231 static struct obd_uuid *mdc_get_uuid(struct obd_export *exp)
2232 {
2233         struct client_obd *cli = &exp->exp_obd->u.cli;
2234
2235         return &cli->cl_target_uuid;
2236 }
2237
2238 /**
2239  * Determine whether the lock can be canceled before replaying it during
2240  * recovery, non zero value will be return if the lock can be canceled,
2241  * or zero returned for not
2242  */
2243 static int mdc_cancel_for_recovery(struct ldlm_lock *lock)
2244 {
2245         if (lock->l_resource->lr_type != LDLM_IBITS)
2246                 return 0;
2247
2248         /* FIXME: if we ever get into a situation where there are too many
2249          * opened files with open locks on a single node, then we really
2250          * should replay these open locks to reget it */
2251         if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN)
2252                 return 0;
2253
2254         return 1;
2255 }
2256
2257 static int mdc_resource_inode_free(struct ldlm_resource *res)
2258 {
2259         if (res->lr_lvb_inode)
2260                 res->lr_lvb_inode = NULL;
2261
2262         return 0;
2263 }
2264
2265 static struct ldlm_valblock_ops inode_lvbo = {
2266         .lvbo_free = mdc_resource_inode_free,
2267 };
2268
2269 static int mdc_llog_init(struct obd_device *obd)
2270 {
2271         struct obd_llog_group   *olg = &obd->obd_olg;
2272         struct llog_ctxt        *ctxt;
2273         int                      rc;
2274
2275         rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, obd,
2276                         &llog_client_ops);
2277         if (rc)
2278                 return rc;
2279
2280         ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT);
2281         llog_initiator_connect(ctxt);
2282         llog_ctxt_put(ctxt);
2283
2284         return 0;
2285 }
2286
2287 static void mdc_llog_finish(struct obd_device *obd)
2288 {
2289         struct llog_ctxt *ctxt;
2290
2291         ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
2292         if (ctxt)
2293                 llog_cleanup(NULL, ctxt);
2294 }
2295
2296 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
2297 {
2298         struct client_obd *cli = &obd->u.cli;
2299         struct lprocfs_static_vars lvars = { NULL };
2300         int rc;
2301
2302         cli->cl_rpc_lock = kzalloc(sizeof(*cli->cl_rpc_lock), GFP_NOFS);
2303         if (!cli->cl_rpc_lock)
2304                 return -ENOMEM;
2305         mdc_init_rpc_lock(cli->cl_rpc_lock);
2306
2307         ptlrpcd_addref();
2308
2309         cli->cl_close_lock = kzalloc(sizeof(*cli->cl_close_lock), GFP_NOFS);
2310         if (!cli->cl_close_lock) {
2311                 rc = -ENOMEM;
2312                 goto err_rpc_lock;
2313         }
2314         mdc_init_rpc_lock(cli->cl_close_lock);
2315
2316         rc = client_obd_setup(obd, cfg);
2317         if (rc)
2318                 goto err_close_lock;
2319         lprocfs_mdc_init_vars(&lvars);
2320         lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
2321         sptlrpc_lprocfs_cliobd_attach(obd);
2322         ptlrpc_lprocfs_register_obd(obd);
2323
2324         ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery);
2325
2326         obd->obd_namespace->ns_lvbo = &inode_lvbo;
2327
2328         rc = mdc_llog_init(obd);
2329         if (rc) {
2330                 mdc_cleanup(obd);
2331                 CERROR("failed to setup llogging subsystems\n");
2332         }
2333
2334         return rc;
2335
2336 err_close_lock:
2337         kfree(cli->cl_close_lock);
2338 err_rpc_lock:
2339         kfree(cli->cl_rpc_lock);
2340         ptlrpcd_decref();
2341         return rc;
2342 }
2343
2344 /* Initialize the default and maximum LOV EA and cookie sizes.  This allows
2345  * us to make MDS RPCs with large enough reply buffers to hold a default
2346  * sized EA and cookie without having to calculate this (via a call into the
2347  * LOV + OSCs) each time we make an RPC.  The maximum size is also tracked
2348  * but not used to avoid wastefully vmalloc()'ing large reply buffers when
2349  * a large number of stripes is possible.  If a larger reply buffer is
2350  * required it will be reallocated in the ptlrpc layer due to overflow.
2351  */
2352 static int mdc_init_ea_size(struct obd_export *exp, int easize,
2353                             int def_easize, int cookiesize, int def_cookiesize)
2354 {
2355         struct obd_device *obd = exp->exp_obd;
2356         struct client_obd *cli = &obd->u.cli;
2357
2358         if (cli->cl_max_mds_easize < easize)
2359                 cli->cl_max_mds_easize = easize;
2360
2361         if (cli->cl_default_mds_easize < def_easize)
2362                 cli->cl_default_mds_easize = def_easize;
2363
2364         if (cli->cl_max_mds_cookiesize < cookiesize)
2365                 cli->cl_max_mds_cookiesize = cookiesize;
2366
2367         if (cli->cl_default_mds_cookiesize < def_cookiesize)
2368                 cli->cl_default_mds_cookiesize = def_cookiesize;
2369
2370         return 0;
2371 }
2372
2373 static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2374 {
2375         switch (stage) {
2376         case OBD_CLEANUP_EARLY:
2377                 break;
2378         case OBD_CLEANUP_EXPORTS:
2379                 /* Failsafe, ok if racy */
2380                 if (obd->obd_type->typ_refcnt <= 1)
2381                         libcfs_kkuc_group_rem(0, KUC_GRP_HSM);
2382
2383                 obd_cleanup_client_import(obd);
2384                 ptlrpc_lprocfs_unregister_obd(obd);
2385                 lprocfs_obd_cleanup(obd);
2386
2387                 mdc_llog_finish(obd);
2388                 break;
2389         }
2390         return 0;
2391 }
2392
2393 static int mdc_cleanup(struct obd_device *obd)
2394 {
2395         struct client_obd *cli = &obd->u.cli;
2396
2397         kfree(cli->cl_rpc_lock);
2398         kfree(cli->cl_close_lock);
2399
2400         ptlrpcd_decref();
2401
2402         return client_obd_cleanup(obd);
2403 }
2404
2405 static int mdc_process_config(struct obd_device *obd, u32 len, void *buf)
2406 {
2407         struct lustre_cfg *lcfg = buf;
2408         struct lprocfs_static_vars lvars = { NULL };
2409         int rc = 0;
2410
2411         lprocfs_mdc_init_vars(&lvars);
2412         switch (lcfg->lcfg_command) {
2413         default:
2414                 rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars,
2415                                               lcfg, obd);
2416                 if (rc > 0)
2417                         rc = 0;
2418                 break;
2419         }
2420         return rc;
2421 }
2422
2423 /* get remote permission for current user on fid */
2424 static int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
2425                                __u32 suppgid, struct ptlrpc_request **request)
2426 {
2427         struct ptlrpc_request  *req;
2428         int                 rc;
2429
2430         LASSERT(client_is_remote(exp));
2431
2432         *request = NULL;
2433         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
2434         if (!req)
2435                 return -ENOMEM;
2436
2437         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
2438         if (rc) {
2439                 ptlrpc_request_free(req);
2440                 return rc;
2441         }
2442
2443         mdc_pack_body(req, fid, OBD_MD_FLRMTPERM, 0, suppgid, 0);
2444
2445         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
2446                              sizeof(struct mdt_remote_perm));
2447
2448         ptlrpc_request_set_replen(req);
2449
2450         rc = ptlrpc_queue_wait(req);
2451         if (rc)
2452                 ptlrpc_req_finished(req);
2453         else
2454                 *request = req;
2455         return rc;
2456 }
2457
2458 static struct obd_ops mdc_obd_ops = {
2459         .owner          = THIS_MODULE,
2460         .setup          = mdc_setup,
2461         .precleanup     = mdc_precleanup,
2462         .cleanup        = mdc_cleanup,
2463         .add_conn       = client_import_add_conn,
2464         .del_conn       = client_import_del_conn,
2465         .connect        = client_connect_import,
2466         .disconnect     = client_disconnect_export,
2467         .iocontrol      = mdc_iocontrol,
2468         .set_info_async = mdc_set_info_async,
2469         .statfs         = mdc_statfs,
2470         .fid_init       = client_fid_init,
2471         .fid_fini       = client_fid_fini,
2472         .fid_alloc      = mdc_fid_alloc,
2473         .import_event   = mdc_import_event,
2474         .get_info       = mdc_get_info,
2475         .process_config = mdc_process_config,
2476         .get_uuid       = mdc_get_uuid,
2477         .quotactl       = mdc_quotactl,
2478         .quotacheck     = mdc_quotacheck
2479 };
2480
2481 static struct md_ops mdc_md_ops = {
2482         .getstatus              = mdc_getstatus,
2483         .null_inode             = mdc_null_inode,
2484         .find_cbdata            = mdc_find_cbdata,
2485         .close                  = mdc_close,
2486         .create                 = mdc_create,
2487         .done_writing           = mdc_done_writing,
2488         .enqueue                = mdc_enqueue,
2489         .getattr                = mdc_getattr,
2490         .getattr_name           = mdc_getattr_name,
2491         .intent_lock            = mdc_intent_lock,
2492         .link                   = mdc_link,
2493         .is_subdir              = mdc_is_subdir,
2494         .rename                 = mdc_rename,
2495         .setattr                = mdc_setattr,
2496         .setxattr               = mdc_setxattr,
2497         .getxattr               = mdc_getxattr,
2498         .sync                   = mdc_sync,
2499         .readpage               = mdc_readpage,
2500         .unlink                 = mdc_unlink,
2501         .cancel_unused          = mdc_cancel_unused,
2502         .init_ea_size           = mdc_init_ea_size,
2503         .set_lock_data          = mdc_set_lock_data,
2504         .lock_match             = mdc_lock_match,
2505         .get_lustre_md          = mdc_get_lustre_md,
2506         .free_lustre_md         = mdc_free_lustre_md,
2507         .set_open_replay_data   = mdc_set_open_replay_data,
2508         .clear_open_replay_data = mdc_clear_open_replay_data,
2509         .get_remote_perm        = mdc_get_remote_perm,
2510         .intent_getattr_async   = mdc_intent_getattr_async,
2511         .revalidate_lock        = mdc_revalidate_lock
2512 };
2513
2514 static int __init mdc_init(void)
2515 {
2516         struct lprocfs_static_vars lvars = { NULL };
2517
2518         lprocfs_mdc_init_vars(&lvars);
2519
2520         return class_register_type(&mdc_obd_ops, &mdc_md_ops,
2521                                  LUSTRE_MDC_NAME, NULL);
2522 }
2523
2524 static void /*__exit*/ mdc_exit(void)
2525 {
2526         class_unregister_type(LUSTRE_MDC_NAME);
2527 }
2528
2529 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2530 MODULE_DESCRIPTION("Lustre Metadata Client");
2531 MODULE_LICENSE("GPL");
2532
2533 module_init(mdc_init);
2534 module_exit(mdc_exit);