]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/ldlm/ldlm_lockd.c
Merge tag 'backlight-for-linus-4.11' of git://git.kernel.org/pub/scm/linux/kernel...
[karo-tx-linux.git] / drivers / staging / lustre / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ldlm/ldlm_lockd.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LDLM
39
40 #include "../../include/linux/libcfs/libcfs.h"
41 #include "../include/lustre_dlm.h"
42 #include "../include/obd_class.h"
43 #include <linux/list.h>
44 #include "ldlm_internal.h"
45
46 static int ldlm_num_threads;
47 module_param(ldlm_num_threads, int, 0444);
48 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
49
50 static char *ldlm_cpts;
51 module_param(ldlm_cpts, charp, 0444);
52 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
53
54 static struct mutex     ldlm_ref_mutex;
55 static int ldlm_refcount;
56
57 static struct kobject *ldlm_kobj;
58 struct kset *ldlm_ns_kset;
59 static struct kset *ldlm_svc_kset;
60
61 struct ldlm_cb_async_args {
62         struct ldlm_cb_set_arg *ca_set_arg;
63         struct ldlm_lock       *ca_lock;
64 };
65
66 /* LDLM state */
67
68 static struct ldlm_state *ldlm_state;
69
70 #define ELT_STOPPED   0
71 #define ELT_READY     1
72 #define ELT_TERMINATE 2
73
74 struct ldlm_bl_pool {
75         spinlock_t              blp_lock;
76
77         /*
78          * blp_prio_list is used for callbacks that should be handled
79          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
80          * see bug 13843
81          */
82         struct list_head              blp_prio_list;
83
84         /*
85          * blp_list is used for all other callbacks which are likely
86          * to take longer to process.
87          */
88         struct list_head              blp_list;
89
90         wait_queue_head_t            blp_waitq;
91         struct completion       blp_comp;
92         atomic_t            blp_num_threads;
93         atomic_t            blp_busy_threads;
94         int                  blp_min_threads;
95         int                  blp_max_threads;
96 };
97
98 struct ldlm_bl_work_item {
99         struct list_head              blwi_entry;
100         struct ldlm_namespace  *blwi_ns;
101         struct ldlm_lock_desc   blwi_ld;
102         struct ldlm_lock       *blwi_lock;
103         struct list_head              blwi_head;
104         int                  blwi_count;
105         struct completion       blwi_comp;
106         enum ldlm_cancel_flags  blwi_flags;
107         int                  blwi_mem_pressure;
108 };
109
110 /**
111  * Callback handler for receiving incoming blocking ASTs.
112  *
113  * This can only happen on client side.
114  */
115 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
116                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
117 {
118         int do_ast;
119
120         LDLM_DEBUG(lock, "client blocking AST callback handler");
121
122         lock_res_and_lock(lock);
123         ldlm_set_cbpending(lock);
124
125         if (ldlm_is_cancel_on_block(lock))
126                 ldlm_set_cancel(lock);
127
128         do_ast = !lock->l_readers && !lock->l_writers;
129         unlock_res_and_lock(lock);
130
131         if (do_ast) {
132                 CDEBUG(D_DLMTRACE,
133                        "Lock %p already unused, calling callback (%p)\n", lock,
134                        lock->l_blocking_ast);
135                 if (lock->l_blocking_ast)
136                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
137                                              LDLM_CB_BLOCKING);
138         } else {
139                 CDEBUG(D_DLMTRACE,
140                        "Lock %p is referenced, will be cancelled later\n",
141                        lock);
142         }
143
144         LDLM_DEBUG(lock, "client blocking callback handler END");
145         LDLM_LOCK_RELEASE(lock);
146 }
147
148 /**
149  * Callback handler for receiving incoming completion ASTs.
150  *
151  * This only can happen on client side.
152  */
153 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
154                                     struct ldlm_namespace *ns,
155                                     struct ldlm_request *dlm_req,
156                                     struct ldlm_lock *lock)
157 {
158         int lvb_len;
159         LIST_HEAD(ast_list);
160         int rc = 0;
161
162         LDLM_DEBUG(lock, "client completion callback handler START");
163
164         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
165                 int to = cfs_time_seconds(1);
166
167                 while (to > 0) {
168                         set_current_state(TASK_INTERRUPTIBLE);
169                         schedule_timeout(to);
170                         if (lock->l_granted_mode == lock->l_req_mode ||
171                             ldlm_is_destroyed(lock))
172                                 break;
173                 }
174         }
175
176         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
177         if (lvb_len < 0) {
178                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
179                 rc = lvb_len;
180                 goto out;
181         } else if (lvb_len > 0) {
182                 if (lock->l_lvb_len > 0) {
183                         /* for extent lock, lvb contains ost_lvb{}. */
184                         LASSERT(lock->l_lvb_data);
185
186                         if (unlikely(lock->l_lvb_len < lvb_len)) {
187                                 LDLM_ERROR(lock, "Replied LVB is larger than expectation, expected = %d, replied = %d",
188                                            lock->l_lvb_len, lvb_len);
189                                 rc = -EINVAL;
190                                 goto out;
191                         }
192                 } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
193                                                      * variable length
194                                                      */
195                         void *lvb_data;
196
197                         lvb_data = kzalloc(lvb_len, GFP_NOFS);
198                         if (!lvb_data) {
199                                 LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
200                                 rc = -ENOMEM;
201                                 goto out;
202                         }
203
204                         lock_res_and_lock(lock);
205                         LASSERT(!lock->l_lvb_data);
206                         lock->l_lvb_type = LVB_T_LAYOUT;
207                         lock->l_lvb_data = lvb_data;
208                         lock->l_lvb_len = lvb_len;
209                         unlock_res_and_lock(lock);
210                 }
211         }
212
213         lock_res_and_lock(lock);
214         if (ldlm_is_destroyed(lock) ||
215             lock->l_granted_mode == lock->l_req_mode) {
216                 /* bug 11300: the lock has already been granted */
217                 unlock_res_and_lock(lock);
218                 LDLM_DEBUG(lock, "Double grant race happened");
219                 rc = 0;
220                 goto out;
221         }
222
223         /* If we receive the completion AST before the actual enqueue returned,
224          * then we might need to switch lock modes, resources, or extents.
225          */
226         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
227                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
228                 LDLM_DEBUG(lock, "completion AST, new lock mode");
229         }
230
231         if (lock->l_resource->lr_type != LDLM_PLAIN) {
232                 ldlm_convert_policy_to_local(req->rq_export,
233                                           dlm_req->lock_desc.l_resource.lr_type,
234                                           &dlm_req->lock_desc.l_policy_data,
235                                           &lock->l_policy_data);
236                 LDLM_DEBUG(lock, "completion AST, new policy data");
237         }
238
239         ldlm_resource_unlink_lock(lock);
240         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
241                    &lock->l_resource->lr_name,
242                    sizeof(lock->l_resource->lr_name)) != 0) {
243                 unlock_res_and_lock(lock);
244                 rc = ldlm_lock_change_resource(ns, lock,
245                                 &dlm_req->lock_desc.l_resource.lr_name);
246                 if (rc < 0) {
247                         LDLM_ERROR(lock, "Failed to allocate resource");
248                         goto out;
249                 }
250                 LDLM_DEBUG(lock, "completion AST, new resource");
251                 CERROR("change resource!\n");
252                 lock_res_and_lock(lock);
253         }
254
255         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
256                 /* BL_AST locks are not needed in LRU.
257                  * Let ldlm_cancel_lru() be fast.
258                  */
259                 ldlm_lock_remove_from_lru(lock);
260                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
261                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
262         }
263
264         if (lock->l_lvb_len > 0) {
265                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
266                                    lock->l_lvb_data, lvb_len);
267                 if (rc < 0) {
268                         unlock_res_and_lock(lock);
269                         goto out;
270                 }
271         }
272
273         ldlm_grant_lock(lock, &ast_list);
274         unlock_res_and_lock(lock);
275
276         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
277
278         /* Let Enqueue to call osc_lock_upcall() and initialize l_ast_data */
279         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
280
281         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
282
283         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
284                           lock);
285         goto out;
286
287 out:
288         if (rc < 0) {
289                 lock_res_and_lock(lock);
290                 ldlm_set_failed(lock);
291                 unlock_res_and_lock(lock);
292                 wake_up(&lock->l_waitq);
293         }
294         LDLM_LOCK_RELEASE(lock);
295 }
296
297 /**
298  * Callback handler for receiving incoming glimpse ASTs.
299  *
300  * This only can happen on client side.  After handling the glimpse AST
301  * we also consider dropping the lock here if it is unused locally for a
302  * long time.
303  */
304 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
305                                     struct ldlm_namespace *ns,
306                                     struct ldlm_request *dlm_req,
307                                     struct ldlm_lock *lock)
308 {
309         int rc = -ENOSYS;
310
311         LDLM_DEBUG(lock, "client glimpse AST callback handler");
312
313         if (lock->l_glimpse_ast)
314                 rc = lock->l_glimpse_ast(lock, req);
315
316         if (req->rq_repmsg) {
317                 ptlrpc_reply(req);
318         } else {
319                 req->rq_status = rc;
320                 ptlrpc_error(req);
321         }
322
323         lock_res_and_lock(lock);
324         if (lock->l_granted_mode == LCK_PW &&
325             !lock->l_readers && !lock->l_writers &&
326             cfs_time_after(cfs_time_current(),
327                            cfs_time_add(lock->l_last_used,
328                                         cfs_time_seconds(10)))) {
329                 unlock_res_and_lock(lock);
330                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
331                         ldlm_handle_bl_callback(ns, NULL, lock);
332
333                 return;
334         }
335         unlock_res_and_lock(lock);
336         LDLM_LOCK_RELEASE(lock);
337 }
338
339 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
340 {
341         if (req->rq_no_reply)
342                 return 0;
343
344         req->rq_status = rc;
345         if (!req->rq_packed_final) {
346                 rc = lustre_pack_reply(req, 1, NULL, NULL);
347                 if (rc)
348                         return rc;
349         }
350         return ptlrpc_reply(req);
351 }
352
353 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
354                                enum ldlm_cancel_flags cancel_flags)
355 {
356         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
357
358         spin_lock(&blp->blp_lock);
359         if (blwi->blwi_lock && ldlm_is_discard_data(blwi->blwi_lock)) {
360                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
361                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
362         } else {
363                 /* other blocking callbacks are added to the regular list */
364                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
365         }
366         spin_unlock(&blp->blp_lock);
367
368         wake_up(&blp->blp_waitq);
369
370         /* can not check blwi->blwi_flags as blwi could be already freed in
371          * LCF_ASYNC mode
372          */
373         if (!(cancel_flags & LCF_ASYNC))
374                 wait_for_completion(&blwi->blwi_comp);
375
376         return 0;
377 }
378
379 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
380                              struct ldlm_namespace *ns,
381                              struct ldlm_lock_desc *ld,
382                              struct list_head *cancels, int count,
383                              struct ldlm_lock *lock,
384                              enum ldlm_cancel_flags cancel_flags)
385 {
386         init_completion(&blwi->blwi_comp);
387         INIT_LIST_HEAD(&blwi->blwi_head);
388
389         if (memory_pressure_get())
390                 blwi->blwi_mem_pressure = 1;
391
392         blwi->blwi_ns = ns;
393         blwi->blwi_flags = cancel_flags;
394         if (ld)
395                 blwi->blwi_ld = *ld;
396         if (count) {
397                 list_add(&blwi->blwi_head, cancels);
398                 list_del_init(cancels);
399                 blwi->blwi_count = count;
400         } else {
401                 blwi->blwi_lock = lock;
402         }
403 }
404
405 /**
406  * Queues a list of locks \a cancels containing \a count locks
407  * for later processing by a blocking thread.  If \a count is zero,
408  * then the lock referenced as \a lock is queued instead.
409  *
410  * The blocking thread would then call ->l_blocking_ast callback in the lock.
411  * If list addition fails an error is returned and caller is supposed to
412  * call ->l_blocking_ast itself.
413  */
414 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
415                              struct ldlm_lock_desc *ld,
416                              struct ldlm_lock *lock,
417                              struct list_head *cancels, int count,
418                              enum ldlm_cancel_flags cancel_flags)
419 {
420         if (cancels && count == 0)
421                 return 0;
422
423         if (cancel_flags & LCF_ASYNC) {
424                 struct ldlm_bl_work_item *blwi;
425
426                 blwi = kzalloc(sizeof(*blwi), GFP_NOFS);
427                 if (!blwi)
428                         return -ENOMEM;
429                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
430
431                 return __ldlm_bl_to_thread(blwi, cancel_flags);
432         } else {
433                 /* if it is synchronous call do minimum mem alloc, as it could
434                  * be triggered from kernel shrinker
435                  */
436                 struct ldlm_bl_work_item blwi;
437
438                 memset(&blwi, 0, sizeof(blwi));
439                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
440                 return __ldlm_bl_to_thread(&blwi, cancel_flags);
441         }
442 }
443
444 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
445                            struct ldlm_lock *lock)
446 {
447         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
448 }
449
450 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
451                            struct list_head *cancels, int count,
452                            enum ldlm_cancel_flags cancel_flags)
453 {
454         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
455 }
456
457 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
458 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
459 {
460         struct obd_device *obd = req->rq_export->exp_obd;
461         char *key;
462         void *val;
463         int keylen, vallen;
464         int rc = -ENOSYS;
465
466         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
467
468         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
469
470         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
471         if (!key) {
472                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
473                 return -EFAULT;
474         }
475         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
476                                       RCL_CLIENT);
477         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
478         if (!val) {
479                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
480                 return -EFAULT;
481         }
482         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
483                                       RCL_CLIENT);
484
485         /* We are responsible for swabbing contents of val */
486
487         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
488                 /* Pass it on to mdc (the "export" in this case) */
489                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
490                                         req->rq_export,
491                                         sizeof(KEY_HSM_COPYTOOL_SEND),
492                                         KEY_HSM_COPYTOOL_SEND,
493                                         vallen, val, NULL);
494         else
495                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
496
497         return rc;
498 }
499
500 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
501                                         const char *msg, int rc,
502                                         const struct lustre_handle *handle)
503 {
504         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
505                   "%s: [nid %s] [rc %d] [lock %#llx]",
506                   msg, libcfs_id2str(req->rq_peer), rc,
507                   handle ? handle->cookie : 0);
508         if (req->rq_no_reply)
509                 CWARN("No reply was sent, maybe cause bug 21636.\n");
510         else if (rc)
511                 CWARN("Send reply failed, maybe cause bug 21636.\n");
512 }
513
514 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
515 static int ldlm_callback_handler(struct ptlrpc_request *req)
516 {
517         struct ldlm_namespace *ns;
518         struct ldlm_request *dlm_req;
519         struct ldlm_lock *lock;
520         int rc;
521
522         /* Requests arrive in sender's byte order.  The ptlrpc service
523          * handler has already checked and, if necessary, byte-swapped the
524          * incoming request message body, but I am responsible for the
525          * message buffers.
526          */
527
528         /* do nothing for sec context finalize */
529         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
530                 return 0;
531
532         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
533
534         if (!req->rq_export) {
535                 rc = ldlm_callback_reply(req, -ENOTCONN);
536                 ldlm_callback_errmsg(req, "Operate on unconnected server",
537                                      rc, NULL);
538                 return 0;
539         }
540
541         LASSERT(req->rq_export->exp_obd);
542
543         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
544         case LDLM_BL_CALLBACK:
545                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
546                         if (cfs_fail_err)
547                                 ldlm_callback_reply(req, -(int)cfs_fail_err);
548                         return 0;
549                 }
550                 break;
551         case LDLM_CP_CALLBACK:
552                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
553                         return 0;
554                 break;
555         case LDLM_GL_CALLBACK:
556                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
557                         return 0;
558                 break;
559         case LDLM_SET_INFO:
560                 rc = ldlm_handle_setinfo(req);
561                 ldlm_callback_reply(req, rc);
562                 return 0;
563         default:
564                 CERROR("unknown opcode %u\n",
565                        lustre_msg_get_opc(req->rq_reqmsg));
566                 ldlm_callback_reply(req, -EPROTO);
567                 return 0;
568         }
569
570         ns = req->rq_export->exp_obd->obd_namespace;
571         LASSERT(ns);
572
573         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
574
575         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
576         if (!dlm_req) {
577                 rc = ldlm_callback_reply(req, -EPROTO);
578                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
579                                      NULL);
580                 return 0;
581         }
582
583         /* Force a known safe race, send a cancel to the server for a lock
584          * which the server has already started a blocking callback on.
585          */
586         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
587             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
588                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
589                 if (rc < 0)
590                         CERROR("ldlm_cli_cancel: %d\n", rc);
591         }
592
593         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
594         if (!lock) {
595                 CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock disappeared\n",
596                        dlm_req->lock_handle[0].cookie);
597                 rc = ldlm_callback_reply(req, -EINVAL);
598                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
599                                      &dlm_req->lock_handle[0]);
600                 return 0;
601         }
602
603         if (ldlm_is_fail_loc(lock) &&
604             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
605                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
606
607         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
608         lock_res_and_lock(lock);
609         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
610                                               LDLM_FL_AST_MASK);
611         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
612                 /* If somebody cancels lock and cache is already dropped,
613                  * or lock is failed before cp_ast received on client,
614                  * we can tell the server we have no lock. Otherwise, we
615                  * should send cancel after dropping the cache.
616                  */
617                 if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
618                     ldlm_is_failed(lock)) {
619                         LDLM_DEBUG(lock,
620                                    "callback on lock %#llx - lock disappeared",
621                                    dlm_req->lock_handle[0].cookie);
622                         unlock_res_and_lock(lock);
623                         LDLM_LOCK_RELEASE(lock);
624                         rc = ldlm_callback_reply(req, -EINVAL);
625                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
626                                              &dlm_req->lock_handle[0]);
627                         return 0;
628                 }
629                 /* BL_AST locks are not needed in LRU.
630                  * Let ldlm_cancel_lru() be fast.
631                  */
632                 ldlm_lock_remove_from_lru(lock);
633                 ldlm_set_bl_ast(lock);
634         }
635         unlock_res_and_lock(lock);
636
637         /* We want the ost thread to get this reply so that it can respond
638          * to ost requests (write cache writeback) that might be triggered
639          * in the callback.
640          *
641          * But we'd also like to be able to indicate in the reply that we're
642          * cancelling right now, because it's unused, or have an intent result
643          * in the reply, so we might have to push the responsibility for sending
644          * the reply down into the AST handlers, alas.
645          */
646
647         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
648         case LDLM_BL_CALLBACK:
649                 CDEBUG(D_INODE, "blocking ast\n");
650                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
651                 if (!ldlm_is_cancel_on_block(lock)) {
652                         rc = ldlm_callback_reply(req, 0);
653                         if (req->rq_no_reply || rc)
654                                 ldlm_callback_errmsg(req, "Normal process", rc,
655                                                      &dlm_req->lock_handle[0]);
656                 }
657                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
658                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
659                 break;
660         case LDLM_CP_CALLBACK:
661                 CDEBUG(D_INODE, "completion ast\n");
662                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
663                 ldlm_callback_reply(req, 0);
664                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
665                 break;
666         case LDLM_GL_CALLBACK:
667                 CDEBUG(D_INODE, "glimpse ast\n");
668                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
669                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
670                 break;
671         default:
672                 LBUG();                  /* checked above */
673         }
674
675         return 0;
676 }
677
678 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
679 {
680         struct ldlm_bl_work_item *blwi = NULL;
681         static unsigned int num_bl;
682
683         spin_lock(&blp->blp_lock);
684         /* process a request from the blp_list at least every blp_num_threads */
685         if (!list_empty(&blp->blp_list) &&
686             (list_empty(&blp->blp_prio_list) || num_bl == 0))
687                 blwi = list_entry(blp->blp_list.next,
688                                   struct ldlm_bl_work_item, blwi_entry);
689         else
690                 if (!list_empty(&blp->blp_prio_list))
691                         blwi = list_entry(blp->blp_prio_list.next,
692                                           struct ldlm_bl_work_item,
693                                           blwi_entry);
694
695         if (blwi) {
696                 if (++num_bl >= atomic_read(&blp->blp_num_threads))
697                         num_bl = 0;
698                 list_del(&blwi->blwi_entry);
699         }
700         spin_unlock(&blp->blp_lock);
701
702         return blwi;
703 }
704
705 /* This only contains temporary data until the thread starts */
706 struct ldlm_bl_thread_data {
707         char                    bltd_name[CFS_CURPROC_COMM_MAX];
708         struct ldlm_bl_pool     *bltd_blp;
709         struct completion       bltd_comp;
710         int                     bltd_num;
711 };
712
713 static int ldlm_bl_thread_main(void *arg);
714
715 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
716 {
717         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
718         struct task_struct *task;
719
720         init_completion(&bltd.bltd_comp);
721         bltd.bltd_num = atomic_read(&blp->blp_num_threads);
722         snprintf(bltd.bltd_name, sizeof(bltd.bltd_name),
723                  "ldlm_bl_%02d", bltd.bltd_num);
724         task = kthread_run(ldlm_bl_thread_main, &bltd, "%s", bltd.bltd_name);
725         if (IS_ERR(task)) {
726                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
727                        atomic_read(&blp->blp_num_threads), PTR_ERR(task));
728                 return PTR_ERR(task);
729         }
730         wait_for_completion(&bltd.bltd_comp);
731
732         return 0;
733 }
734
735 /**
736  * Main blocking requests processing thread.
737  *
738  * Callers put locks into its queue by calling ldlm_bl_to_thread.
739  * This thread in the end ends up doing actual call to ->l_blocking_ast
740  * for queued locks.
741  */
742 static int ldlm_bl_thread_main(void *arg)
743 {
744         struct ldlm_bl_pool *blp;
745
746         {
747                 struct ldlm_bl_thread_data *bltd = arg;
748
749                 blp = bltd->bltd_blp;
750
751                 atomic_inc(&blp->blp_num_threads);
752                 atomic_inc(&blp->blp_busy_threads);
753
754                 complete(&bltd->bltd_comp);
755                 /* cannot use bltd after this, it is only on caller's stack */
756         }
757
758         while (1) {
759                 struct l_wait_info lwi = { 0 };
760                 struct ldlm_bl_work_item *blwi = NULL;
761                 int busy;
762
763                 blwi = ldlm_bl_get_work(blp);
764
765                 if (!blwi) {
766                         atomic_dec(&blp->blp_busy_threads);
767                         l_wait_event_exclusive(blp->blp_waitq,
768                                                (blwi = ldlm_bl_get_work(blp)),
769                                                &lwi);
770                         busy = atomic_inc_return(&blp->blp_busy_threads);
771                 } else {
772                         busy = atomic_read(&blp->blp_busy_threads);
773                 }
774
775                 if (!blwi->blwi_ns)
776                         /* added by ldlm_cleanup() */
777                         break;
778
779                 /* Not fatal if racy and have a few too many threads */
780                 if (unlikely(busy < blp->blp_max_threads &&
781                              busy >= atomic_read(&blp->blp_num_threads) &&
782                              !blwi->blwi_mem_pressure))
783                         /* discard the return value, we tried */
784                         ldlm_bl_thread_start(blp);
785
786                 if (blwi->blwi_mem_pressure)
787                         memory_pressure_set();
788
789                 if (blwi->blwi_count) {
790                         int count;
791                         /* The special case when we cancel locks in LRU
792                          * asynchronously, we pass the list of locks here.
793                          * Thus locks are marked LDLM_FL_CANCELING, but NOT
794                          * canceled locally yet.
795                          */
796                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
797                                                            blwi->blwi_count,
798                                                            LCF_BL_AST);
799                         ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
800                                              blwi->blwi_flags);
801                 } else {
802                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
803                                                 blwi->blwi_lock);
804                 }
805                 if (blwi->blwi_mem_pressure)
806                         memory_pressure_clr();
807
808                 if (blwi->blwi_flags & LCF_ASYNC)
809                         kfree(blwi);
810                 else
811                         complete(&blwi->blwi_comp);
812         }
813
814         atomic_dec(&blp->blp_busy_threads);
815         atomic_dec(&blp->blp_num_threads);
816         complete(&blp->blp_comp);
817         return 0;
818 }
819
820 static int ldlm_setup(void);
821 static int ldlm_cleanup(void);
822
823 int ldlm_get_ref(void)
824 {
825         int rc = 0;
826
827         mutex_lock(&ldlm_ref_mutex);
828         if (++ldlm_refcount == 1) {
829                 rc = ldlm_setup();
830                 if (rc)
831                         ldlm_refcount--;
832         }
833         mutex_unlock(&ldlm_ref_mutex);
834
835         return rc;
836 }
837
838 void ldlm_put_ref(void)
839 {
840         mutex_lock(&ldlm_ref_mutex);
841         if (ldlm_refcount == 1) {
842                 int rc = ldlm_cleanup();
843
844                 if (rc)
845                         CERROR("ldlm_cleanup failed: %d\n", rc);
846                 else
847                         ldlm_refcount--;
848         } else {
849                 ldlm_refcount--;
850         }
851         mutex_unlock(&ldlm_ref_mutex);
852 }
853
854 static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
855                                                       struct attribute *attr,
856                                                       char *buf)
857 {
858         return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
859 }
860
861 static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
862                                                        struct attribute *attr,
863                                                        const char *buffer,
864                                                        size_t count)
865 {
866         int rc;
867         unsigned long val;
868
869         rc = kstrtoul(buffer, 10, &val);
870         if (rc)
871                 return rc;
872
873         ldlm_cancel_unused_locks_before_replay = val;
874
875         return count;
876 }
877 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
878
879 /* These are for root of /sys/fs/lustre/ldlm */
880 static struct attribute *ldlm_attrs[] = {
881         &lustre_attr_cancel_unused_locks_before_replay.attr,
882         NULL,
883 };
884
885 static struct attribute_group ldlm_attr_group = {
886         .attrs = ldlm_attrs,
887 };
888
889 static int ldlm_setup(void)
890 {
891         static struct ptlrpc_service_conf       conf;
892         struct ldlm_bl_pool                     *blp = NULL;
893         int rc = 0;
894         int i;
895
896         if (ldlm_state)
897                 return -EALREADY;
898
899         ldlm_state = kzalloc(sizeof(*ldlm_state), GFP_NOFS);
900         if (!ldlm_state)
901                 return -ENOMEM;
902
903         ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj);
904         if (!ldlm_kobj) {
905                 rc = -ENOMEM;
906                 goto out;
907         }
908
909         rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
910         if (rc)
911                 goto out;
912
913         ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
914         if (!ldlm_ns_kset) {
915                 rc = -ENOMEM;
916                 goto out;
917         }
918
919         ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
920         if (!ldlm_svc_kset) {
921                 rc = -ENOMEM;
922                 goto out;
923         }
924
925         rc = ldlm_debugfs_setup();
926         if (rc != 0)
927                 goto out;
928
929         memset(&conf, 0, sizeof(conf));
930         conf = (typeof(conf)) {
931                 .psc_name               = "ldlm_cbd",
932                 .psc_watchdog_factor    = 2,
933                 .psc_buf                = {
934                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
935                         .bc_buf_size            = LDLM_BUFSIZE,
936                         .bc_req_max_size        = LDLM_MAXREQSIZE,
937                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
938                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
939                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
940                 },
941                 .psc_thr                = {
942                         .tc_thr_name            = "ldlm_cb",
943                         .tc_thr_factor          = LDLM_THR_FACTOR,
944                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
945                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
946                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
947                         .tc_nthrs_user          = ldlm_num_threads,
948                         .tc_cpu_affinity        = 1,
949                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
950                 },
951                 .psc_cpt                = {
952                         .cc_pattern             = ldlm_cpts,
953                 },
954                 .psc_ops                = {
955                         .so_req_handler         = ldlm_callback_handler,
956                 },
957         };
958         ldlm_state->ldlm_cb_service =
959                         ptlrpc_register_service(&conf, ldlm_svc_kset,
960                                                 ldlm_svc_debugfs_dir);
961         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
962                 CERROR("failed to start service\n");
963                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
964                 ldlm_state->ldlm_cb_service = NULL;
965                 goto out;
966         }
967
968         blp = kzalloc(sizeof(*blp), GFP_NOFS);
969         if (!blp) {
970                 rc = -ENOMEM;
971                 goto out;
972         }
973         ldlm_state->ldlm_bl_pool = blp;
974
975         spin_lock_init(&blp->blp_lock);
976         INIT_LIST_HEAD(&blp->blp_list);
977         INIT_LIST_HEAD(&blp->blp_prio_list);
978         init_waitqueue_head(&blp->blp_waitq);
979         atomic_set(&blp->blp_num_threads, 0);
980         atomic_set(&blp->blp_busy_threads, 0);
981
982         if (ldlm_num_threads == 0) {
983                 blp->blp_min_threads = LDLM_NTHRS_INIT;
984                 blp->blp_max_threads = LDLM_NTHRS_MAX;
985         } else {
986                 blp->blp_min_threads = min_t(int, LDLM_NTHRS_MAX,
987                                              max_t(int, LDLM_NTHRS_INIT,
988                                                    ldlm_num_threads));
989
990                 blp->blp_max_threads = blp->blp_min_threads;
991         }
992
993         for (i = 0; i < blp->blp_min_threads; i++) {
994                 rc = ldlm_bl_thread_start(blp);
995                 if (rc < 0)
996                         goto out;
997         }
998
999         rc = ldlm_pools_init();
1000         if (rc) {
1001                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
1002                 goto out;
1003         }
1004         return 0;
1005
1006  out:
1007         ldlm_cleanup();
1008         return rc;
1009 }
1010
1011 static int ldlm_cleanup(void)
1012 {
1013         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1014             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1015                 CERROR("ldlm still has namespaces; clean these up first.\n");
1016                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1017                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1018                 return -EBUSY;
1019         }
1020
1021         ldlm_pools_fini();
1022
1023         if (ldlm_state->ldlm_bl_pool) {
1024                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1025
1026                 while (atomic_read(&blp->blp_num_threads) > 0) {
1027                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1028
1029                         init_completion(&blp->blp_comp);
1030
1031                         spin_lock(&blp->blp_lock);
1032                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1033                         wake_up(&blp->blp_waitq);
1034                         spin_unlock(&blp->blp_lock);
1035
1036                         wait_for_completion(&blp->blp_comp);
1037                 }
1038
1039                 kfree(blp);
1040         }
1041
1042         if (ldlm_state->ldlm_cb_service)
1043                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1044
1045         if (ldlm_ns_kset)
1046                 kset_unregister(ldlm_ns_kset);
1047         if (ldlm_svc_kset)
1048                 kset_unregister(ldlm_svc_kset);
1049         if (ldlm_kobj)
1050                 kobject_put(ldlm_kobj);
1051
1052         ldlm_debugfs_cleanup();
1053
1054         kfree(ldlm_state);
1055         ldlm_state = NULL;
1056
1057         return 0;
1058 }
1059
1060 int ldlm_init(void)
1061 {
1062         mutex_init(&ldlm_ref_mutex);
1063         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1064         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1065         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1066                                                sizeof(struct ldlm_resource), 0,
1067                                                SLAB_HWCACHE_ALIGN, NULL);
1068         if (!ldlm_resource_slab)
1069                 return -ENOMEM;
1070
1071         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1072                                            sizeof(struct ldlm_lock), 0,
1073                                            SLAB_HWCACHE_ALIGN |
1074                                            SLAB_DESTROY_BY_RCU, NULL);
1075         if (!ldlm_lock_slab) {
1076                 kmem_cache_destroy(ldlm_resource_slab);
1077                 return -ENOMEM;
1078         }
1079
1080         ldlm_interval_slab = kmem_cache_create("interval_node",
1081                                                sizeof(struct ldlm_interval),
1082                                                0, SLAB_HWCACHE_ALIGN, NULL);
1083         if (!ldlm_interval_slab) {
1084                 kmem_cache_destroy(ldlm_resource_slab);
1085                 kmem_cache_destroy(ldlm_lock_slab);
1086                 return -ENOMEM;
1087         }
1088 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1089         class_export_dump_hook = ldlm_dump_export_locks;
1090 #endif
1091         return 0;
1092 }
1093
1094 void ldlm_exit(void)
1095 {
1096         if (ldlm_refcount)
1097                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1098         kmem_cache_destroy(ldlm_resource_slab);
1099         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1100          * synchronize_rcu() to wait a grace period elapsed, so that
1101          * ldlm_lock_free() get a chance to be called.
1102          */
1103         synchronize_rcu();
1104         kmem_cache_destroy(ldlm_lock_slab);
1105         kmem_cache_destroy(ldlm_interval_slab);
1106 }