]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/ptlrpc/llog_client.c
Merge remote-tracking branch 'xfs/for-next'
[karo-tx-linux.git] / drivers / staging / lustre / lustre / ptlrpc / llog_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/llog_client.c
37  *
38  * remote api for llog - client side
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOG
44
45 #include <linux/libcfs/libcfs.h>
46
47 #include <obd_class.h>
48 #include <lustre_log.h>
49 #include <lustre_net.h>
50 #include <linux/list.h>
51
52 #define LLOG_CLIENT_ENTRY(ctxt, imp) do {                            \
53         mutex_lock(&ctxt->loc_mutex);                        \
54         if (ctxt->loc_imp) {                                      \
55                 imp = class_import_get(ctxt->loc_imp);          \
56         } else {                                                      \
57                 CERROR("ctxt->loc_imp == NULL for context idx %d."    \
58                        "Unable to complete MDS/OSS recovery,"    \
59                        "but I'll try again next time.  Not fatal.\n", \
60                        ctxt->loc_idx);                          \
61                 imp = NULL;                                        \
62                 mutex_unlock(&ctxt->loc_mutex);            \
63                 return (-EINVAL);                                    \
64         }                                                            \
65         mutex_unlock(&ctxt->loc_mutex);                    \
66 } while(0)
67
68 #define LLOG_CLIENT_EXIT(ctxt, imp) do {                              \
69         mutex_lock(&ctxt->loc_mutex);                        \
70         if (ctxt->loc_imp != imp)                                    \
71                 CWARN("loc_imp has changed from %p to %p\n",      \
72                        ctxt->loc_imp, imp);                        \
73         class_import_put(imp);                                  \
74         mutex_unlock(&ctxt->loc_mutex);                    \
75 } while(0)
76
77 /* This is a callback from the llog_* functions.
78  * Assumes caller has already pushed us into the kernel context. */
79 static int llog_client_open(const struct lu_env *env,
80                             struct llog_handle *lgh, struct llog_logid *logid,
81                             char *name, enum llog_open_param open_param)
82 {
83         struct obd_import     *imp;
84         struct llogd_body     *body;
85         struct llog_ctxt      *ctxt = lgh->lgh_ctxt;
86         struct ptlrpc_request *req = NULL;
87         int                 rc;
88
89         LLOG_CLIENT_ENTRY(ctxt, imp);
90
91         /* client cannot create llog */
92         LASSERTF(open_param != LLOG_OPEN_NEW, "%#x\n", open_param);
93         LASSERT(lgh);
94
95         req = ptlrpc_request_alloc(imp, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
96         if (req == NULL)
97                 GOTO(out, rc = -ENOMEM);
98
99         if (name)
100                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
101                                      strlen(name) + 1);
102
103         rc = ptlrpc_request_pack(req, LUSTRE_LOG_VERSION,
104                                  LLOG_ORIGIN_HANDLE_CREATE);
105         if (rc) {
106                 ptlrpc_request_free(req);
107                 req = NULL;
108                 GOTO(out, rc);
109         }
110         ptlrpc_request_set_replen(req);
111
112         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
113         if (logid)
114                 body->lgd_logid = *logid;
115         body->lgd_ctxt_idx = ctxt->loc_idx - 1;
116
117         if (name) {
118                 char *tmp;
119                 tmp = req_capsule_client_sized_get(&req->rq_pill, &RMF_NAME,
120                                                    strlen(name) + 1);
121                 LASSERT(tmp);
122                 strcpy(tmp, name);
123         }
124
125         rc = ptlrpc_queue_wait(req);
126         if (rc)
127                 GOTO(out, rc);
128
129         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
130         if (body == NULL)
131                 GOTO(out, rc = -EFAULT);
132
133         lgh->lgh_id = body->lgd_logid;
134         lgh->lgh_ctxt = ctxt;
135 out:
136         LLOG_CLIENT_EXIT(ctxt, imp);
137         ptlrpc_req_finished(req);
138         return rc;
139 }
140
141 static int llog_client_destroy(const struct lu_env *env,
142                                struct llog_handle *loghandle)
143 {
144         struct obd_import     *imp;
145         struct ptlrpc_request *req = NULL;
146         struct llogd_body     *body;
147         int                 rc;
148
149         LLOG_CLIENT_ENTRY(loghandle->lgh_ctxt, imp);
150         req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_DESTROY,
151                                         LUSTRE_LOG_VERSION,
152                                         LLOG_ORIGIN_HANDLE_DESTROY);
153         if (req == NULL)
154                 GOTO(err_exit, rc =-ENOMEM);
155
156         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
157         body->lgd_logid = loghandle->lgh_id;
158         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
159
160         if (!(body->lgd_llh_flags & LLOG_F_IS_PLAIN))
161                 CERROR("%s: wrong llog flags %x\n", imp->imp_obd->obd_name,
162                        body->lgd_llh_flags);
163
164         ptlrpc_request_set_replen(req);
165         rc = ptlrpc_queue_wait(req);
166
167         ptlrpc_req_finished(req);
168 err_exit:
169         LLOG_CLIENT_EXIT(loghandle->lgh_ctxt, imp);
170         return rc;
171 }
172
173
174 static int llog_client_next_block(const struct lu_env *env,
175                                   struct llog_handle *loghandle,
176                                   int *cur_idx, int next_idx,
177                                   __u64 *cur_offset, void *buf, int len)
178 {
179         struct obd_import     *imp;
180         struct ptlrpc_request *req = NULL;
181         struct llogd_body     *body;
182         void              *ptr;
183         int                 rc;
184
185         LLOG_CLIENT_ENTRY(loghandle->lgh_ctxt, imp);
186         req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK,
187                                         LUSTRE_LOG_VERSION,
188                                         LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
189         if (req == NULL)
190                 GOTO(err_exit, rc =-ENOMEM);
191
192         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
193         body->lgd_logid = loghandle->lgh_id;
194         body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1;
195         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
196         body->lgd_index = next_idx;
197         body->lgd_saved_index = *cur_idx;
198         body->lgd_len = len;
199         body->lgd_cur_offset = *cur_offset;
200
201         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, len);
202         ptlrpc_request_set_replen(req);
203         rc = ptlrpc_queue_wait(req);
204         if (rc)
205                 GOTO(out, rc);
206
207         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
208         if (body == NULL)
209                 GOTO(out, rc =-EFAULT);
210
211         /* The log records are swabbed as they are processed */
212         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
213         if (ptr == NULL)
214                 GOTO(out, rc =-EFAULT);
215
216         *cur_idx = body->lgd_saved_index;
217         *cur_offset = body->lgd_cur_offset;
218
219         memcpy(buf, ptr, len);
220 out:
221         ptlrpc_req_finished(req);
222 err_exit:
223         LLOG_CLIENT_EXIT(loghandle->lgh_ctxt, imp);
224         return rc;
225 }
226
227 static int llog_client_prev_block(const struct lu_env *env,
228                                   struct llog_handle *loghandle,
229                                   int prev_idx, void *buf, int len)
230 {
231         struct obd_import     *imp;
232         struct ptlrpc_request *req = NULL;
233         struct llogd_body     *body;
234         void              *ptr;
235         int                 rc;
236
237         LLOG_CLIENT_ENTRY(loghandle->lgh_ctxt, imp);
238         req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK,
239                                         LUSTRE_LOG_VERSION,
240                                         LLOG_ORIGIN_HANDLE_PREV_BLOCK);
241         if (req == NULL)
242                 GOTO(err_exit, rc = -ENOMEM);
243
244         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
245         body->lgd_logid = loghandle->lgh_id;
246         body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1;
247         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
248         body->lgd_index = prev_idx;
249         body->lgd_len = len;
250
251         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, len);
252         ptlrpc_request_set_replen(req);
253
254         rc = ptlrpc_queue_wait(req);
255         if (rc)
256                 GOTO(out, rc);
257
258         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
259         if (body == NULL)
260                 GOTO(out, rc =-EFAULT);
261
262         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
263         if (ptr == NULL)
264                 GOTO(out, rc =-EFAULT);
265
266         memcpy(buf, ptr, len);
267 out:
268         ptlrpc_req_finished(req);
269 err_exit:
270         LLOG_CLIENT_EXIT(loghandle->lgh_ctxt, imp);
271         return rc;
272 }
273
274 static int llog_client_read_header(const struct lu_env *env,
275                                    struct llog_handle *handle)
276 {
277         struct obd_import     *imp;
278         struct ptlrpc_request *req = NULL;
279         struct llogd_body     *body;
280         struct llog_log_hdr   *hdr;
281         struct llog_rec_hdr   *llh_hdr;
282         int                 rc;
283
284         LLOG_CLIENT_ENTRY(handle->lgh_ctxt, imp);
285         req = ptlrpc_request_alloc_pack(imp,&RQF_LLOG_ORIGIN_HANDLE_READ_HEADER,
286                                         LUSTRE_LOG_VERSION,
287                                         LLOG_ORIGIN_HANDLE_READ_HEADER);
288         if (req == NULL)
289                 GOTO(err_exit, rc = -ENOMEM);
290
291         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
292         body->lgd_logid = handle->lgh_id;
293         body->lgd_ctxt_idx = handle->lgh_ctxt->loc_idx - 1;
294         body->lgd_llh_flags = handle->lgh_hdr->llh_flags;
295
296         ptlrpc_request_set_replen(req);
297         rc = ptlrpc_queue_wait(req);
298         if (rc)
299                 GOTO(out, rc);
300
301         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
302         if (hdr == NULL)
303                 GOTO(out, rc =-EFAULT);
304
305         memcpy(handle->lgh_hdr, hdr, sizeof (*hdr));
306         handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
307
308         /* sanity checks */
309         llh_hdr = &handle->lgh_hdr->llh_hdr;
310         if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
311                 CERROR("bad log header magic: %#x (expecting %#x)\n",
312                        llh_hdr->lrh_type, LLOG_HDR_MAGIC);
313                 rc = -EIO;
314         } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
315                 CERROR("incorrectly sized log header: %#x "
316                        "(expecting %#x)\n",
317                        llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
318                 CERROR("you may need to re-run lconf --write_conf.\n");
319                 rc = -EIO;
320         }
321 out:
322         ptlrpc_req_finished(req);
323 err_exit:
324         LLOG_CLIENT_EXIT(handle->lgh_ctxt, imp);
325         return rc;
326 }
327
328 static int llog_client_close(const struct lu_env *env,
329                              struct llog_handle *handle)
330 {
331         /* this doesn't call LLOG_ORIGIN_HANDLE_CLOSE because
332            the servers all close the file at the end of every
333            other LLOG_ RPC. */
334         return(0);
335 }
336
337 struct llog_operations llog_client_ops = {
338         .lop_next_block         = llog_client_next_block,
339         .lop_prev_block         = llog_client_prev_block,
340         .lop_read_header        = llog_client_read_header,
341         .lop_open               = llog_client_open,
342         .lop_destroy            = llog_client_destroy,
343         .lop_close              = llog_client_close,
344 };
345 EXPORT_SYMBOL(llog_client_ops);