1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
25 #include "requestqueue.h"
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
36 static struct mutex ls_lock;
37 static struct list_head lslist;
38 static spinlock_t lslist_lock;
39 static struct task_struct * scand_task;
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
45 int n = simple_strtol(buf, NULL, 0);
47 ls = dlm_find_lockspace_local(ls->ls_local_handle);
61 dlm_put_lockspace(ls);
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
67 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69 wake_up(&ls->ls_uevent_wait);
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
75 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
80 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
86 uint32_t status = dlm_recover_status(ls);
87 return snprintf(buf, PAGE_SIZE, "%x\n", status);
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
92 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
96 struct attribute attr;
97 ssize_t (*show)(struct dlm_ls *, char *);
98 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
101 static struct dlm_attr dlm_attr_control = {
102 .attr = {.name = "control", .mode = S_IWUSR},
103 .store = dlm_control_store
106 static struct dlm_attr dlm_attr_event = {
107 .attr = {.name = "event_done", .mode = S_IWUSR},
108 .store = dlm_event_store
111 static struct dlm_attr dlm_attr_id = {
112 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
114 .store = dlm_id_store
117 static struct dlm_attr dlm_attr_recover_status = {
118 .attr = {.name = "recover_status", .mode = S_IRUGO},
119 .show = dlm_recover_status_show
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
124 .show = dlm_recover_nodeid_show
127 static struct attribute *dlm_attrs[] = {
128 &dlm_attr_control.attr,
129 &dlm_attr_event.attr,
131 &dlm_attr_recover_status.attr,
132 &dlm_attr_recover_nodeid.attr,
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
139 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
140 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141 return a->show ? a->show(ls, buf) : 0;
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145 const char *buf, size_t len)
147 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
148 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149 return a->store ? a->store(ls, buf, len) : len;
152 static void lockspace_kobj_release(struct kobject *k)
154 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
158 static struct sysfs_ops dlm_attr_ops = {
159 .show = dlm_attr_show,
160 .store = dlm_attr_store,
163 static struct kobj_type dlm_ktype = {
164 .default_attrs = dlm_attrs,
165 .sysfs_ops = &dlm_attr_ops,
166 .release = lockspace_kobj_release,
169 static struct kset dlm_kset = {
170 .kobj = {.name = "dlm",},
174 static int kobject_setup(struct dlm_ls *ls)
176 char lsname[DLM_LOCKSPACE_LEN];
179 memset(lsname, 0, DLM_LOCKSPACE_LEN);
180 snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
182 error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
186 ls->ls_kobj.kset = &dlm_kset;
187 ls->ls_kobj.ktype = &dlm_ktype;
191 static int do_uevent(struct dlm_ls *ls, int in)
196 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
198 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
200 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
202 /* dlm_controld will see the uevent, do the necessary group management
203 and then write to sysfs to wake us */
205 error = wait_event_interruptible(ls->ls_uevent_wait,
206 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
208 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
213 error = ls->ls_uevent_result;
216 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
217 error, ls->ls_uevent_result);
222 int dlm_lockspace_init(void)
227 mutex_init(&ls_lock);
228 INIT_LIST_HEAD(&lslist);
229 spin_lock_init(&lslist_lock);
231 kobj_set_kset_s(&dlm_kset, kernel_subsys);
232 error = kset_register(&dlm_kset);
234 printk("dlm_lockspace_init: cannot register kset %d\n", error);
238 void dlm_lockspace_exit(void)
240 kset_unregister(&dlm_kset);
243 static int dlm_scand(void *data)
247 while (!kthread_should_stop()) {
248 list_for_each_entry(ls, &lslist, ls_list) {
249 if (dlm_lock_recovery_try(ls)) {
251 dlm_scan_timeout(ls);
252 dlm_unlock_recovery(ls);
255 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
260 static int dlm_scand_start(void)
262 struct task_struct *p;
265 p = kthread_run(dlm_scand, NULL, "dlm_scand");
273 static void dlm_scand_stop(void)
275 kthread_stop(scand_task);
278 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
282 spin_lock(&lslist_lock);
284 list_for_each_entry(ls, &lslist, ls_list) {
285 if (ls->ls_namelen == namelen &&
286 memcmp(ls->ls_name, name, namelen) == 0)
291 spin_unlock(&lslist_lock);
295 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
299 spin_lock(&lslist_lock);
301 list_for_each_entry(ls, &lslist, ls_list) {
302 if (ls->ls_global_id == id) {
309 spin_unlock(&lslist_lock);
313 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
317 spin_lock(&lslist_lock);
318 list_for_each_entry(ls, &lslist, ls_list) {
319 if (ls->ls_local_handle == lockspace) {
326 spin_unlock(&lslist_lock);
330 struct dlm_ls *dlm_find_lockspace_device(int minor)
334 spin_lock(&lslist_lock);
335 list_for_each_entry(ls, &lslist, ls_list) {
336 if (ls->ls_device.minor == minor) {
343 spin_unlock(&lslist_lock);
347 void dlm_put_lockspace(struct dlm_ls *ls)
349 spin_lock(&lslist_lock);
351 spin_unlock(&lslist_lock);
354 static void remove_lockspace(struct dlm_ls *ls)
357 spin_lock(&lslist_lock);
358 if (ls->ls_count == 0) {
359 list_del(&ls->ls_list);
360 spin_unlock(&lslist_lock);
363 spin_unlock(&lslist_lock);
368 static int threads_start(void)
372 /* Thread which process lock requests for all lockspace's */
373 error = dlm_astd_start();
375 log_print("cannot start dlm_astd thread %d", error);
379 error = dlm_scand_start();
381 log_print("cannot start dlm_scand thread %d", error);
385 /* Thread for sending/receiving messages for all lockspace's */
386 error = dlm_lowcomms_start();
388 log_print("cannot start dlm lowcomms %d", error);
402 static void threads_stop(void)
409 static int new_lockspace(char *name, int namelen, void **lockspace,
410 uint32_t flags, int lvblen)
413 int i, size, error = -ENOMEM;
416 if (namelen > DLM_LOCKSPACE_LEN)
419 if (!lvblen || (lvblen % 8))
422 if (!try_module_get(THIS_MODULE))
425 ls = dlm_find_lockspace_name(name, namelen);
428 module_put(THIS_MODULE);
432 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
435 memcpy(ls->ls_name, name, namelen);
436 ls->ls_namelen = namelen;
437 ls->ls_lvblen = lvblen;
441 /* ls_exflags are forced to match among nodes, and we don't
442 need to require all nodes to have TIMEWARN active */
443 if (flags & DLM_LSFL_TIMEWARN)
444 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
445 ls->ls_exflags = (flags & ~DLM_LSFL_TIMEWARN);
447 if (flags & DLM_LSFL_FS)
448 ls->ls_allocation = GFP_NOFS;
450 ls->ls_allocation = GFP_KERNEL;
452 size = dlm_config.ci_rsbtbl_size;
453 ls->ls_rsbtbl_size = size;
455 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
458 for (i = 0; i < size; i++) {
459 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
460 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
461 rwlock_init(&ls->ls_rsbtbl[i].lock);
464 size = dlm_config.ci_lkbtbl_size;
465 ls->ls_lkbtbl_size = size;
467 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
470 for (i = 0; i < size; i++) {
471 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
472 rwlock_init(&ls->ls_lkbtbl[i].lock);
473 ls->ls_lkbtbl[i].counter = 1;
476 size = dlm_config.ci_dirtbl_size;
477 ls->ls_dirtbl_size = size;
479 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
482 for (i = 0; i < size; i++) {
483 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
484 rwlock_init(&ls->ls_dirtbl[i].lock);
487 INIT_LIST_HEAD(&ls->ls_waiters);
488 mutex_init(&ls->ls_waiters_mutex);
489 INIT_LIST_HEAD(&ls->ls_orphans);
490 mutex_init(&ls->ls_orphans_mutex);
491 INIT_LIST_HEAD(&ls->ls_timeout);
492 mutex_init(&ls->ls_timeout_mutex);
494 INIT_LIST_HEAD(&ls->ls_nodes);
495 INIT_LIST_HEAD(&ls->ls_nodes_gone);
496 ls->ls_num_nodes = 0;
497 ls->ls_low_nodeid = 0;
498 ls->ls_total_weight = 0;
499 ls->ls_node_array = NULL;
501 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
502 ls->ls_stub_rsb.res_ls = ls;
504 ls->ls_debug_rsb_dentry = NULL;
505 ls->ls_debug_waiters_dentry = NULL;
507 init_waitqueue_head(&ls->ls_uevent_wait);
508 ls->ls_uevent_result = 0;
509 init_completion(&ls->ls_members_done);
510 ls->ls_members_result = -1;
512 ls->ls_recoverd_task = NULL;
513 mutex_init(&ls->ls_recoverd_active);
514 spin_lock_init(&ls->ls_recover_lock);
515 spin_lock_init(&ls->ls_rcom_spin);
516 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
517 ls->ls_recover_status = 0;
518 ls->ls_recover_seq = 0;
519 ls->ls_recover_args = NULL;
520 init_rwsem(&ls->ls_in_recovery);
521 INIT_LIST_HEAD(&ls->ls_requestqueue);
522 mutex_init(&ls->ls_requestqueue_mutex);
523 mutex_init(&ls->ls_clear_proc_locks);
525 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
526 if (!ls->ls_recover_buf)
529 INIT_LIST_HEAD(&ls->ls_recover_list);
530 spin_lock_init(&ls->ls_recover_list_lock);
531 ls->ls_recover_list_count = 0;
532 ls->ls_local_handle = ls;
533 init_waitqueue_head(&ls->ls_wait_general);
534 INIT_LIST_HEAD(&ls->ls_root_list);
535 init_rwsem(&ls->ls_root_sem);
537 down_write(&ls->ls_in_recovery);
539 spin_lock(&lslist_lock);
540 list_add(&ls->ls_list, &lslist);
541 spin_unlock(&lslist_lock);
543 /* needs to find ls in lslist */
544 error = dlm_recoverd_start(ls);
546 log_error(ls, "can't start dlm_recoverd %d", error);
550 error = kobject_setup(ls);
554 error = kobject_register(&ls->ls_kobj);
558 /* let kobject handle freeing of ls if there's an error */
561 /* This uevent triggers dlm_controld in userspace to add us to the
562 group of nodes that are members of this lockspace (managed by the
563 cluster infrastructure.) Once it's done that, it tells us who the
564 current lockspace members are (via configfs) and then tells the
565 lockspace to start running (via sysfs) in dlm_ls_start(). */
567 error = do_uevent(ls, 1);
571 wait_for_completion(&ls->ls_members_done);
572 error = ls->ls_members_result;
576 dlm_create_debug_file(ls);
578 log_debug(ls, "join complete");
585 dlm_clear_members(ls);
586 kfree(ls->ls_node_array);
588 dlm_recoverd_stop(ls);
590 spin_lock(&lslist_lock);
591 list_del(&ls->ls_list);
592 spin_unlock(&lslist_lock);
593 kfree(ls->ls_recover_buf);
595 kfree(ls->ls_dirtbl);
597 kfree(ls->ls_lkbtbl);
599 kfree(ls->ls_rsbtbl);
602 kobject_unregister(&ls->ls_kobj);
606 module_put(THIS_MODULE);
610 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
611 uint32_t flags, int lvblen)
615 mutex_lock(&ls_lock);
617 error = threads_start();
621 error = new_lockspace(name, namelen, lockspace, flags, lvblen);
627 mutex_unlock(&ls_lock);
631 /* Return 1 if the lockspace still has active remote locks,
632 * 2 if the lockspace still has active local locks.
634 static int lockspace_busy(struct dlm_ls *ls)
636 int i, lkb_found = 0;
639 /* NOTE: We check the lockidtbl here rather than the resource table.
640 This is because there may be LKBs queued as ASTs that have been
641 unlinked from their RSBs and are pending deletion once the AST has
644 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
645 read_lock(&ls->ls_lkbtbl[i].lock);
646 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
648 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
650 if (!lkb->lkb_nodeid) {
651 read_unlock(&ls->ls_lkbtbl[i].lock);
656 read_unlock(&ls->ls_lkbtbl[i].lock);
661 static int release_lockspace(struct dlm_ls *ls, int force)
665 struct list_head *head;
667 int busy = lockspace_busy(ls);
675 dlm_recoverd_stop(ls);
677 remove_lockspace(ls);
679 dlm_delete_debug_file(ls);
683 kfree(ls->ls_recover_buf);
686 * Free direntry structs.
690 kfree(ls->ls_dirtbl);
693 * Free all lkb's on lkbtbl[] lists.
696 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
697 head = &ls->ls_lkbtbl[i].list;
698 while (!list_empty(head)) {
699 lkb = list_entry(head->next, struct dlm_lkb,
702 list_del(&lkb->lkb_idtbl_list);
706 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
707 free_lvb(lkb->lkb_lvbptr);
714 kfree(ls->ls_lkbtbl);
717 * Free all rsb's on rsbtbl[] lists
720 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
721 head = &ls->ls_rsbtbl[i].list;
722 while (!list_empty(head)) {
723 rsb = list_entry(head->next, struct dlm_rsb,
726 list_del(&rsb->res_hashchain);
730 head = &ls->ls_rsbtbl[i].toss;
731 while (!list_empty(head)) {
732 rsb = list_entry(head->next, struct dlm_rsb,
734 list_del(&rsb->res_hashchain);
739 kfree(ls->ls_rsbtbl);
742 * Free structures on any other lists
745 dlm_purge_requestqueue(ls);
746 kfree(ls->ls_recover_args);
747 dlm_clear_free_entries(ls);
748 dlm_clear_members(ls);
749 dlm_clear_members_gone(ls);
750 kfree(ls->ls_node_array);
751 kobject_unregister(&ls->ls_kobj);
752 /* The ls structure will be freed when the kobject is done with */
754 mutex_lock(&ls_lock);
758 mutex_unlock(&ls_lock);
760 module_put(THIS_MODULE);
765 * Called when a system has released all its locks and is not going to use the
766 * lockspace any longer. We free everything we're managing for this lockspace.
767 * Remaining nodes will go through the recovery process as if we'd died. The
768 * lockspace must continue to function as usual, participating in recoveries,
769 * until this returns.
771 * Force has 4 possible values:
772 * 0 - don't destroy locksapce if it has any LKBs
773 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
774 * 2 - destroy lockspace regardless of LKBs
775 * 3 - destroy lockspace as part of a forced shutdown
778 int dlm_release_lockspace(void *lockspace, int force)
782 ls = dlm_find_lockspace_local(lockspace);
785 dlm_put_lockspace(ls);
786 return release_lockspace(ls, force);