1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
63 #include "requestqueue.h"
67 #include "lockspace.h"
72 #include "lvb_table.h"
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 void dlm_timeout_warn(struct dlm_lkb *lkb);
94 * Lock compatibilty matrix - thanks Steve
95 * UN = Unlocked state. Not really a state, used as a flag
96 * PD = Padding. Used to make the matrix a nice power of two in size
97 * Other states are the same as the VMS DLM.
98 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
101 static const int __dlm_compat_matrix[8][8] = {
102 /* UN NL CR CW PR PW EX PD */
103 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
105 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
106 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
107 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
108 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
109 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
110 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
114 * This defines the direction of transfer of LVB data.
115 * Granted mode is the row; requested mode is the column.
116 * Usage: matrix[grmode+1][rqmode+1]
117 * 1 = LVB is returned to the caller
118 * 0 = LVB is written to the resource
119 * -1 = nothing happens to the LVB
122 const int dlm_lvb_operations[8][8] = {
123 /* UN NL CR CW PR PW EX PD*/
124 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
125 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
126 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
127 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
128 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
129 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
130 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
131 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
134 #define modes_compat(gr, rq) \
135 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 int dlm_modes_compat(int mode1, int mode2)
139 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
143 * Compatibility matrix for conversions with QUECVT set.
144 * Granted mode is the row; requested mode is the column.
145 * Usage: matrix[grmode+1][rqmode+1]
148 static const int __quecvt_compat_matrix[8][8] = {
149 /* UN NL CR CW PR PW EX PD */
150 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
151 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
152 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
153 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
154 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
155 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
156 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
157 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
160 void dlm_print_lkb(struct dlm_lkb *lkb)
162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
169 void dlm_print_rsb(struct dlm_rsb *r)
171 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 r->res_nodeid, r->res_flags, r->res_first_lkid,
173 r->res_recover_locks_count, r->res_name);
176 void dlm_dump_rsb(struct dlm_rsb *r)
182 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 printk(KERN_ERR "rsb lookup list\n");
185 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
187 printk(KERN_ERR "rsb grant queue:\n");
188 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
190 printk(KERN_ERR "rsb convert queue:\n");
191 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
193 printk(KERN_ERR "rsb wait queue:\n");
194 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
198 /* Threads cannot use the lockspace while it's being recovered */
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
202 down_read(&ls->ls_in_recovery);
205 void dlm_unlock_recovery(struct dlm_ls *ls)
207 up_read(&ls->ls_in_recovery);
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
212 return down_read_trylock(&ls->ls_in_recovery);
215 static inline int can_be_queued(struct dlm_lkb *lkb)
217 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
222 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
225 static inline int is_demoted(struct dlm_lkb *lkb)
227 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
230 static inline int is_altmode(struct dlm_lkb *lkb)
232 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
235 static inline int is_granted(struct dlm_lkb *lkb)
237 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
240 static inline int is_remote(struct dlm_rsb *r)
242 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 return !!r->res_nodeid;
246 static inline int is_process_copy(struct dlm_lkb *lkb)
248 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
251 static inline int is_master_copy(struct dlm_lkb *lkb)
253 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
258 static inline int middle_conversion(struct dlm_lkb *lkb)
260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
266 static inline int down_conversion(struct dlm_lkb *lkb)
268 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
281 static inline int is_overlap(struct dlm_lkb *lkb)
283 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 DLM_IFL_OVERLAP_CANCEL));
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289 if (is_master_copy(lkb))
294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303 lkb->lkb_lksb->sb_status = rv;
304 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
306 dlm_add_ast(lkb, AST_COMP);
309 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
317 if (is_master_copy(lkb))
318 send_bast(r, lkb, rqmode);
320 lkb->lkb_bastmode = rqmode;
321 dlm_add_ast(lkb, AST_BAST);
326 * Basic operations on rsb's and lkb's
329 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
333 r = allocate_rsb(ls, len);
339 memcpy(r->res_name, name, len);
340 mutex_init(&r->res_mutex);
342 INIT_LIST_HEAD(&r->res_lookup);
343 INIT_LIST_HEAD(&r->res_grantqueue);
344 INIT_LIST_HEAD(&r->res_convertqueue);
345 INIT_LIST_HEAD(&r->res_waitqueue);
346 INIT_LIST_HEAD(&r->res_root_list);
347 INIT_LIST_HEAD(&r->res_recover_list);
352 static int search_rsb_list(struct list_head *head, char *name, int len,
353 unsigned int flags, struct dlm_rsb **r_ret)
358 list_for_each_entry(r, head, res_hashchain) {
359 if (len == r->res_length && !memcmp(name, r->res_name, len))
365 if (r->res_nodeid && (flags & R_MASTER))
371 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
372 unsigned int flags, struct dlm_rsb **r_ret)
377 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
379 kref_get(&r->res_ref);
382 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
386 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
388 if (dlm_no_directory(ls))
391 if (r->res_nodeid == -1) {
392 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
393 r->res_first_lkid = 0;
394 } else if (r->res_nodeid > 0) {
395 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
396 r->res_first_lkid = 0;
398 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
399 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
406 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
407 unsigned int flags, struct dlm_rsb **r_ret)
410 write_lock(&ls->ls_rsbtbl[b].lock);
411 error = _search_rsb(ls, name, len, b, flags, r_ret);
412 write_unlock(&ls->ls_rsbtbl[b].lock);
417 * Find rsb in rsbtbl and potentially create/add one
419 * Delaying the release of rsb's has a similar benefit to applications keeping
420 * NL locks on an rsb, but without the guarantee that the cached master value
421 * will still be valid when the rsb is reused. Apps aren't always smart enough
422 * to keep NL locks on an rsb that they may lock again shortly; this can lead
423 * to excessive master lookups and removals if we don't delay the release.
425 * Searching for an rsb means looking through both the normal list and toss
426 * list. When found on the toss list the rsb is moved to the normal list with
427 * ref count of 1; when found on normal list the ref count is incremented.
430 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
431 unsigned int flags, struct dlm_rsb **r_ret)
433 struct dlm_rsb *r, *tmp;
434 uint32_t hash, bucket;
437 if (dlm_no_directory(ls))
440 hash = jhash(name, namelen, 0);
441 bucket = hash & (ls->ls_rsbtbl_size - 1);
443 error = search_rsb(ls, name, namelen, bucket, flags, &r);
447 if (error == -EBADR && !(flags & R_CREATE))
450 /* the rsb was found but wasn't a master copy */
451 if (error == -ENOTBLK)
455 r = create_rsb(ls, name, namelen);
460 r->res_bucket = bucket;
462 kref_init(&r->res_ref);
464 /* With no directory, the master can be set immediately */
465 if (dlm_no_directory(ls)) {
466 int nodeid = dlm_dir_nodeid(r);
467 if (nodeid == dlm_our_nodeid())
469 r->res_nodeid = nodeid;
472 write_lock(&ls->ls_rsbtbl[bucket].lock);
473 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
475 write_unlock(&ls->ls_rsbtbl[bucket].lock);
480 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
481 write_unlock(&ls->ls_rsbtbl[bucket].lock);
488 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
489 unsigned int flags, struct dlm_rsb **r_ret)
491 return find_rsb(ls, name, namelen, flags, r_ret);
494 /* This is only called to add a reference when the code already holds
495 a valid reference to the rsb, so there's no need for locking. */
497 static inline void hold_rsb(struct dlm_rsb *r)
499 kref_get(&r->res_ref);
502 void dlm_hold_rsb(struct dlm_rsb *r)
507 static void toss_rsb(struct kref *kref)
509 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510 struct dlm_ls *ls = r->res_ls;
512 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513 kref_init(&r->res_ref);
514 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515 r->res_toss_time = jiffies;
517 free_lvb(r->res_lvbptr);
518 r->res_lvbptr = NULL;
522 /* When all references to the rsb are gone it's transfered to
523 the tossed list for later disposal. */
525 static void put_rsb(struct dlm_rsb *r)
527 struct dlm_ls *ls = r->res_ls;
528 uint32_t bucket = r->res_bucket;
530 write_lock(&ls->ls_rsbtbl[bucket].lock);
531 kref_put(&r->res_ref, toss_rsb);
532 write_unlock(&ls->ls_rsbtbl[bucket].lock);
535 void dlm_put_rsb(struct dlm_rsb *r)
540 /* See comment for unhold_lkb */
542 static void unhold_rsb(struct dlm_rsb *r)
545 rv = kref_put(&r->res_ref, toss_rsb);
546 DLM_ASSERT(!rv, dlm_dump_rsb(r););
549 static void kill_rsb(struct kref *kref)
551 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
553 /* All work is done after the return from kref_put() so we
554 can release the write_lock before the remove and free. */
556 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
564 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565 The rsb must exist as long as any lkb's for it do. */
567 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
570 lkb->lkb_resource = r;
573 static void detach_lkb(struct dlm_lkb *lkb)
575 if (lkb->lkb_resource) {
576 put_rsb(lkb->lkb_resource);
577 lkb->lkb_resource = NULL;
581 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
583 struct dlm_lkb *lkb, *tmp;
587 lkb = allocate_lkb(ls);
591 lkb->lkb_nodeid = -1;
592 lkb->lkb_grmode = DLM_LOCK_IV;
593 kref_init(&lkb->lkb_ref);
594 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
595 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
596 INIT_LIST_HEAD(&lkb->lkb_time_list);
598 get_random_bytes(&bucket, sizeof(bucket));
599 bucket &= (ls->ls_lkbtbl_size - 1);
601 write_lock(&ls->ls_lkbtbl[bucket].lock);
603 /* counter can roll over so we must verify lkid is not in use */
606 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
608 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
610 if (tmp->lkb_id != lkid)
618 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
619 write_unlock(&ls->ls_lkbtbl[bucket].lock);
625 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
628 uint16_t bucket = (lkid >> 16);
630 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
631 if (lkb->lkb_id == lkid)
637 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
640 uint16_t bucket = (lkid >> 16);
642 if (bucket >= ls->ls_lkbtbl_size)
645 read_lock(&ls->ls_lkbtbl[bucket].lock);
646 lkb = __find_lkb(ls, lkid);
648 kref_get(&lkb->lkb_ref);
649 read_unlock(&ls->ls_lkbtbl[bucket].lock);
652 return lkb ? 0 : -ENOENT;
655 static void kill_lkb(struct kref *kref)
657 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
659 /* All work is done after the return from kref_put() so we
660 can release the write_lock before the detach_lkb */
662 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
665 /* __put_lkb() is used when an lkb may not have an rsb attached to
666 it so we need to provide the lockspace explicitly */
668 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
670 uint16_t bucket = (lkb->lkb_id >> 16);
672 write_lock(&ls->ls_lkbtbl[bucket].lock);
673 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
674 list_del(&lkb->lkb_idtbl_list);
675 write_unlock(&ls->ls_lkbtbl[bucket].lock);
679 /* for local/process lkbs, lvbptr points to caller's lksb */
680 if (lkb->lkb_lvbptr && is_master_copy(lkb))
681 free_lvb(lkb->lkb_lvbptr);
685 write_unlock(&ls->ls_lkbtbl[bucket].lock);
690 int dlm_put_lkb(struct dlm_lkb *lkb)
694 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
695 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
697 ls = lkb->lkb_resource->res_ls;
698 return __put_lkb(ls, lkb);
701 /* This is only called to add a reference when the code already holds
702 a valid reference to the lkb, so there's no need for locking. */
704 static inline void hold_lkb(struct dlm_lkb *lkb)
706 kref_get(&lkb->lkb_ref);
709 /* This is called when we need to remove a reference and are certain
710 it's not the last ref. e.g. del_lkb is always called between a
711 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
712 put_lkb would work fine, but would involve unnecessary locking */
714 static inline void unhold_lkb(struct dlm_lkb *lkb)
717 rv = kref_put(&lkb->lkb_ref, kill_lkb);
718 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
721 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
724 struct dlm_lkb *lkb = NULL;
726 list_for_each_entry(lkb, head, lkb_statequeue)
727 if (lkb->lkb_rqmode < mode)
731 list_add_tail(new, head);
733 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
736 /* add/remove lkb to rsb's grant/convert/wait queue */
738 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
740 kref_get(&lkb->lkb_ref);
742 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
744 lkb->lkb_status = status;
747 case DLM_LKSTS_WAITING:
748 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
751 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
753 case DLM_LKSTS_GRANTED:
754 /* convention says granted locks kept in order of grmode */
755 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
758 case DLM_LKSTS_CONVERT:
759 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
762 list_add_tail(&lkb->lkb_statequeue,
763 &r->res_convertqueue);
766 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
770 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
773 list_del(&lkb->lkb_statequeue);
777 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
781 add_lkb(r, lkb, sts);
785 static int msg_reply_type(int mstype)
788 case DLM_MSG_REQUEST:
789 return DLM_MSG_REQUEST_REPLY;
790 case DLM_MSG_CONVERT:
791 return DLM_MSG_CONVERT_REPLY;
793 return DLM_MSG_UNLOCK_REPLY;
795 return DLM_MSG_CANCEL_REPLY;
797 return DLM_MSG_LOOKUP_REPLY;
802 /* add/remove lkb from global waiters list of lkb's waiting for
803 a reply from a remote node */
805 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
807 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
810 mutex_lock(&ls->ls_waiters_mutex);
812 if (is_overlap_unlock(lkb) ||
813 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
818 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
821 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
824 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830 lkb->lkb_wait_count++;
833 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
834 lkb->lkb_id, lkb->lkb_wait_type, mstype,
835 lkb->lkb_wait_count, lkb->lkb_flags);
839 DLM_ASSERT(!lkb->lkb_wait_count,
841 printk("wait_count %d\n", lkb->lkb_wait_count););
843 lkb->lkb_wait_count++;
844 lkb->lkb_wait_type = mstype;
846 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
849 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
850 lkb->lkb_id, error, lkb->lkb_flags, mstype,
851 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
852 mutex_unlock(&ls->ls_waiters_mutex);
856 /* We clear the RESEND flag because we might be taking an lkb off the waiters
857 list as part of process_requestqueue (e.g. a lookup that has an optimized
858 request reply on the requestqueue) between dlm_recover_waiters_pre() which
859 set RESEND and dlm_recover_waiters_post() */
861 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
863 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
864 int overlap_done = 0;
866 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
867 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
872 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
873 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
878 /* N.B. type of reply may not always correspond to type of original
879 msg due to lookup->request optimization, verify others? */
881 if (lkb->lkb_wait_type) {
882 lkb->lkb_wait_type = 0;
886 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
887 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
891 /* the force-unlock/cancel has completed and we haven't recvd a reply
892 to the op that was in progress prior to the unlock/cancel; we
893 give up on any reply to the earlier op. FIXME: not sure when/how
896 if (overlap_done && lkb->lkb_wait_type) {
897 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
898 lkb->lkb_id, mstype, lkb->lkb_wait_type);
899 lkb->lkb_wait_count--;
900 lkb->lkb_wait_type = 0;
903 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
905 lkb->lkb_flags &= ~DLM_IFL_RESEND;
906 lkb->lkb_wait_count--;
907 if (!lkb->lkb_wait_count)
908 list_del_init(&lkb->lkb_wait_reply);
913 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
915 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
918 mutex_lock(&ls->ls_waiters_mutex);
919 error = _remove_from_waiters(lkb, mstype);
920 mutex_unlock(&ls->ls_waiters_mutex);
924 /* Handles situations where we might be processing a "fake" or "stub" reply in
925 which we can't try to take waiters_mutex again. */
927 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
929 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
932 if (ms != &ls->ls_stub_ms)
933 mutex_lock(&ls->ls_waiters_mutex);
934 error = _remove_from_waiters(lkb, ms->m_type);
935 if (ms != &ls->ls_stub_ms)
936 mutex_unlock(&ls->ls_waiters_mutex);
940 static void dir_remove(struct dlm_rsb *r)
944 if (dlm_no_directory(r->res_ls))
947 to_nodeid = dlm_dir_nodeid(r);
948 if (to_nodeid != dlm_our_nodeid())
951 dlm_dir_remove_entry(r->res_ls, to_nodeid,
952 r->res_name, r->res_length);
955 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
956 found since they are in order of newest to oldest? */
958 static int shrink_bucket(struct dlm_ls *ls, int b)
961 int count = 0, found;
965 write_lock(&ls->ls_rsbtbl[b].lock);
966 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
968 if (!time_after_eq(jiffies, r->res_toss_time +
969 dlm_config.ci_toss_secs * HZ))
976 write_unlock(&ls->ls_rsbtbl[b].lock);
980 if (kref_put(&r->res_ref, kill_rsb)) {
981 list_del(&r->res_hashchain);
982 write_unlock(&ls->ls_rsbtbl[b].lock);
989 write_unlock(&ls->ls_rsbtbl[b].lock);
990 log_error(ls, "tossed rsb in use %s", r->res_name);
997 void dlm_scan_rsbs(struct dlm_ls *ls)
1001 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1002 shrink_bucket(ls, i);
1003 if (dlm_locking_stopped(ls))
1009 static void add_timeout(struct dlm_lkb *lkb)
1011 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1013 if (is_master_copy(lkb))
1016 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1019 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1020 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1021 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1027 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1028 mutex_lock(&ls->ls_timeout_mutex);
1030 lkb->lkb_timestamp = jiffies;
1031 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1032 mutex_unlock(&ls->ls_timeout_mutex);
1035 static void del_timeout(struct dlm_lkb *lkb)
1037 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1039 mutex_lock(&ls->ls_timeout_mutex);
1040 if (!list_empty(&lkb->lkb_time_list)) {
1041 list_del_init(&lkb->lkb_time_list);
1044 mutex_unlock(&ls->ls_timeout_mutex);
1047 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1048 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1049 and then lock rsb because of lock ordering in add_timeout. We may need
1050 to specify some special timeout-related bits in the lkb that are just to
1051 be accessed under the timeout_mutex. */
1053 void dlm_scan_timeout(struct dlm_ls *ls)
1056 struct dlm_lkb *lkb;
1057 int do_cancel, do_warn;
1060 if (dlm_locking_stopped(ls))
1065 mutex_lock(&ls->ls_timeout_mutex);
1066 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1068 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1069 time_after_eq(jiffies, lkb->lkb_timestamp +
1070 lkb->lkb_timeout_cs * HZ/100))
1073 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1074 time_after_eq(jiffies, lkb->lkb_timestamp +
1075 dlm_config.ci_timewarn_cs * HZ/100))
1078 if (!do_cancel && !do_warn)
1083 mutex_unlock(&ls->ls_timeout_mutex);
1085 if (!do_cancel && !do_warn)
1088 r = lkb->lkb_resource;
1093 /* clear flag so we only warn once */
1094 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1095 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1097 dlm_timeout_warn(lkb);
1101 log_debug("timeout cancel %x node %d %s", lkb->lkb_id,
1102 lkb->lkb_nodeid, r->res_name);
1103 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1104 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1106 _cancel_lock(r, lkb);
1115 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1116 dlm_recoverd before checking/setting ls_recover_begin. */
1118 void dlm_adjust_timeouts(struct dlm_ls *ls)
1120 struct dlm_lkb *lkb;
1121 long adj = jiffies - ls->ls_recover_begin;
1123 ls->ls_recover_begin = 0;
1124 mutex_lock(&ls->ls_timeout_mutex);
1125 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1126 lkb->lkb_timestamp += adj;
1127 mutex_unlock(&ls->ls_timeout_mutex);
1130 /* lkb is master or local copy */
1132 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1134 int b, len = r->res_ls->ls_lvblen;
1136 /* b=1 lvb returned to caller
1137 b=0 lvb written to rsb or invalidated
1140 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1143 if (!lkb->lkb_lvbptr)
1146 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1152 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1153 lkb->lkb_lvbseq = r->res_lvbseq;
1155 } else if (b == 0) {
1156 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1157 rsb_set_flag(r, RSB_VALNOTVALID);
1161 if (!lkb->lkb_lvbptr)
1164 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1168 r->res_lvbptr = allocate_lvb(r->res_ls);
1173 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1175 lkb->lkb_lvbseq = r->res_lvbseq;
1176 rsb_clear_flag(r, RSB_VALNOTVALID);
1179 if (rsb_flag(r, RSB_VALNOTVALID))
1180 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1183 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1185 if (lkb->lkb_grmode < DLM_LOCK_PW)
1188 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1189 rsb_set_flag(r, RSB_VALNOTVALID);
1193 if (!lkb->lkb_lvbptr)
1196 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1200 r->res_lvbptr = allocate_lvb(r->res_ls);
1205 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1207 rsb_clear_flag(r, RSB_VALNOTVALID);
1210 /* lkb is process copy (pc) */
1212 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1213 struct dlm_message *ms)
1217 if (!lkb->lkb_lvbptr)
1220 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1223 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1225 int len = receive_extralen(ms);
1226 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1227 lkb->lkb_lvbseq = ms->m_lvbseq;
1231 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1232 remove_lock -- used for unlock, removes lkb from granted
1233 revert_lock -- used for cancel, moves lkb from convert to granted
1234 grant_lock -- used for request and convert, adds lkb to granted or
1235 moves lkb from convert or waiting to granted
1237 Each of these is used for master or local copy lkb's. There is
1238 also a _pc() variation used to make the corresponding change on
1239 a process copy (pc) lkb. */
1241 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1244 lkb->lkb_grmode = DLM_LOCK_IV;
1245 /* this unhold undoes the original ref from create_lkb()
1246 so this leads to the lkb being freed */
1250 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1252 set_lvb_unlock(r, lkb);
1253 _remove_lock(r, lkb);
1256 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1258 _remove_lock(r, lkb);
1261 /* returns: 0 did nothing
1262 1 moved lock to granted
1265 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1269 lkb->lkb_rqmode = DLM_LOCK_IV;
1271 switch (lkb->lkb_status) {
1272 case DLM_LKSTS_GRANTED:
1274 case DLM_LKSTS_CONVERT:
1275 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1278 case DLM_LKSTS_WAITING:
1280 lkb->lkb_grmode = DLM_LOCK_IV;
1281 /* this unhold undoes the original ref from create_lkb()
1282 so this leads to the lkb being freed */
1287 log_print("invalid status for revert %d", lkb->lkb_status);
1292 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1294 return revert_lock(r, lkb);
1297 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1299 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1300 lkb->lkb_grmode = lkb->lkb_rqmode;
1301 if (lkb->lkb_status)
1302 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1304 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1307 lkb->lkb_rqmode = DLM_LOCK_IV;
1310 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1312 set_lvb_lock(r, lkb);
1313 _grant_lock(r, lkb);
1314 lkb->lkb_highbast = 0;
1317 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1318 struct dlm_message *ms)
1320 set_lvb_lock_pc(r, lkb, ms);
1321 _grant_lock(r, lkb);
1324 /* called by grant_pending_locks() which means an async grant message must
1325 be sent to the requesting node in addition to granting the lock if the
1326 lkb belongs to a remote node. */
1328 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1331 if (is_master_copy(lkb))
1334 queue_cast(r, lkb, 0);
1337 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1338 change the granted/requested modes. We're munging things accordingly in
1340 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1342 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1343 compatible with other granted locks */
1345 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1347 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1348 log_print("munge_demoted %x invalid reply type %d",
1349 lkb->lkb_id, ms->m_type);
1353 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1354 log_print("munge_demoted %x invalid modes gr %d rq %d",
1355 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1359 lkb->lkb_grmode = DLM_LOCK_NL;
1362 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1364 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1365 ms->m_type != DLM_MSG_GRANT) {
1366 log_print("munge_altmode %x invalid reply type %d",
1367 lkb->lkb_id, ms->m_type);
1371 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1372 lkb->lkb_rqmode = DLM_LOCK_PR;
1373 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1374 lkb->lkb_rqmode = DLM_LOCK_CW;
1376 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1381 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1383 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1385 if (lkb->lkb_id == first->lkb_id)
1391 /* Check if the given lkb conflicts with another lkb on the queue. */
1393 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1395 struct dlm_lkb *this;
1397 list_for_each_entry(this, head, lkb_statequeue) {
1400 if (!modes_compat(this, lkb))
1407 * "A conversion deadlock arises with a pair of lock requests in the converting
1408 * queue for one resource. The granted mode of each lock blocks the requested
1409 * mode of the other lock."
1411 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1412 * convert queue from being granted, then demote lkb (set grmode to NL).
1413 * This second form requires that we check for conv-deadlk even when
1414 * now == 0 in _can_be_granted().
1417 * Granted Queue: empty
1418 * Convert Queue: NL->EX (first lock)
1419 * PR->EX (second lock)
1421 * The first lock can't be granted because of the granted mode of the second
1422 * lock and the second lock can't be granted because it's not first in the
1423 * list. We demote the granted mode of the second lock (the lkb passed to this
1426 * After the resolution, the "grant pending" function needs to go back and try
1427 * to grant locks on the convert queue again since the first lock can now be
1431 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1433 struct dlm_lkb *this, *first = NULL, *self = NULL;
1435 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1443 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1447 /* if lkb is on the convert queue and is preventing the first
1448 from being granted, then there's deadlock and we demote lkb.
1449 multiple converting locks may need to do this before the first
1450 converting lock can be granted. */
1452 if (self && self != first) {
1453 if (!modes_compat(lkb, first) &&
1454 !queue_conflict(&rsb->res_grantqueue, first))
1462 * Return 1 if the lock can be granted, 0 otherwise.
1463 * Also detect and resolve conversion deadlocks.
1465 * lkb is the lock to be granted
1467 * now is 1 if the function is being called in the context of the
1468 * immediate request, it is 0 if called later, after the lock has been
1471 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1474 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1476 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1479 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1480 * a new request for a NL mode lock being blocked.
1482 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1483 * request, then it would be granted. In essence, the use of this flag
1484 * tells the Lock Manager to expedite theis request by not considering
1485 * what may be in the CONVERTING or WAITING queues... As of this
1486 * writing, the EXPEDITE flag can be used only with new requests for NL
1487 * mode locks. This flag is not valid for conversion requests.
1489 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1490 * conversion or used with a non-NL requested mode. We also know an
1491 * EXPEDITE request is always granted immediately, so now must always
1492 * be 1. The full condition to grant an expedite request: (now &&
1493 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1494 * therefore be shortened to just checking the flag.
1497 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1501 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1502 * added to the remaining conditions.
1505 if (queue_conflict(&r->res_grantqueue, lkb))
1509 * 6-3: By default, a conversion request is immediately granted if the
1510 * requested mode is compatible with the modes of all other granted
1514 if (queue_conflict(&r->res_convertqueue, lkb))
1518 * 6-5: But the default algorithm for deciding whether to grant or
1519 * queue conversion requests does not by itself guarantee that such
1520 * requests are serviced on a "first come first serve" basis. This, in
1521 * turn, can lead to a phenomenon known as "indefinate postponement".
1523 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1524 * the system service employed to request a lock conversion. This flag
1525 * forces certain conversion requests to be queued, even if they are
1526 * compatible with the granted modes of other locks on the same
1527 * resource. Thus, the use of this flag results in conversion requests
1528 * being ordered on a "first come first servce" basis.
1530 * DCT: This condition is all about new conversions being able to occur
1531 * "in place" while the lock remains on the granted queue (assuming
1532 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1533 * doesn't _have_ to go onto the convert queue where it's processed in
1534 * order. The "now" variable is necessary to distinguish converts
1535 * being received and processed for the first time now, because once a
1536 * convert is moved to the conversion queue the condition below applies
1537 * requiring fifo granting.
1540 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1544 * The NOORDER flag is set to avoid the standard vms rules on grant
1548 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1552 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1553 * granted until all other conversion requests ahead of it are granted
1557 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1561 * 6-4: By default, a new request is immediately granted only if all
1562 * three of the following conditions are satisfied when the request is
1564 * - The queue of ungranted conversion requests for the resource is
1566 * - The queue of ungranted new requests for the resource is empty.
1567 * - The mode of the new request is compatible with the most
1568 * restrictive mode of all granted locks on the resource.
1571 if (now && !conv && list_empty(&r->res_convertqueue) &&
1572 list_empty(&r->res_waitqueue))
1576 * 6-4: Once a lock request is in the queue of ungranted new requests,
1577 * it cannot be granted until the queue of ungranted conversion
1578 * requests is empty, all ungranted new requests ahead of it are
1579 * granted and/or canceled, and it is compatible with the granted mode
1580 * of the most restrictive lock granted on the resource.
1583 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1584 first_in_list(lkb, &r->res_waitqueue))
1589 * The following, enabled by CONVDEADLK, departs from VMS.
1592 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1593 conversion_deadlock_detect(r, lkb)) {
1594 lkb->lkb_grmode = DLM_LOCK_NL;
1595 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1602 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1603 * simple way to provide a big optimization to applications that can use them.
1606 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1608 uint32_t flags = lkb->lkb_exflags;
1610 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1612 rv = _can_be_granted(r, lkb, now);
1616 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1619 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1621 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1625 lkb->lkb_rqmode = alt;
1626 rv = _can_be_granted(r, lkb, now);
1628 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1630 lkb->lkb_rqmode = rqmode;
1636 static int grant_pending_convert(struct dlm_rsb *r, int high)
1638 struct dlm_lkb *lkb, *s;
1639 int hi, demoted, quit, grant_restart, demote_restart;
1647 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1648 demoted = is_demoted(lkb);
1649 if (can_be_granted(r, lkb, 0)) {
1650 grant_lock_pending(r, lkb);
1653 hi = max_t(int, lkb->lkb_rqmode, hi);
1654 if (!demoted && is_demoted(lkb))
1661 if (demote_restart && !quit) {
1666 return max_t(int, high, hi);
1669 static int grant_pending_wait(struct dlm_rsb *r, int high)
1671 struct dlm_lkb *lkb, *s;
1673 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1674 if (can_be_granted(r, lkb, 0))
1675 grant_lock_pending(r, lkb);
1677 high = max_t(int, lkb->lkb_rqmode, high);
1683 static void grant_pending_locks(struct dlm_rsb *r)
1685 struct dlm_lkb *lkb, *s;
1686 int high = DLM_LOCK_IV;
1688 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1690 high = grant_pending_convert(r, high);
1691 high = grant_pending_wait(r, high);
1693 if (high == DLM_LOCK_IV)
1697 * If there are locks left on the wait/convert queue then send blocking
1698 * ASTs to granted locks based on the largest requested mode (high)
1699 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1702 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1703 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1704 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1705 queue_bast(r, lkb, high);
1706 lkb->lkb_highbast = high;
1711 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1712 struct dlm_lkb *lkb)
1716 list_for_each_entry(gr, head, lkb_statequeue) {
1717 if (gr->lkb_bastaddr &&
1718 gr->lkb_highbast < lkb->lkb_rqmode &&
1719 !modes_compat(gr, lkb)) {
1720 queue_bast(r, gr, lkb->lkb_rqmode);
1721 gr->lkb_highbast = lkb->lkb_rqmode;
1726 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1728 send_bast_queue(r, &r->res_grantqueue, lkb);
1731 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1733 send_bast_queue(r, &r->res_grantqueue, lkb);
1734 send_bast_queue(r, &r->res_convertqueue, lkb);
1737 /* set_master(r, lkb) -- set the master nodeid of a resource
1739 The purpose of this function is to set the nodeid field in the given
1740 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1741 known, it can just be copied to the lkb and the function will return
1742 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1743 before it can be copied to the lkb.
1745 When the rsb nodeid is being looked up remotely, the initial lkb
1746 causing the lookup is kept on the ls_waiters list waiting for the
1747 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1748 on the rsb's res_lookup list until the master is verified.
1751 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1752 1: the rsb master is not available and the lkb has been placed on
1756 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1758 struct dlm_ls *ls = r->res_ls;
1759 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1761 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1762 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1763 r->res_first_lkid = lkb->lkb_id;
1764 lkb->lkb_nodeid = r->res_nodeid;
1768 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1769 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1773 if (r->res_nodeid == 0) {
1774 lkb->lkb_nodeid = 0;
1778 if (r->res_nodeid > 0) {
1779 lkb->lkb_nodeid = r->res_nodeid;
1783 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1785 dir_nodeid = dlm_dir_nodeid(r);
1787 if (dir_nodeid != our_nodeid) {
1788 r->res_first_lkid = lkb->lkb_id;
1789 send_lookup(r, lkb);
1794 /* It's possible for dlm_scand to remove an old rsb for
1795 this same resource from the toss list, us to create
1796 a new one, look up the master locally, and find it
1797 already exists just before dlm_scand does the
1798 dir_remove() on the previous rsb. */
1800 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1801 r->res_length, &ret_nodeid);
1804 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1808 if (ret_nodeid == our_nodeid) {
1809 r->res_first_lkid = 0;
1811 lkb->lkb_nodeid = 0;
1813 r->res_first_lkid = lkb->lkb_id;
1814 r->res_nodeid = ret_nodeid;
1815 lkb->lkb_nodeid = ret_nodeid;
1820 static void process_lookup_list(struct dlm_rsb *r)
1822 struct dlm_lkb *lkb, *safe;
1824 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1825 list_del_init(&lkb->lkb_rsb_lookup);
1826 _request_lock(r, lkb);
1831 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1833 static void confirm_master(struct dlm_rsb *r, int error)
1835 struct dlm_lkb *lkb;
1837 if (!r->res_first_lkid)
1843 r->res_first_lkid = 0;
1844 process_lookup_list(r);
1848 /* the remote master didn't queue our NOQUEUE request;
1849 make a waiting lkb the first_lkid */
1851 r->res_first_lkid = 0;
1853 if (!list_empty(&r->res_lookup)) {
1854 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1856 list_del_init(&lkb->lkb_rsb_lookup);
1857 r->res_first_lkid = lkb->lkb_id;
1858 _request_lock(r, lkb);
1864 log_error(r->res_ls, "confirm_master unknown error %d", error);
1868 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1869 int namelen, unsigned long timeout_cs, void *ast,
1870 void *astarg, void *bast, struct dlm_args *args)
1874 /* check for invalid arg usage */
1876 if (mode < 0 || mode > DLM_LOCK_EX)
1879 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1882 if (flags & DLM_LKF_CANCEL)
1885 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1888 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1891 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1894 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1897 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1900 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1903 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1909 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1912 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1915 /* these args will be copied to the lkb in validate_lock_args,
1916 it cannot be done now because when converting locks, fields in
1917 an active lkb cannot be modified before locking the rsb */
1919 args->flags = flags;
1920 args->astaddr = ast;
1921 args->astparam = (long) astarg;
1922 args->bastaddr = bast;
1923 args->timeout = timeout_cs;
1931 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1933 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1934 DLM_LKF_FORCEUNLOCK))
1937 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1940 args->flags = flags;
1941 args->astparam = (long) astarg;
1945 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1946 struct dlm_args *args)
1950 if (args->flags & DLM_LKF_CONVERT) {
1951 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1954 if (args->flags & DLM_LKF_QUECVT &&
1955 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1959 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1962 if (lkb->lkb_wait_type)
1965 if (is_overlap(lkb))
1969 lkb->lkb_exflags = args->flags;
1970 lkb->lkb_sbflags = 0;
1971 lkb->lkb_astaddr = args->astaddr;
1972 lkb->lkb_astparam = args->astparam;
1973 lkb->lkb_bastaddr = args->bastaddr;
1974 lkb->lkb_rqmode = args->mode;
1975 lkb->lkb_lksb = args->lksb;
1976 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1977 lkb->lkb_ownpid = (int) current->pid;
1978 lkb->lkb_timeout_cs = args->timeout;
1984 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1987 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1988 because there may be a lookup in progress and it's valid to do
1989 cancel/unlockf on it */
1991 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1993 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1996 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1997 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2002 /* an lkb may still exist even though the lock is EOL'ed due to a
2003 cancel, unlock or failed noqueue request; an app can't use these
2004 locks; return same error as if the lkid had not been found at all */
2006 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2007 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2012 /* an lkb may be waiting for an rsb lookup to complete where the
2013 lookup was initiated by another lock */
2015 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2016 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2017 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2018 list_del_init(&lkb->lkb_rsb_lookup);
2019 queue_cast(lkb->lkb_resource, lkb,
2020 args->flags & DLM_LKF_CANCEL ?
2021 -DLM_ECANCEL : -DLM_EUNLOCK);
2022 unhold_lkb(lkb); /* undoes create_lkb() */
2028 /* cancel not allowed with another cancel/unlock in progress */
2030 if (args->flags & DLM_LKF_CANCEL) {
2031 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2034 if (is_overlap(lkb))
2037 /* don't let scand try to do a cancel */
2040 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2041 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2046 switch (lkb->lkb_wait_type) {
2047 case DLM_MSG_LOOKUP:
2048 case DLM_MSG_REQUEST:
2049 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2052 case DLM_MSG_UNLOCK:
2053 case DLM_MSG_CANCEL:
2056 /* add_to_waiters() will set OVERLAP_CANCEL */
2060 /* do we need to allow a force-unlock if there's a normal unlock
2061 already in progress? in what conditions could the normal unlock
2062 fail such that we'd want to send a force-unlock to be sure? */
2064 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2065 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2068 if (is_overlap_unlock(lkb))
2071 /* don't let scand try to do a cancel */
2074 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2075 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2080 switch (lkb->lkb_wait_type) {
2081 case DLM_MSG_LOOKUP:
2082 case DLM_MSG_REQUEST:
2083 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2086 case DLM_MSG_UNLOCK:
2089 /* add_to_waiters() will set OVERLAP_UNLOCK */
2093 /* normal unlock not allowed if there's any op in progress */
2095 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2099 /* an overlapping op shouldn't blow away exflags from other op */
2100 lkb->lkb_exflags |= args->flags;
2101 lkb->lkb_sbflags = 0;
2102 lkb->lkb_astparam = args->astparam;
2106 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2107 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2108 args->flags, lkb->lkb_wait_type,
2109 lkb->lkb_resource->res_name);
2114 * Four stage 4 varieties:
2115 * do_request(), do_convert(), do_unlock(), do_cancel()
2116 * These are called on the master node for the given lock and
2117 * from the central locking logic.
2120 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2124 if (can_be_granted(r, lkb, 1)) {
2126 queue_cast(r, lkb, 0);
2130 if (can_be_queued(lkb)) {
2131 error = -EINPROGRESS;
2132 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2133 send_blocking_asts(r, lkb);
2139 if (force_blocking_asts(lkb))
2140 send_blocking_asts_all(r, lkb);
2141 queue_cast(r, lkb, -EAGAIN);
2147 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2151 /* changing an existing lock may allow others to be granted */
2153 if (can_be_granted(r, lkb, 1)) {
2155 queue_cast(r, lkb, 0);
2156 grant_pending_locks(r);
2160 /* is_demoted() means the can_be_granted() above set the grmode
2161 to NL, and left us on the granted queue. This auto-demotion
2162 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2163 now grantable. We have to try to grant other converting locks
2164 before we try again to grant this one. */
2166 if (is_demoted(lkb)) {
2167 grant_pending_convert(r, DLM_LOCK_IV);
2168 if (_can_be_granted(r, lkb, 1)) {
2170 queue_cast(r, lkb, 0);
2171 grant_pending_locks(r);
2174 /* else fall through and move to convert queue */
2177 if (can_be_queued(lkb)) {
2178 error = -EINPROGRESS;
2180 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2181 send_blocking_asts(r, lkb);
2187 if (force_blocking_asts(lkb))
2188 send_blocking_asts_all(r, lkb);
2189 queue_cast(r, lkb, -EAGAIN);
2195 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2197 remove_lock(r, lkb);
2198 queue_cast(r, lkb, -DLM_EUNLOCK);
2199 grant_pending_locks(r);
2200 return -DLM_EUNLOCK;
2203 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2205 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2209 error = revert_lock(r, lkb);
2211 queue_cast(r, lkb, -DLM_ECANCEL);
2212 grant_pending_locks(r);
2213 return -DLM_ECANCEL;
2219 * Four stage 3 varieties:
2220 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2223 /* add a new lkb to a possibly new rsb, called by requesting process */
2225 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2229 /* set_master: sets lkb nodeid from r */
2231 error = set_master(r, lkb);
2240 /* receive_request() calls do_request() on remote node */
2241 error = send_request(r, lkb);
2243 error = do_request(r, lkb);
2248 /* change some property of an existing lkb, e.g. mode */
2250 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2255 /* receive_convert() calls do_convert() on remote node */
2256 error = send_convert(r, lkb);
2258 error = do_convert(r, lkb);
2263 /* remove an existing lkb from the granted queue */
2265 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2270 /* receive_unlock() calls do_unlock() on remote node */
2271 error = send_unlock(r, lkb);
2273 error = do_unlock(r, lkb);
2278 /* remove an existing lkb from the convert or wait queue */
2280 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2285 /* receive_cancel() calls do_cancel() on remote node */
2286 error = send_cancel(r, lkb);
2288 error = do_cancel(r, lkb);
2294 * Four stage 2 varieties:
2295 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2298 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2299 int len, struct dlm_args *args)
2304 error = validate_lock_args(ls, lkb, args);
2308 error = find_rsb(ls, name, len, R_CREATE, &r);
2315 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2317 error = _request_lock(r, lkb);
2326 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2327 struct dlm_args *args)
2332 r = lkb->lkb_resource;
2337 error = validate_lock_args(ls, lkb, args);
2341 error = _convert_lock(r, lkb);
2348 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2349 struct dlm_args *args)
2354 r = lkb->lkb_resource;
2359 error = validate_unlock_args(lkb, args);
2363 error = _unlock_lock(r, lkb);
2370 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2371 struct dlm_args *args)
2376 r = lkb->lkb_resource;
2381 error = validate_unlock_args(lkb, args);
2385 error = _cancel_lock(r, lkb);
2393 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2396 int dlm_lock(dlm_lockspace_t *lockspace,
2398 struct dlm_lksb *lksb,
2401 unsigned int namelen,
2402 uint32_t parent_lkid,
2403 void (*ast) (void *astarg),
2405 void (*bast) (void *astarg, int mode))
2408 struct dlm_lkb *lkb;
2409 struct dlm_args args;
2410 int error, convert = flags & DLM_LKF_CONVERT;
2412 ls = dlm_find_lockspace_local(lockspace);
2416 dlm_lock_recovery(ls);
2419 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2421 error = create_lkb(ls, &lkb);
2426 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2427 astarg, bast, &args);
2432 error = convert_lock(ls, lkb, &args);
2434 error = request_lock(ls, lkb, name, namelen, &args);
2436 if (error == -EINPROGRESS)
2439 if (convert || error)
2441 if (error == -EAGAIN)
2444 dlm_unlock_recovery(ls);
2445 dlm_put_lockspace(ls);
2449 int dlm_unlock(dlm_lockspace_t *lockspace,
2452 struct dlm_lksb *lksb,
2456 struct dlm_lkb *lkb;
2457 struct dlm_args args;
2460 ls = dlm_find_lockspace_local(lockspace);
2464 dlm_lock_recovery(ls);
2466 error = find_lkb(ls, lkid, &lkb);
2470 error = set_unlock_args(flags, astarg, &args);
2474 if (flags & DLM_LKF_CANCEL)
2475 error = cancel_lock(ls, lkb, &args);
2477 error = unlock_lock(ls, lkb, &args);
2479 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2481 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2486 dlm_unlock_recovery(ls);
2487 dlm_put_lockspace(ls);
2492 * send/receive routines for remote operations and replies
2496 * send_request receive_request
2497 * send_convert receive_convert
2498 * send_unlock receive_unlock
2499 * send_cancel receive_cancel
2500 * send_grant receive_grant
2501 * send_bast receive_bast
2502 * send_lookup receive_lookup
2503 * send_remove receive_remove
2506 * receive_request_reply send_request_reply
2507 * receive_convert_reply send_convert_reply
2508 * receive_unlock_reply send_unlock_reply
2509 * receive_cancel_reply send_cancel_reply
2510 * receive_lookup_reply send_lookup_reply
2513 static int _create_message(struct dlm_ls *ls, int mb_len,
2514 int to_nodeid, int mstype,
2515 struct dlm_message **ms_ret,
2516 struct dlm_mhandle **mh_ret)
2518 struct dlm_message *ms;
2519 struct dlm_mhandle *mh;
2522 /* get_buffer gives us a message handle (mh) that we need to
2523 pass into lowcomms_commit and a message buffer (mb) that we
2524 write our data into */
2526 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2530 memset(mb, 0, mb_len);
2532 ms = (struct dlm_message *) mb;
2534 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2535 ms->m_header.h_lockspace = ls->ls_global_id;
2536 ms->m_header.h_nodeid = dlm_our_nodeid();
2537 ms->m_header.h_length = mb_len;
2538 ms->m_header.h_cmd = DLM_MSG;
2540 ms->m_type = mstype;
2547 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2548 int to_nodeid, int mstype,
2549 struct dlm_message **ms_ret,
2550 struct dlm_mhandle **mh_ret)
2552 int mb_len = sizeof(struct dlm_message);
2555 case DLM_MSG_REQUEST:
2556 case DLM_MSG_LOOKUP:
2557 case DLM_MSG_REMOVE:
2558 mb_len += r->res_length;
2560 case DLM_MSG_CONVERT:
2561 case DLM_MSG_UNLOCK:
2562 case DLM_MSG_REQUEST_REPLY:
2563 case DLM_MSG_CONVERT_REPLY:
2565 if (lkb && lkb->lkb_lvbptr)
2566 mb_len += r->res_ls->ls_lvblen;
2570 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2574 /* further lowcomms enhancements or alternate implementations may make
2575 the return value from this function useful at some point */
2577 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2579 dlm_message_out(ms);
2580 dlm_lowcomms_commit_buffer(mh);
2584 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2585 struct dlm_message *ms)
2587 ms->m_nodeid = lkb->lkb_nodeid;
2588 ms->m_pid = lkb->lkb_ownpid;
2589 ms->m_lkid = lkb->lkb_id;
2590 ms->m_remid = lkb->lkb_remid;
2591 ms->m_exflags = lkb->lkb_exflags;
2592 ms->m_sbflags = lkb->lkb_sbflags;
2593 ms->m_flags = lkb->lkb_flags;
2594 ms->m_lvbseq = lkb->lkb_lvbseq;
2595 ms->m_status = lkb->lkb_status;
2596 ms->m_grmode = lkb->lkb_grmode;
2597 ms->m_rqmode = lkb->lkb_rqmode;
2598 ms->m_hash = r->res_hash;
2600 /* m_result and m_bastmode are set from function args,
2601 not from lkb fields */
2603 if (lkb->lkb_bastaddr)
2604 ms->m_asts |= AST_BAST;
2605 if (lkb->lkb_astaddr)
2606 ms->m_asts |= AST_COMP;
2608 /* compare with switch in create_message; send_remove() doesn't
2611 switch (ms->m_type) {
2612 case DLM_MSG_REQUEST:
2613 case DLM_MSG_LOOKUP:
2614 memcpy(ms->m_extra, r->res_name, r->res_length);
2616 case DLM_MSG_CONVERT:
2617 case DLM_MSG_UNLOCK:
2618 case DLM_MSG_REQUEST_REPLY:
2619 case DLM_MSG_CONVERT_REPLY:
2621 if (!lkb->lkb_lvbptr)
2623 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2628 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2630 struct dlm_message *ms;
2631 struct dlm_mhandle *mh;
2632 int to_nodeid, error;
2634 error = add_to_waiters(lkb, mstype);
2638 to_nodeid = r->res_nodeid;
2640 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2644 send_args(r, lkb, ms);
2646 error = send_message(mh, ms);
2652 remove_from_waiters(lkb, msg_reply_type(mstype));
2656 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2658 return send_common(r, lkb, DLM_MSG_REQUEST);
2661 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2665 error = send_common(r, lkb, DLM_MSG_CONVERT);
2667 /* down conversions go without a reply from the master */
2668 if (!error && down_conversion(lkb)) {
2669 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2670 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2671 r->res_ls->ls_stub_ms.m_result = 0;
2672 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2673 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2679 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2680 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2681 that the master is still correct. */
2683 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2685 return send_common(r, lkb, DLM_MSG_UNLOCK);
2688 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2690 return send_common(r, lkb, DLM_MSG_CANCEL);
2693 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2695 struct dlm_message *ms;
2696 struct dlm_mhandle *mh;
2697 int to_nodeid, error;
2699 to_nodeid = lkb->lkb_nodeid;
2701 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2705 send_args(r, lkb, ms);
2709 error = send_message(mh, ms);
2714 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2716 struct dlm_message *ms;
2717 struct dlm_mhandle *mh;
2718 int to_nodeid, error;
2720 to_nodeid = lkb->lkb_nodeid;
2722 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2726 send_args(r, lkb, ms);
2728 ms->m_bastmode = mode;
2730 error = send_message(mh, ms);
2735 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2737 struct dlm_message *ms;
2738 struct dlm_mhandle *mh;
2739 int to_nodeid, error;
2741 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2745 to_nodeid = dlm_dir_nodeid(r);
2747 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2751 send_args(r, lkb, ms);
2753 error = send_message(mh, ms);
2759 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2763 static int send_remove(struct dlm_rsb *r)
2765 struct dlm_message *ms;
2766 struct dlm_mhandle *mh;
2767 int to_nodeid, error;
2769 to_nodeid = dlm_dir_nodeid(r);
2771 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2775 memcpy(ms->m_extra, r->res_name, r->res_length);
2776 ms->m_hash = r->res_hash;
2778 error = send_message(mh, ms);
2783 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2786 struct dlm_message *ms;
2787 struct dlm_mhandle *mh;
2788 int to_nodeid, error;
2790 to_nodeid = lkb->lkb_nodeid;
2792 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2796 send_args(r, lkb, ms);
2800 error = send_message(mh, ms);
2805 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2807 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2810 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2812 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2815 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2817 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2820 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2822 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2825 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2826 int ret_nodeid, int rv)
2828 struct dlm_rsb *r = &ls->ls_stub_rsb;
2829 struct dlm_message *ms;
2830 struct dlm_mhandle *mh;
2831 int error, nodeid = ms_in->m_header.h_nodeid;
2833 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2837 ms->m_lkid = ms_in->m_lkid;
2839 ms->m_nodeid = ret_nodeid;
2841 error = send_message(mh, ms);
2846 /* which args we save from a received message depends heavily on the type
2847 of message, unlike the send side where we can safely send everything about
2848 the lkb for any type of message */
2850 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2852 lkb->lkb_exflags = ms->m_exflags;
2853 lkb->lkb_sbflags = ms->m_sbflags;
2854 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2855 (ms->m_flags & 0x0000FFFF);
2858 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2860 lkb->lkb_sbflags = ms->m_sbflags;
2861 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2862 (ms->m_flags & 0x0000FFFF);
2865 static int receive_extralen(struct dlm_message *ms)
2867 return (ms->m_header.h_length - sizeof(struct dlm_message));
2870 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2871 struct dlm_message *ms)
2875 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2876 if (!lkb->lkb_lvbptr)
2877 lkb->lkb_lvbptr = allocate_lvb(ls);
2878 if (!lkb->lkb_lvbptr)
2880 len = receive_extralen(ms);
2881 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2886 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2887 struct dlm_message *ms)
2889 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2890 lkb->lkb_ownpid = ms->m_pid;
2891 lkb->lkb_remid = ms->m_lkid;
2892 lkb->lkb_grmode = DLM_LOCK_IV;
2893 lkb->lkb_rqmode = ms->m_rqmode;
2894 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2895 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2897 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2899 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2900 /* lkb was just created so there won't be an lvb yet */
2901 lkb->lkb_lvbptr = allocate_lvb(ls);
2902 if (!lkb->lkb_lvbptr)
2909 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2910 struct dlm_message *ms)
2912 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2913 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2914 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2915 lkb->lkb_id, lkb->lkb_remid);
2919 if (!is_master_copy(lkb))
2922 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2925 if (receive_lvb(ls, lkb, ms))
2928 lkb->lkb_rqmode = ms->m_rqmode;
2929 lkb->lkb_lvbseq = ms->m_lvbseq;
2934 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2935 struct dlm_message *ms)
2937 if (!is_master_copy(lkb))
2939 if (receive_lvb(ls, lkb, ms))
2944 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2945 uses to send a reply and that the remote end uses to process the reply. */
2947 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2949 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2950 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2951 lkb->lkb_remid = ms->m_lkid;
2954 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2956 struct dlm_lkb *lkb;
2960 error = create_lkb(ls, &lkb);
2964 receive_flags(lkb, ms);
2965 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2966 error = receive_request_args(ls, lkb, ms);
2972 namelen = receive_extralen(ms);
2974 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2983 error = do_request(r, lkb);
2984 send_request_reply(r, lkb, error);
2989 if (error == -EINPROGRESS)
2996 setup_stub_lkb(ls, ms);
2997 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3000 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3002 struct dlm_lkb *lkb;
3004 int error, reply = 1;
3006 error = find_lkb(ls, ms->m_remid, &lkb);
3010 r = lkb->lkb_resource;
3015 receive_flags(lkb, ms);
3016 error = receive_convert_args(ls, lkb, ms);
3019 reply = !down_conversion(lkb);
3021 error = do_convert(r, lkb);
3024 send_convert_reply(r, lkb, error);
3032 setup_stub_lkb(ls, ms);
3033 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3036 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3038 struct dlm_lkb *lkb;
3042 error = find_lkb(ls, ms->m_remid, &lkb);
3046 r = lkb->lkb_resource;
3051 receive_flags(lkb, ms);
3052 error = receive_unlock_args(ls, lkb, ms);
3056 error = do_unlock(r, lkb);
3058 send_unlock_reply(r, lkb, error);
3066 setup_stub_lkb(ls, ms);
3067 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3070 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3072 struct dlm_lkb *lkb;
3076 error = find_lkb(ls, ms->m_remid, &lkb);
3080 receive_flags(lkb, ms);
3082 r = lkb->lkb_resource;
3087 error = do_cancel(r, lkb);
3088 send_cancel_reply(r, lkb, error);
3096 setup_stub_lkb(ls, ms);
3097 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3100 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3102 struct dlm_lkb *lkb;
3106 error = find_lkb(ls, ms->m_remid, &lkb);
3108 log_error(ls, "receive_grant no lkb");
3111 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3113 r = lkb->lkb_resource;
3118 receive_flags_reply(lkb, ms);
3119 if (is_altmode(lkb))
3120 munge_altmode(lkb, ms);
3121 grant_lock_pc(r, lkb, ms);
3122 queue_cast(r, lkb, 0);
3129 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3131 struct dlm_lkb *lkb;
3135 error = find_lkb(ls, ms->m_remid, &lkb);
3137 log_error(ls, "receive_bast no lkb");
3140 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3142 r = lkb->lkb_resource;
3147 queue_bast(r, lkb, ms->m_bastmode);
3154 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3156 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3158 from_nodeid = ms->m_header.h_nodeid;
3159 our_nodeid = dlm_our_nodeid();
3161 len = receive_extralen(ms);
3163 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3164 if (dir_nodeid != our_nodeid) {
3165 log_error(ls, "lookup dir_nodeid %d from %d",
3166 dir_nodeid, from_nodeid);
3172 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3174 /* Optimization: we're master so treat lookup as a request */
3175 if (!error && ret_nodeid == our_nodeid) {
3176 receive_request(ls, ms);
3180 send_lookup_reply(ls, ms, ret_nodeid, error);
3183 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3185 int len, dir_nodeid, from_nodeid;
3187 from_nodeid = ms->m_header.h_nodeid;
3189 len = receive_extralen(ms);
3191 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3192 if (dir_nodeid != dlm_our_nodeid()) {
3193 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3194 dir_nodeid, from_nodeid);
3198 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3201 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3203 do_purge(ls, ms->m_nodeid, ms->m_pid);
3206 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3208 struct dlm_lkb *lkb;
3210 int error, mstype, result;
3212 error = find_lkb(ls, ms->m_remid, &lkb);
3214 log_error(ls, "receive_request_reply no lkb");
3217 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3219 r = lkb->lkb_resource;
3223 mstype = lkb->lkb_wait_type;
3224 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3228 /* Optimization: the dir node was also the master, so it took our
3229 lookup as a request and sent request reply instead of lookup reply */
3230 if (mstype == DLM_MSG_LOOKUP) {
3231 r->res_nodeid = ms->m_header.h_nodeid;
3232 lkb->lkb_nodeid = r->res_nodeid;
3235 /* this is the value returned from do_request() on the master */
3236 result = ms->m_result;
3240 /* request would block (be queued) on remote master */
3241 queue_cast(r, lkb, -EAGAIN);
3242 confirm_master(r, -EAGAIN);
3243 unhold_lkb(lkb); /* undoes create_lkb() */
3248 /* request was queued or granted on remote master */
3249 receive_flags_reply(lkb, ms);
3250 lkb->lkb_remid = ms->m_lkid;
3251 if (is_altmode(lkb))
3252 munge_altmode(lkb, ms);
3254 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3257 grant_lock_pc(r, lkb, ms);
3258 queue_cast(r, lkb, 0);
3260 confirm_master(r, result);
3265 /* find_rsb failed to find rsb or rsb wasn't master */
3266 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3267 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3269 lkb->lkb_nodeid = -1;
3271 if (is_overlap(lkb)) {
3272 /* we'll ignore error in cancel/unlock reply */
3273 queue_cast_overlap(r, lkb);
3274 unhold_lkb(lkb); /* undoes create_lkb() */
3276 _request_lock(r, lkb);
3280 log_error(ls, "receive_request_reply %x error %d",
3281 lkb->lkb_id, result);
3284 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3285 log_debug(ls, "receive_request_reply %x result %d unlock",
3286 lkb->lkb_id, result);
3287 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3288 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3289 send_unlock(r, lkb);
3290 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3291 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3292 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3293 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3294 send_cancel(r, lkb);
3296 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3297 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3305 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3306 struct dlm_message *ms)
3308 /* this is the value returned from do_convert() on the master */
3309 switch (ms->m_result) {
3311 /* convert would block (be queued) on remote master */
3312 queue_cast(r, lkb, -EAGAIN);
3316 /* convert was queued on remote master */
3317 receive_flags_reply(lkb, ms);
3318 if (is_demoted(lkb))
3319 munge_demoted(lkb, ms);
3321 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3326 /* convert was granted on remote master */
3327 receive_flags_reply(lkb, ms);
3328 if (is_demoted(lkb))
3329 munge_demoted(lkb, ms);
3330 grant_lock_pc(r, lkb, ms);
3331 queue_cast(r, lkb, 0);
3335 log_error(r->res_ls, "receive_convert_reply %x error %d",
3336 lkb->lkb_id, ms->m_result);
3340 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3342 struct dlm_rsb *r = lkb->lkb_resource;
3348 /* stub reply can happen with waiters_mutex held */
3349 error = remove_from_waiters_ms(lkb, ms);
3353 __receive_convert_reply(r, lkb, ms);
3359 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3361 struct dlm_lkb *lkb;
3364 error = find_lkb(ls, ms->m_remid, &lkb);
3366 log_error(ls, "receive_convert_reply no lkb");
3369 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3371 _receive_convert_reply(lkb, ms);
3375 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3377 struct dlm_rsb *r = lkb->lkb_resource;
3383 /* stub reply can happen with waiters_mutex held */
3384 error = remove_from_waiters_ms(lkb, ms);
3388 /* this is the value returned from do_unlock() on the master */
3390 switch (ms->m_result) {
3392 receive_flags_reply(lkb, ms);
3393 remove_lock_pc(r, lkb);
3394 queue_cast(r, lkb, -DLM_EUNLOCK);
3399 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3400 lkb->lkb_id, ms->m_result);
3407 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3409 struct dlm_lkb *lkb;
3412 error = find_lkb(ls, ms->m_remid, &lkb);
3414 log_error(ls, "receive_unlock_reply no lkb");
3417 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3419 _receive_unlock_reply(lkb, ms);
3423 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3425 struct dlm_rsb *r = lkb->lkb_resource;
3431 /* stub reply can happen with waiters_mutex held */
3432 error = remove_from_waiters_ms(lkb, ms);
3436 /* this is the value returned from do_cancel() on the master */
3438 switch (ms->m_result) {
3440 receive_flags_reply(lkb, ms);
3441 revert_lock_pc(r, lkb);
3443 queue_cast(r, lkb, -DLM_ECANCEL);
3448 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3449 lkb->lkb_id, ms->m_result);
3456 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3458 struct dlm_lkb *lkb;
3461 error = find_lkb(ls, ms->m_remid, &lkb);
3463 log_error(ls, "receive_cancel_reply no lkb");
3466 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3468 _receive_cancel_reply(lkb, ms);
3472 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3474 struct dlm_lkb *lkb;
3476 int error, ret_nodeid;
3478 error = find_lkb(ls, ms->m_lkid, &lkb);
3480 log_error(ls, "receive_lookup_reply no lkb");
3484 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3485 FIXME: will a non-zero error ever be returned? */
3487 r = lkb->lkb_resource;
3491 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3495 ret_nodeid = ms->m_nodeid;
3496 if (ret_nodeid == dlm_our_nodeid()) {
3499 r->res_first_lkid = 0;
3501 /* set_master() will copy res_nodeid to lkb_nodeid */
3502 r->res_nodeid = ret_nodeid;
3505 if (is_overlap(lkb)) {
3506 log_debug(ls, "receive_lookup_reply %x unlock %x",
3507 lkb->lkb_id, lkb->lkb_flags);
3508 queue_cast_overlap(r, lkb);
3509 unhold_lkb(lkb); /* undoes create_lkb() */
3513 _request_lock(r, lkb);
3517 process_lookup_list(r);
3524 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3526 struct dlm_message *ms = (struct dlm_message *) hd;
3533 ls = dlm_find_lockspace_global(hd->h_lockspace);
3535 log_print("drop message %d from %d for unknown lockspace %d",
3536 ms->m_type, nodeid, hd->h_lockspace);
3540 /* recovery may have just ended leaving a bunch of backed-up requests
3541 in the requestqueue; wait while dlm_recoverd clears them */
3544 dlm_wait_requestqueue(ls);
3546 /* recovery may have just started while there were a bunch of
3547 in-flight requests -- save them in requestqueue to be processed
3548 after recovery. we can't let dlm_recvd block on the recovery
3549 lock. if dlm_recoverd is calling this function to clear the
3550 requestqueue, it needs to be interrupted (-EINTR) if another
3551 recovery operation is starting. */
3554 if (dlm_locking_stopped(ls)) {
3559 error = dlm_add_requestqueue(ls, nodeid, hd);
3560 if (error == -EAGAIN)
3568 if (dlm_lock_recovery_try(ls))
3573 switch (ms->m_type) {
3575 /* messages sent to a master node */
3577 case DLM_MSG_REQUEST:
3578 receive_request(ls, ms);
3581 case DLM_MSG_CONVERT:
3582 receive_convert(ls, ms);
3585 case DLM_MSG_UNLOCK:
3586 receive_unlock(ls, ms);
3589 case DLM_MSG_CANCEL:
3590 receive_cancel(ls, ms);
3593 /* messages sent from a master node (replies to above) */
3595 case DLM_MSG_REQUEST_REPLY:
3596 receive_request_reply(ls, ms);
3599 case DLM_MSG_CONVERT_REPLY:
3600 receive_convert_reply(ls, ms);
3603 case DLM_MSG_UNLOCK_REPLY:
3604 receive_unlock_reply(ls, ms);
3607 case DLM_MSG_CANCEL_REPLY:
3608 receive_cancel_reply(ls, ms);
3611 /* messages sent from a master node (only two types of async msg) */
3614 receive_grant(ls, ms);
3618 receive_bast(ls, ms);
3621 /* messages sent to a dir node */
3623 case DLM_MSG_LOOKUP:
3624 receive_lookup(ls, ms);
3627 case DLM_MSG_REMOVE:
3628 receive_remove(ls, ms);
3631 /* messages sent from a dir node (remove has no reply) */
3633 case DLM_MSG_LOOKUP_REPLY:
3634 receive_lookup_reply(ls, ms);
3637 /* other messages */
3640 receive_purge(ls, ms);
3644 log_error(ls, "unknown message type %d", ms->m_type);
3647 dlm_unlock_recovery(ls);
3649 dlm_put_lockspace(ls);
3659 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3661 if (middle_conversion(lkb)) {
3663 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3664 ls->ls_stub_ms.m_result = -EINPROGRESS;
3665 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3666 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3668 /* Same special case as in receive_rcom_lock_args() */
3669 lkb->lkb_grmode = DLM_LOCK_IV;
3670 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3673 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3674 lkb->lkb_flags |= DLM_IFL_RESEND;
3677 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3678 conversions are async; there's no reply from the remote master */
3681 /* A waiting lkb needs recovery if the master node has failed, or
3682 the master node is changing (only when no directory is used) */
3684 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3686 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3689 if (!dlm_no_directory(ls))
3692 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3698 /* Recovery for locks that are waiting for replies from nodes that are now
3699 gone. We can just complete unlocks and cancels by faking a reply from the
3700 dead node. Requests and up-conversions we flag to be resent after
3701 recovery. Down-conversions can just be completed with a fake reply like
3702 unlocks. Conversions between PR and CW need special attention. */
3704 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3706 struct dlm_lkb *lkb, *safe;
3708 mutex_lock(&ls->ls_waiters_mutex);
3710 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3711 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3712 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3714 /* all outstanding lookups, regardless of destination will be
3715 resent after recovery is done */
3717 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3718 lkb->lkb_flags |= DLM_IFL_RESEND;
3722 if (!waiter_needs_recovery(ls, lkb))
3725 switch (lkb->lkb_wait_type) {
3727 case DLM_MSG_REQUEST:
3728 lkb->lkb_flags |= DLM_IFL_RESEND;
3731 case DLM_MSG_CONVERT:
3732 recover_convert_waiter(ls, lkb);
3735 case DLM_MSG_UNLOCK:
3737 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3738 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3739 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3740 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3744 case DLM_MSG_CANCEL:
3746 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3747 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3748 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3749 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3754 log_error(ls, "invalid lkb wait_type %d",
3755 lkb->lkb_wait_type);
3759 mutex_unlock(&ls->ls_waiters_mutex);
3762 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3764 struct dlm_lkb *lkb;
3767 mutex_lock(&ls->ls_waiters_mutex);
3768 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3769 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3775 mutex_unlock(&ls->ls_waiters_mutex);
3782 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3783 master or dir-node for r. Processing the lkb may result in it being placed
3786 /* We do this after normal locking has been enabled and any saved messages
3787 (in requestqueue) have been processed. We should be confident that at
3788 this point we won't get or process a reply to any of these waiting
3789 operations. But, new ops may be coming in on the rsbs/locks here from
3790 userspace or remotely. */
3792 /* there may have been an overlap unlock/cancel prior to recovery or after
3793 recovery. if before, the lkb may still have a pos wait_count; if after, the
3794 overlap flag would just have been set and nothing new sent. we can be
3795 confident here than any replies to either the initial op or overlap ops
3796 prior to recovery have been received. */
3798 int dlm_recover_waiters_post(struct dlm_ls *ls)
3800 struct dlm_lkb *lkb;
3802 int error = 0, mstype, err, oc, ou;
3805 if (dlm_locking_stopped(ls)) {
3806 log_debug(ls, "recover_waiters_post aborted");
3811 lkb = find_resend_waiter(ls);
3815 r = lkb->lkb_resource;
3819 mstype = lkb->lkb_wait_type;
3820 oc = is_overlap_cancel(lkb);
3821 ou = is_overlap_unlock(lkb);
3824 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3825 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3827 /* At this point we assume that we won't get a reply to any
3828 previous op or overlap op on this lock. First, do a big
3829 remove_from_waiters() for all previous ops. */
3831 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3832 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3833 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3834 lkb->lkb_wait_type = 0;
3835 lkb->lkb_wait_count = 0;
3836 mutex_lock(&ls->ls_waiters_mutex);
3837 list_del_init(&lkb->lkb_wait_reply);
3838 mutex_unlock(&ls->ls_waiters_mutex);
3839 unhold_lkb(lkb); /* for waiters list */
3842 /* do an unlock or cancel instead of resending */
3844 case DLM_MSG_LOOKUP:
3845 case DLM_MSG_REQUEST:
3846 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3848 unhold_lkb(lkb); /* undoes create_lkb() */
3850 case DLM_MSG_CONVERT:
3852 queue_cast(r, lkb, -DLM_ECANCEL);
3854 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3855 _unlock_lock(r, lkb);
3863 case DLM_MSG_LOOKUP:
3864 case DLM_MSG_REQUEST:
3865 _request_lock(r, lkb);
3867 confirm_master(r, 0);
3869 case DLM_MSG_CONVERT:
3870 _convert_lock(r, lkb);
3878 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3879 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3888 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3889 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3891 struct dlm_ls *ls = r->res_ls;
3892 struct dlm_lkb *lkb, *safe;
3894 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3895 if (test(ls, lkb)) {
3896 rsb_set_flag(r, RSB_LOCKS_PURGED);
3898 /* this put should free the lkb */
3899 if (!dlm_put_lkb(lkb))
3900 log_error(ls, "purged lkb not released");
3905 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3907 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3910 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3912 return is_master_copy(lkb);
3915 static void purge_dead_locks(struct dlm_rsb *r)
3917 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3918 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3919 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3922 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3924 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3925 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3926 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3929 /* Get rid of locks held by nodes that are gone. */
3931 int dlm_purge_locks(struct dlm_ls *ls)
3935 log_debug(ls, "dlm_purge_locks");
3937 down_write(&ls->ls_root_sem);
3938 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3942 purge_dead_locks(r);
3948 up_write(&ls->ls_root_sem);
3953 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3955 struct dlm_rsb *r, *r_ret = NULL;
3957 read_lock(&ls->ls_rsbtbl[bucket].lock);
3958 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3959 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3962 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3966 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3970 void dlm_grant_after_purge(struct dlm_ls *ls)
3976 r = find_purged_rsb(ls, bucket);
3978 if (bucket == ls->ls_rsbtbl_size - 1)
3985 grant_pending_locks(r);
3986 confirm_master(r, 0);
3994 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3997 struct dlm_lkb *lkb;
3999 list_for_each_entry(lkb, head, lkb_statequeue) {
4000 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4006 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4009 struct dlm_lkb *lkb;
4011 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4014 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4017 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4023 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4024 struct dlm_rsb *r, struct dlm_rcom *rc)
4026 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4029 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4030 lkb->lkb_ownpid = rl->rl_ownpid;
4031 lkb->lkb_remid = rl->rl_lkid;
4032 lkb->lkb_exflags = rl->rl_exflags;
4033 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
4034 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4035 lkb->lkb_lvbseq = rl->rl_lvbseq;
4036 lkb->lkb_rqmode = rl->rl_rqmode;
4037 lkb->lkb_grmode = rl->rl_grmode;
4038 /* don't set lkb_status because add_lkb wants to itself */
4040 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
4041 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
4043 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4044 lkb->lkb_lvbptr = allocate_lvb(ls);
4045 if (!lkb->lkb_lvbptr)
4047 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4048 sizeof(struct rcom_lock);
4049 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4052 /* Conversions between PR and CW (middle modes) need special handling.
4053 The real granted mode of these converting locks cannot be determined
4054 until all locks have been rebuilt on the rsb (recover_conversion) */
4056 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
4057 rl->rl_status = DLM_LKSTS_CONVERT;
4058 lkb->lkb_grmode = DLM_LOCK_IV;
4059 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4065 /* This lkb may have been recovered in a previous aborted recovery so we need
4066 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4067 If so we just send back a standard reply. If not, we create a new lkb with
4068 the given values and send back our lkid. We send back our lkid by sending
4069 back the rcom_lock struct we got but with the remid field filled in. */
4071 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4073 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4075 struct dlm_lkb *lkb;
4078 if (rl->rl_parent_lkid) {
4079 error = -EOPNOTSUPP;
4083 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
4089 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
4095 error = create_lkb(ls, &lkb);
4099 error = receive_rcom_lock_args(ls, lkb, r, rc);
4106 add_lkb(r, lkb, rl->rl_status);
4110 /* this is the new value returned to the lock holder for
4111 saving in its process-copy lkb */
4112 rl->rl_remid = lkb->lkb_id;
4119 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
4120 rl->rl_result = error;
4124 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4126 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4128 struct dlm_lkb *lkb;
4131 error = find_lkb(ls, rl->rl_lkid, &lkb);
4133 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
4137 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4139 error = rl->rl_result;
4141 r = lkb->lkb_resource;
4147 /* There's a chance the new master received our lock before
4148 dlm_recover_master_reply(), this wouldn't happen if we did
4149 a barrier between recover_masters and recover_locks. */
4150 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4151 (unsigned long)r, r->res_name);
4152 dlm_send_rcom_lock(r, lkb);
4155 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4158 lkb->lkb_remid = rl->rl_remid;
4161 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4162 error, lkb->lkb_id);
4165 /* an ack for dlm_recover_locks() which waits for replies from
4166 all the locks it sends to new masters */
4167 dlm_recovered_lock(r);
4176 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4177 int mode, uint32_t flags, void *name, unsigned int namelen,
4178 unsigned long timeout_cs)
4180 struct dlm_lkb *lkb;
4181 struct dlm_args args;
4184 dlm_lock_recovery(ls);
4186 error = create_lkb(ls, &lkb);
4192 if (flags & DLM_LKF_VALBLK) {
4193 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4194 if (!ua->lksb.sb_lvbptr) {
4202 /* After ua is attached to lkb it will be freed by free_lkb().
4203 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4204 lock and that lkb_astparam is the dlm_user_args structure. */
4206 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4207 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4208 lkb->lkb_flags |= DLM_IFL_USER;
4209 ua->old_mode = DLM_LOCK_IV;
4216 error = request_lock(ls, lkb, name, namelen, &args);
4232 /* add this new lkb to the per-process list of locks */
4233 spin_lock(&ua->proc->locks_spin);
4235 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4236 spin_unlock(&ua->proc->locks_spin);
4238 dlm_unlock_recovery(ls);
4242 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4243 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4244 unsigned long timeout_cs)
4246 struct dlm_lkb *lkb;
4247 struct dlm_args args;
4248 struct dlm_user_args *ua;
4251 dlm_lock_recovery(ls);
4253 error = find_lkb(ls, lkid, &lkb);
4257 /* user can change the params on its lock when it converts it, or
4258 add an lvb that didn't exist before */
4260 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4262 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4263 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4264 if (!ua->lksb.sb_lvbptr) {
4269 if (lvb_in && ua->lksb.sb_lvbptr)
4270 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4272 ua->xid = ua_tmp->xid;
4273 ua->castparam = ua_tmp->castparam;
4274 ua->castaddr = ua_tmp->castaddr;
4275 ua->bastparam = ua_tmp->bastparam;
4276 ua->bastaddr = ua_tmp->bastaddr;
4277 ua->user_lksb = ua_tmp->user_lksb;
4278 ua->old_mode = lkb->lkb_grmode;
4280 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4281 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4285 error = convert_lock(ls, lkb, &args);
4287 if (error == -EINPROGRESS || error == -EAGAIN)
4292 dlm_unlock_recovery(ls);
4297 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4298 uint32_t flags, uint32_t lkid, char *lvb_in)
4300 struct dlm_lkb *lkb;
4301 struct dlm_args args;
4302 struct dlm_user_args *ua;
4305 dlm_lock_recovery(ls);
4307 error = find_lkb(ls, lkid, &lkb);
4311 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4313 if (lvb_in && ua->lksb.sb_lvbptr)
4314 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4315 ua->castparam = ua_tmp->castparam;
4316 ua->user_lksb = ua_tmp->user_lksb;
4318 error = set_unlock_args(flags, ua, &args);
4322 error = unlock_lock(ls, lkb, &args);
4324 if (error == -DLM_EUNLOCK)
4326 /* from validate_unlock_args() */
4327 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4332 spin_lock(&ua->proc->locks_spin);
4333 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4334 if (!list_empty(&lkb->lkb_ownqueue))
4335 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4336 spin_unlock(&ua->proc->locks_spin);
4340 dlm_unlock_recovery(ls);
4345 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4346 uint32_t flags, uint32_t lkid)
4348 struct dlm_lkb *lkb;
4349 struct dlm_args args;
4350 struct dlm_user_args *ua;
4353 dlm_lock_recovery(ls);
4355 error = find_lkb(ls, lkid, &lkb);
4359 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4360 ua->castparam = ua_tmp->castparam;
4361 ua->user_lksb = ua_tmp->user_lksb;
4363 error = set_unlock_args(flags, ua, &args);
4367 error = cancel_lock(ls, lkb, &args);
4369 if (error == -DLM_ECANCEL)
4371 /* from validate_unlock_args() */
4372 if (error == -EBUSY)
4377 dlm_unlock_recovery(ls);
4382 /* lkb's that are removed from the waiters list by revert are just left on the
4383 orphans list with the granted orphan locks, to be freed by purge */
4385 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4387 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4388 struct dlm_args args;
4392 mutex_lock(&ls->ls_orphans_mutex);
4393 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4394 mutex_unlock(&ls->ls_orphans_mutex);
4396 set_unlock_args(0, ua, &args);
4398 error = cancel_lock(ls, lkb, &args);
4399 if (error == -DLM_ECANCEL)
4404 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4405 Regardless of what rsb queue the lock is on, it's removed and freed. */
4407 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4409 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4410 struct dlm_args args;
4413 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4415 error = unlock_lock(ls, lkb, &args);
4416 if (error == -DLM_EUNLOCK)
4421 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4422 (which does lock_rsb) due to deadlock with receiving a message that does
4423 lock_rsb followed by dlm_user_add_ast() */
4425 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4426 struct dlm_user_proc *proc)
4428 struct dlm_lkb *lkb = NULL;
4430 mutex_lock(&ls->ls_clear_proc_locks);
4431 if (list_empty(&proc->locks))
4434 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4435 list_del_init(&lkb->lkb_ownqueue);
4437 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4438 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4440 lkb->lkb_flags |= DLM_IFL_DEAD;
4442 mutex_unlock(&ls->ls_clear_proc_locks);
4446 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4447 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4448 which we clear here. */
4450 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4451 list, and no more device_writes should add lkb's to proc->locks list; so we
4452 shouldn't need to take asts_spin or locks_spin here. this assumes that
4453 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4456 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4458 struct dlm_lkb *lkb, *safe;
4460 dlm_lock_recovery(ls);
4463 lkb = del_proc_lock(ls, proc);
4466 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4467 orphan_proc_lock(ls, lkb);
4469 unlock_proc_lock(ls, lkb);
4471 /* this removes the reference for the proc->locks list
4472 added by dlm_user_request, it may result in the lkb
4478 mutex_lock(&ls->ls_clear_proc_locks);
4480 /* in-progress unlocks */
4481 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4482 list_del_init(&lkb->lkb_ownqueue);
4483 lkb->lkb_flags |= DLM_IFL_DEAD;
4487 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4488 list_del(&lkb->lkb_astqueue);
4492 mutex_unlock(&ls->ls_clear_proc_locks);
4493 dlm_unlock_recovery(ls);
4496 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4498 struct dlm_lkb *lkb, *safe;
4502 spin_lock(&proc->locks_spin);
4503 if (!list_empty(&proc->locks)) {
4504 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4506 list_del_init(&lkb->lkb_ownqueue);
4508 spin_unlock(&proc->locks_spin);
4513 lkb->lkb_flags |= DLM_IFL_DEAD;
4514 unlock_proc_lock(ls, lkb);
4515 dlm_put_lkb(lkb); /* ref from proc->locks list */
4518 spin_lock(&proc->locks_spin);
4519 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4520 list_del_init(&lkb->lkb_ownqueue);
4521 lkb->lkb_flags |= DLM_IFL_DEAD;
4524 spin_unlock(&proc->locks_spin);
4526 spin_lock(&proc->asts_spin);
4527 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4528 list_del(&lkb->lkb_astqueue);
4531 spin_unlock(&proc->asts_spin);
4534 /* pid of 0 means purge all orphans */
4536 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4538 struct dlm_lkb *lkb, *safe;
4540 mutex_lock(&ls->ls_orphans_mutex);
4541 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4542 if (pid && lkb->lkb_ownpid != pid)
4544 unlock_proc_lock(ls, lkb);
4545 list_del_init(&lkb->lkb_ownqueue);
4548 mutex_unlock(&ls->ls_orphans_mutex);
4551 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4553 struct dlm_message *ms;
4554 struct dlm_mhandle *mh;
4557 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4558 DLM_MSG_PURGE, &ms, &mh);
4561 ms->m_nodeid = nodeid;
4564 return send_message(mh, ms);
4567 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4568 int nodeid, int pid)
4572 if (nodeid != dlm_our_nodeid()) {
4573 error = send_purge(ls, nodeid, pid);
4575 dlm_lock_recovery(ls);
4576 if (pid == current->pid)
4577 purge_proc_locks(ls, proc);
4579 do_purge(ls, nodeid, pid);
4580 dlm_unlock_recovery(ls);