1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
63 #include "requestqueue.h"
67 #include "lockspace.h"
72 #include "lvb_table.h"
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91 * Lock compatibilty matrix - thanks Steve
92 * UN = Unlocked state. Not really a state, used as a flag
93 * PD = Padding. Used to make the matrix a nice power of two in size
94 * Other states are the same as the VMS DLM.
95 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
98 static const int __dlm_compat_matrix[8][8] = {
99 /* UN NL CR CW PR PW EX PD */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
102 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
103 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
104 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
105 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
106 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
107 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
111 * This defines the direction of transfer of LVB data.
112 * Granted mode is the row; requested mode is the column.
113 * Usage: matrix[grmode+1][rqmode+1]
114 * 1 = LVB is returned to the caller
115 * 0 = LVB is written to the resource
116 * -1 = nothing happens to the LVB
119 const int dlm_lvb_operations[8][8] = {
120 /* UN NL CR CW PR PW EX PD*/
121 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
122 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
123 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
124 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
125 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
126 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
127 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
128 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
131 #define modes_compat(gr, rq) \
132 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
134 int dlm_modes_compat(int mode1, int mode2)
136 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 * Compatibility matrix for conversions with QUECVT set.
141 * Granted mode is the row; requested mode is the column.
142 * Usage: matrix[grmode+1][rqmode+1]
145 static const int __quecvt_compat_matrix[8][8] = {
146 /* UN NL CR CW PR PW EX PD */
147 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
148 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
149 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
150 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
151 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
152 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
153 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
154 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
157 void dlm_print_lkb(struct dlm_lkb *lkb)
159 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 void dlm_print_rsb(struct dlm_rsb *r)
168 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169 r->res_nodeid, r->res_flags, r->res_first_lkid,
170 r->res_recover_locks_count, r->res_name);
173 void dlm_dump_rsb(struct dlm_rsb *r)
179 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181 printk(KERN_ERR "rsb lookup list\n");
182 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
184 printk(KERN_ERR "rsb grant queue:\n");
185 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
187 printk(KERN_ERR "rsb convert queue:\n");
188 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
190 printk(KERN_ERR "rsb wait queue:\n");
191 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195 /* Threads cannot use the lockspace while it's being recovered */
197 static inline void dlm_lock_recovery(struct dlm_ls *ls)
199 down_read(&ls->ls_in_recovery);
202 void dlm_unlock_recovery(struct dlm_ls *ls)
204 up_read(&ls->ls_in_recovery);
207 int dlm_lock_recovery_try(struct dlm_ls *ls)
209 return down_read_trylock(&ls->ls_in_recovery);
212 static inline int can_be_queued(struct dlm_lkb *lkb)
214 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 static inline int force_blocking_asts(struct dlm_lkb *lkb)
219 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 static inline int is_demoted(struct dlm_lkb *lkb)
224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 static inline int is_altmode(struct dlm_lkb *lkb)
229 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 static inline int is_granted(struct dlm_lkb *lkb)
234 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 static inline int is_remote(struct dlm_rsb *r)
239 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
240 return !!r->res_nodeid;
243 static inline int is_process_copy(struct dlm_lkb *lkb)
245 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 static inline int is_master_copy(struct dlm_lkb *lkb)
250 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
251 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
252 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 static inline int middle_conversion(struct dlm_lkb *lkb)
257 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
258 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
263 static inline int down_conversion(struct dlm_lkb *lkb)
265 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
270 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
275 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 static inline int is_overlap(struct dlm_lkb *lkb)
280 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281 DLM_IFL_OVERLAP_CANCEL));
284 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
286 if (is_master_copy(lkb))
289 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
291 lkb->lkb_lksb->sb_status = rv;
292 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
294 dlm_add_ast(lkb, AST_COMP);
297 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
300 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
303 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
305 if (is_master_copy(lkb))
306 send_bast(r, lkb, rqmode);
308 lkb->lkb_bastmode = rqmode;
309 dlm_add_ast(lkb, AST_BAST);
314 * Basic operations on rsb's and lkb's
317 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
321 r = allocate_rsb(ls, len);
327 memcpy(r->res_name, name, len);
328 mutex_init(&r->res_mutex);
330 INIT_LIST_HEAD(&r->res_lookup);
331 INIT_LIST_HEAD(&r->res_grantqueue);
332 INIT_LIST_HEAD(&r->res_convertqueue);
333 INIT_LIST_HEAD(&r->res_waitqueue);
334 INIT_LIST_HEAD(&r->res_root_list);
335 INIT_LIST_HEAD(&r->res_recover_list);
340 static int search_rsb_list(struct list_head *head, char *name, int len,
341 unsigned int flags, struct dlm_rsb **r_ret)
346 list_for_each_entry(r, head, res_hashchain) {
347 if (len == r->res_length && !memcmp(name, r->res_name, len))
353 if (r->res_nodeid && (flags & R_MASTER))
359 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
360 unsigned int flags, struct dlm_rsb **r_ret)
365 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
367 kref_get(&r->res_ref);
370 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
374 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
376 if (dlm_no_directory(ls))
379 if (r->res_nodeid == -1) {
380 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
381 r->res_first_lkid = 0;
382 } else if (r->res_nodeid > 0) {
383 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
384 r->res_first_lkid = 0;
386 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
387 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
394 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
395 unsigned int flags, struct dlm_rsb **r_ret)
398 write_lock(&ls->ls_rsbtbl[b].lock);
399 error = _search_rsb(ls, name, len, b, flags, r_ret);
400 write_unlock(&ls->ls_rsbtbl[b].lock);
405 * Find rsb in rsbtbl and potentially create/add one
407 * Delaying the release of rsb's has a similar benefit to applications keeping
408 * NL locks on an rsb, but without the guarantee that the cached master value
409 * will still be valid when the rsb is reused. Apps aren't always smart enough
410 * to keep NL locks on an rsb that they may lock again shortly; this can lead
411 * to excessive master lookups and removals if we don't delay the release.
413 * Searching for an rsb means looking through both the normal list and toss
414 * list. When found on the toss list the rsb is moved to the normal list with
415 * ref count of 1; when found on normal list the ref count is incremented.
418 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
419 unsigned int flags, struct dlm_rsb **r_ret)
421 struct dlm_rsb *r, *tmp;
422 uint32_t hash, bucket;
425 if (dlm_no_directory(ls))
428 hash = jhash(name, namelen, 0);
429 bucket = hash & (ls->ls_rsbtbl_size - 1);
431 error = search_rsb(ls, name, namelen, bucket, flags, &r);
435 if (error == -EBADR && !(flags & R_CREATE))
438 /* the rsb was found but wasn't a master copy */
439 if (error == -ENOTBLK)
443 r = create_rsb(ls, name, namelen);
448 r->res_bucket = bucket;
450 kref_init(&r->res_ref);
452 /* With no directory, the master can be set immediately */
453 if (dlm_no_directory(ls)) {
454 int nodeid = dlm_dir_nodeid(r);
455 if (nodeid == dlm_our_nodeid())
457 r->res_nodeid = nodeid;
460 write_lock(&ls->ls_rsbtbl[bucket].lock);
461 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
463 write_unlock(&ls->ls_rsbtbl[bucket].lock);
468 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
469 write_unlock(&ls->ls_rsbtbl[bucket].lock);
476 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
477 unsigned int flags, struct dlm_rsb **r_ret)
479 return find_rsb(ls, name, namelen, flags, r_ret);
482 /* This is only called to add a reference when the code already holds
483 a valid reference to the rsb, so there's no need for locking. */
485 static inline void hold_rsb(struct dlm_rsb *r)
487 kref_get(&r->res_ref);
490 void dlm_hold_rsb(struct dlm_rsb *r)
495 static void toss_rsb(struct kref *kref)
497 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
498 struct dlm_ls *ls = r->res_ls;
500 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
501 kref_init(&r->res_ref);
502 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
503 r->res_toss_time = jiffies;
505 free_lvb(r->res_lvbptr);
506 r->res_lvbptr = NULL;
510 /* When all references to the rsb are gone it's transfered to
511 the tossed list for later disposal. */
513 static void put_rsb(struct dlm_rsb *r)
515 struct dlm_ls *ls = r->res_ls;
516 uint32_t bucket = r->res_bucket;
518 write_lock(&ls->ls_rsbtbl[bucket].lock);
519 kref_put(&r->res_ref, toss_rsb);
520 write_unlock(&ls->ls_rsbtbl[bucket].lock);
523 void dlm_put_rsb(struct dlm_rsb *r)
528 /* See comment for unhold_lkb */
530 static void unhold_rsb(struct dlm_rsb *r)
533 rv = kref_put(&r->res_ref, toss_rsb);
534 DLM_ASSERT(!rv, dlm_dump_rsb(r););
537 static void kill_rsb(struct kref *kref)
539 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
541 /* All work is done after the return from kref_put() so we
542 can release the write_lock before the remove and free. */
544 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
545 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
546 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
547 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
548 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
549 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
552 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553 The rsb must exist as long as any lkb's for it do. */
555 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
558 lkb->lkb_resource = r;
561 static void detach_lkb(struct dlm_lkb *lkb)
563 if (lkb->lkb_resource) {
564 put_rsb(lkb->lkb_resource);
565 lkb->lkb_resource = NULL;
569 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
571 struct dlm_lkb *lkb, *tmp;
575 lkb = allocate_lkb(ls);
579 lkb->lkb_nodeid = -1;
580 lkb->lkb_grmode = DLM_LOCK_IV;
581 kref_init(&lkb->lkb_ref);
582 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
585 get_random_bytes(&bucket, sizeof(bucket));
586 bucket &= (ls->ls_lkbtbl_size - 1);
588 write_lock(&ls->ls_lkbtbl[bucket].lock);
590 /* counter can roll over so we must verify lkid is not in use */
593 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
595 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
597 if (tmp->lkb_id != lkid)
605 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
606 write_unlock(&ls->ls_lkbtbl[bucket].lock);
612 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
615 uint16_t bucket = (lkid >> 16);
617 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
618 if (lkb->lkb_id == lkid)
624 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
627 uint16_t bucket = (lkid >> 16);
629 if (bucket >= ls->ls_lkbtbl_size)
632 read_lock(&ls->ls_lkbtbl[bucket].lock);
633 lkb = __find_lkb(ls, lkid);
635 kref_get(&lkb->lkb_ref);
636 read_unlock(&ls->ls_lkbtbl[bucket].lock);
639 return lkb ? 0 : -ENOENT;
642 static void kill_lkb(struct kref *kref)
644 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
646 /* All work is done after the return from kref_put() so we
647 can release the write_lock before the detach_lkb */
649 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
652 /* __put_lkb() is used when an lkb may not have an rsb attached to
653 it so we need to provide the lockspace explicitly */
655 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
657 uint16_t bucket = (lkb->lkb_id >> 16);
659 write_lock(&ls->ls_lkbtbl[bucket].lock);
660 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
661 list_del(&lkb->lkb_idtbl_list);
662 write_unlock(&ls->ls_lkbtbl[bucket].lock);
666 /* for local/process lkbs, lvbptr points to caller's lksb */
667 if (lkb->lkb_lvbptr && is_master_copy(lkb))
668 free_lvb(lkb->lkb_lvbptr);
672 write_unlock(&ls->ls_lkbtbl[bucket].lock);
677 int dlm_put_lkb(struct dlm_lkb *lkb)
681 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
682 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
684 ls = lkb->lkb_resource->res_ls;
685 return __put_lkb(ls, lkb);
688 /* This is only called to add a reference when the code already holds
689 a valid reference to the lkb, so there's no need for locking. */
691 static inline void hold_lkb(struct dlm_lkb *lkb)
693 kref_get(&lkb->lkb_ref);
696 /* This is called when we need to remove a reference and are certain
697 it's not the last ref. e.g. del_lkb is always called between a
698 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
699 put_lkb would work fine, but would involve unnecessary locking */
701 static inline void unhold_lkb(struct dlm_lkb *lkb)
704 rv = kref_put(&lkb->lkb_ref, kill_lkb);
705 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
708 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
711 struct dlm_lkb *lkb = NULL;
713 list_for_each_entry(lkb, head, lkb_statequeue)
714 if (lkb->lkb_rqmode < mode)
718 list_add_tail(new, head);
720 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
723 /* add/remove lkb to rsb's grant/convert/wait queue */
725 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
727 kref_get(&lkb->lkb_ref);
729 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
731 lkb->lkb_status = status;
734 case DLM_LKSTS_WAITING:
735 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
738 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
740 case DLM_LKSTS_GRANTED:
741 /* convention says granted locks kept in order of grmode */
742 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
745 case DLM_LKSTS_CONVERT:
746 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
749 list_add_tail(&lkb->lkb_statequeue,
750 &r->res_convertqueue);
753 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
757 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
760 list_del(&lkb->lkb_statequeue);
764 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
768 add_lkb(r, lkb, sts);
772 static int msg_reply_type(int mstype)
775 case DLM_MSG_REQUEST:
776 return DLM_MSG_REQUEST_REPLY;
777 case DLM_MSG_CONVERT:
778 return DLM_MSG_CONVERT_REPLY;
780 return DLM_MSG_UNLOCK_REPLY;
782 return DLM_MSG_CANCEL_REPLY;
784 return DLM_MSG_LOOKUP_REPLY;
789 /* add/remove lkb from global waiters list of lkb's waiting for
790 a reply from a remote node */
792 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
794 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
797 mutex_lock(&ls->ls_waiters_mutex);
799 if (is_overlap_unlock(lkb) ||
800 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
805 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
808 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
811 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
817 lkb->lkb_wait_count++;
820 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821 lkb->lkb_id, lkb->lkb_wait_type, mstype,
822 lkb->lkb_wait_count, lkb->lkb_flags);
826 DLM_ASSERT(!lkb->lkb_wait_count,
828 printk("wait_count %d\n", lkb->lkb_wait_count););
830 lkb->lkb_wait_count++;
831 lkb->lkb_wait_type = mstype;
833 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
836 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837 lkb->lkb_id, error, lkb->lkb_flags, mstype,
838 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
839 mutex_unlock(&ls->ls_waiters_mutex);
843 /* We clear the RESEND flag because we might be taking an lkb off the waiters
844 list as part of process_requestqueue (e.g. a lookup that has an optimized
845 request reply on the requestqueue) between dlm_recover_waiters_pre() which
846 set RESEND and dlm_recover_waiters_post() */
848 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
850 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851 int overlap_done = 0;
853 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
854 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
859 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
865 /* N.B. type of reply may not always correspond to type of original
866 msg due to lookup->request optimization, verify others? */
868 if (lkb->lkb_wait_type) {
869 lkb->lkb_wait_type = 0;
873 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
878 /* the force-unlock/cancel has completed and we haven't recvd a reply
879 to the op that was in progress prior to the unlock/cancel; we
880 give up on any reply to the earlier op. FIXME: not sure when/how
883 if (overlap_done && lkb->lkb_wait_type) {
884 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
885 lkb->lkb_id, mstype, lkb->lkb_wait_type);
886 lkb->lkb_wait_count--;
887 lkb->lkb_wait_type = 0;
890 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
892 lkb->lkb_flags &= ~DLM_IFL_RESEND;
893 lkb->lkb_wait_count--;
894 if (!lkb->lkb_wait_count)
895 list_del_init(&lkb->lkb_wait_reply);
900 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
902 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
905 mutex_lock(&ls->ls_waiters_mutex);
906 error = _remove_from_waiters(lkb, mstype);
907 mutex_unlock(&ls->ls_waiters_mutex);
911 /* Handles situations where we might be processing a "fake" or "stub" reply in
912 which we can't try to take waiters_mutex again. */
914 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
916 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
919 if (ms != &ls->ls_stub_ms)
920 mutex_lock(&ls->ls_waiters_mutex);
921 error = _remove_from_waiters(lkb, ms->m_type);
922 if (ms != &ls->ls_stub_ms)
923 mutex_unlock(&ls->ls_waiters_mutex);
927 static void dir_remove(struct dlm_rsb *r)
931 if (dlm_no_directory(r->res_ls))
934 to_nodeid = dlm_dir_nodeid(r);
935 if (to_nodeid != dlm_our_nodeid())
938 dlm_dir_remove_entry(r->res_ls, to_nodeid,
939 r->res_name, r->res_length);
942 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
943 found since they are in order of newest to oldest? */
945 static int shrink_bucket(struct dlm_ls *ls, int b)
948 int count = 0, found;
952 write_lock(&ls->ls_rsbtbl[b].lock);
953 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
955 if (!time_after_eq(jiffies, r->res_toss_time +
956 dlm_config.ci_toss_secs * HZ))
963 write_unlock(&ls->ls_rsbtbl[b].lock);
967 if (kref_put(&r->res_ref, kill_rsb)) {
968 list_del(&r->res_hashchain);
969 write_unlock(&ls->ls_rsbtbl[b].lock);
976 write_unlock(&ls->ls_rsbtbl[b].lock);
977 log_error(ls, "tossed rsb in use %s", r->res_name);
984 void dlm_scan_rsbs(struct dlm_ls *ls)
988 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
989 shrink_bucket(ls, i);
990 if (dlm_locking_stopped(ls))
996 /* lkb is master or local copy */
998 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1000 int b, len = r->res_ls->ls_lvblen;
1002 /* b=1 lvb returned to caller
1003 b=0 lvb written to rsb or invalidated
1006 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1009 if (!lkb->lkb_lvbptr)
1012 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1018 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1019 lkb->lkb_lvbseq = r->res_lvbseq;
1021 } else if (b == 0) {
1022 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1023 rsb_set_flag(r, RSB_VALNOTVALID);
1027 if (!lkb->lkb_lvbptr)
1030 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1034 r->res_lvbptr = allocate_lvb(r->res_ls);
1039 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1041 lkb->lkb_lvbseq = r->res_lvbseq;
1042 rsb_clear_flag(r, RSB_VALNOTVALID);
1045 if (rsb_flag(r, RSB_VALNOTVALID))
1046 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1049 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1051 if (lkb->lkb_grmode < DLM_LOCK_PW)
1054 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1055 rsb_set_flag(r, RSB_VALNOTVALID);
1059 if (!lkb->lkb_lvbptr)
1062 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1066 r->res_lvbptr = allocate_lvb(r->res_ls);
1071 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1073 rsb_clear_flag(r, RSB_VALNOTVALID);
1076 /* lkb is process copy (pc) */
1078 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1079 struct dlm_message *ms)
1083 if (!lkb->lkb_lvbptr)
1086 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1089 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1091 int len = receive_extralen(ms);
1092 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1093 lkb->lkb_lvbseq = ms->m_lvbseq;
1097 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1098 remove_lock -- used for unlock, removes lkb from granted
1099 revert_lock -- used for cancel, moves lkb from convert to granted
1100 grant_lock -- used for request and convert, adds lkb to granted or
1101 moves lkb from convert or waiting to granted
1103 Each of these is used for master or local copy lkb's. There is
1104 also a _pc() variation used to make the corresponding change on
1105 a process copy (pc) lkb. */
1107 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1110 lkb->lkb_grmode = DLM_LOCK_IV;
1111 /* this unhold undoes the original ref from create_lkb()
1112 so this leads to the lkb being freed */
1116 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1118 set_lvb_unlock(r, lkb);
1119 _remove_lock(r, lkb);
1122 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1124 _remove_lock(r, lkb);
1127 /* returns: 0 did nothing
1128 1 moved lock to granted
1131 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1135 lkb->lkb_rqmode = DLM_LOCK_IV;
1137 switch (lkb->lkb_status) {
1138 case DLM_LKSTS_GRANTED:
1140 case DLM_LKSTS_CONVERT:
1141 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1144 case DLM_LKSTS_WAITING:
1146 lkb->lkb_grmode = DLM_LOCK_IV;
1147 /* this unhold undoes the original ref from create_lkb()
1148 so this leads to the lkb being freed */
1153 log_print("invalid status for revert %d", lkb->lkb_status);
1158 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1160 return revert_lock(r, lkb);
1163 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1165 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1166 lkb->lkb_grmode = lkb->lkb_rqmode;
1167 if (lkb->lkb_status)
1168 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1170 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1173 lkb->lkb_rqmode = DLM_LOCK_IV;
1176 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1178 set_lvb_lock(r, lkb);
1179 _grant_lock(r, lkb);
1180 lkb->lkb_highbast = 0;
1183 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1184 struct dlm_message *ms)
1186 set_lvb_lock_pc(r, lkb, ms);
1187 _grant_lock(r, lkb);
1190 /* called by grant_pending_locks() which means an async grant message must
1191 be sent to the requesting node in addition to granting the lock if the
1192 lkb belongs to a remote node. */
1194 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1197 if (is_master_copy(lkb))
1200 queue_cast(r, lkb, 0);
1203 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1204 change the granted/requested modes. We're munging things accordingly in
1206 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1208 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1209 compatible with other granted locks */
1211 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1213 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1214 log_print("munge_demoted %x invalid reply type %d",
1215 lkb->lkb_id, ms->m_type);
1219 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1220 log_print("munge_demoted %x invalid modes gr %d rq %d",
1221 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1225 lkb->lkb_grmode = DLM_LOCK_NL;
1228 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1230 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1231 ms->m_type != DLM_MSG_GRANT) {
1232 log_print("munge_altmode %x invalid reply type %d",
1233 lkb->lkb_id, ms->m_type);
1237 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1238 lkb->lkb_rqmode = DLM_LOCK_PR;
1239 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1240 lkb->lkb_rqmode = DLM_LOCK_CW;
1242 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1247 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1249 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1251 if (lkb->lkb_id == first->lkb_id)
1257 /* Check if the given lkb conflicts with another lkb on the queue. */
1259 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1261 struct dlm_lkb *this;
1263 list_for_each_entry(this, head, lkb_statequeue) {
1266 if (!modes_compat(this, lkb))
1273 * "A conversion deadlock arises with a pair of lock requests in the converting
1274 * queue for one resource. The granted mode of each lock blocks the requested
1275 * mode of the other lock."
1277 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1278 * convert queue from being granted, then demote lkb (set grmode to NL).
1279 * This second form requires that we check for conv-deadlk even when
1280 * now == 0 in _can_be_granted().
1283 * Granted Queue: empty
1284 * Convert Queue: NL->EX (first lock)
1285 * PR->EX (second lock)
1287 * The first lock can't be granted because of the granted mode of the second
1288 * lock and the second lock can't be granted because it's not first in the
1289 * list. We demote the granted mode of the second lock (the lkb passed to this
1292 * After the resolution, the "grant pending" function needs to go back and try
1293 * to grant locks on the convert queue again since the first lock can now be
1297 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1299 struct dlm_lkb *this, *first = NULL, *self = NULL;
1301 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1309 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1313 /* if lkb is on the convert queue and is preventing the first
1314 from being granted, then there's deadlock and we demote lkb.
1315 multiple converting locks may need to do this before the first
1316 converting lock can be granted. */
1318 if (self && self != first) {
1319 if (!modes_compat(lkb, first) &&
1320 !queue_conflict(&rsb->res_grantqueue, first))
1328 * Return 1 if the lock can be granted, 0 otherwise.
1329 * Also detect and resolve conversion deadlocks.
1331 * lkb is the lock to be granted
1333 * now is 1 if the function is being called in the context of the
1334 * immediate request, it is 0 if called later, after the lock has been
1337 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1340 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1342 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1345 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1346 * a new request for a NL mode lock being blocked.
1348 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1349 * request, then it would be granted. In essence, the use of this flag
1350 * tells the Lock Manager to expedite theis request by not considering
1351 * what may be in the CONVERTING or WAITING queues... As of this
1352 * writing, the EXPEDITE flag can be used only with new requests for NL
1353 * mode locks. This flag is not valid for conversion requests.
1355 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1356 * conversion or used with a non-NL requested mode. We also know an
1357 * EXPEDITE request is always granted immediately, so now must always
1358 * be 1. The full condition to grant an expedite request: (now &&
1359 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1360 * therefore be shortened to just checking the flag.
1363 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1367 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1368 * added to the remaining conditions.
1371 if (queue_conflict(&r->res_grantqueue, lkb))
1375 * 6-3: By default, a conversion request is immediately granted if the
1376 * requested mode is compatible with the modes of all other granted
1380 if (queue_conflict(&r->res_convertqueue, lkb))
1384 * 6-5: But the default algorithm for deciding whether to grant or
1385 * queue conversion requests does not by itself guarantee that such
1386 * requests are serviced on a "first come first serve" basis. This, in
1387 * turn, can lead to a phenomenon known as "indefinate postponement".
1389 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1390 * the system service employed to request a lock conversion. This flag
1391 * forces certain conversion requests to be queued, even if they are
1392 * compatible with the granted modes of other locks on the same
1393 * resource. Thus, the use of this flag results in conversion requests
1394 * being ordered on a "first come first servce" basis.
1396 * DCT: This condition is all about new conversions being able to occur
1397 * "in place" while the lock remains on the granted queue (assuming
1398 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1399 * doesn't _have_ to go onto the convert queue where it's processed in
1400 * order. The "now" variable is necessary to distinguish converts
1401 * being received and processed for the first time now, because once a
1402 * convert is moved to the conversion queue the condition below applies
1403 * requiring fifo granting.
1406 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1410 * The NOORDER flag is set to avoid the standard vms rules on grant
1414 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1418 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1419 * granted until all other conversion requests ahead of it are granted
1423 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1427 * 6-4: By default, a new request is immediately granted only if all
1428 * three of the following conditions are satisfied when the request is
1430 * - The queue of ungranted conversion requests for the resource is
1432 * - The queue of ungranted new requests for the resource is empty.
1433 * - The mode of the new request is compatible with the most
1434 * restrictive mode of all granted locks on the resource.
1437 if (now && !conv && list_empty(&r->res_convertqueue) &&
1438 list_empty(&r->res_waitqueue))
1442 * 6-4: Once a lock request is in the queue of ungranted new requests,
1443 * it cannot be granted until the queue of ungranted conversion
1444 * requests is empty, all ungranted new requests ahead of it are
1445 * granted and/or canceled, and it is compatible with the granted mode
1446 * of the most restrictive lock granted on the resource.
1449 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1450 first_in_list(lkb, &r->res_waitqueue))
1455 * The following, enabled by CONVDEADLK, departs from VMS.
1458 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1459 conversion_deadlock_detect(r, lkb)) {
1460 lkb->lkb_grmode = DLM_LOCK_NL;
1461 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1468 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1469 * simple way to provide a big optimization to applications that can use them.
1472 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1474 uint32_t flags = lkb->lkb_exflags;
1476 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1478 rv = _can_be_granted(r, lkb, now);
1482 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1485 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1487 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1491 lkb->lkb_rqmode = alt;
1492 rv = _can_be_granted(r, lkb, now);
1494 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1496 lkb->lkb_rqmode = rqmode;
1502 static int grant_pending_convert(struct dlm_rsb *r, int high)
1504 struct dlm_lkb *lkb, *s;
1505 int hi, demoted, quit, grant_restart, demote_restart;
1513 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1514 demoted = is_demoted(lkb);
1515 if (can_be_granted(r, lkb, 0)) {
1516 grant_lock_pending(r, lkb);
1519 hi = max_t(int, lkb->lkb_rqmode, hi);
1520 if (!demoted && is_demoted(lkb))
1527 if (demote_restart && !quit) {
1532 return max_t(int, high, hi);
1535 static int grant_pending_wait(struct dlm_rsb *r, int high)
1537 struct dlm_lkb *lkb, *s;
1539 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1540 if (can_be_granted(r, lkb, 0))
1541 grant_lock_pending(r, lkb);
1543 high = max_t(int, lkb->lkb_rqmode, high);
1549 static void grant_pending_locks(struct dlm_rsb *r)
1551 struct dlm_lkb *lkb, *s;
1552 int high = DLM_LOCK_IV;
1554 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1556 high = grant_pending_convert(r, high);
1557 high = grant_pending_wait(r, high);
1559 if (high == DLM_LOCK_IV)
1563 * If there are locks left on the wait/convert queue then send blocking
1564 * ASTs to granted locks based on the largest requested mode (high)
1565 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1568 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1569 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1570 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1571 queue_bast(r, lkb, high);
1572 lkb->lkb_highbast = high;
1577 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1578 struct dlm_lkb *lkb)
1582 list_for_each_entry(gr, head, lkb_statequeue) {
1583 if (gr->lkb_bastaddr &&
1584 gr->lkb_highbast < lkb->lkb_rqmode &&
1585 !modes_compat(gr, lkb)) {
1586 queue_bast(r, gr, lkb->lkb_rqmode);
1587 gr->lkb_highbast = lkb->lkb_rqmode;
1592 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1594 send_bast_queue(r, &r->res_grantqueue, lkb);
1597 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1599 send_bast_queue(r, &r->res_grantqueue, lkb);
1600 send_bast_queue(r, &r->res_convertqueue, lkb);
1603 /* set_master(r, lkb) -- set the master nodeid of a resource
1605 The purpose of this function is to set the nodeid field in the given
1606 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1607 known, it can just be copied to the lkb and the function will return
1608 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1609 before it can be copied to the lkb.
1611 When the rsb nodeid is being looked up remotely, the initial lkb
1612 causing the lookup is kept on the ls_waiters list waiting for the
1613 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1614 on the rsb's res_lookup list until the master is verified.
1617 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1618 1: the rsb master is not available and the lkb has been placed on
1622 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1624 struct dlm_ls *ls = r->res_ls;
1625 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1627 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1628 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1629 r->res_first_lkid = lkb->lkb_id;
1630 lkb->lkb_nodeid = r->res_nodeid;
1634 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1635 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1639 if (r->res_nodeid == 0) {
1640 lkb->lkb_nodeid = 0;
1644 if (r->res_nodeid > 0) {
1645 lkb->lkb_nodeid = r->res_nodeid;
1649 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1651 dir_nodeid = dlm_dir_nodeid(r);
1653 if (dir_nodeid != our_nodeid) {
1654 r->res_first_lkid = lkb->lkb_id;
1655 send_lookup(r, lkb);
1660 /* It's possible for dlm_scand to remove an old rsb for
1661 this same resource from the toss list, us to create
1662 a new one, look up the master locally, and find it
1663 already exists just before dlm_scand does the
1664 dir_remove() on the previous rsb. */
1666 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1667 r->res_length, &ret_nodeid);
1670 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1674 if (ret_nodeid == our_nodeid) {
1675 r->res_first_lkid = 0;
1677 lkb->lkb_nodeid = 0;
1679 r->res_first_lkid = lkb->lkb_id;
1680 r->res_nodeid = ret_nodeid;
1681 lkb->lkb_nodeid = ret_nodeid;
1686 static void process_lookup_list(struct dlm_rsb *r)
1688 struct dlm_lkb *lkb, *safe;
1690 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1691 list_del_init(&lkb->lkb_rsb_lookup);
1692 _request_lock(r, lkb);
1697 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1699 static void confirm_master(struct dlm_rsb *r, int error)
1701 struct dlm_lkb *lkb;
1703 if (!r->res_first_lkid)
1709 r->res_first_lkid = 0;
1710 process_lookup_list(r);
1714 /* the remote master didn't queue our NOQUEUE request;
1715 make a waiting lkb the first_lkid */
1717 r->res_first_lkid = 0;
1719 if (!list_empty(&r->res_lookup)) {
1720 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1722 list_del_init(&lkb->lkb_rsb_lookup);
1723 r->res_first_lkid = lkb->lkb_id;
1724 _request_lock(r, lkb);
1730 log_error(r->res_ls, "confirm_master unknown error %d", error);
1734 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1735 int namelen, uint32_t parent_lkid, void *ast,
1736 void *astarg, void *bast, struct dlm_args *args)
1740 /* check for invalid arg usage */
1742 if (mode < 0 || mode > DLM_LOCK_EX)
1745 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1748 if (flags & DLM_LKF_CANCEL)
1751 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1754 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1757 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1760 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1763 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1766 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1769 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1775 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1778 /* parent/child locks not yet supported */
1782 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1785 /* these args will be copied to the lkb in validate_lock_args,
1786 it cannot be done now because when converting locks, fields in
1787 an active lkb cannot be modified before locking the rsb */
1789 args->flags = flags;
1790 args->astaddr = ast;
1791 args->astparam = (long) astarg;
1792 args->bastaddr = bast;
1800 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1802 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1803 DLM_LKF_FORCEUNLOCK))
1806 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1809 args->flags = flags;
1810 args->astparam = (long) astarg;
1814 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1815 struct dlm_args *args)
1819 if (args->flags & DLM_LKF_CONVERT) {
1820 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1823 if (args->flags & DLM_LKF_QUECVT &&
1824 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1828 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1831 if (lkb->lkb_wait_type)
1834 if (is_overlap(lkb))
1838 lkb->lkb_exflags = args->flags;
1839 lkb->lkb_sbflags = 0;
1840 lkb->lkb_astaddr = args->astaddr;
1841 lkb->lkb_astparam = args->astparam;
1842 lkb->lkb_bastaddr = args->bastaddr;
1843 lkb->lkb_rqmode = args->mode;
1844 lkb->lkb_lksb = args->lksb;
1845 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1846 lkb->lkb_ownpid = (int) current->pid;
1852 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1855 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1856 because there may be a lookup in progress and it's valid to do
1857 cancel/unlockf on it */
1859 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1861 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1864 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1865 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1870 /* an lkb may still exist even though the lock is EOL'ed due to a
1871 cancel, unlock or failed noqueue request; an app can't use these
1872 locks; return same error as if the lkid had not been found at all */
1874 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1875 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1880 /* an lkb may be waiting for an rsb lookup to complete where the
1881 lookup was initiated by another lock */
1883 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1884 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1885 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1886 list_del_init(&lkb->lkb_rsb_lookup);
1887 queue_cast(lkb->lkb_resource, lkb,
1888 args->flags & DLM_LKF_CANCEL ?
1889 -DLM_ECANCEL : -DLM_EUNLOCK);
1890 unhold_lkb(lkb); /* undoes create_lkb() */
1896 /* cancel not allowed with another cancel/unlock in progress */
1898 if (args->flags & DLM_LKF_CANCEL) {
1899 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1902 if (is_overlap(lkb))
1905 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1906 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1911 switch (lkb->lkb_wait_type) {
1912 case DLM_MSG_LOOKUP:
1913 case DLM_MSG_REQUEST:
1914 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1917 case DLM_MSG_UNLOCK:
1918 case DLM_MSG_CANCEL:
1921 /* add_to_waiters() will set OVERLAP_CANCEL */
1925 /* do we need to allow a force-unlock if there's a normal unlock
1926 already in progress? in what conditions could the normal unlock
1927 fail such that we'd want to send a force-unlock to be sure? */
1929 if (args->flags & DLM_LKF_FORCEUNLOCK) {
1930 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1933 if (is_overlap_unlock(lkb))
1936 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1937 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1942 switch (lkb->lkb_wait_type) {
1943 case DLM_MSG_LOOKUP:
1944 case DLM_MSG_REQUEST:
1945 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1948 case DLM_MSG_UNLOCK:
1951 /* add_to_waiters() will set OVERLAP_UNLOCK */
1955 /* normal unlock not allowed if there's any op in progress */
1957 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1961 /* an overlapping op shouldn't blow away exflags from other op */
1962 lkb->lkb_exflags |= args->flags;
1963 lkb->lkb_sbflags = 0;
1964 lkb->lkb_astparam = args->astparam;
1968 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1969 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1970 args->flags, lkb->lkb_wait_type,
1971 lkb->lkb_resource->res_name);
1976 * Four stage 4 varieties:
1977 * do_request(), do_convert(), do_unlock(), do_cancel()
1978 * These are called on the master node for the given lock and
1979 * from the central locking logic.
1982 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1986 if (can_be_granted(r, lkb, 1)) {
1988 queue_cast(r, lkb, 0);
1992 if (can_be_queued(lkb)) {
1993 error = -EINPROGRESS;
1994 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1995 send_blocking_asts(r, lkb);
2000 if (force_blocking_asts(lkb))
2001 send_blocking_asts_all(r, lkb);
2002 queue_cast(r, lkb, -EAGAIN);
2008 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2012 /* changing an existing lock may allow others to be granted */
2014 if (can_be_granted(r, lkb, 1)) {
2016 queue_cast(r, lkb, 0);
2017 grant_pending_locks(r);
2021 /* is_demoted() means the can_be_granted() above set the grmode
2022 to NL, and left us on the granted queue. This auto-demotion
2023 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2024 now grantable. We have to try to grant other converting locks
2025 before we try again to grant this one. */
2027 if (is_demoted(lkb)) {
2028 grant_pending_convert(r, DLM_LOCK_IV);
2029 if (_can_be_granted(r, lkb, 1)) {
2031 queue_cast(r, lkb, 0);
2032 grant_pending_locks(r);
2035 /* else fall through and move to convert queue */
2038 if (can_be_queued(lkb)) {
2039 error = -EINPROGRESS;
2041 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2042 send_blocking_asts(r, lkb);
2047 if (force_blocking_asts(lkb))
2048 send_blocking_asts_all(r, lkb);
2049 queue_cast(r, lkb, -EAGAIN);
2055 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2057 remove_lock(r, lkb);
2058 queue_cast(r, lkb, -DLM_EUNLOCK);
2059 grant_pending_locks(r);
2060 return -DLM_EUNLOCK;
2063 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2065 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2069 error = revert_lock(r, lkb);
2071 queue_cast(r, lkb, -DLM_ECANCEL);
2072 grant_pending_locks(r);
2073 return -DLM_ECANCEL;
2079 * Four stage 3 varieties:
2080 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2083 /* add a new lkb to a possibly new rsb, called by requesting process */
2085 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2089 /* set_master: sets lkb nodeid from r */
2091 error = set_master(r, lkb);
2100 /* receive_request() calls do_request() on remote node */
2101 error = send_request(r, lkb);
2103 error = do_request(r, lkb);
2108 /* change some property of an existing lkb, e.g. mode */
2110 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2115 /* receive_convert() calls do_convert() on remote node */
2116 error = send_convert(r, lkb);
2118 error = do_convert(r, lkb);
2123 /* remove an existing lkb from the granted queue */
2125 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2130 /* receive_unlock() calls do_unlock() on remote node */
2131 error = send_unlock(r, lkb);
2133 error = do_unlock(r, lkb);
2138 /* remove an existing lkb from the convert or wait queue */
2140 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2145 /* receive_cancel() calls do_cancel() on remote node */
2146 error = send_cancel(r, lkb);
2148 error = do_cancel(r, lkb);
2154 * Four stage 2 varieties:
2155 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2158 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2159 int len, struct dlm_args *args)
2164 error = validate_lock_args(ls, lkb, args);
2168 error = find_rsb(ls, name, len, R_CREATE, &r);
2175 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2177 error = _request_lock(r, lkb);
2186 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2187 struct dlm_args *args)
2192 r = lkb->lkb_resource;
2197 error = validate_lock_args(ls, lkb, args);
2201 error = _convert_lock(r, lkb);
2208 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2209 struct dlm_args *args)
2214 r = lkb->lkb_resource;
2219 error = validate_unlock_args(lkb, args);
2223 error = _unlock_lock(r, lkb);
2230 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2231 struct dlm_args *args)
2236 r = lkb->lkb_resource;
2241 error = validate_unlock_args(lkb, args);
2245 error = _cancel_lock(r, lkb);
2253 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2256 int dlm_lock(dlm_lockspace_t *lockspace,
2258 struct dlm_lksb *lksb,
2261 unsigned int namelen,
2262 uint32_t parent_lkid,
2263 void (*ast) (void *astarg),
2265 void (*bast) (void *astarg, int mode))
2268 struct dlm_lkb *lkb;
2269 struct dlm_args args;
2270 int error, convert = flags & DLM_LKF_CONVERT;
2272 ls = dlm_find_lockspace_local(lockspace);
2276 dlm_lock_recovery(ls);
2279 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2281 error = create_lkb(ls, &lkb);
2286 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
2287 astarg, bast, &args);
2292 error = convert_lock(ls, lkb, &args);
2294 error = request_lock(ls, lkb, name, namelen, &args);
2296 if (error == -EINPROGRESS)
2299 if (convert || error)
2301 if (error == -EAGAIN)
2304 dlm_unlock_recovery(ls);
2305 dlm_put_lockspace(ls);
2309 int dlm_unlock(dlm_lockspace_t *lockspace,
2312 struct dlm_lksb *lksb,
2316 struct dlm_lkb *lkb;
2317 struct dlm_args args;
2320 ls = dlm_find_lockspace_local(lockspace);
2324 dlm_lock_recovery(ls);
2326 error = find_lkb(ls, lkid, &lkb);
2330 error = set_unlock_args(flags, astarg, &args);
2334 if (flags & DLM_LKF_CANCEL)
2335 error = cancel_lock(ls, lkb, &args);
2337 error = unlock_lock(ls, lkb, &args);
2339 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2341 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2346 dlm_unlock_recovery(ls);
2347 dlm_put_lockspace(ls);
2352 * send/receive routines for remote operations and replies
2356 * send_request receive_request
2357 * send_convert receive_convert
2358 * send_unlock receive_unlock
2359 * send_cancel receive_cancel
2360 * send_grant receive_grant
2361 * send_bast receive_bast
2362 * send_lookup receive_lookup
2363 * send_remove receive_remove
2366 * receive_request_reply send_request_reply
2367 * receive_convert_reply send_convert_reply
2368 * receive_unlock_reply send_unlock_reply
2369 * receive_cancel_reply send_cancel_reply
2370 * receive_lookup_reply send_lookup_reply
2373 static int _create_message(struct dlm_ls *ls, int mb_len,
2374 int to_nodeid, int mstype,
2375 struct dlm_message **ms_ret,
2376 struct dlm_mhandle **mh_ret)
2378 struct dlm_message *ms;
2379 struct dlm_mhandle *mh;
2382 /* get_buffer gives us a message handle (mh) that we need to
2383 pass into lowcomms_commit and a message buffer (mb) that we
2384 write our data into */
2386 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2390 memset(mb, 0, mb_len);
2392 ms = (struct dlm_message *) mb;
2394 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2395 ms->m_header.h_lockspace = ls->ls_global_id;
2396 ms->m_header.h_nodeid = dlm_our_nodeid();
2397 ms->m_header.h_length = mb_len;
2398 ms->m_header.h_cmd = DLM_MSG;
2400 ms->m_type = mstype;
2407 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2408 int to_nodeid, int mstype,
2409 struct dlm_message **ms_ret,
2410 struct dlm_mhandle **mh_ret)
2412 int mb_len = sizeof(struct dlm_message);
2415 case DLM_MSG_REQUEST:
2416 case DLM_MSG_LOOKUP:
2417 case DLM_MSG_REMOVE:
2418 mb_len += r->res_length;
2420 case DLM_MSG_CONVERT:
2421 case DLM_MSG_UNLOCK:
2422 case DLM_MSG_REQUEST_REPLY:
2423 case DLM_MSG_CONVERT_REPLY:
2425 if (lkb && lkb->lkb_lvbptr)
2426 mb_len += r->res_ls->ls_lvblen;
2430 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2434 /* further lowcomms enhancements or alternate implementations may make
2435 the return value from this function useful at some point */
2437 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2439 dlm_message_out(ms);
2440 dlm_lowcomms_commit_buffer(mh);
2444 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2445 struct dlm_message *ms)
2447 ms->m_nodeid = lkb->lkb_nodeid;
2448 ms->m_pid = lkb->lkb_ownpid;
2449 ms->m_lkid = lkb->lkb_id;
2450 ms->m_remid = lkb->lkb_remid;
2451 ms->m_exflags = lkb->lkb_exflags;
2452 ms->m_sbflags = lkb->lkb_sbflags;
2453 ms->m_flags = lkb->lkb_flags;
2454 ms->m_lvbseq = lkb->lkb_lvbseq;
2455 ms->m_status = lkb->lkb_status;
2456 ms->m_grmode = lkb->lkb_grmode;
2457 ms->m_rqmode = lkb->lkb_rqmode;
2458 ms->m_hash = r->res_hash;
2460 /* m_result and m_bastmode are set from function args,
2461 not from lkb fields */
2463 if (lkb->lkb_bastaddr)
2464 ms->m_asts |= AST_BAST;
2465 if (lkb->lkb_astaddr)
2466 ms->m_asts |= AST_COMP;
2468 /* compare with switch in create_message; send_remove() doesn't
2471 switch (ms->m_type) {
2472 case DLM_MSG_REQUEST:
2473 case DLM_MSG_LOOKUP:
2474 memcpy(ms->m_extra, r->res_name, r->res_length);
2476 case DLM_MSG_CONVERT:
2477 case DLM_MSG_UNLOCK:
2478 case DLM_MSG_REQUEST_REPLY:
2479 case DLM_MSG_CONVERT_REPLY:
2481 if (!lkb->lkb_lvbptr)
2483 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2488 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2490 struct dlm_message *ms;
2491 struct dlm_mhandle *mh;
2492 int to_nodeid, error;
2494 error = add_to_waiters(lkb, mstype);
2498 to_nodeid = r->res_nodeid;
2500 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2504 send_args(r, lkb, ms);
2506 error = send_message(mh, ms);
2512 remove_from_waiters(lkb, msg_reply_type(mstype));
2516 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2518 return send_common(r, lkb, DLM_MSG_REQUEST);
2521 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2525 error = send_common(r, lkb, DLM_MSG_CONVERT);
2527 /* down conversions go without a reply from the master */
2528 if (!error && down_conversion(lkb)) {
2529 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2530 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2531 r->res_ls->ls_stub_ms.m_result = 0;
2532 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2533 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2539 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2540 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2541 that the master is still correct. */
2543 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2545 return send_common(r, lkb, DLM_MSG_UNLOCK);
2548 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2550 return send_common(r, lkb, DLM_MSG_CANCEL);
2553 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2555 struct dlm_message *ms;
2556 struct dlm_mhandle *mh;
2557 int to_nodeid, error;
2559 to_nodeid = lkb->lkb_nodeid;
2561 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2565 send_args(r, lkb, ms);
2569 error = send_message(mh, ms);
2574 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2576 struct dlm_message *ms;
2577 struct dlm_mhandle *mh;
2578 int to_nodeid, error;
2580 to_nodeid = lkb->lkb_nodeid;
2582 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2586 send_args(r, lkb, ms);
2588 ms->m_bastmode = mode;
2590 error = send_message(mh, ms);
2595 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2597 struct dlm_message *ms;
2598 struct dlm_mhandle *mh;
2599 int to_nodeid, error;
2601 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2605 to_nodeid = dlm_dir_nodeid(r);
2607 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2611 send_args(r, lkb, ms);
2613 error = send_message(mh, ms);
2619 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2623 static int send_remove(struct dlm_rsb *r)
2625 struct dlm_message *ms;
2626 struct dlm_mhandle *mh;
2627 int to_nodeid, error;
2629 to_nodeid = dlm_dir_nodeid(r);
2631 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2635 memcpy(ms->m_extra, r->res_name, r->res_length);
2636 ms->m_hash = r->res_hash;
2638 error = send_message(mh, ms);
2643 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2646 struct dlm_message *ms;
2647 struct dlm_mhandle *mh;
2648 int to_nodeid, error;
2650 to_nodeid = lkb->lkb_nodeid;
2652 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2656 send_args(r, lkb, ms);
2660 error = send_message(mh, ms);
2665 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2667 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2670 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2672 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2675 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2677 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2680 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2682 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2685 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2686 int ret_nodeid, int rv)
2688 struct dlm_rsb *r = &ls->ls_stub_rsb;
2689 struct dlm_message *ms;
2690 struct dlm_mhandle *mh;
2691 int error, nodeid = ms_in->m_header.h_nodeid;
2693 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2697 ms->m_lkid = ms_in->m_lkid;
2699 ms->m_nodeid = ret_nodeid;
2701 error = send_message(mh, ms);
2706 /* which args we save from a received message depends heavily on the type
2707 of message, unlike the send side where we can safely send everything about
2708 the lkb for any type of message */
2710 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2712 lkb->lkb_exflags = ms->m_exflags;
2713 lkb->lkb_sbflags = ms->m_sbflags;
2714 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2715 (ms->m_flags & 0x0000FFFF);
2718 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2720 lkb->lkb_sbflags = ms->m_sbflags;
2721 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2722 (ms->m_flags & 0x0000FFFF);
2725 static int receive_extralen(struct dlm_message *ms)
2727 return (ms->m_header.h_length - sizeof(struct dlm_message));
2730 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2731 struct dlm_message *ms)
2735 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2736 if (!lkb->lkb_lvbptr)
2737 lkb->lkb_lvbptr = allocate_lvb(ls);
2738 if (!lkb->lkb_lvbptr)
2740 len = receive_extralen(ms);
2741 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2746 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2747 struct dlm_message *ms)
2749 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2750 lkb->lkb_ownpid = ms->m_pid;
2751 lkb->lkb_remid = ms->m_lkid;
2752 lkb->lkb_grmode = DLM_LOCK_IV;
2753 lkb->lkb_rqmode = ms->m_rqmode;
2754 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2755 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2757 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2759 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2760 /* lkb was just created so there won't be an lvb yet */
2761 lkb->lkb_lvbptr = allocate_lvb(ls);
2762 if (!lkb->lkb_lvbptr)
2769 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2770 struct dlm_message *ms)
2772 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2773 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2774 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2775 lkb->lkb_id, lkb->lkb_remid);
2779 if (!is_master_copy(lkb))
2782 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2785 if (receive_lvb(ls, lkb, ms))
2788 lkb->lkb_rqmode = ms->m_rqmode;
2789 lkb->lkb_lvbseq = ms->m_lvbseq;
2794 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2795 struct dlm_message *ms)
2797 if (!is_master_copy(lkb))
2799 if (receive_lvb(ls, lkb, ms))
2804 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2805 uses to send a reply and that the remote end uses to process the reply. */
2807 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2809 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2810 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2811 lkb->lkb_remid = ms->m_lkid;
2814 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2816 struct dlm_lkb *lkb;
2820 error = create_lkb(ls, &lkb);
2824 receive_flags(lkb, ms);
2825 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2826 error = receive_request_args(ls, lkb, ms);
2832 namelen = receive_extralen(ms);
2834 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2843 error = do_request(r, lkb);
2844 send_request_reply(r, lkb, error);
2849 if (error == -EINPROGRESS)
2856 setup_stub_lkb(ls, ms);
2857 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2860 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2862 struct dlm_lkb *lkb;
2864 int error, reply = 1;
2866 error = find_lkb(ls, ms->m_remid, &lkb);
2870 r = lkb->lkb_resource;
2875 receive_flags(lkb, ms);
2876 error = receive_convert_args(ls, lkb, ms);
2879 reply = !down_conversion(lkb);
2881 error = do_convert(r, lkb);
2884 send_convert_reply(r, lkb, error);
2892 setup_stub_lkb(ls, ms);
2893 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2896 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2898 struct dlm_lkb *lkb;
2902 error = find_lkb(ls, ms->m_remid, &lkb);
2906 r = lkb->lkb_resource;
2911 receive_flags(lkb, ms);
2912 error = receive_unlock_args(ls, lkb, ms);
2916 error = do_unlock(r, lkb);
2918 send_unlock_reply(r, lkb, error);
2926 setup_stub_lkb(ls, ms);
2927 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2930 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2932 struct dlm_lkb *lkb;
2936 error = find_lkb(ls, ms->m_remid, &lkb);
2940 receive_flags(lkb, ms);
2942 r = lkb->lkb_resource;
2947 error = do_cancel(r, lkb);
2948 send_cancel_reply(r, lkb, error);
2956 setup_stub_lkb(ls, ms);
2957 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2960 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2962 struct dlm_lkb *lkb;
2966 error = find_lkb(ls, ms->m_remid, &lkb);
2968 log_error(ls, "receive_grant no lkb");
2971 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2973 r = lkb->lkb_resource;
2978 receive_flags_reply(lkb, ms);
2979 if (is_altmode(lkb))
2980 munge_altmode(lkb, ms);
2981 grant_lock_pc(r, lkb, ms);
2982 queue_cast(r, lkb, 0);
2989 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2991 struct dlm_lkb *lkb;
2995 error = find_lkb(ls, ms->m_remid, &lkb);
2997 log_error(ls, "receive_bast no lkb");
3000 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3002 r = lkb->lkb_resource;
3007 queue_bast(r, lkb, ms->m_bastmode);
3014 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3016 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3018 from_nodeid = ms->m_header.h_nodeid;
3019 our_nodeid = dlm_our_nodeid();
3021 len = receive_extralen(ms);
3023 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3024 if (dir_nodeid != our_nodeid) {
3025 log_error(ls, "lookup dir_nodeid %d from %d",
3026 dir_nodeid, from_nodeid);
3032 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3034 /* Optimization: we're master so treat lookup as a request */
3035 if (!error && ret_nodeid == our_nodeid) {
3036 receive_request(ls, ms);
3040 send_lookup_reply(ls, ms, ret_nodeid, error);
3043 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3045 int len, dir_nodeid, from_nodeid;
3047 from_nodeid = ms->m_header.h_nodeid;
3049 len = receive_extralen(ms);
3051 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3052 if (dir_nodeid != dlm_our_nodeid()) {
3053 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3054 dir_nodeid, from_nodeid);
3058 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3061 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3063 do_purge(ls, ms->m_nodeid, ms->m_pid);
3066 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3068 struct dlm_lkb *lkb;
3070 int error, mstype, result;
3072 error = find_lkb(ls, ms->m_remid, &lkb);
3074 log_error(ls, "receive_request_reply no lkb");
3077 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3079 r = lkb->lkb_resource;
3083 mstype = lkb->lkb_wait_type;
3084 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3088 /* Optimization: the dir node was also the master, so it took our
3089 lookup as a request and sent request reply instead of lookup reply */
3090 if (mstype == DLM_MSG_LOOKUP) {
3091 r->res_nodeid = ms->m_header.h_nodeid;
3092 lkb->lkb_nodeid = r->res_nodeid;
3095 /* this is the value returned from do_request() on the master */
3096 result = ms->m_result;
3100 /* request would block (be queued) on remote master */
3101 queue_cast(r, lkb, -EAGAIN);
3102 confirm_master(r, -EAGAIN);
3103 unhold_lkb(lkb); /* undoes create_lkb() */
3108 /* request was queued or granted on remote master */
3109 receive_flags_reply(lkb, ms);
3110 lkb->lkb_remid = ms->m_lkid;
3111 if (is_altmode(lkb))
3112 munge_altmode(lkb, ms);
3114 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3116 grant_lock_pc(r, lkb, ms);
3117 queue_cast(r, lkb, 0);
3119 confirm_master(r, result);
3124 /* find_rsb failed to find rsb or rsb wasn't master */
3125 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3126 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3128 lkb->lkb_nodeid = -1;
3130 if (is_overlap(lkb)) {
3131 /* we'll ignore error in cancel/unlock reply */
3132 queue_cast_overlap(r, lkb);
3133 unhold_lkb(lkb); /* undoes create_lkb() */
3135 _request_lock(r, lkb);
3139 log_error(ls, "receive_request_reply %x error %d",
3140 lkb->lkb_id, result);
3143 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3144 log_debug(ls, "receive_request_reply %x result %d unlock",
3145 lkb->lkb_id, result);
3146 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3147 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3148 send_unlock(r, lkb);
3149 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3150 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3151 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3152 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3153 send_cancel(r, lkb);
3155 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3156 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3164 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3165 struct dlm_message *ms)
3167 /* this is the value returned from do_convert() on the master */
3168 switch (ms->m_result) {
3170 /* convert would block (be queued) on remote master */
3171 queue_cast(r, lkb, -EAGAIN);
3175 /* convert was queued on remote master */
3176 receive_flags_reply(lkb, ms);
3177 if (is_demoted(lkb))
3178 munge_demoted(lkb, ms);
3180 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3184 /* convert was granted on remote master */
3185 receive_flags_reply(lkb, ms);
3186 if (is_demoted(lkb))
3187 munge_demoted(lkb, ms);
3188 grant_lock_pc(r, lkb, ms);
3189 queue_cast(r, lkb, 0);
3193 log_error(r->res_ls, "receive_convert_reply %x error %d",
3194 lkb->lkb_id, ms->m_result);
3198 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3200 struct dlm_rsb *r = lkb->lkb_resource;
3206 /* stub reply can happen with waiters_mutex held */
3207 error = remove_from_waiters_ms(lkb, ms);
3211 __receive_convert_reply(r, lkb, ms);
3217 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3219 struct dlm_lkb *lkb;
3222 error = find_lkb(ls, ms->m_remid, &lkb);
3224 log_error(ls, "receive_convert_reply no lkb");
3227 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3229 _receive_convert_reply(lkb, ms);
3233 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3235 struct dlm_rsb *r = lkb->lkb_resource;
3241 /* stub reply can happen with waiters_mutex held */
3242 error = remove_from_waiters_ms(lkb, ms);
3246 /* this is the value returned from do_unlock() on the master */
3248 switch (ms->m_result) {
3250 receive_flags_reply(lkb, ms);
3251 remove_lock_pc(r, lkb);
3252 queue_cast(r, lkb, -DLM_EUNLOCK);
3257 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3258 lkb->lkb_id, ms->m_result);
3265 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3267 struct dlm_lkb *lkb;
3270 error = find_lkb(ls, ms->m_remid, &lkb);
3272 log_error(ls, "receive_unlock_reply no lkb");
3275 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3277 _receive_unlock_reply(lkb, ms);
3281 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3283 struct dlm_rsb *r = lkb->lkb_resource;
3289 /* stub reply can happen with waiters_mutex held */
3290 error = remove_from_waiters_ms(lkb, ms);
3294 /* this is the value returned from do_cancel() on the master */
3296 switch (ms->m_result) {
3298 receive_flags_reply(lkb, ms);
3299 revert_lock_pc(r, lkb);
3301 queue_cast(r, lkb, -DLM_ECANCEL);
3306 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3307 lkb->lkb_id, ms->m_result);
3314 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3316 struct dlm_lkb *lkb;
3319 error = find_lkb(ls, ms->m_remid, &lkb);
3321 log_error(ls, "receive_cancel_reply no lkb");
3324 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3326 _receive_cancel_reply(lkb, ms);
3330 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3332 struct dlm_lkb *lkb;
3334 int error, ret_nodeid;
3336 error = find_lkb(ls, ms->m_lkid, &lkb);
3338 log_error(ls, "receive_lookup_reply no lkb");
3342 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3343 FIXME: will a non-zero error ever be returned? */
3345 r = lkb->lkb_resource;
3349 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3353 ret_nodeid = ms->m_nodeid;
3354 if (ret_nodeid == dlm_our_nodeid()) {
3357 r->res_first_lkid = 0;
3359 /* set_master() will copy res_nodeid to lkb_nodeid */
3360 r->res_nodeid = ret_nodeid;
3363 if (is_overlap(lkb)) {
3364 log_debug(ls, "receive_lookup_reply %x unlock %x",
3365 lkb->lkb_id, lkb->lkb_flags);
3366 queue_cast_overlap(r, lkb);
3367 unhold_lkb(lkb); /* undoes create_lkb() */
3371 _request_lock(r, lkb);
3375 process_lookup_list(r);
3382 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3384 struct dlm_message *ms = (struct dlm_message *) hd;
3391 ls = dlm_find_lockspace_global(hd->h_lockspace);
3393 log_print("drop message %d from %d for unknown lockspace %d",
3394 ms->m_type, nodeid, hd->h_lockspace);
3398 /* recovery may have just ended leaving a bunch of backed-up requests
3399 in the requestqueue; wait while dlm_recoverd clears them */
3402 dlm_wait_requestqueue(ls);
3404 /* recovery may have just started while there were a bunch of
3405 in-flight requests -- save them in requestqueue to be processed
3406 after recovery. we can't let dlm_recvd block on the recovery
3407 lock. if dlm_recoverd is calling this function to clear the
3408 requestqueue, it needs to be interrupted (-EINTR) if another
3409 recovery operation is starting. */
3412 if (dlm_locking_stopped(ls)) {
3417 error = dlm_add_requestqueue(ls, nodeid, hd);
3418 if (error == -EAGAIN)
3426 if (dlm_lock_recovery_try(ls))
3431 switch (ms->m_type) {
3433 /* messages sent to a master node */
3435 case DLM_MSG_REQUEST:
3436 receive_request(ls, ms);
3439 case DLM_MSG_CONVERT:
3440 receive_convert(ls, ms);
3443 case DLM_MSG_UNLOCK:
3444 receive_unlock(ls, ms);
3447 case DLM_MSG_CANCEL:
3448 receive_cancel(ls, ms);
3451 /* messages sent from a master node (replies to above) */
3453 case DLM_MSG_REQUEST_REPLY:
3454 receive_request_reply(ls, ms);
3457 case DLM_MSG_CONVERT_REPLY:
3458 receive_convert_reply(ls, ms);
3461 case DLM_MSG_UNLOCK_REPLY:
3462 receive_unlock_reply(ls, ms);
3465 case DLM_MSG_CANCEL_REPLY:
3466 receive_cancel_reply(ls, ms);
3469 /* messages sent from a master node (only two types of async msg) */
3472 receive_grant(ls, ms);
3476 receive_bast(ls, ms);
3479 /* messages sent to a dir node */
3481 case DLM_MSG_LOOKUP:
3482 receive_lookup(ls, ms);
3485 case DLM_MSG_REMOVE:
3486 receive_remove(ls, ms);
3489 /* messages sent from a dir node (remove has no reply) */
3491 case DLM_MSG_LOOKUP_REPLY:
3492 receive_lookup_reply(ls, ms);
3495 /* other messages */
3498 receive_purge(ls, ms);
3502 log_error(ls, "unknown message type %d", ms->m_type);
3505 dlm_unlock_recovery(ls);
3507 dlm_put_lockspace(ls);
3517 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3519 if (middle_conversion(lkb)) {
3521 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3522 ls->ls_stub_ms.m_result = -EINPROGRESS;
3523 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3524 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3526 /* Same special case as in receive_rcom_lock_args() */
3527 lkb->lkb_grmode = DLM_LOCK_IV;
3528 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3531 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3532 lkb->lkb_flags |= DLM_IFL_RESEND;
3535 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3536 conversions are async; there's no reply from the remote master */
3539 /* A waiting lkb needs recovery if the master node has failed, or
3540 the master node is changing (only when no directory is used) */
3542 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3544 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3547 if (!dlm_no_directory(ls))
3550 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3556 /* Recovery for locks that are waiting for replies from nodes that are now
3557 gone. We can just complete unlocks and cancels by faking a reply from the
3558 dead node. Requests and up-conversions we flag to be resent after
3559 recovery. Down-conversions can just be completed with a fake reply like
3560 unlocks. Conversions between PR and CW need special attention. */
3562 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3564 struct dlm_lkb *lkb, *safe;
3566 mutex_lock(&ls->ls_waiters_mutex);
3568 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3569 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3570 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3572 /* all outstanding lookups, regardless of destination will be
3573 resent after recovery is done */
3575 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3576 lkb->lkb_flags |= DLM_IFL_RESEND;
3580 if (!waiter_needs_recovery(ls, lkb))
3583 switch (lkb->lkb_wait_type) {
3585 case DLM_MSG_REQUEST:
3586 lkb->lkb_flags |= DLM_IFL_RESEND;
3589 case DLM_MSG_CONVERT:
3590 recover_convert_waiter(ls, lkb);
3593 case DLM_MSG_UNLOCK:
3595 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3596 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3597 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3598 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3602 case DLM_MSG_CANCEL:
3604 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3605 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3606 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3607 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3612 log_error(ls, "invalid lkb wait_type %d",
3613 lkb->lkb_wait_type);
3617 mutex_unlock(&ls->ls_waiters_mutex);
3620 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3622 struct dlm_lkb *lkb;
3625 mutex_lock(&ls->ls_waiters_mutex);
3626 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3627 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3633 mutex_unlock(&ls->ls_waiters_mutex);
3640 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3641 master or dir-node for r. Processing the lkb may result in it being placed
3644 /* We do this after normal locking has been enabled and any saved messages
3645 (in requestqueue) have been processed. We should be confident that at
3646 this point we won't get or process a reply to any of these waiting
3647 operations. But, new ops may be coming in on the rsbs/locks here from
3648 userspace or remotely. */
3650 /* there may have been an overlap unlock/cancel prior to recovery or after
3651 recovery. if before, the lkb may still have a pos wait_count; if after, the
3652 overlap flag would just have been set and nothing new sent. we can be
3653 confident here than any replies to either the initial op or overlap ops
3654 prior to recovery have been received. */
3656 int dlm_recover_waiters_post(struct dlm_ls *ls)
3658 struct dlm_lkb *lkb;
3660 int error = 0, mstype, err, oc, ou;
3663 if (dlm_locking_stopped(ls)) {
3664 log_debug(ls, "recover_waiters_post aborted");
3669 lkb = find_resend_waiter(ls);
3673 r = lkb->lkb_resource;
3677 mstype = lkb->lkb_wait_type;
3678 oc = is_overlap_cancel(lkb);
3679 ou = is_overlap_unlock(lkb);
3682 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3683 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3685 /* At this point we assume that we won't get a reply to any
3686 previous op or overlap op on this lock. First, do a big
3687 remove_from_waiters() for all previous ops. */
3689 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3690 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3691 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3692 lkb->lkb_wait_type = 0;
3693 lkb->lkb_wait_count = 0;
3694 mutex_lock(&ls->ls_waiters_mutex);
3695 list_del_init(&lkb->lkb_wait_reply);
3696 mutex_unlock(&ls->ls_waiters_mutex);
3697 unhold_lkb(lkb); /* for waiters list */
3700 /* do an unlock or cancel instead of resending */
3702 case DLM_MSG_LOOKUP:
3703 case DLM_MSG_REQUEST:
3704 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3706 unhold_lkb(lkb); /* undoes create_lkb() */
3708 case DLM_MSG_CONVERT:
3710 queue_cast(r, lkb, -DLM_ECANCEL);
3712 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3713 _unlock_lock(r, lkb);
3721 case DLM_MSG_LOOKUP:
3722 case DLM_MSG_REQUEST:
3723 _request_lock(r, lkb);
3725 confirm_master(r, 0);
3727 case DLM_MSG_CONVERT:
3728 _convert_lock(r, lkb);
3736 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3737 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3746 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3747 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3749 struct dlm_ls *ls = r->res_ls;
3750 struct dlm_lkb *lkb, *safe;
3752 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3753 if (test(ls, lkb)) {
3754 rsb_set_flag(r, RSB_LOCKS_PURGED);
3756 /* this put should free the lkb */
3757 if (!dlm_put_lkb(lkb))
3758 log_error(ls, "purged lkb not released");
3763 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3765 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3768 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3770 return is_master_copy(lkb);
3773 static void purge_dead_locks(struct dlm_rsb *r)
3775 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3776 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3777 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3780 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3782 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3783 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3784 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3787 /* Get rid of locks held by nodes that are gone. */
3789 int dlm_purge_locks(struct dlm_ls *ls)
3793 log_debug(ls, "dlm_purge_locks");
3795 down_write(&ls->ls_root_sem);
3796 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3800 purge_dead_locks(r);
3806 up_write(&ls->ls_root_sem);
3811 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3813 struct dlm_rsb *r, *r_ret = NULL;
3815 read_lock(&ls->ls_rsbtbl[bucket].lock);
3816 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3817 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3820 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3824 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3828 void dlm_grant_after_purge(struct dlm_ls *ls)
3834 r = find_purged_rsb(ls, bucket);
3836 if (bucket == ls->ls_rsbtbl_size - 1)
3843 grant_pending_locks(r);
3844 confirm_master(r, 0);
3852 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3855 struct dlm_lkb *lkb;
3857 list_for_each_entry(lkb, head, lkb_statequeue) {
3858 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3864 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3867 struct dlm_lkb *lkb;
3869 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3872 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3875 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3881 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3882 struct dlm_rsb *r, struct dlm_rcom *rc)
3884 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3887 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3888 lkb->lkb_ownpid = rl->rl_ownpid;
3889 lkb->lkb_remid = rl->rl_lkid;
3890 lkb->lkb_exflags = rl->rl_exflags;
3891 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3892 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3893 lkb->lkb_lvbseq = rl->rl_lvbseq;
3894 lkb->lkb_rqmode = rl->rl_rqmode;
3895 lkb->lkb_grmode = rl->rl_grmode;
3896 /* don't set lkb_status because add_lkb wants to itself */
3898 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3899 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3901 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3902 lkb->lkb_lvbptr = allocate_lvb(ls);
3903 if (!lkb->lkb_lvbptr)
3905 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3906 sizeof(struct rcom_lock);
3907 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3910 /* Conversions between PR and CW (middle modes) need special handling.
3911 The real granted mode of these converting locks cannot be determined
3912 until all locks have been rebuilt on the rsb (recover_conversion) */
3914 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3915 rl->rl_status = DLM_LKSTS_CONVERT;
3916 lkb->lkb_grmode = DLM_LOCK_IV;
3917 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3923 /* This lkb may have been recovered in a previous aborted recovery so we need
3924 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3925 If so we just send back a standard reply. If not, we create a new lkb with
3926 the given values and send back our lkid. We send back our lkid by sending
3927 back the rcom_lock struct we got but with the remid field filled in. */
3929 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3931 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3933 struct dlm_lkb *lkb;
3936 if (rl->rl_parent_lkid) {
3937 error = -EOPNOTSUPP;
3941 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3947 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3953 error = create_lkb(ls, &lkb);
3957 error = receive_rcom_lock_args(ls, lkb, r, rc);
3964 add_lkb(r, lkb, rl->rl_status);
3968 /* this is the new value returned to the lock holder for
3969 saving in its process-copy lkb */
3970 rl->rl_remid = lkb->lkb_id;
3977 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3978 rl->rl_result = error;
3982 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3984 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3986 struct dlm_lkb *lkb;
3989 error = find_lkb(ls, rl->rl_lkid, &lkb);
3991 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3995 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3997 error = rl->rl_result;
3999 r = lkb->lkb_resource;
4005 /* There's a chance the new master received our lock before
4006 dlm_recover_master_reply(), this wouldn't happen if we did
4007 a barrier between recover_masters and recover_locks. */
4008 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4009 (unsigned long)r, r->res_name);
4010 dlm_send_rcom_lock(r, lkb);
4013 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4016 lkb->lkb_remid = rl->rl_remid;
4019 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4020 error, lkb->lkb_id);
4023 /* an ack for dlm_recover_locks() which waits for replies from
4024 all the locks it sends to new masters */
4025 dlm_recovered_lock(r);
4034 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4035 int mode, uint32_t flags, void *name, unsigned int namelen,
4036 uint32_t parent_lkid)
4038 struct dlm_lkb *lkb;
4039 struct dlm_args args;
4042 dlm_lock_recovery(ls);
4044 error = create_lkb(ls, &lkb);
4050 if (flags & DLM_LKF_VALBLK) {
4051 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4052 if (!ua->lksb.sb_lvbptr) {
4060 /* After ua is attached to lkb it will be freed by free_lkb().
4061 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4062 lock and that lkb_astparam is the dlm_user_args structure. */
4064 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
4065 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4066 lkb->lkb_flags |= DLM_IFL_USER;
4067 ua->old_mode = DLM_LOCK_IV;
4074 error = request_lock(ls, lkb, name, namelen, &args);
4090 /* add this new lkb to the per-process list of locks */
4091 spin_lock(&ua->proc->locks_spin);
4093 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4094 spin_unlock(&ua->proc->locks_spin);
4096 dlm_unlock_recovery(ls);
4100 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4101 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4103 struct dlm_lkb *lkb;
4104 struct dlm_args args;
4105 struct dlm_user_args *ua;
4108 dlm_lock_recovery(ls);
4110 error = find_lkb(ls, lkid, &lkb);
4114 /* user can change the params on its lock when it converts it, or
4115 add an lvb that didn't exist before */
4117 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4119 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4120 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4121 if (!ua->lksb.sb_lvbptr) {
4126 if (lvb_in && ua->lksb.sb_lvbptr)
4127 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4129 ua->castparam = ua_tmp->castparam;
4130 ua->castaddr = ua_tmp->castaddr;
4131 ua->bastparam = ua_tmp->bastparam;
4132 ua->bastaddr = ua_tmp->bastaddr;
4133 ua->user_lksb = ua_tmp->user_lksb;
4134 ua->old_mode = lkb->lkb_grmode;
4136 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4137 ua, DLM_FAKE_USER_AST, &args);
4141 error = convert_lock(ls, lkb, &args);
4143 if (error == -EINPROGRESS || error == -EAGAIN)
4148 dlm_unlock_recovery(ls);
4153 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4154 uint32_t flags, uint32_t lkid, char *lvb_in)
4156 struct dlm_lkb *lkb;
4157 struct dlm_args args;
4158 struct dlm_user_args *ua;
4161 dlm_lock_recovery(ls);
4163 error = find_lkb(ls, lkid, &lkb);
4167 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4169 if (lvb_in && ua->lksb.sb_lvbptr)
4170 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4171 ua->castparam = ua_tmp->castparam;
4172 ua->user_lksb = ua_tmp->user_lksb;
4174 error = set_unlock_args(flags, ua, &args);
4178 error = unlock_lock(ls, lkb, &args);
4180 if (error == -DLM_EUNLOCK)
4182 /* from validate_unlock_args() */
4183 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4188 spin_lock(&ua->proc->locks_spin);
4189 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4190 if (!list_empty(&lkb->lkb_ownqueue))
4191 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4192 spin_unlock(&ua->proc->locks_spin);
4196 dlm_unlock_recovery(ls);
4201 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4202 uint32_t flags, uint32_t lkid)
4204 struct dlm_lkb *lkb;
4205 struct dlm_args args;
4206 struct dlm_user_args *ua;
4209 dlm_lock_recovery(ls);
4211 error = find_lkb(ls, lkid, &lkb);
4215 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4216 ua->castparam = ua_tmp->castparam;
4217 ua->user_lksb = ua_tmp->user_lksb;
4219 error = set_unlock_args(flags, ua, &args);
4223 error = cancel_lock(ls, lkb, &args);
4225 if (error == -DLM_ECANCEL)
4227 /* from validate_unlock_args() */
4228 if (error == -EBUSY)
4233 dlm_unlock_recovery(ls);
4238 /* lkb's that are removed from the waiters list by revert are just left on the
4239 orphans list with the granted orphan locks, to be freed by purge */
4241 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4243 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4244 struct dlm_args args;
4248 mutex_lock(&ls->ls_orphans_mutex);
4249 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4250 mutex_unlock(&ls->ls_orphans_mutex);
4252 set_unlock_args(0, ua, &args);
4254 error = cancel_lock(ls, lkb, &args);
4255 if (error == -DLM_ECANCEL)
4260 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4261 Regardless of what rsb queue the lock is on, it's removed and freed. */
4263 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4265 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4266 struct dlm_args args;
4269 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4271 error = unlock_lock(ls, lkb, &args);
4272 if (error == -DLM_EUNLOCK)
4277 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4278 (which does lock_rsb) due to deadlock with receiving a message that does
4279 lock_rsb followed by dlm_user_add_ast() */
4281 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4282 struct dlm_user_proc *proc)
4284 struct dlm_lkb *lkb = NULL;
4286 mutex_lock(&ls->ls_clear_proc_locks);
4287 if (list_empty(&proc->locks))
4290 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4291 list_del_init(&lkb->lkb_ownqueue);
4293 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4294 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4296 lkb->lkb_flags |= DLM_IFL_DEAD;
4298 mutex_unlock(&ls->ls_clear_proc_locks);
4302 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4303 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4304 which we clear here. */
4306 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4307 list, and no more device_writes should add lkb's to proc->locks list; so we
4308 shouldn't need to take asts_spin or locks_spin here. this assumes that
4309 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4312 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4314 struct dlm_lkb *lkb, *safe;
4316 dlm_lock_recovery(ls);
4319 lkb = del_proc_lock(ls, proc);
4322 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4323 orphan_proc_lock(ls, lkb);
4325 unlock_proc_lock(ls, lkb);
4327 /* this removes the reference for the proc->locks list
4328 added by dlm_user_request, it may result in the lkb
4334 mutex_lock(&ls->ls_clear_proc_locks);
4336 /* in-progress unlocks */
4337 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4338 list_del_init(&lkb->lkb_ownqueue);
4339 lkb->lkb_flags |= DLM_IFL_DEAD;
4343 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4344 list_del(&lkb->lkb_astqueue);
4348 mutex_unlock(&ls->ls_clear_proc_locks);
4349 dlm_unlock_recovery(ls);
4352 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4354 struct dlm_lkb *lkb, *safe;
4358 spin_lock(&proc->locks_spin);
4359 if (!list_empty(&proc->locks)) {
4360 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4362 list_del_init(&lkb->lkb_ownqueue);
4364 spin_unlock(&proc->locks_spin);
4369 lkb->lkb_flags |= DLM_IFL_DEAD;
4370 unlock_proc_lock(ls, lkb);
4371 dlm_put_lkb(lkb); /* ref from proc->locks list */
4374 spin_lock(&proc->locks_spin);
4375 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4376 list_del_init(&lkb->lkb_ownqueue);
4377 lkb->lkb_flags |= DLM_IFL_DEAD;
4380 spin_unlock(&proc->locks_spin);
4382 spin_lock(&proc->asts_spin);
4383 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4384 list_del(&lkb->lkb_astqueue);
4387 spin_unlock(&proc->asts_spin);
4390 /* pid of 0 means purge all orphans */
4392 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4394 struct dlm_lkb *lkb, *safe;
4396 mutex_lock(&ls->ls_orphans_mutex);
4397 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4398 if (pid && lkb->lkb_ownpid != pid)
4400 unlock_proc_lock(ls, lkb);
4401 list_del_init(&lkb->lkb_ownqueue);
4404 mutex_unlock(&ls->ls_orphans_mutex);
4407 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4409 struct dlm_message *ms;
4410 struct dlm_mhandle *mh;
4413 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4414 DLM_MSG_PURGE, &ms, &mh);
4417 ms->m_nodeid = nodeid;
4420 return send_message(mh, ms);
4423 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4424 int nodeid, int pid)
4428 if (nodeid != dlm_our_nodeid()) {
4429 error = send_purge(ls, nodeid, pid);
4431 dlm_lock_recovery(ls);
4432 if (pid == current->pid)
4433 purge_proc_locks(ls, proc);
4435 do_purge(ls, nodeid, pid);
4436 dlm_unlock_recovery(ls);