]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/ptlrpc/llog_server.c
Merge branch 'for-3.12' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/libata
[karo-tx-linux.git] / drivers / staging / lustre / lustre / ptlrpc / llog_server.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/llog_server.c
37  *
38  * remote api for llog - server side
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOG
44
45
46 #include <obd_class.h>
47 #include <lustre_log.h>
48 #include <lustre_net.h>
49 #include <lustre_fsfilt.h>
50
51 #if  defined(LUSTRE_LOG_SERVER)
52 static int llog_origin_close(const struct lu_env *env, struct llog_handle *lgh)
53 {
54         if (lgh->lgh_hdr != NULL && lgh->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
55                 return llog_cat_close(env, lgh);
56         else
57                 return llog_close(env, lgh);
58 }
59
60 /* Only open is supported, no new llog can be created remotely */
61 int llog_origin_handle_open(struct ptlrpc_request *req)
62 {
63         struct obd_export       *exp = req->rq_export;
64         struct obd_device       *obd = exp->exp_obd;
65         struct obd_device       *disk_obd;
66         struct lvfs_run_ctxt     saved;
67         struct llog_handle      *loghandle;
68         struct llogd_body       *body;
69         struct llog_logid       *logid = NULL;
70         struct llog_ctxt        *ctxt;
71         char                    *name = NULL;
72         int                      rc;
73
74         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
75         if (body == NULL)
76                 return -EFAULT;
77
78         if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
79                 logid = &body->lgd_logid;
80
81         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
82                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
83                 if (name == NULL)
84                         return -EFAULT;
85                 CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
86         }
87
88         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
89         if (ctxt == NULL) {
90                 CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
91                        obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
92                 return -ENODEV;
93         }
94         disk_obd = ctxt->loc_exp->exp_obd;
95         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
96
97         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
98                        name, LLOG_OPEN_EXISTS);
99         if (rc)
100                 GOTO(out_pop, rc);
101
102         rc = req_capsule_server_pack(&req->rq_pill);
103         if (rc)
104                 GOTO(out_close, rc = -ENOMEM);
105
106         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
107         body->lgd_logid = loghandle->lgh_id;
108
109 out_close:
110         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
111 out_pop:
112         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
113         llog_ctxt_put(ctxt);
114         return rc;
115 }
116 EXPORT_SYMBOL(llog_origin_handle_open);
117
118 int llog_origin_handle_destroy(struct ptlrpc_request *req)
119 {
120         struct obd_device       *disk_obd;
121         struct lvfs_run_ctxt     saved;
122         struct llogd_body       *body;
123         struct llog_logid       *logid = NULL;
124         struct llog_ctxt        *ctxt;
125         int                      rc;
126
127         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
128         if (body == NULL)
129                 return -EFAULT;
130
131         if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
132                 logid = &body->lgd_logid;
133
134         if (!(body->lgd_llh_flags & LLOG_F_IS_PLAIN))
135                 CERROR("%s: wrong llog flags %x\n",
136                        req->rq_export->exp_obd->obd_name, body->lgd_llh_flags);
137
138         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
139         if (ctxt == NULL)
140                 return -ENODEV;
141
142         disk_obd = ctxt->loc_exp->exp_obd;
143         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
144
145         rc = req_capsule_server_pack(&req->rq_pill);
146         /* erase only if no error and logid is valid */
147         if (rc == 0)
148                 rc = llog_erase(req->rq_svc_thread->t_env, ctxt, logid, NULL);
149         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
150         llog_ctxt_put(ctxt);
151         return rc;
152 }
153 EXPORT_SYMBOL(llog_origin_handle_destroy);
154
155 int llog_origin_handle_next_block(struct ptlrpc_request *req)
156 {
157         struct obd_device   *disk_obd;
158         struct llog_handle  *loghandle;
159         struct llogd_body   *body;
160         struct llogd_body   *repbody;
161         struct lvfs_run_ctxt saved;
162         struct llog_ctxt    *ctxt;
163         __u32           flags;
164         void            *ptr;
165         int               rc;
166
167         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
168         if (body == NULL)
169                 return -EFAULT;
170
171         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
172         if (ctxt == NULL)
173                 return -ENODEV;
174
175         disk_obd = ctxt->loc_exp->exp_obd;
176         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
177
178         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
179                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
180         if (rc)
181                 GOTO(out_pop, rc);
182
183         flags = body->lgd_llh_flags;
184         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
185                               NULL);
186         if (rc)
187                 GOTO(out_close, rc);
188
189         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
190                              LLOG_CHUNK_SIZE);
191         rc = req_capsule_server_pack(&req->rq_pill);
192         if (rc)
193                 GOTO(out_close, rc = -ENOMEM);
194
195         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
196         *repbody = *body;
197
198         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
199         rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
200                              &repbody->lgd_saved_index, repbody->lgd_index,
201                              &repbody->lgd_cur_offset, ptr, LLOG_CHUNK_SIZE);
202         if (rc)
203                 GOTO(out_close, rc);
204 out_close:
205         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
206 out_pop:
207         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
208         llog_ctxt_put(ctxt);
209         return rc;
210 }
211 EXPORT_SYMBOL(llog_origin_handle_next_block);
212
213 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
214 {
215         struct llog_handle   *loghandle;
216         struct llogd_body    *body;
217         struct llogd_body    *repbody;
218         struct obd_device    *disk_obd;
219         struct lvfs_run_ctxt  saved;
220         struct llog_ctxt     *ctxt;
221         __u32            flags;
222         void             *ptr;
223         int                rc;
224
225         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
226         if (body == NULL)
227                 return -EFAULT;
228
229         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
230         if (ctxt == NULL)
231                 return -ENODEV;
232
233         disk_obd = ctxt->loc_exp->exp_obd;
234         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
235
236         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
237                          &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
238         if (rc)
239                 GOTO(out_pop, rc);
240
241         flags = body->lgd_llh_flags;
242         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
243                               NULL);
244         if (rc)
245                 GOTO(out_close, rc);
246
247         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
248                              LLOG_CHUNK_SIZE);
249         rc = req_capsule_server_pack(&req->rq_pill);
250         if (rc)
251                 GOTO(out_close, rc = -ENOMEM);
252
253         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
254         *repbody = *body;
255
256         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
257         rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
258                              body->lgd_index, ptr, LLOG_CHUNK_SIZE);
259         if (rc)
260                 GOTO(out_close, rc);
261
262 out_close:
263         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
264 out_pop:
265         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
266         llog_ctxt_put(ctxt);
267         return rc;
268 }
269 EXPORT_SYMBOL(llog_origin_handle_prev_block);
270
271 int llog_origin_handle_read_header(struct ptlrpc_request *req)
272 {
273         struct obd_device    *disk_obd;
274         struct llog_handle   *loghandle;
275         struct llogd_body    *body;
276         struct llog_log_hdr  *hdr;
277         struct lvfs_run_ctxt  saved;
278         struct llog_ctxt     *ctxt;
279         __u32            flags;
280         int                rc;
281
282         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
283         if (body == NULL)
284                 return -EFAULT;
285
286         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
287         if (ctxt == NULL)
288                 return -ENODEV;
289
290         disk_obd = ctxt->loc_exp->exp_obd;
291         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
292
293         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
294                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
295         if (rc)
296                 GOTO(out_pop, rc);
297
298         /*
299          * llog_init_handle() reads the llog header
300          */
301         flags = body->lgd_llh_flags;
302         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
303                               NULL);
304         if (rc)
305                 GOTO(out_close, rc);
306         flags = loghandle->lgh_hdr->llh_flags;
307
308         rc = req_capsule_server_pack(&req->rq_pill);
309         if (rc)
310                 GOTO(out_close, rc = -ENOMEM);
311
312         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
313         *hdr = *loghandle->lgh_hdr;
314 out_close:
315         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
316 out_pop:
317         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
318         llog_ctxt_put(ctxt);
319         return rc;
320 }
321 EXPORT_SYMBOL(llog_origin_handle_read_header);
322
323 int llog_origin_handle_close(struct ptlrpc_request *req)
324 {
325         /* Nothing to do */
326         return 0;
327 }
328 EXPORT_SYMBOL(llog_origin_handle_close);
329
330 int llog_origin_handle_cancel(struct ptlrpc_request *req)
331 {
332         int num_cookies, rc = 0, err, i, failed = 0;
333         struct obd_device *disk_obd;
334         struct llog_cookie *logcookies;
335         struct llog_ctxt *ctxt = NULL;
336         struct lvfs_run_ctxt saved;
337         struct llog_handle *cathandle;
338         struct inode *inode;
339         void *handle;
340
341         logcookies = req_capsule_client_get(&req->rq_pill, &RMF_LOGCOOKIES);
342         num_cookies = req_capsule_get_size(&req->rq_pill, &RMF_LOGCOOKIES,
343                                            RCL_CLIENT) / sizeof(*logcookies);
344         if (logcookies == NULL || num_cookies == 0) {
345                 DEBUG_REQ(D_HA, req, "No llog cookies sent");
346                 return -EFAULT;
347         }
348
349         ctxt = llog_get_context(req->rq_export->exp_obd,
350                                 logcookies->lgc_subsys);
351         if (ctxt == NULL)
352                 return -ENODEV;
353
354         disk_obd = ctxt->loc_exp->exp_obd;
355         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
356         for (i = 0; i < num_cookies; i++, logcookies++) {
357                 cathandle = ctxt->loc_handle;
358                 LASSERT(cathandle != NULL);
359                 inode = cathandle->lgh_file->f_dentry->d_inode;
360
361                 handle = fsfilt_start_log(disk_obd, inode,
362                                           FSFILT_OP_CANCEL_UNLINK, NULL, 1);
363                 if (IS_ERR(handle)) {
364                         CERROR("fsfilt_start_log() failed: %ld\n",
365                                PTR_ERR(handle));
366                         GOTO(pop_ctxt, rc = PTR_ERR(handle));
367                 }
368
369                 rc = llog_cat_cancel_records(req->rq_svc_thread->t_env,
370                                              cathandle, 1, logcookies);
371
372                 /*
373                  * Do not raise -ENOENT errors for resent rpcs. This rec already
374                  * might be killed.
375                  */
376                 if (rc == -ENOENT &&
377                     (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) {
378                         /*
379                          * Do not change this message, reply-single.sh test_59b
380                          * expects to find this in log.
381                          */
382                         CDEBUG(D_RPCTRACE, "RESENT cancel req %p - ignored\n",
383                                req);
384                         rc = 0;
385                 } else if (rc == 0) {
386                         CDEBUG(D_RPCTRACE, "Canceled %d llog-records\n",
387                                num_cookies);
388                 }
389
390                 err = fsfilt_commit(disk_obd, inode, handle, 0);
391                 if (err) {
392                         CERROR("Error committing transaction: %d\n", err);
393                         if (!rc)
394                                 rc = err;
395                         failed++;
396                         GOTO(pop_ctxt, rc);
397                 } else if (rc)
398                         failed++;
399         }
400         GOTO(pop_ctxt, rc);
401 pop_ctxt:
402         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
403         if (rc)
404                 CERROR("Cancel %d of %d llog-records failed: %d\n",
405                        failed, num_cookies, rc);
406
407         llog_ctxt_put(ctxt);
408         return rc;
409 }
410 EXPORT_SYMBOL(llog_origin_handle_cancel);
411
412 #else /* !__KERNEL__ */
413 int llog_origin_handle_open(struct ptlrpc_request *req)
414 {
415         LBUG();
416         return 0;
417 }
418
419 int llog_origin_handle_destroy(struct ptlrpc_request *req)
420 {
421         LBUG();
422         return 0;
423 }
424
425 int llog_origin_handle_next_block(struct ptlrpc_request *req)
426 {
427         LBUG();
428         return 0;
429 }
430 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
431 {
432         LBUG();
433         return 0;
434 }
435 int llog_origin_handle_read_header(struct ptlrpc_request *req)
436 {
437         LBUG();
438         return 0;
439 }
440 int llog_origin_handle_close(struct ptlrpc_request *req)
441 {
442         LBUG();
443         return 0;
444 }
445 int llog_origin_handle_cancel(struct ptlrpc_request *req)
446 {
447         LBUG();
448         return 0;
449 }
450 #endif