v2.6.22.24-op1
[linux-2.6.22.y-op.git] / fs / dlm / lock.c
blobd8d6e729f96b669b5a6ed16bfb92c776cfc4744c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
5 **
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
15 dlm_lock()
16 dlm_unlock()
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91 * Lock compatibilty matrix - thanks Steve
92 * UN = Unlocked state. Not really a state, used as a flag
93 * PD = Padding. Used to make the matrix a nice power of two in size
94 * Other states are the same as the VMS DLM.
95 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
98 static const int __dlm_compat_matrix[8][8] = {
99 /* UN NL CR CW PR PW EX PD */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
102 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
103 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
104 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
105 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
106 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
107 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
111 * This defines the direction of transfer of LVB data.
112 * Granted mode is the row; requested mode is the column.
113 * Usage: matrix[grmode+1][rqmode+1]
114 * 1 = LVB is returned to the caller
115 * 0 = LVB is written to the resource
116 * -1 = nothing happens to the LVB
119 const int dlm_lvb_operations[8][8] = {
120 /* UN NL CR CW PR PW EX PD*/
121 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
122 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
123 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
124 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
125 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
126 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
127 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
128 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
131 #define modes_compat(gr, rq) \
132 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
134 int dlm_modes_compat(int mode1, int mode2)
136 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 * Compatibility matrix for conversions with QUECVT set.
141 * Granted mode is the row; requested mode is the column.
142 * Usage: matrix[grmode+1][rqmode+1]
145 static const int __quecvt_compat_matrix[8][8] = {
146 /* UN NL CR CW PR PW EX PD */
147 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
148 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
149 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
150 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
151 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
152 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
153 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
154 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
157 void dlm_print_lkb(struct dlm_lkb *lkb)
159 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 void dlm_print_rsb(struct dlm_rsb *r)
168 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169 r->res_nodeid, r->res_flags, r->res_first_lkid,
170 r->res_recover_locks_count, r->res_name);
173 void dlm_dump_rsb(struct dlm_rsb *r)
175 struct dlm_lkb *lkb;
177 dlm_print_rsb(r);
179 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181 printk(KERN_ERR "rsb lookup list\n");
182 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183 dlm_print_lkb(lkb);
184 printk(KERN_ERR "rsb grant queue:\n");
185 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb convert queue:\n");
188 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb wait queue:\n");
191 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
195 /* Threads cannot use the lockspace while it's being recovered */
197 static inline void lock_recovery(struct dlm_ls *ls)
199 down_read(&ls->ls_in_recovery);
202 static inline void unlock_recovery(struct dlm_ls *ls)
204 up_read(&ls->ls_in_recovery);
207 static inline int lock_recovery_try(struct dlm_ls *ls)
209 return down_read_trylock(&ls->ls_in_recovery);
212 static inline int can_be_queued(struct dlm_lkb *lkb)
214 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 static inline int force_blocking_asts(struct dlm_lkb *lkb)
219 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 static inline int is_demoted(struct dlm_lkb *lkb)
224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 static inline int is_altmode(struct dlm_lkb *lkb)
229 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 static inline int is_granted(struct dlm_lkb *lkb)
234 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 static inline int is_remote(struct dlm_rsb *r)
239 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
240 return !!r->res_nodeid;
243 static inline int is_process_copy(struct dlm_lkb *lkb)
245 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 static inline int is_master_copy(struct dlm_lkb *lkb)
250 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
251 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
252 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 static inline int middle_conversion(struct dlm_lkb *lkb)
257 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
258 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
259 return 1;
260 return 0;
263 static inline int down_conversion(struct dlm_lkb *lkb)
265 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
270 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
275 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 static inline int is_overlap(struct dlm_lkb *lkb)
280 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281 DLM_IFL_OVERLAP_CANCEL));
284 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
286 if (is_master_copy(lkb))
287 return;
289 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
291 lkb->lkb_lksb->sb_status = rv;
292 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
294 dlm_add_ast(lkb, AST_COMP);
297 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
299 queue_cast(r, lkb,
300 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
303 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
305 if (is_master_copy(lkb))
306 send_bast(r, lkb, rqmode);
307 else {
308 lkb->lkb_bastmode = rqmode;
309 dlm_add_ast(lkb, AST_BAST);
314 * Basic operations on rsb's and lkb's
317 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
319 struct dlm_rsb *r;
321 r = allocate_rsb(ls, len);
322 if (!r)
323 return NULL;
325 r->res_ls = ls;
326 r->res_length = len;
327 memcpy(r->res_name, name, len);
328 mutex_init(&r->res_mutex);
330 INIT_LIST_HEAD(&r->res_lookup);
331 INIT_LIST_HEAD(&r->res_grantqueue);
332 INIT_LIST_HEAD(&r->res_convertqueue);
333 INIT_LIST_HEAD(&r->res_waitqueue);
334 INIT_LIST_HEAD(&r->res_root_list);
335 INIT_LIST_HEAD(&r->res_recover_list);
337 return r;
340 static int search_rsb_list(struct list_head *head, char *name, int len,
341 unsigned int flags, struct dlm_rsb **r_ret)
343 struct dlm_rsb *r;
344 int error = 0;
346 list_for_each_entry(r, head, res_hashchain) {
347 if (len == r->res_length && !memcmp(name, r->res_name, len))
348 goto found;
350 return -EBADR;
352 found:
353 if (r->res_nodeid && (flags & R_MASTER))
354 error = -ENOTBLK;
355 *r_ret = r;
356 return error;
359 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
360 unsigned int flags, struct dlm_rsb **r_ret)
362 struct dlm_rsb *r;
363 int error;
365 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
366 if (!error) {
367 kref_get(&r->res_ref);
368 goto out;
370 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
371 if (error)
372 goto out;
374 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
376 if (dlm_no_directory(ls))
377 goto out;
379 if (r->res_nodeid == -1) {
380 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
381 r->res_first_lkid = 0;
382 } else if (r->res_nodeid > 0) {
383 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
384 r->res_first_lkid = 0;
385 } else {
386 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
387 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
389 out:
390 *r_ret = r;
391 return error;
394 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
395 unsigned int flags, struct dlm_rsb **r_ret)
397 int error;
398 write_lock(&ls->ls_rsbtbl[b].lock);
399 error = _search_rsb(ls, name, len, b, flags, r_ret);
400 write_unlock(&ls->ls_rsbtbl[b].lock);
401 return error;
405 * Find rsb in rsbtbl and potentially create/add one
407 * Delaying the release of rsb's has a similar benefit to applications keeping
408 * NL locks on an rsb, but without the guarantee that the cached master value
409 * will still be valid when the rsb is reused. Apps aren't always smart enough
410 * to keep NL locks on an rsb that they may lock again shortly; this can lead
411 * to excessive master lookups and removals if we don't delay the release.
413 * Searching for an rsb means looking through both the normal list and toss
414 * list. When found on the toss list the rsb is moved to the normal list with
415 * ref count of 1; when found on normal list the ref count is incremented.
418 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
419 unsigned int flags, struct dlm_rsb **r_ret)
421 struct dlm_rsb *r, *tmp;
422 uint32_t hash, bucket;
423 int error = 0;
425 if (dlm_no_directory(ls))
426 flags |= R_CREATE;
428 hash = jhash(name, namelen, 0);
429 bucket = hash & (ls->ls_rsbtbl_size - 1);
431 error = search_rsb(ls, name, namelen, bucket, flags, &r);
432 if (!error)
433 goto out;
435 if (error == -EBADR && !(flags & R_CREATE))
436 goto out;
438 /* the rsb was found but wasn't a master copy */
439 if (error == -ENOTBLK)
440 goto out;
442 error = -ENOMEM;
443 r = create_rsb(ls, name, namelen);
444 if (!r)
445 goto out;
447 r->res_hash = hash;
448 r->res_bucket = bucket;
449 r->res_nodeid = -1;
450 kref_init(&r->res_ref);
452 /* With no directory, the master can be set immediately */
453 if (dlm_no_directory(ls)) {
454 int nodeid = dlm_dir_nodeid(r);
455 if (nodeid == dlm_our_nodeid())
456 nodeid = 0;
457 r->res_nodeid = nodeid;
460 write_lock(&ls->ls_rsbtbl[bucket].lock);
461 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
462 if (!error) {
463 write_unlock(&ls->ls_rsbtbl[bucket].lock);
464 free_rsb(r);
465 r = tmp;
466 goto out;
468 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
469 write_unlock(&ls->ls_rsbtbl[bucket].lock);
470 error = 0;
471 out:
472 *r_ret = r;
473 return error;
476 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
477 unsigned int flags, struct dlm_rsb **r_ret)
479 return find_rsb(ls, name, namelen, flags, r_ret);
482 /* This is only called to add a reference when the code already holds
483 a valid reference to the rsb, so there's no need for locking. */
485 static inline void hold_rsb(struct dlm_rsb *r)
487 kref_get(&r->res_ref);
490 void dlm_hold_rsb(struct dlm_rsb *r)
492 hold_rsb(r);
495 static void toss_rsb(struct kref *kref)
497 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
498 struct dlm_ls *ls = r->res_ls;
500 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
501 kref_init(&r->res_ref);
502 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
503 r->res_toss_time = jiffies;
504 if (r->res_lvbptr) {
505 free_lvb(r->res_lvbptr);
506 r->res_lvbptr = NULL;
510 /* When all references to the rsb are gone it's transfered to
511 the tossed list for later disposal. */
513 static void put_rsb(struct dlm_rsb *r)
515 struct dlm_ls *ls = r->res_ls;
516 uint32_t bucket = r->res_bucket;
518 write_lock(&ls->ls_rsbtbl[bucket].lock);
519 kref_put(&r->res_ref, toss_rsb);
520 write_unlock(&ls->ls_rsbtbl[bucket].lock);
523 void dlm_put_rsb(struct dlm_rsb *r)
525 put_rsb(r);
528 /* See comment for unhold_lkb */
530 static void unhold_rsb(struct dlm_rsb *r)
532 int rv;
533 rv = kref_put(&r->res_ref, toss_rsb);
534 DLM_ASSERT(!rv, dlm_dump_rsb(r););
537 static void kill_rsb(struct kref *kref)
539 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
541 /* All work is done after the return from kref_put() so we
542 can release the write_lock before the remove and free. */
544 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
545 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
546 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
547 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
548 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
549 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
552 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553 The rsb must exist as long as any lkb's for it do. */
555 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
557 hold_rsb(r);
558 lkb->lkb_resource = r;
561 static void detach_lkb(struct dlm_lkb *lkb)
563 if (lkb->lkb_resource) {
564 put_rsb(lkb->lkb_resource);
565 lkb->lkb_resource = NULL;
569 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
571 struct dlm_lkb *lkb, *tmp;
572 uint32_t lkid = 0;
573 uint16_t bucket;
575 lkb = allocate_lkb(ls);
576 if (!lkb)
577 return -ENOMEM;
579 lkb->lkb_nodeid = -1;
580 lkb->lkb_grmode = DLM_LOCK_IV;
581 kref_init(&lkb->lkb_ref);
582 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
585 get_random_bytes(&bucket, sizeof(bucket));
586 bucket &= (ls->ls_lkbtbl_size - 1);
588 write_lock(&ls->ls_lkbtbl[bucket].lock);
590 /* counter can roll over so we must verify lkid is not in use */
592 while (lkid == 0) {
593 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
595 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
596 lkb_idtbl_list) {
597 if (tmp->lkb_id != lkid)
598 continue;
599 lkid = 0;
600 break;
604 lkb->lkb_id = lkid;
605 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
606 write_unlock(&ls->ls_lkbtbl[bucket].lock);
608 *lkb_ret = lkb;
609 return 0;
612 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
614 struct dlm_lkb *lkb;
615 uint16_t bucket = (lkid >> 16);
617 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
618 if (lkb->lkb_id == lkid)
619 return lkb;
621 return NULL;
624 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
626 struct dlm_lkb *lkb;
627 uint16_t bucket = (lkid >> 16);
629 if (bucket >= ls->ls_lkbtbl_size)
630 return -EBADSLT;
632 read_lock(&ls->ls_lkbtbl[bucket].lock);
633 lkb = __find_lkb(ls, lkid);
634 if (lkb)
635 kref_get(&lkb->lkb_ref);
636 read_unlock(&ls->ls_lkbtbl[bucket].lock);
638 *lkb_ret = lkb;
639 return lkb ? 0 : -ENOENT;
642 static void kill_lkb(struct kref *kref)
644 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
646 /* All work is done after the return from kref_put() so we
647 can release the write_lock before the detach_lkb */
649 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
652 /* __put_lkb() is used when an lkb may not have an rsb attached to
653 it so we need to provide the lockspace explicitly */
655 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
657 uint16_t bucket = (lkb->lkb_id >> 16);
659 write_lock(&ls->ls_lkbtbl[bucket].lock);
660 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
661 list_del(&lkb->lkb_idtbl_list);
662 write_unlock(&ls->ls_lkbtbl[bucket].lock);
664 detach_lkb(lkb);
666 /* for local/process lkbs, lvbptr points to caller's lksb */
667 if (lkb->lkb_lvbptr && is_master_copy(lkb))
668 free_lvb(lkb->lkb_lvbptr);
669 free_lkb(lkb);
670 return 1;
671 } else {
672 write_unlock(&ls->ls_lkbtbl[bucket].lock);
673 return 0;
677 int dlm_put_lkb(struct dlm_lkb *lkb)
679 struct dlm_ls *ls;
681 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
682 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
684 ls = lkb->lkb_resource->res_ls;
685 return __put_lkb(ls, lkb);
688 /* This is only called to add a reference when the code already holds
689 a valid reference to the lkb, so there's no need for locking. */
691 static inline void hold_lkb(struct dlm_lkb *lkb)
693 kref_get(&lkb->lkb_ref);
696 /* This is called when we need to remove a reference and are certain
697 it's not the last ref. e.g. del_lkb is always called between a
698 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
699 put_lkb would work fine, but would involve unnecessary locking */
701 static inline void unhold_lkb(struct dlm_lkb *lkb)
703 int rv;
704 rv = kref_put(&lkb->lkb_ref, kill_lkb);
705 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
708 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
709 int mode)
711 struct dlm_lkb *lkb = NULL;
713 list_for_each_entry(lkb, head, lkb_statequeue)
714 if (lkb->lkb_rqmode < mode)
715 break;
717 if (!lkb)
718 list_add_tail(new, head);
719 else
720 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
723 /* add/remove lkb to rsb's grant/convert/wait queue */
725 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
727 kref_get(&lkb->lkb_ref);
729 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
731 lkb->lkb_status = status;
733 switch (status) {
734 case DLM_LKSTS_WAITING:
735 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
737 else
738 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
739 break;
740 case DLM_LKSTS_GRANTED:
741 /* convention says granted locks kept in order of grmode */
742 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
743 lkb->lkb_grmode);
744 break;
745 case DLM_LKSTS_CONVERT:
746 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
748 else
749 list_add_tail(&lkb->lkb_statequeue,
750 &r->res_convertqueue);
751 break;
752 default:
753 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
757 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
759 lkb->lkb_status = 0;
760 list_del(&lkb->lkb_statequeue);
761 unhold_lkb(lkb);
764 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
766 hold_lkb(lkb);
767 del_lkb(r, lkb);
768 add_lkb(r, lkb, sts);
769 unhold_lkb(lkb);
772 static int msg_reply_type(int mstype)
774 switch (mstype) {
775 case DLM_MSG_REQUEST:
776 return DLM_MSG_REQUEST_REPLY;
777 case DLM_MSG_CONVERT:
778 return DLM_MSG_CONVERT_REPLY;
779 case DLM_MSG_UNLOCK:
780 return DLM_MSG_UNLOCK_REPLY;
781 case DLM_MSG_CANCEL:
782 return DLM_MSG_CANCEL_REPLY;
783 case DLM_MSG_LOOKUP:
784 return DLM_MSG_LOOKUP_REPLY;
786 return -1;
789 /* add/remove lkb from global waiters list of lkb's waiting for
790 a reply from a remote node */
792 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
794 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
795 int error = 0;
797 mutex_lock(&ls->ls_waiters_mutex);
799 if (is_overlap_unlock(lkb) ||
800 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
801 error = -EINVAL;
802 goto out;
805 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
806 switch (mstype) {
807 case DLM_MSG_UNLOCK:
808 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
809 break;
810 case DLM_MSG_CANCEL:
811 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
812 break;
813 default:
814 error = -EBUSY;
815 goto out;
817 lkb->lkb_wait_count++;
818 hold_lkb(lkb);
820 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821 lkb->lkb_id, lkb->lkb_wait_type, mstype,
822 lkb->lkb_wait_count, lkb->lkb_flags);
823 goto out;
826 DLM_ASSERT(!lkb->lkb_wait_count,
827 dlm_print_lkb(lkb);
828 printk("wait_count %d\n", lkb->lkb_wait_count););
830 lkb->lkb_wait_count++;
831 lkb->lkb_wait_type = mstype;
832 hold_lkb(lkb);
833 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
834 out:
835 if (error)
836 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837 lkb->lkb_id, error, lkb->lkb_flags, mstype,
838 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
839 mutex_unlock(&ls->ls_waiters_mutex);
840 return error;
843 /* We clear the RESEND flag because we might be taking an lkb off the waiters
844 list as part of process_requestqueue (e.g. a lookup that has an optimized
845 request reply on the requestqueue) between dlm_recover_waiters_pre() which
846 set RESEND and dlm_recover_waiters_post() */
848 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
850 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851 int overlap_done = 0;
853 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
854 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
855 overlap_done = 1;
856 goto out_del;
859 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
861 overlap_done = 1;
862 goto out_del;
865 /* N.B. type of reply may not always correspond to type of original
866 msg due to lookup->request optimization, verify others? */
868 if (lkb->lkb_wait_type) {
869 lkb->lkb_wait_type = 0;
870 goto out_del;
873 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
875 return -1;
877 out_del:
878 /* the force-unlock/cancel has completed and we haven't recvd a reply
879 to the op that was in progress prior to the unlock/cancel; we
880 give up on any reply to the earlier op. FIXME: not sure when/how
881 this would happen */
883 if (overlap_done && lkb->lkb_wait_type) {
884 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
885 lkb->lkb_id, mstype, lkb->lkb_wait_type);
886 lkb->lkb_wait_count--;
887 lkb->lkb_wait_type = 0;
890 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
892 lkb->lkb_flags &= ~DLM_IFL_RESEND;
893 lkb->lkb_wait_count--;
894 if (!lkb->lkb_wait_count)
895 list_del_init(&lkb->lkb_wait_reply);
896 unhold_lkb(lkb);
897 return 0;
900 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
902 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
903 int error;
905 mutex_lock(&ls->ls_waiters_mutex);
906 error = _remove_from_waiters(lkb, mstype);
907 mutex_unlock(&ls->ls_waiters_mutex);
908 return error;
911 /* Handles situations where we might be processing a "fake" or "stub" reply in
912 which we can't try to take waiters_mutex again. */
914 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
916 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917 int error;
919 if (ms != &ls->ls_stub_ms)
920 mutex_lock(&ls->ls_waiters_mutex);
921 error = _remove_from_waiters(lkb, ms->m_type);
922 if (ms != &ls->ls_stub_ms)
923 mutex_unlock(&ls->ls_waiters_mutex);
924 return error;
927 static void dir_remove(struct dlm_rsb *r)
929 int to_nodeid;
931 if (dlm_no_directory(r->res_ls))
932 return;
934 to_nodeid = dlm_dir_nodeid(r);
935 if (to_nodeid != dlm_our_nodeid())
936 send_remove(r);
937 else
938 dlm_dir_remove_entry(r->res_ls, to_nodeid,
939 r->res_name, r->res_length);
942 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
943 found since they are in order of newest to oldest? */
945 static int shrink_bucket(struct dlm_ls *ls, int b)
947 struct dlm_rsb *r;
948 int count = 0, found;
950 for (;;) {
951 found = 0;
952 write_lock(&ls->ls_rsbtbl[b].lock);
953 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
954 res_hashchain) {
955 if (!time_after_eq(jiffies, r->res_toss_time +
956 dlm_config.ci_toss_secs * HZ))
957 continue;
958 found = 1;
959 break;
962 if (!found) {
963 write_unlock(&ls->ls_rsbtbl[b].lock);
964 break;
967 if (kref_put(&r->res_ref, kill_rsb)) {
968 list_del(&r->res_hashchain);
969 write_unlock(&ls->ls_rsbtbl[b].lock);
971 if (is_master(r))
972 dir_remove(r);
973 free_rsb(r);
974 count++;
975 } else {
976 write_unlock(&ls->ls_rsbtbl[b].lock);
977 log_error(ls, "tossed rsb in use %s", r->res_name);
981 return count;
984 void dlm_scan_rsbs(struct dlm_ls *ls)
986 int i;
988 if (dlm_locking_stopped(ls))
989 return;
991 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
992 shrink_bucket(ls, i);
993 cond_resched();
997 /* lkb is master or local copy */
999 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1001 int b, len = r->res_ls->ls_lvblen;
1003 /* b=1 lvb returned to caller
1004 b=0 lvb written to rsb or invalidated
1005 b=-1 do nothing */
1007 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1009 if (b == 1) {
1010 if (!lkb->lkb_lvbptr)
1011 return;
1013 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1014 return;
1016 if (!r->res_lvbptr)
1017 return;
1019 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1020 lkb->lkb_lvbseq = r->res_lvbseq;
1022 } else if (b == 0) {
1023 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1024 rsb_set_flag(r, RSB_VALNOTVALID);
1025 return;
1028 if (!lkb->lkb_lvbptr)
1029 return;
1031 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1032 return;
1034 if (!r->res_lvbptr)
1035 r->res_lvbptr = allocate_lvb(r->res_ls);
1037 if (!r->res_lvbptr)
1038 return;
1040 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1041 r->res_lvbseq++;
1042 lkb->lkb_lvbseq = r->res_lvbseq;
1043 rsb_clear_flag(r, RSB_VALNOTVALID);
1046 if (rsb_flag(r, RSB_VALNOTVALID))
1047 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1050 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1052 if (lkb->lkb_grmode < DLM_LOCK_PW)
1053 return;
1055 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1056 rsb_set_flag(r, RSB_VALNOTVALID);
1057 return;
1060 if (!lkb->lkb_lvbptr)
1061 return;
1063 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1064 return;
1066 if (!r->res_lvbptr)
1067 r->res_lvbptr = allocate_lvb(r->res_ls);
1069 if (!r->res_lvbptr)
1070 return;
1072 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1073 r->res_lvbseq++;
1074 rsb_clear_flag(r, RSB_VALNOTVALID);
1077 /* lkb is process copy (pc) */
1079 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1080 struct dlm_message *ms)
1082 int b;
1084 if (!lkb->lkb_lvbptr)
1085 return;
1087 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1088 return;
1090 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1091 if (b == 1) {
1092 int len = receive_extralen(ms);
1093 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1094 lkb->lkb_lvbseq = ms->m_lvbseq;
1098 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1099 remove_lock -- used for unlock, removes lkb from granted
1100 revert_lock -- used for cancel, moves lkb from convert to granted
1101 grant_lock -- used for request and convert, adds lkb to granted or
1102 moves lkb from convert or waiting to granted
1104 Each of these is used for master or local copy lkb's. There is
1105 also a _pc() variation used to make the corresponding change on
1106 a process copy (pc) lkb. */
1108 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1110 del_lkb(r, lkb);
1111 lkb->lkb_grmode = DLM_LOCK_IV;
1112 /* this unhold undoes the original ref from create_lkb()
1113 so this leads to the lkb being freed */
1114 unhold_lkb(lkb);
1117 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1119 set_lvb_unlock(r, lkb);
1120 _remove_lock(r, lkb);
1123 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1125 _remove_lock(r, lkb);
1128 /* returns: 0 did nothing
1129 1 moved lock to granted
1130 -1 removed lock */
1132 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1134 int rv = 0;
1136 lkb->lkb_rqmode = DLM_LOCK_IV;
1138 switch (lkb->lkb_status) {
1139 case DLM_LKSTS_GRANTED:
1140 break;
1141 case DLM_LKSTS_CONVERT:
1142 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1143 rv = 1;
1144 break;
1145 case DLM_LKSTS_WAITING:
1146 del_lkb(r, lkb);
1147 lkb->lkb_grmode = DLM_LOCK_IV;
1148 /* this unhold undoes the original ref from create_lkb()
1149 so this leads to the lkb being freed */
1150 unhold_lkb(lkb);
1151 rv = -1;
1152 break;
1153 default:
1154 log_print("invalid status for revert %d", lkb->lkb_status);
1156 return rv;
1159 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1161 return revert_lock(r, lkb);
1164 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1166 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1167 lkb->lkb_grmode = lkb->lkb_rqmode;
1168 if (lkb->lkb_status)
1169 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1170 else
1171 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1174 lkb->lkb_rqmode = DLM_LOCK_IV;
1177 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1179 set_lvb_lock(r, lkb);
1180 _grant_lock(r, lkb);
1181 lkb->lkb_highbast = 0;
1184 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1185 struct dlm_message *ms)
1187 set_lvb_lock_pc(r, lkb, ms);
1188 _grant_lock(r, lkb);
1191 /* called by grant_pending_locks() which means an async grant message must
1192 be sent to the requesting node in addition to granting the lock if the
1193 lkb belongs to a remote node. */
1195 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1197 grant_lock(r, lkb);
1198 if (is_master_copy(lkb))
1199 send_grant(r, lkb);
1200 else
1201 queue_cast(r, lkb, 0);
1204 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1205 change the granted/requested modes. We're munging things accordingly in
1206 the process copy.
1207 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1208 conversion deadlock
1209 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1210 compatible with other granted locks */
1212 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1214 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1215 log_print("munge_demoted %x invalid reply type %d",
1216 lkb->lkb_id, ms->m_type);
1217 return;
1220 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1221 log_print("munge_demoted %x invalid modes gr %d rq %d",
1222 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1223 return;
1226 lkb->lkb_grmode = DLM_LOCK_NL;
1229 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1231 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1232 ms->m_type != DLM_MSG_GRANT) {
1233 log_print("munge_altmode %x invalid reply type %d",
1234 lkb->lkb_id, ms->m_type);
1235 return;
1238 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1239 lkb->lkb_rqmode = DLM_LOCK_PR;
1240 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1241 lkb->lkb_rqmode = DLM_LOCK_CW;
1242 else {
1243 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1244 dlm_print_lkb(lkb);
1248 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1250 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1251 lkb_statequeue);
1252 if (lkb->lkb_id == first->lkb_id)
1253 return 1;
1255 return 0;
1258 /* Check if the given lkb conflicts with another lkb on the queue. */
1260 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1262 struct dlm_lkb *this;
1264 list_for_each_entry(this, head, lkb_statequeue) {
1265 if (this == lkb)
1266 continue;
1267 if (!modes_compat(this, lkb))
1268 return 1;
1270 return 0;
1274 * "A conversion deadlock arises with a pair of lock requests in the converting
1275 * queue for one resource. The granted mode of each lock blocks the requested
1276 * mode of the other lock."
1278 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1279 * convert queue from being granted, then demote lkb (set grmode to NL).
1280 * This second form requires that we check for conv-deadlk even when
1281 * now == 0 in _can_be_granted().
1283 * Example:
1284 * Granted Queue: empty
1285 * Convert Queue: NL->EX (first lock)
1286 * PR->EX (second lock)
1288 * The first lock can't be granted because of the granted mode of the second
1289 * lock and the second lock can't be granted because it's not first in the
1290 * list. We demote the granted mode of the second lock (the lkb passed to this
1291 * function).
1293 * After the resolution, the "grant pending" function needs to go back and try
1294 * to grant locks on the convert queue again since the first lock can now be
1295 * granted.
1298 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1300 struct dlm_lkb *this, *first = NULL, *self = NULL;
1302 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1303 if (!first)
1304 first = this;
1305 if (this == lkb) {
1306 self = lkb;
1307 continue;
1310 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1311 return 1;
1314 /* if lkb is on the convert queue and is preventing the first
1315 from being granted, then there's deadlock and we demote lkb.
1316 multiple converting locks may need to do this before the first
1317 converting lock can be granted. */
1319 if (self && self != first) {
1320 if (!modes_compat(lkb, first) &&
1321 !queue_conflict(&rsb->res_grantqueue, first))
1322 return 1;
1325 return 0;
1329 * Return 1 if the lock can be granted, 0 otherwise.
1330 * Also detect and resolve conversion deadlocks.
1332 * lkb is the lock to be granted
1334 * now is 1 if the function is being called in the context of the
1335 * immediate request, it is 0 if called later, after the lock has been
1336 * queued.
1338 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1341 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1343 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1346 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1347 * a new request for a NL mode lock being blocked.
1349 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1350 * request, then it would be granted. In essence, the use of this flag
1351 * tells the Lock Manager to expedite theis request by not considering
1352 * what may be in the CONVERTING or WAITING queues... As of this
1353 * writing, the EXPEDITE flag can be used only with new requests for NL
1354 * mode locks. This flag is not valid for conversion requests.
1356 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1357 * conversion or used with a non-NL requested mode. We also know an
1358 * EXPEDITE request is always granted immediately, so now must always
1359 * be 1. The full condition to grant an expedite request: (now &&
1360 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1361 * therefore be shortened to just checking the flag.
1364 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1365 return 1;
1368 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1369 * added to the remaining conditions.
1372 if (queue_conflict(&r->res_grantqueue, lkb))
1373 goto out;
1376 * 6-3: By default, a conversion request is immediately granted if the
1377 * requested mode is compatible with the modes of all other granted
1378 * locks
1381 if (queue_conflict(&r->res_convertqueue, lkb))
1382 goto out;
1385 * 6-5: But the default algorithm for deciding whether to grant or
1386 * queue conversion requests does not by itself guarantee that such
1387 * requests are serviced on a "first come first serve" basis. This, in
1388 * turn, can lead to a phenomenon known as "indefinate postponement".
1390 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1391 * the system service employed to request a lock conversion. This flag
1392 * forces certain conversion requests to be queued, even if they are
1393 * compatible with the granted modes of other locks on the same
1394 * resource. Thus, the use of this flag results in conversion requests
1395 * being ordered on a "first come first servce" basis.
1397 * DCT: This condition is all about new conversions being able to occur
1398 * "in place" while the lock remains on the granted queue (assuming
1399 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1400 * doesn't _have_ to go onto the convert queue where it's processed in
1401 * order. The "now" variable is necessary to distinguish converts
1402 * being received and processed for the first time now, because once a
1403 * convert is moved to the conversion queue the condition below applies
1404 * requiring fifo granting.
1407 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1408 return 1;
1411 * The NOORDER flag is set to avoid the standard vms rules on grant
1412 * order.
1415 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1416 return 1;
1419 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1420 * granted until all other conversion requests ahead of it are granted
1421 * and/or canceled.
1424 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1425 return 1;
1428 * 6-4: By default, a new request is immediately granted only if all
1429 * three of the following conditions are satisfied when the request is
1430 * issued:
1431 * - The queue of ungranted conversion requests for the resource is
1432 * empty.
1433 * - The queue of ungranted new requests for the resource is empty.
1434 * - The mode of the new request is compatible with the most
1435 * restrictive mode of all granted locks on the resource.
1438 if (now && !conv && list_empty(&r->res_convertqueue) &&
1439 list_empty(&r->res_waitqueue))
1440 return 1;
1443 * 6-4: Once a lock request is in the queue of ungranted new requests,
1444 * it cannot be granted until the queue of ungranted conversion
1445 * requests is empty, all ungranted new requests ahead of it are
1446 * granted and/or canceled, and it is compatible with the granted mode
1447 * of the most restrictive lock granted on the resource.
1450 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1451 first_in_list(lkb, &r->res_waitqueue))
1452 return 1;
1454 out:
1456 * The following, enabled by CONVDEADLK, departs from VMS.
1459 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1460 conversion_deadlock_detect(r, lkb)) {
1461 lkb->lkb_grmode = DLM_LOCK_NL;
1462 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1465 return 0;
1469 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1470 * simple way to provide a big optimization to applications that can use them.
1473 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1475 uint32_t flags = lkb->lkb_exflags;
1476 int rv;
1477 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1479 rv = _can_be_granted(r, lkb, now);
1480 if (rv)
1481 goto out;
1483 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1484 goto out;
1486 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1487 alt = DLM_LOCK_PR;
1488 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1489 alt = DLM_LOCK_CW;
1491 if (alt) {
1492 lkb->lkb_rqmode = alt;
1493 rv = _can_be_granted(r, lkb, now);
1494 if (rv)
1495 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1496 else
1497 lkb->lkb_rqmode = rqmode;
1499 out:
1500 return rv;
1503 static int grant_pending_convert(struct dlm_rsb *r, int high)
1505 struct dlm_lkb *lkb, *s;
1506 int hi, demoted, quit, grant_restart, demote_restart;
1508 quit = 0;
1509 restart:
1510 grant_restart = 0;
1511 demote_restart = 0;
1512 hi = DLM_LOCK_IV;
1514 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1515 demoted = is_demoted(lkb);
1516 if (can_be_granted(r, lkb, 0)) {
1517 grant_lock_pending(r, lkb);
1518 grant_restart = 1;
1519 } else {
1520 hi = max_t(int, lkb->lkb_rqmode, hi);
1521 if (!demoted && is_demoted(lkb))
1522 demote_restart = 1;
1526 if (grant_restart)
1527 goto restart;
1528 if (demote_restart && !quit) {
1529 quit = 1;
1530 goto restart;
1533 return max_t(int, high, hi);
1536 static int grant_pending_wait(struct dlm_rsb *r, int high)
1538 struct dlm_lkb *lkb, *s;
1540 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1541 if (can_be_granted(r, lkb, 0))
1542 grant_lock_pending(r, lkb);
1543 else
1544 high = max_t(int, lkb->lkb_rqmode, high);
1547 return high;
1550 static void grant_pending_locks(struct dlm_rsb *r)
1552 struct dlm_lkb *lkb, *s;
1553 int high = DLM_LOCK_IV;
1555 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1557 high = grant_pending_convert(r, high);
1558 high = grant_pending_wait(r, high);
1560 if (high == DLM_LOCK_IV)
1561 return;
1564 * If there are locks left on the wait/convert queue then send blocking
1565 * ASTs to granted locks based on the largest requested mode (high)
1566 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1569 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1570 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1571 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1572 queue_bast(r, lkb, high);
1573 lkb->lkb_highbast = high;
1578 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1579 struct dlm_lkb *lkb)
1581 struct dlm_lkb *gr;
1583 list_for_each_entry(gr, head, lkb_statequeue) {
1584 if (gr->lkb_bastaddr &&
1585 gr->lkb_highbast < lkb->lkb_rqmode &&
1586 !modes_compat(gr, lkb)) {
1587 queue_bast(r, gr, lkb->lkb_rqmode);
1588 gr->lkb_highbast = lkb->lkb_rqmode;
1593 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1595 send_bast_queue(r, &r->res_grantqueue, lkb);
1598 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1600 send_bast_queue(r, &r->res_grantqueue, lkb);
1601 send_bast_queue(r, &r->res_convertqueue, lkb);
1604 /* set_master(r, lkb) -- set the master nodeid of a resource
1606 The purpose of this function is to set the nodeid field in the given
1607 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1608 known, it can just be copied to the lkb and the function will return
1609 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1610 before it can be copied to the lkb.
1612 When the rsb nodeid is being looked up remotely, the initial lkb
1613 causing the lookup is kept on the ls_waiters list waiting for the
1614 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1615 on the rsb's res_lookup list until the master is verified.
1617 Return values:
1618 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1619 1: the rsb master is not available and the lkb has been placed on
1620 a wait queue
1623 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1625 struct dlm_ls *ls = r->res_ls;
1626 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1628 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1629 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1630 r->res_first_lkid = lkb->lkb_id;
1631 lkb->lkb_nodeid = r->res_nodeid;
1632 return 0;
1635 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1636 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1637 return 1;
1640 if (r->res_nodeid == 0) {
1641 lkb->lkb_nodeid = 0;
1642 return 0;
1645 if (r->res_nodeid > 0) {
1646 lkb->lkb_nodeid = r->res_nodeid;
1647 return 0;
1650 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1652 dir_nodeid = dlm_dir_nodeid(r);
1654 if (dir_nodeid != our_nodeid) {
1655 r->res_first_lkid = lkb->lkb_id;
1656 send_lookup(r, lkb);
1657 return 1;
1660 for (;;) {
1661 /* It's possible for dlm_scand to remove an old rsb for
1662 this same resource from the toss list, us to create
1663 a new one, look up the master locally, and find it
1664 already exists just before dlm_scand does the
1665 dir_remove() on the previous rsb. */
1667 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1668 r->res_length, &ret_nodeid);
1669 if (!error)
1670 break;
1671 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1672 schedule();
1675 if (ret_nodeid == our_nodeid) {
1676 r->res_first_lkid = 0;
1677 r->res_nodeid = 0;
1678 lkb->lkb_nodeid = 0;
1679 } else {
1680 r->res_first_lkid = lkb->lkb_id;
1681 r->res_nodeid = ret_nodeid;
1682 lkb->lkb_nodeid = ret_nodeid;
1684 return 0;
1687 static void process_lookup_list(struct dlm_rsb *r)
1689 struct dlm_lkb *lkb, *safe;
1691 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1692 list_del_init(&lkb->lkb_rsb_lookup);
1693 _request_lock(r, lkb);
1694 schedule();
1698 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1700 static void confirm_master(struct dlm_rsb *r, int error)
1702 struct dlm_lkb *lkb;
1704 if (!r->res_first_lkid)
1705 return;
1707 switch (error) {
1708 case 0:
1709 case -EINPROGRESS:
1710 r->res_first_lkid = 0;
1711 process_lookup_list(r);
1712 break;
1714 case -EAGAIN:
1715 /* the remote master didn't queue our NOQUEUE request;
1716 make a waiting lkb the first_lkid */
1718 r->res_first_lkid = 0;
1720 if (!list_empty(&r->res_lookup)) {
1721 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1722 lkb_rsb_lookup);
1723 list_del_init(&lkb->lkb_rsb_lookup);
1724 r->res_first_lkid = lkb->lkb_id;
1725 _request_lock(r, lkb);
1726 } else
1727 r->res_nodeid = -1;
1728 break;
1730 default:
1731 log_error(r->res_ls, "confirm_master unknown error %d", error);
1735 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1736 int namelen, uint32_t parent_lkid, void *ast,
1737 void *astarg, void *bast, struct dlm_args *args)
1739 int rv = -EINVAL;
1741 /* check for invalid arg usage */
1743 if (mode < 0 || mode > DLM_LOCK_EX)
1744 goto out;
1746 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1747 goto out;
1749 if (flags & DLM_LKF_CANCEL)
1750 goto out;
1752 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1753 goto out;
1755 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1756 goto out;
1758 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1759 goto out;
1761 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1762 goto out;
1764 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1765 goto out;
1767 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1768 goto out;
1770 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1771 goto out;
1773 if (!ast || !lksb)
1774 goto out;
1776 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1777 goto out;
1779 /* parent/child locks not yet supported */
1780 if (parent_lkid)
1781 goto out;
1783 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1784 goto out;
1786 /* these args will be copied to the lkb in validate_lock_args,
1787 it cannot be done now because when converting locks, fields in
1788 an active lkb cannot be modified before locking the rsb */
1790 args->flags = flags;
1791 args->astaddr = ast;
1792 args->astparam = (long) astarg;
1793 args->bastaddr = bast;
1794 args->mode = mode;
1795 args->lksb = lksb;
1796 rv = 0;
1797 out:
1798 return rv;
1801 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1803 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1804 DLM_LKF_FORCEUNLOCK))
1805 return -EINVAL;
1807 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1808 return -EINVAL;
1810 args->flags = flags;
1811 args->astparam = (long) astarg;
1812 return 0;
1815 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1816 struct dlm_args *args)
1818 int rv = -EINVAL;
1820 if (args->flags & DLM_LKF_CONVERT) {
1821 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1822 goto out;
1824 if (args->flags & DLM_LKF_QUECVT &&
1825 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1826 goto out;
1828 rv = -EBUSY;
1829 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1830 goto out;
1832 if (lkb->lkb_wait_type)
1833 goto out;
1835 if (is_overlap(lkb))
1836 goto out;
1839 lkb->lkb_exflags = args->flags;
1840 lkb->lkb_sbflags = 0;
1841 lkb->lkb_astaddr = args->astaddr;
1842 lkb->lkb_astparam = args->astparam;
1843 lkb->lkb_bastaddr = args->bastaddr;
1844 lkb->lkb_rqmode = args->mode;
1845 lkb->lkb_lksb = args->lksb;
1846 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1847 lkb->lkb_ownpid = (int) current->pid;
1848 rv = 0;
1849 out:
1850 return rv;
1853 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1854 for success */
1856 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1857 because there may be a lookup in progress and it's valid to do
1858 cancel/unlockf on it */
1860 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1862 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1863 int rv = -EINVAL;
1865 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1866 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1867 dlm_print_lkb(lkb);
1868 goto out;
1871 /* an lkb may still exist even though the lock is EOL'ed due to a
1872 cancel, unlock or failed noqueue request; an app can't use these
1873 locks; return same error as if the lkid had not been found at all */
1875 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1876 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1877 rv = -ENOENT;
1878 goto out;
1881 /* an lkb may be waiting for an rsb lookup to complete where the
1882 lookup was initiated by another lock */
1884 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1885 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1886 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1887 list_del_init(&lkb->lkb_rsb_lookup);
1888 queue_cast(lkb->lkb_resource, lkb,
1889 args->flags & DLM_LKF_CANCEL ?
1890 -DLM_ECANCEL : -DLM_EUNLOCK);
1891 unhold_lkb(lkb); /* undoes create_lkb() */
1892 rv = -EBUSY;
1893 goto out;
1897 /* cancel not allowed with another cancel/unlock in progress */
1899 if (args->flags & DLM_LKF_CANCEL) {
1900 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1901 goto out;
1903 if (is_overlap(lkb))
1904 goto out;
1906 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1907 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1908 rv = -EBUSY;
1909 goto out;
1912 switch (lkb->lkb_wait_type) {
1913 case DLM_MSG_LOOKUP:
1914 case DLM_MSG_REQUEST:
1915 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1916 rv = -EBUSY;
1917 goto out;
1918 case DLM_MSG_UNLOCK:
1919 case DLM_MSG_CANCEL:
1920 goto out;
1922 /* add_to_waiters() will set OVERLAP_CANCEL */
1923 goto out_ok;
1926 /* do we need to allow a force-unlock if there's a normal unlock
1927 already in progress? in what conditions could the normal unlock
1928 fail such that we'd want to send a force-unlock to be sure? */
1930 if (args->flags & DLM_LKF_FORCEUNLOCK) {
1931 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1932 goto out;
1934 if (is_overlap_unlock(lkb))
1935 goto out;
1937 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1938 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1939 rv = -EBUSY;
1940 goto out;
1943 switch (lkb->lkb_wait_type) {
1944 case DLM_MSG_LOOKUP:
1945 case DLM_MSG_REQUEST:
1946 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1947 rv = -EBUSY;
1948 goto out;
1949 case DLM_MSG_UNLOCK:
1950 goto out;
1952 /* add_to_waiters() will set OVERLAP_UNLOCK */
1953 goto out_ok;
1956 /* normal unlock not allowed if there's any op in progress */
1957 rv = -EBUSY;
1958 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1959 goto out;
1961 out_ok:
1962 /* an overlapping op shouldn't blow away exflags from other op */
1963 lkb->lkb_exflags |= args->flags;
1964 lkb->lkb_sbflags = 0;
1965 lkb->lkb_astparam = args->astparam;
1966 rv = 0;
1967 out:
1968 if (rv)
1969 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1970 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1971 args->flags, lkb->lkb_wait_type,
1972 lkb->lkb_resource->res_name);
1973 return rv;
1977 * Four stage 4 varieties:
1978 * do_request(), do_convert(), do_unlock(), do_cancel()
1979 * These are called on the master node for the given lock and
1980 * from the central locking logic.
1983 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1985 int error = 0;
1987 if (can_be_granted(r, lkb, 1)) {
1988 grant_lock(r, lkb);
1989 queue_cast(r, lkb, 0);
1990 goto out;
1993 if (can_be_queued(lkb)) {
1994 error = -EINPROGRESS;
1995 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1996 send_blocking_asts(r, lkb);
1997 goto out;
2000 error = -EAGAIN;
2001 if (force_blocking_asts(lkb))
2002 send_blocking_asts_all(r, lkb);
2003 queue_cast(r, lkb, -EAGAIN);
2005 out:
2006 return error;
2009 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2011 int error = 0;
2013 /* changing an existing lock may allow others to be granted */
2015 if (can_be_granted(r, lkb, 1)) {
2016 grant_lock(r, lkb);
2017 queue_cast(r, lkb, 0);
2018 grant_pending_locks(r);
2019 goto out;
2022 /* is_demoted() means the can_be_granted() above set the grmode
2023 to NL, and left us on the granted queue. This auto-demotion
2024 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2025 now grantable. We have to try to grant other converting locks
2026 before we try again to grant this one. */
2028 if (is_demoted(lkb)) {
2029 grant_pending_convert(r, DLM_LOCK_IV);
2030 if (_can_be_granted(r, lkb, 1)) {
2031 grant_lock(r, lkb);
2032 queue_cast(r, lkb, 0);
2033 grant_pending_locks(r);
2034 goto out;
2036 /* else fall through and move to convert queue */
2039 if (can_be_queued(lkb)) {
2040 error = -EINPROGRESS;
2041 del_lkb(r, lkb);
2042 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2043 send_blocking_asts(r, lkb);
2044 goto out;
2047 error = -EAGAIN;
2048 if (force_blocking_asts(lkb))
2049 send_blocking_asts_all(r, lkb);
2050 queue_cast(r, lkb, -EAGAIN);
2052 out:
2053 return error;
2056 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2058 remove_lock(r, lkb);
2059 queue_cast(r, lkb, -DLM_EUNLOCK);
2060 grant_pending_locks(r);
2061 return -DLM_EUNLOCK;
2064 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2066 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2068 int error;
2070 error = revert_lock(r, lkb);
2071 if (error) {
2072 queue_cast(r, lkb, -DLM_ECANCEL);
2073 grant_pending_locks(r);
2074 return -DLM_ECANCEL;
2076 return 0;
2080 * Four stage 3 varieties:
2081 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2084 /* add a new lkb to a possibly new rsb, called by requesting process */
2086 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2088 int error;
2090 /* set_master: sets lkb nodeid from r */
2092 error = set_master(r, lkb);
2093 if (error < 0)
2094 goto out;
2095 if (error) {
2096 error = 0;
2097 goto out;
2100 if (is_remote(r))
2101 /* receive_request() calls do_request() on remote node */
2102 error = send_request(r, lkb);
2103 else
2104 error = do_request(r, lkb);
2105 out:
2106 return error;
2109 /* change some property of an existing lkb, e.g. mode */
2111 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2113 int error;
2115 if (is_remote(r))
2116 /* receive_convert() calls do_convert() on remote node */
2117 error = send_convert(r, lkb);
2118 else
2119 error = do_convert(r, lkb);
2121 return error;
2124 /* remove an existing lkb from the granted queue */
2126 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2128 int error;
2130 if (is_remote(r))
2131 /* receive_unlock() calls do_unlock() on remote node */
2132 error = send_unlock(r, lkb);
2133 else
2134 error = do_unlock(r, lkb);
2136 return error;
2139 /* remove an existing lkb from the convert or wait queue */
2141 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2143 int error;
2145 if (is_remote(r))
2146 /* receive_cancel() calls do_cancel() on remote node */
2147 error = send_cancel(r, lkb);
2148 else
2149 error = do_cancel(r, lkb);
2151 return error;
2155 * Four stage 2 varieties:
2156 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2159 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2160 int len, struct dlm_args *args)
2162 struct dlm_rsb *r;
2163 int error;
2165 error = validate_lock_args(ls, lkb, args);
2166 if (error)
2167 goto out;
2169 error = find_rsb(ls, name, len, R_CREATE, &r);
2170 if (error)
2171 goto out;
2173 lock_rsb(r);
2175 attach_lkb(r, lkb);
2176 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2178 error = _request_lock(r, lkb);
2180 unlock_rsb(r);
2181 put_rsb(r);
2183 out:
2184 return error;
2187 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2188 struct dlm_args *args)
2190 struct dlm_rsb *r;
2191 int error;
2193 r = lkb->lkb_resource;
2195 hold_rsb(r);
2196 lock_rsb(r);
2198 error = validate_lock_args(ls, lkb, args);
2199 if (error)
2200 goto out;
2202 error = _convert_lock(r, lkb);
2203 out:
2204 unlock_rsb(r);
2205 put_rsb(r);
2206 return error;
2209 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2210 struct dlm_args *args)
2212 struct dlm_rsb *r;
2213 int error;
2215 r = lkb->lkb_resource;
2217 hold_rsb(r);
2218 lock_rsb(r);
2220 error = validate_unlock_args(lkb, args);
2221 if (error)
2222 goto out;
2224 error = _unlock_lock(r, lkb);
2225 out:
2226 unlock_rsb(r);
2227 put_rsb(r);
2228 return error;
2231 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2232 struct dlm_args *args)
2234 struct dlm_rsb *r;
2235 int error;
2237 r = lkb->lkb_resource;
2239 hold_rsb(r);
2240 lock_rsb(r);
2242 error = validate_unlock_args(lkb, args);
2243 if (error)
2244 goto out;
2246 error = _cancel_lock(r, lkb);
2247 out:
2248 unlock_rsb(r);
2249 put_rsb(r);
2250 return error;
2254 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2257 int dlm_lock(dlm_lockspace_t *lockspace,
2258 int mode,
2259 struct dlm_lksb *lksb,
2260 uint32_t flags,
2261 void *name,
2262 unsigned int namelen,
2263 uint32_t parent_lkid,
2264 void (*ast) (void *astarg),
2265 void *astarg,
2266 void (*bast) (void *astarg, int mode))
2268 struct dlm_ls *ls;
2269 struct dlm_lkb *lkb;
2270 struct dlm_args args;
2271 int error, convert = flags & DLM_LKF_CONVERT;
2273 ls = dlm_find_lockspace_local(lockspace);
2274 if (!ls)
2275 return -EINVAL;
2277 lock_recovery(ls);
2279 if (convert)
2280 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2281 else
2282 error = create_lkb(ls, &lkb);
2284 if (error)
2285 goto out;
2287 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
2288 astarg, bast, &args);
2289 if (error)
2290 goto out_put;
2292 if (convert)
2293 error = convert_lock(ls, lkb, &args);
2294 else
2295 error = request_lock(ls, lkb, name, namelen, &args);
2297 if (error == -EINPROGRESS)
2298 error = 0;
2299 out_put:
2300 if (convert || error)
2301 __put_lkb(ls, lkb);
2302 if (error == -EAGAIN)
2303 error = 0;
2304 out:
2305 unlock_recovery(ls);
2306 dlm_put_lockspace(ls);
2307 return error;
2310 int dlm_unlock(dlm_lockspace_t *lockspace,
2311 uint32_t lkid,
2312 uint32_t flags,
2313 struct dlm_lksb *lksb,
2314 void *astarg)
2316 struct dlm_ls *ls;
2317 struct dlm_lkb *lkb;
2318 struct dlm_args args;
2319 int error;
2321 ls = dlm_find_lockspace_local(lockspace);
2322 if (!ls)
2323 return -EINVAL;
2325 lock_recovery(ls);
2327 error = find_lkb(ls, lkid, &lkb);
2328 if (error)
2329 goto out;
2331 error = set_unlock_args(flags, astarg, &args);
2332 if (error)
2333 goto out_put;
2335 if (flags & DLM_LKF_CANCEL)
2336 error = cancel_lock(ls, lkb, &args);
2337 else
2338 error = unlock_lock(ls, lkb, &args);
2340 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2341 error = 0;
2342 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2343 error = 0;
2344 out_put:
2345 dlm_put_lkb(lkb);
2346 out:
2347 unlock_recovery(ls);
2348 dlm_put_lockspace(ls);
2349 return error;
2353 * send/receive routines for remote operations and replies
2355 * send_args
2356 * send_common
2357 * send_request receive_request
2358 * send_convert receive_convert
2359 * send_unlock receive_unlock
2360 * send_cancel receive_cancel
2361 * send_grant receive_grant
2362 * send_bast receive_bast
2363 * send_lookup receive_lookup
2364 * send_remove receive_remove
2366 * send_common_reply
2367 * receive_request_reply send_request_reply
2368 * receive_convert_reply send_convert_reply
2369 * receive_unlock_reply send_unlock_reply
2370 * receive_cancel_reply send_cancel_reply
2371 * receive_lookup_reply send_lookup_reply
2374 static int _create_message(struct dlm_ls *ls, int mb_len,
2375 int to_nodeid, int mstype,
2376 struct dlm_message **ms_ret,
2377 struct dlm_mhandle **mh_ret)
2379 struct dlm_message *ms;
2380 struct dlm_mhandle *mh;
2381 char *mb;
2383 /* get_buffer gives us a message handle (mh) that we need to
2384 pass into lowcomms_commit and a message buffer (mb) that we
2385 write our data into */
2387 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2388 if (!mh)
2389 return -ENOBUFS;
2391 memset(mb, 0, mb_len);
2393 ms = (struct dlm_message *) mb;
2395 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2396 ms->m_header.h_lockspace = ls->ls_global_id;
2397 ms->m_header.h_nodeid = dlm_our_nodeid();
2398 ms->m_header.h_length = mb_len;
2399 ms->m_header.h_cmd = DLM_MSG;
2401 ms->m_type = mstype;
2403 *mh_ret = mh;
2404 *ms_ret = ms;
2405 return 0;
2408 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2409 int to_nodeid, int mstype,
2410 struct dlm_message **ms_ret,
2411 struct dlm_mhandle **mh_ret)
2413 int mb_len = sizeof(struct dlm_message);
2415 switch (mstype) {
2416 case DLM_MSG_REQUEST:
2417 case DLM_MSG_LOOKUP:
2418 case DLM_MSG_REMOVE:
2419 mb_len += r->res_length;
2420 break;
2421 case DLM_MSG_CONVERT:
2422 case DLM_MSG_UNLOCK:
2423 case DLM_MSG_REQUEST_REPLY:
2424 case DLM_MSG_CONVERT_REPLY:
2425 case DLM_MSG_GRANT:
2426 if (lkb && lkb->lkb_lvbptr)
2427 mb_len += r->res_ls->ls_lvblen;
2428 break;
2431 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2432 ms_ret, mh_ret);
2435 /* further lowcomms enhancements or alternate implementations may make
2436 the return value from this function useful at some point */
2438 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2440 dlm_message_out(ms);
2441 dlm_lowcomms_commit_buffer(mh);
2442 return 0;
2445 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2446 struct dlm_message *ms)
2448 ms->m_nodeid = lkb->lkb_nodeid;
2449 ms->m_pid = lkb->lkb_ownpid;
2450 ms->m_lkid = lkb->lkb_id;
2451 ms->m_remid = lkb->lkb_remid;
2452 ms->m_exflags = lkb->lkb_exflags;
2453 ms->m_sbflags = lkb->lkb_sbflags;
2454 ms->m_flags = lkb->lkb_flags;
2455 ms->m_lvbseq = lkb->lkb_lvbseq;
2456 ms->m_status = lkb->lkb_status;
2457 ms->m_grmode = lkb->lkb_grmode;
2458 ms->m_rqmode = lkb->lkb_rqmode;
2459 ms->m_hash = r->res_hash;
2461 /* m_result and m_bastmode are set from function args,
2462 not from lkb fields */
2464 if (lkb->lkb_bastaddr)
2465 ms->m_asts |= AST_BAST;
2466 if (lkb->lkb_astaddr)
2467 ms->m_asts |= AST_COMP;
2469 /* compare with switch in create_message; send_remove() doesn't
2470 use send_args() */
2472 switch (ms->m_type) {
2473 case DLM_MSG_REQUEST:
2474 case DLM_MSG_LOOKUP:
2475 memcpy(ms->m_extra, r->res_name, r->res_length);
2476 break;
2477 case DLM_MSG_CONVERT:
2478 case DLM_MSG_UNLOCK:
2479 case DLM_MSG_REQUEST_REPLY:
2480 case DLM_MSG_CONVERT_REPLY:
2481 case DLM_MSG_GRANT:
2482 if (!lkb->lkb_lvbptr)
2483 break;
2484 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2485 break;
2489 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2491 struct dlm_message *ms;
2492 struct dlm_mhandle *mh;
2493 int to_nodeid, error;
2495 error = add_to_waiters(lkb, mstype);
2496 if (error)
2497 return error;
2499 to_nodeid = r->res_nodeid;
2501 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2502 if (error)
2503 goto fail;
2505 send_args(r, lkb, ms);
2507 error = send_message(mh, ms);
2508 if (error)
2509 goto fail;
2510 return 0;
2512 fail:
2513 remove_from_waiters(lkb, msg_reply_type(mstype));
2514 return error;
2517 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2519 return send_common(r, lkb, DLM_MSG_REQUEST);
2522 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2524 int error;
2526 error = send_common(r, lkb, DLM_MSG_CONVERT);
2528 /* down conversions go without a reply from the master */
2529 if (!error && down_conversion(lkb)) {
2530 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2531 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2532 r->res_ls->ls_stub_ms.m_result = 0;
2533 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2534 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2537 return error;
2540 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2541 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2542 that the master is still correct. */
2544 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2546 return send_common(r, lkb, DLM_MSG_UNLOCK);
2549 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2551 return send_common(r, lkb, DLM_MSG_CANCEL);
2554 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2556 struct dlm_message *ms;
2557 struct dlm_mhandle *mh;
2558 int to_nodeid, error;
2560 to_nodeid = lkb->lkb_nodeid;
2562 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2563 if (error)
2564 goto out;
2566 send_args(r, lkb, ms);
2568 ms->m_result = 0;
2570 error = send_message(mh, ms);
2571 out:
2572 return error;
2575 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2577 struct dlm_message *ms;
2578 struct dlm_mhandle *mh;
2579 int to_nodeid, error;
2581 to_nodeid = lkb->lkb_nodeid;
2583 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2584 if (error)
2585 goto out;
2587 send_args(r, lkb, ms);
2589 ms->m_bastmode = mode;
2591 error = send_message(mh, ms);
2592 out:
2593 return error;
2596 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2598 struct dlm_message *ms;
2599 struct dlm_mhandle *mh;
2600 int to_nodeid, error;
2602 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2603 if (error)
2604 return error;
2606 to_nodeid = dlm_dir_nodeid(r);
2608 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2609 if (error)
2610 goto fail;
2612 send_args(r, lkb, ms);
2614 error = send_message(mh, ms);
2615 if (error)
2616 goto fail;
2617 return 0;
2619 fail:
2620 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2621 return error;
2624 static int send_remove(struct dlm_rsb *r)
2626 struct dlm_message *ms;
2627 struct dlm_mhandle *mh;
2628 int to_nodeid, error;
2630 to_nodeid = dlm_dir_nodeid(r);
2632 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2633 if (error)
2634 goto out;
2636 memcpy(ms->m_extra, r->res_name, r->res_length);
2637 ms->m_hash = r->res_hash;
2639 error = send_message(mh, ms);
2640 out:
2641 return error;
2644 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2645 int mstype, int rv)
2647 struct dlm_message *ms;
2648 struct dlm_mhandle *mh;
2649 int to_nodeid, error;
2651 to_nodeid = lkb->lkb_nodeid;
2653 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2654 if (error)
2655 goto out;
2657 send_args(r, lkb, ms);
2659 ms->m_result = rv;
2661 error = send_message(mh, ms);
2662 out:
2663 return error;
2666 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2668 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2671 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2673 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2676 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2678 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2681 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2683 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2686 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2687 int ret_nodeid, int rv)
2689 struct dlm_rsb *r = &ls->ls_stub_rsb;
2690 struct dlm_message *ms;
2691 struct dlm_mhandle *mh;
2692 int error, nodeid = ms_in->m_header.h_nodeid;
2694 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2695 if (error)
2696 goto out;
2698 ms->m_lkid = ms_in->m_lkid;
2699 ms->m_result = rv;
2700 ms->m_nodeid = ret_nodeid;
2702 error = send_message(mh, ms);
2703 out:
2704 return error;
2707 /* which args we save from a received message depends heavily on the type
2708 of message, unlike the send side where we can safely send everything about
2709 the lkb for any type of message */
2711 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2713 lkb->lkb_exflags = ms->m_exflags;
2714 lkb->lkb_sbflags = ms->m_sbflags;
2715 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2716 (ms->m_flags & 0x0000FFFF);
2719 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2721 lkb->lkb_sbflags = ms->m_sbflags;
2722 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2723 (ms->m_flags & 0x0000FFFF);
2726 static int receive_extralen(struct dlm_message *ms)
2728 return (ms->m_header.h_length - sizeof(struct dlm_message));
2731 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2732 struct dlm_message *ms)
2734 int len;
2736 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2737 if (!lkb->lkb_lvbptr)
2738 lkb->lkb_lvbptr = allocate_lvb(ls);
2739 if (!lkb->lkb_lvbptr)
2740 return -ENOMEM;
2741 len = receive_extralen(ms);
2742 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2744 return 0;
2747 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2748 struct dlm_message *ms)
2750 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2751 lkb->lkb_ownpid = ms->m_pid;
2752 lkb->lkb_remid = ms->m_lkid;
2753 lkb->lkb_grmode = DLM_LOCK_IV;
2754 lkb->lkb_rqmode = ms->m_rqmode;
2755 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2756 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2758 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2760 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2761 /* lkb was just created so there won't be an lvb yet */
2762 lkb->lkb_lvbptr = allocate_lvb(ls);
2763 if (!lkb->lkb_lvbptr)
2764 return -ENOMEM;
2767 return 0;
2770 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2771 struct dlm_message *ms)
2773 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2774 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2775 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2776 lkb->lkb_id, lkb->lkb_remid);
2777 return -EINVAL;
2780 if (!is_master_copy(lkb))
2781 return -EINVAL;
2783 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2784 return -EBUSY;
2786 if (receive_lvb(ls, lkb, ms))
2787 return -ENOMEM;
2789 lkb->lkb_rqmode = ms->m_rqmode;
2790 lkb->lkb_lvbseq = ms->m_lvbseq;
2792 return 0;
2795 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2796 struct dlm_message *ms)
2798 if (!is_master_copy(lkb))
2799 return -EINVAL;
2800 if (receive_lvb(ls, lkb, ms))
2801 return -ENOMEM;
2802 return 0;
2805 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2806 uses to send a reply and that the remote end uses to process the reply. */
2808 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2810 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2811 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2812 lkb->lkb_remid = ms->m_lkid;
2815 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2817 struct dlm_lkb *lkb;
2818 struct dlm_rsb *r;
2819 int error, namelen;
2821 error = create_lkb(ls, &lkb);
2822 if (error)
2823 goto fail;
2825 receive_flags(lkb, ms);
2826 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2827 error = receive_request_args(ls, lkb, ms);
2828 if (error) {
2829 __put_lkb(ls, lkb);
2830 goto fail;
2833 namelen = receive_extralen(ms);
2835 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2836 if (error) {
2837 __put_lkb(ls, lkb);
2838 goto fail;
2841 lock_rsb(r);
2843 attach_lkb(r, lkb);
2844 error = do_request(r, lkb);
2845 send_request_reply(r, lkb, error);
2847 unlock_rsb(r);
2848 put_rsb(r);
2850 if (error == -EINPROGRESS)
2851 error = 0;
2852 if (error)
2853 dlm_put_lkb(lkb);
2854 return;
2856 fail:
2857 setup_stub_lkb(ls, ms);
2858 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2861 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2863 struct dlm_lkb *lkb;
2864 struct dlm_rsb *r;
2865 int error, reply = 1;
2867 error = find_lkb(ls, ms->m_remid, &lkb);
2868 if (error)
2869 goto fail;
2871 r = lkb->lkb_resource;
2873 hold_rsb(r);
2874 lock_rsb(r);
2876 receive_flags(lkb, ms);
2877 error = receive_convert_args(ls, lkb, ms);
2878 if (error)
2879 goto out;
2880 reply = !down_conversion(lkb);
2882 error = do_convert(r, lkb);
2883 out:
2884 if (reply)
2885 send_convert_reply(r, lkb, error);
2887 unlock_rsb(r);
2888 put_rsb(r);
2889 dlm_put_lkb(lkb);
2890 return;
2892 fail:
2893 setup_stub_lkb(ls, ms);
2894 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2897 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2899 struct dlm_lkb *lkb;
2900 struct dlm_rsb *r;
2901 int error;
2903 error = find_lkb(ls, ms->m_remid, &lkb);
2904 if (error)
2905 goto fail;
2907 r = lkb->lkb_resource;
2909 hold_rsb(r);
2910 lock_rsb(r);
2912 receive_flags(lkb, ms);
2913 error = receive_unlock_args(ls, lkb, ms);
2914 if (error)
2915 goto out;
2917 error = do_unlock(r, lkb);
2918 out:
2919 send_unlock_reply(r, lkb, error);
2921 unlock_rsb(r);
2922 put_rsb(r);
2923 dlm_put_lkb(lkb);
2924 return;
2926 fail:
2927 setup_stub_lkb(ls, ms);
2928 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2931 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2933 struct dlm_lkb *lkb;
2934 struct dlm_rsb *r;
2935 int error;
2937 error = find_lkb(ls, ms->m_remid, &lkb);
2938 if (error)
2939 goto fail;
2941 receive_flags(lkb, ms);
2943 r = lkb->lkb_resource;
2945 hold_rsb(r);
2946 lock_rsb(r);
2948 error = do_cancel(r, lkb);
2949 send_cancel_reply(r, lkb, error);
2951 unlock_rsb(r);
2952 put_rsb(r);
2953 dlm_put_lkb(lkb);
2954 return;
2956 fail:
2957 setup_stub_lkb(ls, ms);
2958 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2961 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2963 struct dlm_lkb *lkb;
2964 struct dlm_rsb *r;
2965 int error;
2967 error = find_lkb(ls, ms->m_remid, &lkb);
2968 if (error) {
2969 log_error(ls, "receive_grant no lkb");
2970 return;
2972 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2974 r = lkb->lkb_resource;
2976 hold_rsb(r);
2977 lock_rsb(r);
2979 receive_flags_reply(lkb, ms);
2980 if (is_altmode(lkb))
2981 munge_altmode(lkb, ms);
2982 grant_lock_pc(r, lkb, ms);
2983 queue_cast(r, lkb, 0);
2985 unlock_rsb(r);
2986 put_rsb(r);
2987 dlm_put_lkb(lkb);
2990 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2992 struct dlm_lkb *lkb;
2993 struct dlm_rsb *r;
2994 int error;
2996 error = find_lkb(ls, ms->m_remid, &lkb);
2997 if (error) {
2998 log_error(ls, "receive_bast no lkb");
2999 return;
3001 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3003 r = lkb->lkb_resource;
3005 hold_rsb(r);
3006 lock_rsb(r);
3008 queue_bast(r, lkb, ms->m_bastmode);
3010 unlock_rsb(r);
3011 put_rsb(r);
3012 dlm_put_lkb(lkb);
3015 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3017 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3019 from_nodeid = ms->m_header.h_nodeid;
3020 our_nodeid = dlm_our_nodeid();
3022 len = receive_extralen(ms);
3024 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3025 if (dir_nodeid != our_nodeid) {
3026 log_error(ls, "lookup dir_nodeid %d from %d",
3027 dir_nodeid, from_nodeid);
3028 error = -EINVAL;
3029 ret_nodeid = -1;
3030 goto out;
3033 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3035 /* Optimization: we're master so treat lookup as a request */
3036 if (!error && ret_nodeid == our_nodeid) {
3037 receive_request(ls, ms);
3038 return;
3040 out:
3041 send_lookup_reply(ls, ms, ret_nodeid, error);
3044 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3046 int len, dir_nodeid, from_nodeid;
3048 from_nodeid = ms->m_header.h_nodeid;
3050 len = receive_extralen(ms);
3052 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3053 if (dir_nodeid != dlm_our_nodeid()) {
3054 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3055 dir_nodeid, from_nodeid);
3056 return;
3059 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3062 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3064 do_purge(ls, ms->m_nodeid, ms->m_pid);
3067 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3069 struct dlm_lkb *lkb;
3070 struct dlm_rsb *r;
3071 int error, mstype, result;
3073 error = find_lkb(ls, ms->m_remid, &lkb);
3074 if (error) {
3075 log_error(ls, "receive_request_reply no lkb");
3076 return;
3078 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3080 r = lkb->lkb_resource;
3081 hold_rsb(r);
3082 lock_rsb(r);
3084 mstype = lkb->lkb_wait_type;
3085 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3086 if (error)
3087 goto out;
3089 /* Optimization: the dir node was also the master, so it took our
3090 lookup as a request and sent request reply instead of lookup reply */
3091 if (mstype == DLM_MSG_LOOKUP) {
3092 r->res_nodeid = ms->m_header.h_nodeid;
3093 lkb->lkb_nodeid = r->res_nodeid;
3096 /* this is the value returned from do_request() on the master */
3097 result = ms->m_result;
3099 switch (result) {
3100 case -EAGAIN:
3101 /* request would block (be queued) on remote master */
3102 queue_cast(r, lkb, -EAGAIN);
3103 confirm_master(r, -EAGAIN);
3104 unhold_lkb(lkb); /* undoes create_lkb() */
3105 break;
3107 case -EINPROGRESS:
3108 case 0:
3109 /* request was queued or granted on remote master */
3110 receive_flags_reply(lkb, ms);
3111 lkb->lkb_remid = ms->m_lkid;
3112 if (is_altmode(lkb))
3113 munge_altmode(lkb, ms);
3114 if (result)
3115 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3116 else {
3117 grant_lock_pc(r, lkb, ms);
3118 queue_cast(r, lkb, 0);
3120 confirm_master(r, result);
3121 break;
3123 case -EBADR:
3124 case -ENOTBLK:
3125 /* find_rsb failed to find rsb or rsb wasn't master */
3126 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3127 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3128 r->res_nodeid = -1;
3129 lkb->lkb_nodeid = -1;
3131 if (is_overlap(lkb)) {
3132 /* we'll ignore error in cancel/unlock reply */
3133 queue_cast_overlap(r, lkb);
3134 unhold_lkb(lkb); /* undoes create_lkb() */
3135 } else
3136 _request_lock(r, lkb);
3137 break;
3139 default:
3140 log_error(ls, "receive_request_reply %x error %d",
3141 lkb->lkb_id, result);
3144 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3145 log_debug(ls, "receive_request_reply %x result %d unlock",
3146 lkb->lkb_id, result);
3147 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3148 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3149 send_unlock(r, lkb);
3150 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3151 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3152 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3153 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3154 send_cancel(r, lkb);
3155 } else {
3156 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3157 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3159 out:
3160 unlock_rsb(r);
3161 put_rsb(r);
3162 dlm_put_lkb(lkb);
3165 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3166 struct dlm_message *ms)
3168 /* this is the value returned from do_convert() on the master */
3169 switch (ms->m_result) {
3170 case -EAGAIN:
3171 /* convert would block (be queued) on remote master */
3172 queue_cast(r, lkb, -EAGAIN);
3173 break;
3175 case -EINPROGRESS:
3176 /* convert was queued on remote master */
3177 receive_flags_reply(lkb, ms);
3178 if (is_demoted(lkb))
3179 munge_demoted(lkb, ms);
3180 del_lkb(r, lkb);
3181 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3182 break;
3184 case 0:
3185 /* convert was granted on remote master */
3186 receive_flags_reply(lkb, ms);
3187 if (is_demoted(lkb))
3188 munge_demoted(lkb, ms);
3189 grant_lock_pc(r, lkb, ms);
3190 queue_cast(r, lkb, 0);
3191 break;
3193 default:
3194 log_error(r->res_ls, "receive_convert_reply %x error %d",
3195 lkb->lkb_id, ms->m_result);
3199 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3201 struct dlm_rsb *r = lkb->lkb_resource;
3202 int error;
3204 hold_rsb(r);
3205 lock_rsb(r);
3207 /* stub reply can happen with waiters_mutex held */
3208 error = remove_from_waiters_ms(lkb, ms);
3209 if (error)
3210 goto out;
3212 __receive_convert_reply(r, lkb, ms);
3213 out:
3214 unlock_rsb(r);
3215 put_rsb(r);
3218 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3220 struct dlm_lkb *lkb;
3221 int error;
3223 error = find_lkb(ls, ms->m_remid, &lkb);
3224 if (error) {
3225 log_error(ls, "receive_convert_reply no lkb");
3226 return;
3228 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3230 _receive_convert_reply(lkb, ms);
3231 dlm_put_lkb(lkb);
3234 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3236 struct dlm_rsb *r = lkb->lkb_resource;
3237 int error;
3239 hold_rsb(r);
3240 lock_rsb(r);
3242 /* stub reply can happen with waiters_mutex held */
3243 error = remove_from_waiters_ms(lkb, ms);
3244 if (error)
3245 goto out;
3247 /* this is the value returned from do_unlock() on the master */
3249 switch (ms->m_result) {
3250 case -DLM_EUNLOCK:
3251 receive_flags_reply(lkb, ms);
3252 remove_lock_pc(r, lkb);
3253 queue_cast(r, lkb, -DLM_EUNLOCK);
3254 break;
3255 case -ENOENT:
3256 break;
3257 default:
3258 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3259 lkb->lkb_id, ms->m_result);
3261 out:
3262 unlock_rsb(r);
3263 put_rsb(r);
3266 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3268 struct dlm_lkb *lkb;
3269 int error;
3271 error = find_lkb(ls, ms->m_remid, &lkb);
3272 if (error) {
3273 log_error(ls, "receive_unlock_reply no lkb");
3274 return;
3276 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3278 _receive_unlock_reply(lkb, ms);
3279 dlm_put_lkb(lkb);
3282 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3284 struct dlm_rsb *r = lkb->lkb_resource;
3285 int error;
3287 hold_rsb(r);
3288 lock_rsb(r);
3290 /* stub reply can happen with waiters_mutex held */
3291 error = remove_from_waiters_ms(lkb, ms);
3292 if (error)
3293 goto out;
3295 /* this is the value returned from do_cancel() on the master */
3297 switch (ms->m_result) {
3298 case -DLM_ECANCEL:
3299 receive_flags_reply(lkb, ms);
3300 revert_lock_pc(r, lkb);
3301 if (ms->m_result)
3302 queue_cast(r, lkb, -DLM_ECANCEL);
3303 break;
3304 case 0:
3305 break;
3306 default:
3307 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3308 lkb->lkb_id, ms->m_result);
3310 out:
3311 unlock_rsb(r);
3312 put_rsb(r);
3315 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3317 struct dlm_lkb *lkb;
3318 int error;
3320 error = find_lkb(ls, ms->m_remid, &lkb);
3321 if (error) {
3322 log_error(ls, "receive_cancel_reply no lkb");
3323 return;
3325 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3327 _receive_cancel_reply(lkb, ms);
3328 dlm_put_lkb(lkb);
3331 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3333 struct dlm_lkb *lkb;
3334 struct dlm_rsb *r;
3335 int error, ret_nodeid;
3337 error = find_lkb(ls, ms->m_lkid, &lkb);
3338 if (error) {
3339 log_error(ls, "receive_lookup_reply no lkb");
3340 return;
3343 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3344 FIXME: will a non-zero error ever be returned? */
3346 r = lkb->lkb_resource;
3347 hold_rsb(r);
3348 lock_rsb(r);
3350 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3351 if (error)
3352 goto out;
3354 ret_nodeid = ms->m_nodeid;
3355 if (ret_nodeid == dlm_our_nodeid()) {
3356 r->res_nodeid = 0;
3357 ret_nodeid = 0;
3358 r->res_first_lkid = 0;
3359 } else {
3360 /* set_master() will copy res_nodeid to lkb_nodeid */
3361 r->res_nodeid = ret_nodeid;
3364 if (is_overlap(lkb)) {
3365 log_debug(ls, "receive_lookup_reply %x unlock %x",
3366 lkb->lkb_id, lkb->lkb_flags);
3367 queue_cast_overlap(r, lkb);
3368 unhold_lkb(lkb); /* undoes create_lkb() */
3369 goto out_list;
3372 _request_lock(r, lkb);
3374 out_list:
3375 if (!ret_nodeid)
3376 process_lookup_list(r);
3377 out:
3378 unlock_rsb(r);
3379 put_rsb(r);
3380 dlm_put_lkb(lkb);
3383 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3385 struct dlm_message *ms = (struct dlm_message *) hd;
3386 struct dlm_ls *ls;
3387 int error = 0;
3389 if (!recovery)
3390 dlm_message_in(ms);
3392 ls = dlm_find_lockspace_global(hd->h_lockspace);
3393 if (!ls) {
3394 log_print("drop message %d from %d for unknown lockspace %d",
3395 ms->m_type, nodeid, hd->h_lockspace);
3396 return -EINVAL;
3399 /* recovery may have just ended leaving a bunch of backed-up requests
3400 in the requestqueue; wait while dlm_recoverd clears them */
3402 if (!recovery)
3403 dlm_wait_requestqueue(ls);
3405 /* recovery may have just started while there were a bunch of
3406 in-flight requests -- save them in requestqueue to be processed
3407 after recovery. we can't let dlm_recvd block on the recovery
3408 lock. if dlm_recoverd is calling this function to clear the
3409 requestqueue, it needs to be interrupted (-EINTR) if another
3410 recovery operation is starting. */
3412 while (1) {
3413 if (dlm_locking_stopped(ls)) {
3414 if (recovery) {
3415 error = -EINTR;
3416 goto out;
3418 error = dlm_add_requestqueue(ls, nodeid, hd);
3419 if (error == -EAGAIN)
3420 continue;
3421 else {
3422 error = -EINTR;
3423 goto out;
3427 if (lock_recovery_try(ls))
3428 break;
3429 schedule();
3432 switch (ms->m_type) {
3434 /* messages sent to a master node */
3436 case DLM_MSG_REQUEST:
3437 receive_request(ls, ms);
3438 break;
3440 case DLM_MSG_CONVERT:
3441 receive_convert(ls, ms);
3442 break;
3444 case DLM_MSG_UNLOCK:
3445 receive_unlock(ls, ms);
3446 break;
3448 case DLM_MSG_CANCEL:
3449 receive_cancel(ls, ms);
3450 break;
3452 /* messages sent from a master node (replies to above) */
3454 case DLM_MSG_REQUEST_REPLY:
3455 receive_request_reply(ls, ms);
3456 break;
3458 case DLM_MSG_CONVERT_REPLY:
3459 receive_convert_reply(ls, ms);
3460 break;
3462 case DLM_MSG_UNLOCK_REPLY:
3463 receive_unlock_reply(ls, ms);
3464 break;
3466 case DLM_MSG_CANCEL_REPLY:
3467 receive_cancel_reply(ls, ms);
3468 break;
3470 /* messages sent from a master node (only two types of async msg) */
3472 case DLM_MSG_GRANT:
3473 receive_grant(ls, ms);
3474 break;
3476 case DLM_MSG_BAST:
3477 receive_bast(ls, ms);
3478 break;
3480 /* messages sent to a dir node */
3482 case DLM_MSG_LOOKUP:
3483 receive_lookup(ls, ms);
3484 break;
3486 case DLM_MSG_REMOVE:
3487 receive_remove(ls, ms);
3488 break;
3490 /* messages sent from a dir node (remove has no reply) */
3492 case DLM_MSG_LOOKUP_REPLY:
3493 receive_lookup_reply(ls, ms);
3494 break;
3496 /* other messages */
3498 case DLM_MSG_PURGE:
3499 receive_purge(ls, ms);
3500 break;
3502 default:
3503 log_error(ls, "unknown message type %d", ms->m_type);
3506 unlock_recovery(ls);
3507 out:
3508 dlm_put_lockspace(ls);
3509 dlm_astd_wake();
3510 return error;
3515 * Recovery related
3518 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3520 if (middle_conversion(lkb)) {
3521 hold_lkb(lkb);
3522 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3523 ls->ls_stub_ms.m_result = -EINPROGRESS;
3524 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3525 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3527 /* Same special case as in receive_rcom_lock_args() */
3528 lkb->lkb_grmode = DLM_LOCK_IV;
3529 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3530 unhold_lkb(lkb);
3532 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3533 lkb->lkb_flags |= DLM_IFL_RESEND;
3536 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3537 conversions are async; there's no reply from the remote master */
3540 /* A waiting lkb needs recovery if the master node has failed, or
3541 the master node is changing (only when no directory is used) */
3543 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3545 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3546 return 1;
3548 if (!dlm_no_directory(ls))
3549 return 0;
3551 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3552 return 1;
3554 return 0;
3557 /* Recovery for locks that are waiting for replies from nodes that are now
3558 gone. We can just complete unlocks and cancels by faking a reply from the
3559 dead node. Requests and up-conversions we flag to be resent after
3560 recovery. Down-conversions can just be completed with a fake reply like
3561 unlocks. Conversions between PR and CW need special attention. */
3563 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3565 struct dlm_lkb *lkb, *safe;
3567 mutex_lock(&ls->ls_waiters_mutex);
3569 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3570 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3571 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3573 /* all outstanding lookups, regardless of destination will be
3574 resent after recovery is done */
3576 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3577 lkb->lkb_flags |= DLM_IFL_RESEND;
3578 continue;
3581 if (!waiter_needs_recovery(ls, lkb))
3582 continue;
3584 switch (lkb->lkb_wait_type) {
3586 case DLM_MSG_REQUEST:
3587 lkb->lkb_flags |= DLM_IFL_RESEND;
3588 break;
3590 case DLM_MSG_CONVERT:
3591 recover_convert_waiter(ls, lkb);
3592 break;
3594 case DLM_MSG_UNLOCK:
3595 hold_lkb(lkb);
3596 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3597 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3598 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3599 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3600 dlm_put_lkb(lkb);
3601 break;
3603 case DLM_MSG_CANCEL:
3604 hold_lkb(lkb);
3605 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3606 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3607 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3608 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3609 dlm_put_lkb(lkb);
3610 break;
3612 default:
3613 log_error(ls, "invalid lkb wait_type %d",
3614 lkb->lkb_wait_type);
3616 schedule();
3618 mutex_unlock(&ls->ls_waiters_mutex);
3621 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3623 struct dlm_lkb *lkb;
3624 int found = 0;
3626 mutex_lock(&ls->ls_waiters_mutex);
3627 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3628 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3629 hold_lkb(lkb);
3630 found = 1;
3631 break;
3634 mutex_unlock(&ls->ls_waiters_mutex);
3636 if (!found)
3637 lkb = NULL;
3638 return lkb;
3641 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3642 master or dir-node for r. Processing the lkb may result in it being placed
3643 back on waiters. */
3645 /* We do this after normal locking has been enabled and any saved messages
3646 (in requestqueue) have been processed. We should be confident that at
3647 this point we won't get or process a reply to any of these waiting
3648 operations. But, new ops may be coming in on the rsbs/locks here from
3649 userspace or remotely. */
3651 /* there may have been an overlap unlock/cancel prior to recovery or after
3652 recovery. if before, the lkb may still have a pos wait_count; if after, the
3653 overlap flag would just have been set and nothing new sent. we can be
3654 confident here than any replies to either the initial op or overlap ops
3655 prior to recovery have been received. */
3657 int dlm_recover_waiters_post(struct dlm_ls *ls)
3659 struct dlm_lkb *lkb;
3660 struct dlm_rsb *r;
3661 int error = 0, mstype, err, oc, ou;
3663 while (1) {
3664 if (dlm_locking_stopped(ls)) {
3665 log_debug(ls, "recover_waiters_post aborted");
3666 error = -EINTR;
3667 break;
3670 lkb = find_resend_waiter(ls);
3671 if (!lkb)
3672 break;
3674 r = lkb->lkb_resource;
3675 hold_rsb(r);
3676 lock_rsb(r);
3678 mstype = lkb->lkb_wait_type;
3679 oc = is_overlap_cancel(lkb);
3680 ou = is_overlap_unlock(lkb);
3681 err = 0;
3683 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3684 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3686 /* At this point we assume that we won't get a reply to any
3687 previous op or overlap op on this lock. First, do a big
3688 remove_from_waiters() for all previous ops. */
3690 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3691 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3692 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3693 lkb->lkb_wait_type = 0;
3694 lkb->lkb_wait_count = 0;
3695 mutex_lock(&ls->ls_waiters_mutex);
3696 list_del_init(&lkb->lkb_wait_reply);
3697 mutex_unlock(&ls->ls_waiters_mutex);
3698 unhold_lkb(lkb); /* for waiters list */
3700 if (oc || ou) {
3701 /* do an unlock or cancel instead of resending */
3702 switch (mstype) {
3703 case DLM_MSG_LOOKUP:
3704 case DLM_MSG_REQUEST:
3705 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3706 -DLM_ECANCEL);
3707 unhold_lkb(lkb); /* undoes create_lkb() */
3708 break;
3709 case DLM_MSG_CONVERT:
3710 if (oc) {
3711 queue_cast(r, lkb, -DLM_ECANCEL);
3712 } else {
3713 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3714 _unlock_lock(r, lkb);
3716 break;
3717 default:
3718 err = 1;
3720 } else {
3721 switch (mstype) {
3722 case DLM_MSG_LOOKUP:
3723 case DLM_MSG_REQUEST:
3724 _request_lock(r, lkb);
3725 if (is_master(r))
3726 confirm_master(r, 0);
3727 break;
3728 case DLM_MSG_CONVERT:
3729 _convert_lock(r, lkb);
3730 break;
3731 default:
3732 err = 1;
3736 if (err)
3737 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3738 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3739 unlock_rsb(r);
3740 put_rsb(r);
3741 dlm_put_lkb(lkb);
3744 return error;
3747 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3748 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3750 struct dlm_ls *ls = r->res_ls;
3751 struct dlm_lkb *lkb, *safe;
3753 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3754 if (test(ls, lkb)) {
3755 rsb_set_flag(r, RSB_LOCKS_PURGED);
3756 del_lkb(r, lkb);
3757 /* this put should free the lkb */
3758 if (!dlm_put_lkb(lkb))
3759 log_error(ls, "purged lkb not released");
3764 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3766 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3769 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3771 return is_master_copy(lkb);
3774 static void purge_dead_locks(struct dlm_rsb *r)
3776 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3777 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3778 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3781 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3783 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3784 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3785 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3788 /* Get rid of locks held by nodes that are gone. */
3790 int dlm_purge_locks(struct dlm_ls *ls)
3792 struct dlm_rsb *r;
3794 log_debug(ls, "dlm_purge_locks");
3796 down_write(&ls->ls_root_sem);
3797 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3798 hold_rsb(r);
3799 lock_rsb(r);
3800 if (is_master(r))
3801 purge_dead_locks(r);
3802 unlock_rsb(r);
3803 unhold_rsb(r);
3805 schedule();
3807 up_write(&ls->ls_root_sem);
3809 return 0;
3812 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3814 struct dlm_rsb *r, *r_ret = NULL;
3816 read_lock(&ls->ls_rsbtbl[bucket].lock);
3817 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3818 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3819 continue;
3820 hold_rsb(r);
3821 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3822 r_ret = r;
3823 break;
3825 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3826 return r_ret;
3829 void dlm_grant_after_purge(struct dlm_ls *ls)
3831 struct dlm_rsb *r;
3832 int bucket = 0;
3834 while (1) {
3835 r = find_purged_rsb(ls, bucket);
3836 if (!r) {
3837 if (bucket == ls->ls_rsbtbl_size - 1)
3838 break;
3839 bucket++;
3840 continue;
3842 lock_rsb(r);
3843 if (is_master(r)) {
3844 grant_pending_locks(r);
3845 confirm_master(r, 0);
3847 unlock_rsb(r);
3848 put_rsb(r);
3849 schedule();
3853 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3854 uint32_t remid)
3856 struct dlm_lkb *lkb;
3858 list_for_each_entry(lkb, head, lkb_statequeue) {
3859 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3860 return lkb;
3862 return NULL;
3865 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3866 uint32_t remid)
3868 struct dlm_lkb *lkb;
3870 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3871 if (lkb)
3872 return lkb;
3873 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3874 if (lkb)
3875 return lkb;
3876 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3877 if (lkb)
3878 return lkb;
3879 return NULL;
3882 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3883 struct dlm_rsb *r, struct dlm_rcom *rc)
3885 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3886 int lvblen;
3888 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3889 lkb->lkb_ownpid = rl->rl_ownpid;
3890 lkb->lkb_remid = rl->rl_lkid;
3891 lkb->lkb_exflags = rl->rl_exflags;
3892 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3893 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3894 lkb->lkb_lvbseq = rl->rl_lvbseq;
3895 lkb->lkb_rqmode = rl->rl_rqmode;
3896 lkb->lkb_grmode = rl->rl_grmode;
3897 /* don't set lkb_status because add_lkb wants to itself */
3899 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3900 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3902 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3903 lkb->lkb_lvbptr = allocate_lvb(ls);
3904 if (!lkb->lkb_lvbptr)
3905 return -ENOMEM;
3906 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3907 sizeof(struct rcom_lock);
3908 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3911 /* Conversions between PR and CW (middle modes) need special handling.
3912 The real granted mode of these converting locks cannot be determined
3913 until all locks have been rebuilt on the rsb (recover_conversion) */
3915 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3916 rl->rl_status = DLM_LKSTS_CONVERT;
3917 lkb->lkb_grmode = DLM_LOCK_IV;
3918 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3921 return 0;
3924 /* This lkb may have been recovered in a previous aborted recovery so we need
3925 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3926 If so we just send back a standard reply. If not, we create a new lkb with
3927 the given values and send back our lkid. We send back our lkid by sending
3928 back the rcom_lock struct we got but with the remid field filled in. */
3930 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3932 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3933 struct dlm_rsb *r;
3934 struct dlm_lkb *lkb;
3935 int error;
3937 if (rl->rl_parent_lkid) {
3938 error = -EOPNOTSUPP;
3939 goto out;
3942 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3943 if (error)
3944 goto out;
3946 lock_rsb(r);
3948 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3949 if (lkb) {
3950 error = -EEXIST;
3951 goto out_remid;
3954 error = create_lkb(ls, &lkb);
3955 if (error)
3956 goto out_unlock;
3958 error = receive_rcom_lock_args(ls, lkb, r, rc);
3959 if (error) {
3960 __put_lkb(ls, lkb);
3961 goto out_unlock;
3964 attach_lkb(r, lkb);
3965 add_lkb(r, lkb, rl->rl_status);
3966 error = 0;
3968 out_remid:
3969 /* this is the new value returned to the lock holder for
3970 saving in its process-copy lkb */
3971 rl->rl_remid = lkb->lkb_id;
3973 out_unlock:
3974 unlock_rsb(r);
3975 put_rsb(r);
3976 out:
3977 if (error)
3978 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3979 rl->rl_result = error;
3980 return error;
3983 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3985 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3986 struct dlm_rsb *r;
3987 struct dlm_lkb *lkb;
3988 int error;
3990 error = find_lkb(ls, rl->rl_lkid, &lkb);
3991 if (error) {
3992 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3993 return error;
3996 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3998 error = rl->rl_result;
4000 r = lkb->lkb_resource;
4001 hold_rsb(r);
4002 lock_rsb(r);
4004 switch (error) {
4005 case -EBADR:
4006 /* There's a chance the new master received our lock before
4007 dlm_recover_master_reply(), this wouldn't happen if we did
4008 a barrier between recover_masters and recover_locks. */
4009 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4010 (unsigned long)r, r->res_name);
4011 dlm_send_rcom_lock(r, lkb);
4012 goto out;
4013 case -EEXIST:
4014 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4015 /* fall through */
4016 case 0:
4017 lkb->lkb_remid = rl->rl_remid;
4018 break;
4019 default:
4020 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4021 error, lkb->lkb_id);
4024 /* an ack for dlm_recover_locks() which waits for replies from
4025 all the locks it sends to new masters */
4026 dlm_recovered_lock(r);
4027 out:
4028 unlock_rsb(r);
4029 put_rsb(r);
4030 dlm_put_lkb(lkb);
4032 return 0;
4035 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4036 int mode, uint32_t flags, void *name, unsigned int namelen,
4037 uint32_t parent_lkid)
4039 struct dlm_lkb *lkb;
4040 struct dlm_args args;
4041 int error;
4043 lock_recovery(ls);
4045 error = create_lkb(ls, &lkb);
4046 if (error) {
4047 kfree(ua);
4048 goto out;
4051 if (flags & DLM_LKF_VALBLK) {
4052 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4053 if (!ua->lksb.sb_lvbptr) {
4054 kfree(ua);
4055 __put_lkb(ls, lkb);
4056 error = -ENOMEM;
4057 goto out;
4061 /* After ua is attached to lkb it will be freed by free_lkb().
4062 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4063 lock and that lkb_astparam is the dlm_user_args structure. */
4065 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
4066 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4067 lkb->lkb_flags |= DLM_IFL_USER;
4068 ua->old_mode = DLM_LOCK_IV;
4070 if (error) {
4071 __put_lkb(ls, lkb);
4072 goto out;
4075 error = request_lock(ls, lkb, name, namelen, &args);
4077 switch (error) {
4078 case 0:
4079 break;
4080 case -EINPROGRESS:
4081 error = 0;
4082 break;
4083 case -EAGAIN:
4084 error = 0;
4085 /* fall through */
4086 default:
4087 __put_lkb(ls, lkb);
4088 goto out;
4091 /* add this new lkb to the per-process list of locks */
4092 spin_lock(&ua->proc->locks_spin);
4093 hold_lkb(lkb);
4094 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4095 spin_unlock(&ua->proc->locks_spin);
4096 out:
4097 unlock_recovery(ls);
4098 return error;
4101 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4102 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4104 struct dlm_lkb *lkb;
4105 struct dlm_args args;
4106 struct dlm_user_args *ua;
4107 int error;
4109 lock_recovery(ls);
4111 error = find_lkb(ls, lkid, &lkb);
4112 if (error)
4113 goto out;
4115 /* user can change the params on its lock when it converts it, or
4116 add an lvb that didn't exist before */
4118 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4120 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4121 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4122 if (!ua->lksb.sb_lvbptr) {
4123 error = -ENOMEM;
4124 goto out_put;
4127 if (lvb_in && ua->lksb.sb_lvbptr)
4128 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4130 ua->castparam = ua_tmp->castparam;
4131 ua->castaddr = ua_tmp->castaddr;
4132 ua->bastparam = ua_tmp->bastparam;
4133 ua->bastaddr = ua_tmp->bastaddr;
4134 ua->user_lksb = ua_tmp->user_lksb;
4135 ua->old_mode = lkb->lkb_grmode;
4137 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4138 ua, DLM_FAKE_USER_AST, &args);
4139 if (error)
4140 goto out_put;
4142 error = convert_lock(ls, lkb, &args);
4144 if (error == -EINPROGRESS || error == -EAGAIN)
4145 error = 0;
4146 out_put:
4147 dlm_put_lkb(lkb);
4148 out:
4149 unlock_recovery(ls);
4150 kfree(ua_tmp);
4151 return error;
4154 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4155 uint32_t flags, uint32_t lkid, char *lvb_in)
4157 struct dlm_lkb *lkb;
4158 struct dlm_args args;
4159 struct dlm_user_args *ua;
4160 int error;
4162 lock_recovery(ls);
4164 error = find_lkb(ls, lkid, &lkb);
4165 if (error)
4166 goto out;
4168 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4170 if (lvb_in && ua->lksb.sb_lvbptr)
4171 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4172 ua->castparam = ua_tmp->castparam;
4173 ua->user_lksb = ua_tmp->user_lksb;
4175 error = set_unlock_args(flags, ua, &args);
4176 if (error)
4177 goto out_put;
4179 error = unlock_lock(ls, lkb, &args);
4181 if (error == -DLM_EUNLOCK)
4182 error = 0;
4183 /* from validate_unlock_args() */
4184 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4185 error = 0;
4186 if (error)
4187 goto out_put;
4189 spin_lock(&ua->proc->locks_spin);
4190 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4191 if (!list_empty(&lkb->lkb_ownqueue))
4192 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4193 spin_unlock(&ua->proc->locks_spin);
4194 out_put:
4195 dlm_put_lkb(lkb);
4196 out:
4197 unlock_recovery(ls);
4198 kfree(ua_tmp);
4199 return error;
4202 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4203 uint32_t flags, uint32_t lkid)
4205 struct dlm_lkb *lkb;
4206 struct dlm_args args;
4207 struct dlm_user_args *ua;
4208 int error;
4210 lock_recovery(ls);
4212 error = find_lkb(ls, lkid, &lkb);
4213 if (error)
4214 goto out;
4216 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4217 ua->castparam = ua_tmp->castparam;
4218 ua->user_lksb = ua_tmp->user_lksb;
4220 error = set_unlock_args(flags, ua, &args);
4221 if (error)
4222 goto out_put;
4224 error = cancel_lock(ls, lkb, &args);
4226 if (error == -DLM_ECANCEL)
4227 error = 0;
4228 /* from validate_unlock_args() */
4229 if (error == -EBUSY)
4230 error = 0;
4231 out_put:
4232 dlm_put_lkb(lkb);
4233 out:
4234 unlock_recovery(ls);
4235 kfree(ua_tmp);
4236 return error;
4239 /* lkb's that are removed from the waiters list by revert are just left on the
4240 orphans list with the granted orphan locks, to be freed by purge */
4242 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4244 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4245 struct dlm_args args;
4246 int error;
4248 hold_lkb(lkb);
4249 mutex_lock(&ls->ls_orphans_mutex);
4250 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4251 mutex_unlock(&ls->ls_orphans_mutex);
4253 set_unlock_args(0, ua, &args);
4255 error = cancel_lock(ls, lkb, &args);
4256 if (error == -DLM_ECANCEL)
4257 error = 0;
4258 return error;
4261 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4262 Regardless of what rsb queue the lock is on, it's removed and freed. */
4264 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4266 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4267 struct dlm_args args;
4268 int error;
4270 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4272 error = unlock_lock(ls, lkb, &args);
4273 if (error == -DLM_EUNLOCK)
4274 error = 0;
4275 return error;
4278 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4279 (which does lock_rsb) due to deadlock with receiving a message that does
4280 lock_rsb followed by dlm_user_add_ast() */
4282 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4283 struct dlm_user_proc *proc)
4285 struct dlm_lkb *lkb = NULL;
4287 mutex_lock(&ls->ls_clear_proc_locks);
4288 if (list_empty(&proc->locks))
4289 goto out;
4291 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4292 list_del_init(&lkb->lkb_ownqueue);
4294 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4295 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4296 else
4297 lkb->lkb_flags |= DLM_IFL_DEAD;
4298 out:
4299 mutex_unlock(&ls->ls_clear_proc_locks);
4300 return lkb;
4303 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4304 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4305 which we clear here. */
4307 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4308 list, and no more device_writes should add lkb's to proc->locks list; so we
4309 shouldn't need to take asts_spin or locks_spin here. this assumes that
4310 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4311 them ourself. */
4313 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4315 struct dlm_lkb *lkb, *safe;
4317 lock_recovery(ls);
4319 while (1) {
4320 lkb = del_proc_lock(ls, proc);
4321 if (!lkb)
4322 break;
4323 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4324 orphan_proc_lock(ls, lkb);
4325 else
4326 unlock_proc_lock(ls, lkb);
4328 /* this removes the reference for the proc->locks list
4329 added by dlm_user_request, it may result in the lkb
4330 being freed */
4332 dlm_put_lkb(lkb);
4335 mutex_lock(&ls->ls_clear_proc_locks);
4337 /* in-progress unlocks */
4338 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4339 list_del_init(&lkb->lkb_ownqueue);
4340 lkb->lkb_flags |= DLM_IFL_DEAD;
4341 dlm_put_lkb(lkb);
4344 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4345 list_del(&lkb->lkb_astqueue);
4346 dlm_put_lkb(lkb);
4349 mutex_unlock(&ls->ls_clear_proc_locks);
4350 unlock_recovery(ls);
4353 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4355 struct dlm_lkb *lkb, *safe;
4357 while (1) {
4358 lkb = NULL;
4359 spin_lock(&proc->locks_spin);
4360 if (!list_empty(&proc->locks)) {
4361 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4362 lkb_ownqueue);
4363 list_del_init(&lkb->lkb_ownqueue);
4365 spin_unlock(&proc->locks_spin);
4367 if (!lkb)
4368 break;
4370 lkb->lkb_flags |= DLM_IFL_DEAD;
4371 unlock_proc_lock(ls, lkb);
4372 dlm_put_lkb(lkb); /* ref from proc->locks list */
4375 spin_lock(&proc->locks_spin);
4376 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4377 list_del_init(&lkb->lkb_ownqueue);
4378 lkb->lkb_flags |= DLM_IFL_DEAD;
4379 dlm_put_lkb(lkb);
4381 spin_unlock(&proc->locks_spin);
4383 spin_lock(&proc->asts_spin);
4384 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4385 list_del(&lkb->lkb_astqueue);
4386 dlm_put_lkb(lkb);
4388 spin_unlock(&proc->asts_spin);
4391 /* pid of 0 means purge all orphans */
4393 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4395 struct dlm_lkb *lkb, *safe;
4397 mutex_lock(&ls->ls_orphans_mutex);
4398 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4399 if (pid && lkb->lkb_ownpid != pid)
4400 continue;
4401 unlock_proc_lock(ls, lkb);
4402 list_del_init(&lkb->lkb_ownqueue);
4403 dlm_put_lkb(lkb);
4405 mutex_unlock(&ls->ls_orphans_mutex);
4408 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4410 struct dlm_message *ms;
4411 struct dlm_mhandle *mh;
4412 int error;
4414 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4415 DLM_MSG_PURGE, &ms, &mh);
4416 if (error)
4417 return error;
4418 ms->m_nodeid = nodeid;
4419 ms->m_pid = pid;
4421 return send_message(mh, ms);
4424 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4425 int nodeid, int pid)
4427 int error = 0;
4429 if (nodeid != dlm_our_nodeid()) {
4430 error = send_purge(ls, nodeid, pid);
4431 } else {
4432 lock_recovery(ls);
4433 if (pid == current->pid)
4434 purge_proc_locks(ls, proc);
4435 else
4436 do_purge(ls, nodeid, pid);
4437 unlock_recovery(ls);
4439 return error;