Merge master.kernel.org:/pub/scm/linux/kernel/git/herbert/crypto-2.6
[linux-2.6/kvm.git] / fs / dlm / lock.c
blobe725005fafd024cd41de5bdb9c0271497a7d2e1f
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
5 **
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
15 dlm_lock()
16 dlm_unlock()
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
90 * Lock compatibilty matrix - thanks Steve
91 * UN = Unlocked state. Not really a state, used as a flag
92 * PD = Padding. Used to make the matrix a nice power of two in size
93 * Other states are the same as the VMS DLM.
94 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
97 static const int __dlm_compat_matrix[8][8] = {
98 /* UN NL CR CW PR PW EX PD */
99 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
101 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
102 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
103 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
104 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
105 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
106 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
110 * This defines the direction of transfer of LVB data.
111 * Granted mode is the row; requested mode is the column.
112 * Usage: matrix[grmode+1][rqmode+1]
113 * 1 = LVB is returned to the caller
114 * 0 = LVB is written to the resource
115 * -1 = nothing happens to the LVB
118 const int dlm_lvb_operations[8][8] = {
119 /* UN NL CR CW PR PW EX PD*/
120 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
121 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
122 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
123 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
124 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
125 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
126 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
127 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
130 #define modes_compat(gr, rq) \
131 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133 int dlm_modes_compat(int mode1, int mode2)
135 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 * Compatibility matrix for conversions with QUECVT set.
140 * Granted mode is the row; requested mode is the column.
141 * Usage: matrix[grmode+1][rqmode+1]
144 static const int __quecvt_compat_matrix[8][8] = {
145 /* UN NL CR CW PR PW EX PD */
146 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
147 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
148 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
149 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
150 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
151 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
152 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
153 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
156 void dlm_print_lkb(struct dlm_lkb *lkb)
158 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
159 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
160 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
161 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
162 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
165 void dlm_print_rsb(struct dlm_rsb *r)
167 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
168 r->res_nodeid, r->res_flags, r->res_first_lkid,
169 r->res_recover_locks_count, r->res_name);
172 void dlm_dump_rsb(struct dlm_rsb *r)
174 struct dlm_lkb *lkb;
176 dlm_print_rsb(r);
178 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
179 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
180 printk(KERN_ERR "rsb lookup list\n");
181 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
182 dlm_print_lkb(lkb);
183 printk(KERN_ERR "rsb grant queue:\n");
184 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
185 dlm_print_lkb(lkb);
186 printk(KERN_ERR "rsb convert queue:\n");
187 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
188 dlm_print_lkb(lkb);
189 printk(KERN_ERR "rsb wait queue:\n");
190 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
191 dlm_print_lkb(lkb);
194 /* Threads cannot use the lockspace while it's being recovered */
196 static inline void lock_recovery(struct dlm_ls *ls)
198 down_read(&ls->ls_in_recovery);
201 static inline void unlock_recovery(struct dlm_ls *ls)
203 up_read(&ls->ls_in_recovery);
206 static inline int lock_recovery_try(struct dlm_ls *ls)
208 return down_read_trylock(&ls->ls_in_recovery);
211 static inline int can_be_queued(struct dlm_lkb *lkb)
213 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
216 static inline int force_blocking_asts(struct dlm_lkb *lkb)
218 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
221 static inline int is_demoted(struct dlm_lkb *lkb)
223 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
226 static inline int is_remote(struct dlm_rsb *r)
228 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
229 return !!r->res_nodeid;
232 static inline int is_process_copy(struct dlm_lkb *lkb)
234 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
237 static inline int is_master_copy(struct dlm_lkb *lkb)
239 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
240 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
241 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
244 static inline int middle_conversion(struct dlm_lkb *lkb)
246 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
247 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
248 return 1;
249 return 0;
252 static inline int down_conversion(struct dlm_lkb *lkb)
254 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
257 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
259 if (is_master_copy(lkb))
260 return;
262 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
264 lkb->lkb_lksb->sb_status = rv;
265 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
267 dlm_add_ast(lkb, AST_COMP);
270 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
272 if (is_master_copy(lkb))
273 send_bast(r, lkb, rqmode);
274 else {
275 lkb->lkb_bastmode = rqmode;
276 dlm_add_ast(lkb, AST_BAST);
281 * Basic operations on rsb's and lkb's
284 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
286 struct dlm_rsb *r;
288 r = allocate_rsb(ls, len);
289 if (!r)
290 return NULL;
292 r->res_ls = ls;
293 r->res_length = len;
294 memcpy(r->res_name, name, len);
295 mutex_init(&r->res_mutex);
297 INIT_LIST_HEAD(&r->res_lookup);
298 INIT_LIST_HEAD(&r->res_grantqueue);
299 INIT_LIST_HEAD(&r->res_convertqueue);
300 INIT_LIST_HEAD(&r->res_waitqueue);
301 INIT_LIST_HEAD(&r->res_root_list);
302 INIT_LIST_HEAD(&r->res_recover_list);
304 return r;
307 static int search_rsb_list(struct list_head *head, char *name, int len,
308 unsigned int flags, struct dlm_rsb **r_ret)
310 struct dlm_rsb *r;
311 int error = 0;
313 list_for_each_entry(r, head, res_hashchain) {
314 if (len == r->res_length && !memcmp(name, r->res_name, len))
315 goto found;
317 return -EBADR;
319 found:
320 if (r->res_nodeid && (flags & R_MASTER))
321 error = -ENOTBLK;
322 *r_ret = r;
323 return error;
326 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
327 unsigned int flags, struct dlm_rsb **r_ret)
329 struct dlm_rsb *r;
330 int error;
332 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
333 if (!error) {
334 kref_get(&r->res_ref);
335 goto out;
337 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
338 if (error)
339 goto out;
341 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
343 if (dlm_no_directory(ls))
344 goto out;
346 if (r->res_nodeid == -1) {
347 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
348 r->res_first_lkid = 0;
349 } else if (r->res_nodeid > 0) {
350 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
351 r->res_first_lkid = 0;
352 } else {
353 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
354 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
356 out:
357 *r_ret = r;
358 return error;
361 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
362 unsigned int flags, struct dlm_rsb **r_ret)
364 int error;
365 write_lock(&ls->ls_rsbtbl[b].lock);
366 error = _search_rsb(ls, name, len, b, flags, r_ret);
367 write_unlock(&ls->ls_rsbtbl[b].lock);
368 return error;
372 * Find rsb in rsbtbl and potentially create/add one
374 * Delaying the release of rsb's has a similar benefit to applications keeping
375 * NL locks on an rsb, but without the guarantee that the cached master value
376 * will still be valid when the rsb is reused. Apps aren't always smart enough
377 * to keep NL locks on an rsb that they may lock again shortly; this can lead
378 * to excessive master lookups and removals if we don't delay the release.
380 * Searching for an rsb means looking through both the normal list and toss
381 * list. When found on the toss list the rsb is moved to the normal list with
382 * ref count of 1; when found on normal list the ref count is incremented.
385 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
386 unsigned int flags, struct dlm_rsb **r_ret)
388 struct dlm_rsb *r, *tmp;
389 uint32_t hash, bucket;
390 int error = 0;
392 if (dlm_no_directory(ls))
393 flags |= R_CREATE;
395 hash = jhash(name, namelen, 0);
396 bucket = hash & (ls->ls_rsbtbl_size - 1);
398 error = search_rsb(ls, name, namelen, bucket, flags, &r);
399 if (!error)
400 goto out;
402 if (error == -EBADR && !(flags & R_CREATE))
403 goto out;
405 /* the rsb was found but wasn't a master copy */
406 if (error == -ENOTBLK)
407 goto out;
409 error = -ENOMEM;
410 r = create_rsb(ls, name, namelen);
411 if (!r)
412 goto out;
414 r->res_hash = hash;
415 r->res_bucket = bucket;
416 r->res_nodeid = -1;
417 kref_init(&r->res_ref);
419 /* With no directory, the master can be set immediately */
420 if (dlm_no_directory(ls)) {
421 int nodeid = dlm_dir_nodeid(r);
422 if (nodeid == dlm_our_nodeid())
423 nodeid = 0;
424 r->res_nodeid = nodeid;
427 write_lock(&ls->ls_rsbtbl[bucket].lock);
428 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
429 if (!error) {
430 write_unlock(&ls->ls_rsbtbl[bucket].lock);
431 free_rsb(r);
432 r = tmp;
433 goto out;
435 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
436 write_unlock(&ls->ls_rsbtbl[bucket].lock);
437 error = 0;
438 out:
439 *r_ret = r;
440 return error;
443 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
444 unsigned int flags, struct dlm_rsb **r_ret)
446 return find_rsb(ls, name, namelen, flags, r_ret);
449 /* This is only called to add a reference when the code already holds
450 a valid reference to the rsb, so there's no need for locking. */
452 static inline void hold_rsb(struct dlm_rsb *r)
454 kref_get(&r->res_ref);
457 void dlm_hold_rsb(struct dlm_rsb *r)
459 hold_rsb(r);
462 static void toss_rsb(struct kref *kref)
464 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
465 struct dlm_ls *ls = r->res_ls;
467 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
468 kref_init(&r->res_ref);
469 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
470 r->res_toss_time = jiffies;
471 if (r->res_lvbptr) {
472 free_lvb(r->res_lvbptr);
473 r->res_lvbptr = NULL;
477 /* When all references to the rsb are gone it's transfered to
478 the tossed list for later disposal. */
480 static void put_rsb(struct dlm_rsb *r)
482 struct dlm_ls *ls = r->res_ls;
483 uint32_t bucket = r->res_bucket;
485 write_lock(&ls->ls_rsbtbl[bucket].lock);
486 kref_put(&r->res_ref, toss_rsb);
487 write_unlock(&ls->ls_rsbtbl[bucket].lock);
490 void dlm_put_rsb(struct dlm_rsb *r)
492 put_rsb(r);
495 /* See comment for unhold_lkb */
497 static void unhold_rsb(struct dlm_rsb *r)
499 int rv;
500 rv = kref_put(&r->res_ref, toss_rsb);
501 DLM_ASSERT(!rv, dlm_dump_rsb(r););
504 static void kill_rsb(struct kref *kref)
506 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
508 /* All work is done after the return from kref_put() so we
509 can release the write_lock before the remove and free. */
511 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
512 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
513 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
514 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
515 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
516 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
519 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
520 The rsb must exist as long as any lkb's for it do. */
522 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
524 hold_rsb(r);
525 lkb->lkb_resource = r;
528 static void detach_lkb(struct dlm_lkb *lkb)
530 if (lkb->lkb_resource) {
531 put_rsb(lkb->lkb_resource);
532 lkb->lkb_resource = NULL;
536 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
538 struct dlm_lkb *lkb, *tmp;
539 uint32_t lkid = 0;
540 uint16_t bucket;
542 lkb = allocate_lkb(ls);
543 if (!lkb)
544 return -ENOMEM;
546 lkb->lkb_nodeid = -1;
547 lkb->lkb_grmode = DLM_LOCK_IV;
548 kref_init(&lkb->lkb_ref);
549 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
551 get_random_bytes(&bucket, sizeof(bucket));
552 bucket &= (ls->ls_lkbtbl_size - 1);
554 write_lock(&ls->ls_lkbtbl[bucket].lock);
556 /* counter can roll over so we must verify lkid is not in use */
558 while (lkid == 0) {
559 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
561 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
562 lkb_idtbl_list) {
563 if (tmp->lkb_id != lkid)
564 continue;
565 lkid = 0;
566 break;
570 lkb->lkb_id = lkid;
571 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
572 write_unlock(&ls->ls_lkbtbl[bucket].lock);
574 *lkb_ret = lkb;
575 return 0;
578 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
580 uint16_t bucket = lkid & 0xFFFF;
581 struct dlm_lkb *lkb;
583 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
584 if (lkb->lkb_id == lkid)
585 return lkb;
587 return NULL;
590 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
592 struct dlm_lkb *lkb;
593 uint16_t bucket = lkid & 0xFFFF;
595 if (bucket >= ls->ls_lkbtbl_size)
596 return -EBADSLT;
598 read_lock(&ls->ls_lkbtbl[bucket].lock);
599 lkb = __find_lkb(ls, lkid);
600 if (lkb)
601 kref_get(&lkb->lkb_ref);
602 read_unlock(&ls->ls_lkbtbl[bucket].lock);
604 *lkb_ret = lkb;
605 return lkb ? 0 : -ENOENT;
608 static void kill_lkb(struct kref *kref)
610 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
612 /* All work is done after the return from kref_put() so we
613 can release the write_lock before the detach_lkb */
615 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
618 /* __put_lkb() is used when an lkb may not have an rsb attached to
619 it so we need to provide the lockspace explicitly */
621 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
623 uint16_t bucket = lkb->lkb_id & 0xFFFF;
625 write_lock(&ls->ls_lkbtbl[bucket].lock);
626 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
627 list_del(&lkb->lkb_idtbl_list);
628 write_unlock(&ls->ls_lkbtbl[bucket].lock);
630 detach_lkb(lkb);
632 /* for local/process lkbs, lvbptr points to caller's lksb */
633 if (lkb->lkb_lvbptr && is_master_copy(lkb))
634 free_lvb(lkb->lkb_lvbptr);
635 free_lkb(lkb);
636 return 1;
637 } else {
638 write_unlock(&ls->ls_lkbtbl[bucket].lock);
639 return 0;
643 int dlm_put_lkb(struct dlm_lkb *lkb)
645 struct dlm_ls *ls;
647 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
648 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
650 ls = lkb->lkb_resource->res_ls;
651 return __put_lkb(ls, lkb);
654 /* This is only called to add a reference when the code already holds
655 a valid reference to the lkb, so there's no need for locking. */
657 static inline void hold_lkb(struct dlm_lkb *lkb)
659 kref_get(&lkb->lkb_ref);
662 /* This is called when we need to remove a reference and are certain
663 it's not the last ref. e.g. del_lkb is always called between a
664 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
665 put_lkb would work fine, but would involve unnecessary locking */
667 static inline void unhold_lkb(struct dlm_lkb *lkb)
669 int rv;
670 rv = kref_put(&lkb->lkb_ref, kill_lkb);
671 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
674 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
675 int mode)
677 struct dlm_lkb *lkb = NULL;
679 list_for_each_entry(lkb, head, lkb_statequeue)
680 if (lkb->lkb_rqmode < mode)
681 break;
683 if (!lkb)
684 list_add_tail(new, head);
685 else
686 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
689 /* add/remove lkb to rsb's grant/convert/wait queue */
691 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
693 kref_get(&lkb->lkb_ref);
695 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
697 lkb->lkb_status = status;
699 switch (status) {
700 case DLM_LKSTS_WAITING:
701 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
702 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
703 else
704 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
705 break;
706 case DLM_LKSTS_GRANTED:
707 /* convention says granted locks kept in order of grmode */
708 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
709 lkb->lkb_grmode);
710 break;
711 case DLM_LKSTS_CONVERT:
712 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
713 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
714 else
715 list_add_tail(&lkb->lkb_statequeue,
716 &r->res_convertqueue);
717 break;
718 default:
719 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
723 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
725 lkb->lkb_status = 0;
726 list_del(&lkb->lkb_statequeue);
727 unhold_lkb(lkb);
730 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
732 hold_lkb(lkb);
733 del_lkb(r, lkb);
734 add_lkb(r, lkb, sts);
735 unhold_lkb(lkb);
738 /* add/remove lkb from global waiters list of lkb's waiting for
739 a reply from a remote node */
741 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
743 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
745 mutex_lock(&ls->ls_waiters_mutex);
746 if (lkb->lkb_wait_type) {
747 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
748 goto out;
750 lkb->lkb_wait_type = mstype;
751 kref_get(&lkb->lkb_ref);
752 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
753 out:
754 mutex_unlock(&ls->ls_waiters_mutex);
757 /* We clear the RESEND flag because we might be taking an lkb off the waiters
758 list as part of process_requestqueue (e.g. a lookup that has an optimized
759 request reply on the requestqueue) between dlm_recover_waiters_pre() which
760 set RESEND and dlm_recover_waiters_post() */
762 static int _remove_from_waiters(struct dlm_lkb *lkb)
764 int error = 0;
766 if (!lkb->lkb_wait_type) {
767 log_print("remove_from_waiters error");
768 error = -EINVAL;
769 goto out;
771 lkb->lkb_wait_type = 0;
772 lkb->lkb_flags &= ~DLM_IFL_RESEND;
773 list_del(&lkb->lkb_wait_reply);
774 unhold_lkb(lkb);
775 out:
776 return error;
779 static int remove_from_waiters(struct dlm_lkb *lkb)
781 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
782 int error;
784 mutex_lock(&ls->ls_waiters_mutex);
785 error = _remove_from_waiters(lkb);
786 mutex_unlock(&ls->ls_waiters_mutex);
787 return error;
790 static void dir_remove(struct dlm_rsb *r)
792 int to_nodeid;
794 if (dlm_no_directory(r->res_ls))
795 return;
797 to_nodeid = dlm_dir_nodeid(r);
798 if (to_nodeid != dlm_our_nodeid())
799 send_remove(r);
800 else
801 dlm_dir_remove_entry(r->res_ls, to_nodeid,
802 r->res_name, r->res_length);
805 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
806 found since they are in order of newest to oldest? */
808 static int shrink_bucket(struct dlm_ls *ls, int b)
810 struct dlm_rsb *r;
811 int count = 0, found;
813 for (;;) {
814 found = 0;
815 write_lock(&ls->ls_rsbtbl[b].lock);
816 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
817 res_hashchain) {
818 if (!time_after_eq(jiffies, r->res_toss_time +
819 dlm_config.ci_toss_secs * HZ))
820 continue;
821 found = 1;
822 break;
825 if (!found) {
826 write_unlock(&ls->ls_rsbtbl[b].lock);
827 break;
830 if (kref_put(&r->res_ref, kill_rsb)) {
831 list_del(&r->res_hashchain);
832 write_unlock(&ls->ls_rsbtbl[b].lock);
834 if (is_master(r))
835 dir_remove(r);
836 free_rsb(r);
837 count++;
838 } else {
839 write_unlock(&ls->ls_rsbtbl[b].lock);
840 log_error(ls, "tossed rsb in use %s", r->res_name);
844 return count;
847 void dlm_scan_rsbs(struct dlm_ls *ls)
849 int i;
851 if (dlm_locking_stopped(ls))
852 return;
854 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
855 shrink_bucket(ls, i);
856 cond_resched();
860 /* lkb is master or local copy */
862 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
864 int b, len = r->res_ls->ls_lvblen;
866 /* b=1 lvb returned to caller
867 b=0 lvb written to rsb or invalidated
868 b=-1 do nothing */
870 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
872 if (b == 1) {
873 if (!lkb->lkb_lvbptr)
874 return;
876 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
877 return;
879 if (!r->res_lvbptr)
880 return;
882 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
883 lkb->lkb_lvbseq = r->res_lvbseq;
885 } else if (b == 0) {
886 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
887 rsb_set_flag(r, RSB_VALNOTVALID);
888 return;
891 if (!lkb->lkb_lvbptr)
892 return;
894 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
895 return;
897 if (!r->res_lvbptr)
898 r->res_lvbptr = allocate_lvb(r->res_ls);
900 if (!r->res_lvbptr)
901 return;
903 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
904 r->res_lvbseq++;
905 lkb->lkb_lvbseq = r->res_lvbseq;
906 rsb_clear_flag(r, RSB_VALNOTVALID);
909 if (rsb_flag(r, RSB_VALNOTVALID))
910 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
913 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
915 if (lkb->lkb_grmode < DLM_LOCK_PW)
916 return;
918 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
919 rsb_set_flag(r, RSB_VALNOTVALID);
920 return;
923 if (!lkb->lkb_lvbptr)
924 return;
926 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
927 return;
929 if (!r->res_lvbptr)
930 r->res_lvbptr = allocate_lvb(r->res_ls);
932 if (!r->res_lvbptr)
933 return;
935 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
936 r->res_lvbseq++;
937 rsb_clear_flag(r, RSB_VALNOTVALID);
940 /* lkb is process copy (pc) */
942 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
943 struct dlm_message *ms)
945 int b;
947 if (!lkb->lkb_lvbptr)
948 return;
950 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
951 return;
953 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
954 if (b == 1) {
955 int len = receive_extralen(ms);
956 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
957 lkb->lkb_lvbseq = ms->m_lvbseq;
961 /* Manipulate lkb's on rsb's convert/granted/waiting queues
962 remove_lock -- used for unlock, removes lkb from granted
963 revert_lock -- used for cancel, moves lkb from convert to granted
964 grant_lock -- used for request and convert, adds lkb to granted or
965 moves lkb from convert or waiting to granted
967 Each of these is used for master or local copy lkb's. There is
968 also a _pc() variation used to make the corresponding change on
969 a process copy (pc) lkb. */
971 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
973 del_lkb(r, lkb);
974 lkb->lkb_grmode = DLM_LOCK_IV;
975 /* this unhold undoes the original ref from create_lkb()
976 so this leads to the lkb being freed */
977 unhold_lkb(lkb);
980 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
982 set_lvb_unlock(r, lkb);
983 _remove_lock(r, lkb);
986 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
988 _remove_lock(r, lkb);
991 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
993 lkb->lkb_rqmode = DLM_LOCK_IV;
995 switch (lkb->lkb_status) {
996 case DLM_LKSTS_GRANTED:
997 break;
998 case DLM_LKSTS_CONVERT:
999 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1000 break;
1001 case DLM_LKSTS_WAITING:
1002 del_lkb(r, lkb);
1003 lkb->lkb_grmode = DLM_LOCK_IV;
1004 /* this unhold undoes the original ref from create_lkb()
1005 so this leads to the lkb being freed */
1006 unhold_lkb(lkb);
1007 break;
1008 default:
1009 log_print("invalid status for revert %d", lkb->lkb_status);
1013 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1015 revert_lock(r, lkb);
1018 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1020 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1021 lkb->lkb_grmode = lkb->lkb_rqmode;
1022 if (lkb->lkb_status)
1023 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1024 else
1025 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1028 lkb->lkb_rqmode = DLM_LOCK_IV;
1031 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1033 set_lvb_lock(r, lkb);
1034 _grant_lock(r, lkb);
1035 lkb->lkb_highbast = 0;
1038 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1039 struct dlm_message *ms)
1041 set_lvb_lock_pc(r, lkb, ms);
1042 _grant_lock(r, lkb);
1045 /* called by grant_pending_locks() which means an async grant message must
1046 be sent to the requesting node in addition to granting the lock if the
1047 lkb belongs to a remote node. */
1049 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1051 grant_lock(r, lkb);
1052 if (is_master_copy(lkb))
1053 send_grant(r, lkb);
1054 else
1055 queue_cast(r, lkb, 0);
1058 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1060 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1061 lkb_statequeue);
1062 if (lkb->lkb_id == first->lkb_id)
1063 return 1;
1065 return 0;
1068 /* Check if the given lkb conflicts with another lkb on the queue. */
1070 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1072 struct dlm_lkb *this;
1074 list_for_each_entry(this, head, lkb_statequeue) {
1075 if (this == lkb)
1076 continue;
1077 if (!modes_compat(this, lkb))
1078 return 1;
1080 return 0;
1084 * "A conversion deadlock arises with a pair of lock requests in the converting
1085 * queue for one resource. The granted mode of each lock blocks the requested
1086 * mode of the other lock."
1088 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1089 * convert queue from being granted, then demote lkb (set grmode to NL).
1090 * This second form requires that we check for conv-deadlk even when
1091 * now == 0 in _can_be_granted().
1093 * Example:
1094 * Granted Queue: empty
1095 * Convert Queue: NL->EX (first lock)
1096 * PR->EX (second lock)
1098 * The first lock can't be granted because of the granted mode of the second
1099 * lock and the second lock can't be granted because it's not first in the
1100 * list. We demote the granted mode of the second lock (the lkb passed to this
1101 * function).
1103 * After the resolution, the "grant pending" function needs to go back and try
1104 * to grant locks on the convert queue again since the first lock can now be
1105 * granted.
1108 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1110 struct dlm_lkb *this, *first = NULL, *self = NULL;
1112 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1113 if (!first)
1114 first = this;
1115 if (this == lkb) {
1116 self = lkb;
1117 continue;
1120 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1121 return 1;
1124 /* if lkb is on the convert queue and is preventing the first
1125 from being granted, then there's deadlock and we demote lkb.
1126 multiple converting locks may need to do this before the first
1127 converting lock can be granted. */
1129 if (self && self != first) {
1130 if (!modes_compat(lkb, first) &&
1131 !queue_conflict(&rsb->res_grantqueue, first))
1132 return 1;
1135 return 0;
1139 * Return 1 if the lock can be granted, 0 otherwise.
1140 * Also detect and resolve conversion deadlocks.
1142 * lkb is the lock to be granted
1144 * now is 1 if the function is being called in the context of the
1145 * immediate request, it is 0 if called later, after the lock has been
1146 * queued.
1148 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1151 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1153 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1156 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1157 * a new request for a NL mode lock being blocked.
1159 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1160 * request, then it would be granted. In essence, the use of this flag
1161 * tells the Lock Manager to expedite theis request by not considering
1162 * what may be in the CONVERTING or WAITING queues... As of this
1163 * writing, the EXPEDITE flag can be used only with new requests for NL
1164 * mode locks. This flag is not valid for conversion requests.
1166 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1167 * conversion or used with a non-NL requested mode. We also know an
1168 * EXPEDITE request is always granted immediately, so now must always
1169 * be 1. The full condition to grant an expedite request: (now &&
1170 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1171 * therefore be shortened to just checking the flag.
1174 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1175 return 1;
1178 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1179 * added to the remaining conditions.
1182 if (queue_conflict(&r->res_grantqueue, lkb))
1183 goto out;
1186 * 6-3: By default, a conversion request is immediately granted if the
1187 * requested mode is compatible with the modes of all other granted
1188 * locks
1191 if (queue_conflict(&r->res_convertqueue, lkb))
1192 goto out;
1195 * 6-5: But the default algorithm for deciding whether to grant or
1196 * queue conversion requests does not by itself guarantee that such
1197 * requests are serviced on a "first come first serve" basis. This, in
1198 * turn, can lead to a phenomenon known as "indefinate postponement".
1200 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1201 * the system service employed to request a lock conversion. This flag
1202 * forces certain conversion requests to be queued, even if they are
1203 * compatible with the granted modes of other locks on the same
1204 * resource. Thus, the use of this flag results in conversion requests
1205 * being ordered on a "first come first servce" basis.
1207 * DCT: This condition is all about new conversions being able to occur
1208 * "in place" while the lock remains on the granted queue (assuming
1209 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1210 * doesn't _have_ to go onto the convert queue where it's processed in
1211 * order. The "now" variable is necessary to distinguish converts
1212 * being received and processed for the first time now, because once a
1213 * convert is moved to the conversion queue the condition below applies
1214 * requiring fifo granting.
1217 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1218 return 1;
1221 * The NOORDER flag is set to avoid the standard vms rules on grant
1222 * order.
1225 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1226 return 1;
1229 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1230 * granted until all other conversion requests ahead of it are granted
1231 * and/or canceled.
1234 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1235 return 1;
1238 * 6-4: By default, a new request is immediately granted only if all
1239 * three of the following conditions are satisfied when the request is
1240 * issued:
1241 * - The queue of ungranted conversion requests for the resource is
1242 * empty.
1243 * - The queue of ungranted new requests for the resource is empty.
1244 * - The mode of the new request is compatible with the most
1245 * restrictive mode of all granted locks on the resource.
1248 if (now && !conv && list_empty(&r->res_convertqueue) &&
1249 list_empty(&r->res_waitqueue))
1250 return 1;
1253 * 6-4: Once a lock request is in the queue of ungranted new requests,
1254 * it cannot be granted until the queue of ungranted conversion
1255 * requests is empty, all ungranted new requests ahead of it are
1256 * granted and/or canceled, and it is compatible with the granted mode
1257 * of the most restrictive lock granted on the resource.
1260 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1261 first_in_list(lkb, &r->res_waitqueue))
1262 return 1;
1264 out:
1266 * The following, enabled by CONVDEADLK, departs from VMS.
1269 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1270 conversion_deadlock_detect(r, lkb)) {
1271 lkb->lkb_grmode = DLM_LOCK_NL;
1272 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1275 return 0;
1279 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1280 * simple way to provide a big optimization to applications that can use them.
1283 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1285 uint32_t flags = lkb->lkb_exflags;
1286 int rv;
1287 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1289 rv = _can_be_granted(r, lkb, now);
1290 if (rv)
1291 goto out;
1293 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1294 goto out;
1296 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1297 alt = DLM_LOCK_PR;
1298 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1299 alt = DLM_LOCK_CW;
1301 if (alt) {
1302 lkb->lkb_rqmode = alt;
1303 rv = _can_be_granted(r, lkb, now);
1304 if (rv)
1305 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1306 else
1307 lkb->lkb_rqmode = rqmode;
1309 out:
1310 return rv;
1313 static int grant_pending_convert(struct dlm_rsb *r, int high)
1315 struct dlm_lkb *lkb, *s;
1316 int hi, demoted, quit, grant_restart, demote_restart;
1318 quit = 0;
1319 restart:
1320 grant_restart = 0;
1321 demote_restart = 0;
1322 hi = DLM_LOCK_IV;
1324 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1325 demoted = is_demoted(lkb);
1326 if (can_be_granted(r, lkb, 0)) {
1327 grant_lock_pending(r, lkb);
1328 grant_restart = 1;
1329 } else {
1330 hi = max_t(int, lkb->lkb_rqmode, hi);
1331 if (!demoted && is_demoted(lkb))
1332 demote_restart = 1;
1336 if (grant_restart)
1337 goto restart;
1338 if (demote_restart && !quit) {
1339 quit = 1;
1340 goto restart;
1343 return max_t(int, high, hi);
1346 static int grant_pending_wait(struct dlm_rsb *r, int high)
1348 struct dlm_lkb *lkb, *s;
1350 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1351 if (can_be_granted(r, lkb, 0))
1352 grant_lock_pending(r, lkb);
1353 else
1354 high = max_t(int, lkb->lkb_rqmode, high);
1357 return high;
1360 static void grant_pending_locks(struct dlm_rsb *r)
1362 struct dlm_lkb *lkb, *s;
1363 int high = DLM_LOCK_IV;
1365 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1367 high = grant_pending_convert(r, high);
1368 high = grant_pending_wait(r, high);
1370 if (high == DLM_LOCK_IV)
1371 return;
1374 * If there are locks left on the wait/convert queue then send blocking
1375 * ASTs to granted locks based on the largest requested mode (high)
1376 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1379 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1380 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1381 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1382 queue_bast(r, lkb, high);
1383 lkb->lkb_highbast = high;
1388 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1389 struct dlm_lkb *lkb)
1391 struct dlm_lkb *gr;
1393 list_for_each_entry(gr, head, lkb_statequeue) {
1394 if (gr->lkb_bastaddr &&
1395 gr->lkb_highbast < lkb->lkb_rqmode &&
1396 !modes_compat(gr, lkb)) {
1397 queue_bast(r, gr, lkb->lkb_rqmode);
1398 gr->lkb_highbast = lkb->lkb_rqmode;
1403 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1405 send_bast_queue(r, &r->res_grantqueue, lkb);
1408 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1410 send_bast_queue(r, &r->res_grantqueue, lkb);
1411 send_bast_queue(r, &r->res_convertqueue, lkb);
1414 /* set_master(r, lkb) -- set the master nodeid of a resource
1416 The purpose of this function is to set the nodeid field in the given
1417 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1418 known, it can just be copied to the lkb and the function will return
1419 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1420 before it can be copied to the lkb.
1422 When the rsb nodeid is being looked up remotely, the initial lkb
1423 causing the lookup is kept on the ls_waiters list waiting for the
1424 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1425 on the rsb's res_lookup list until the master is verified.
1427 Return values:
1428 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1429 1: the rsb master is not available and the lkb has been placed on
1430 a wait queue
1433 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1435 struct dlm_ls *ls = r->res_ls;
1436 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1438 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1439 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1440 r->res_first_lkid = lkb->lkb_id;
1441 lkb->lkb_nodeid = r->res_nodeid;
1442 return 0;
1445 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1446 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1447 return 1;
1450 if (r->res_nodeid == 0) {
1451 lkb->lkb_nodeid = 0;
1452 return 0;
1455 if (r->res_nodeid > 0) {
1456 lkb->lkb_nodeid = r->res_nodeid;
1457 return 0;
1460 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1462 dir_nodeid = dlm_dir_nodeid(r);
1464 if (dir_nodeid != our_nodeid) {
1465 r->res_first_lkid = lkb->lkb_id;
1466 send_lookup(r, lkb);
1467 return 1;
1470 for (;;) {
1471 /* It's possible for dlm_scand to remove an old rsb for
1472 this same resource from the toss list, us to create
1473 a new one, look up the master locally, and find it
1474 already exists just before dlm_scand does the
1475 dir_remove() on the previous rsb. */
1477 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1478 r->res_length, &ret_nodeid);
1479 if (!error)
1480 break;
1481 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1482 schedule();
1485 if (ret_nodeid == our_nodeid) {
1486 r->res_first_lkid = 0;
1487 r->res_nodeid = 0;
1488 lkb->lkb_nodeid = 0;
1489 } else {
1490 r->res_first_lkid = lkb->lkb_id;
1491 r->res_nodeid = ret_nodeid;
1492 lkb->lkb_nodeid = ret_nodeid;
1494 return 0;
1497 static void process_lookup_list(struct dlm_rsb *r)
1499 struct dlm_lkb *lkb, *safe;
1501 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1502 list_del(&lkb->lkb_rsb_lookup);
1503 _request_lock(r, lkb);
1504 schedule();
1508 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1510 static void confirm_master(struct dlm_rsb *r, int error)
1512 struct dlm_lkb *lkb;
1514 if (!r->res_first_lkid)
1515 return;
1517 switch (error) {
1518 case 0:
1519 case -EINPROGRESS:
1520 r->res_first_lkid = 0;
1521 process_lookup_list(r);
1522 break;
1524 case -EAGAIN:
1525 /* the remote master didn't queue our NOQUEUE request;
1526 make a waiting lkb the first_lkid */
1528 r->res_first_lkid = 0;
1530 if (!list_empty(&r->res_lookup)) {
1531 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1532 lkb_rsb_lookup);
1533 list_del(&lkb->lkb_rsb_lookup);
1534 r->res_first_lkid = lkb->lkb_id;
1535 _request_lock(r, lkb);
1536 } else
1537 r->res_nodeid = -1;
1538 break;
1540 default:
1541 log_error(r->res_ls, "confirm_master unknown error %d", error);
1545 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1546 int namelen, uint32_t parent_lkid, void *ast,
1547 void *astarg, void *bast, struct dlm_args *args)
1549 int rv = -EINVAL;
1551 /* check for invalid arg usage */
1553 if (mode < 0 || mode > DLM_LOCK_EX)
1554 goto out;
1556 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1557 goto out;
1559 if (flags & DLM_LKF_CANCEL)
1560 goto out;
1562 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1563 goto out;
1565 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1566 goto out;
1568 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1569 goto out;
1571 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1572 goto out;
1574 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1575 goto out;
1577 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1578 goto out;
1580 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1581 goto out;
1583 if (!ast || !lksb)
1584 goto out;
1586 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1587 goto out;
1589 /* parent/child locks not yet supported */
1590 if (parent_lkid)
1591 goto out;
1593 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1594 goto out;
1596 /* these args will be copied to the lkb in validate_lock_args,
1597 it cannot be done now because when converting locks, fields in
1598 an active lkb cannot be modified before locking the rsb */
1600 args->flags = flags;
1601 args->astaddr = ast;
1602 args->astparam = (long) astarg;
1603 args->bastaddr = bast;
1604 args->mode = mode;
1605 args->lksb = lksb;
1606 rv = 0;
1607 out:
1608 return rv;
1611 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1613 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1614 DLM_LKF_FORCEUNLOCK))
1615 return -EINVAL;
1617 args->flags = flags;
1618 args->astparam = (long) astarg;
1619 return 0;
1622 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1623 struct dlm_args *args)
1625 int rv = -EINVAL;
1627 if (args->flags & DLM_LKF_CONVERT) {
1628 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1629 goto out;
1631 if (args->flags & DLM_LKF_QUECVT &&
1632 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1633 goto out;
1635 rv = -EBUSY;
1636 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1637 goto out;
1639 if (lkb->lkb_wait_type)
1640 goto out;
1643 lkb->lkb_exflags = args->flags;
1644 lkb->lkb_sbflags = 0;
1645 lkb->lkb_astaddr = args->astaddr;
1646 lkb->lkb_astparam = args->astparam;
1647 lkb->lkb_bastaddr = args->bastaddr;
1648 lkb->lkb_rqmode = args->mode;
1649 lkb->lkb_lksb = args->lksb;
1650 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1651 lkb->lkb_ownpid = (int) current->pid;
1652 rv = 0;
1653 out:
1654 return rv;
1657 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1659 int rv = -EINVAL;
1661 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1662 goto out;
1664 if (args->flags & DLM_LKF_FORCEUNLOCK)
1665 goto out_ok;
1667 if (args->flags & DLM_LKF_CANCEL &&
1668 lkb->lkb_status == DLM_LKSTS_GRANTED)
1669 goto out;
1671 if (!(args->flags & DLM_LKF_CANCEL) &&
1672 lkb->lkb_status != DLM_LKSTS_GRANTED)
1673 goto out;
1675 rv = -EBUSY;
1676 if (lkb->lkb_wait_type)
1677 goto out;
1679 out_ok:
1680 lkb->lkb_exflags = args->flags;
1681 lkb->lkb_sbflags = 0;
1682 lkb->lkb_astparam = args->astparam;
1684 rv = 0;
1685 out:
1686 return rv;
1690 * Four stage 4 varieties:
1691 * do_request(), do_convert(), do_unlock(), do_cancel()
1692 * These are called on the master node for the given lock and
1693 * from the central locking logic.
1696 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1698 int error = 0;
1700 if (can_be_granted(r, lkb, 1)) {
1701 grant_lock(r, lkb);
1702 queue_cast(r, lkb, 0);
1703 goto out;
1706 if (can_be_queued(lkb)) {
1707 error = -EINPROGRESS;
1708 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1709 send_blocking_asts(r, lkb);
1710 goto out;
1713 error = -EAGAIN;
1714 if (force_blocking_asts(lkb))
1715 send_blocking_asts_all(r, lkb);
1716 queue_cast(r, lkb, -EAGAIN);
1718 out:
1719 return error;
1722 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1724 int error = 0;
1726 /* changing an existing lock may allow others to be granted */
1728 if (can_be_granted(r, lkb, 1)) {
1729 grant_lock(r, lkb);
1730 queue_cast(r, lkb, 0);
1731 grant_pending_locks(r);
1732 goto out;
1735 if (can_be_queued(lkb)) {
1736 if (is_demoted(lkb))
1737 grant_pending_locks(r);
1738 error = -EINPROGRESS;
1739 del_lkb(r, lkb);
1740 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1741 send_blocking_asts(r, lkb);
1742 goto out;
1745 error = -EAGAIN;
1746 if (force_blocking_asts(lkb))
1747 send_blocking_asts_all(r, lkb);
1748 queue_cast(r, lkb, -EAGAIN);
1750 out:
1751 return error;
1754 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1756 remove_lock(r, lkb);
1757 queue_cast(r, lkb, -DLM_EUNLOCK);
1758 grant_pending_locks(r);
1759 return -DLM_EUNLOCK;
1762 /* FIXME: if revert_lock() finds that the lkb is granted, we should
1763 skip the queue_cast(ECANCEL). It indicates that the request/convert
1764 completed (and queued a normal ast) just before the cancel; we don't
1765 want to clobber the sb_result for the normal ast with ECANCEL. */
1767 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1769 revert_lock(r, lkb);
1770 queue_cast(r, lkb, -DLM_ECANCEL);
1771 grant_pending_locks(r);
1772 return -DLM_ECANCEL;
1776 * Four stage 3 varieties:
1777 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1780 /* add a new lkb to a possibly new rsb, called by requesting process */
1782 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1784 int error;
1786 /* set_master: sets lkb nodeid from r */
1788 error = set_master(r, lkb);
1789 if (error < 0)
1790 goto out;
1791 if (error) {
1792 error = 0;
1793 goto out;
1796 if (is_remote(r))
1797 /* receive_request() calls do_request() on remote node */
1798 error = send_request(r, lkb);
1799 else
1800 error = do_request(r, lkb);
1801 out:
1802 return error;
1805 /* change some property of an existing lkb, e.g. mode */
1807 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1809 int error;
1811 if (is_remote(r))
1812 /* receive_convert() calls do_convert() on remote node */
1813 error = send_convert(r, lkb);
1814 else
1815 error = do_convert(r, lkb);
1817 return error;
1820 /* remove an existing lkb from the granted queue */
1822 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1824 int error;
1826 if (is_remote(r))
1827 /* receive_unlock() calls do_unlock() on remote node */
1828 error = send_unlock(r, lkb);
1829 else
1830 error = do_unlock(r, lkb);
1832 return error;
1835 /* remove an existing lkb from the convert or wait queue */
1837 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1839 int error;
1841 if (is_remote(r))
1842 /* receive_cancel() calls do_cancel() on remote node */
1843 error = send_cancel(r, lkb);
1844 else
1845 error = do_cancel(r, lkb);
1847 return error;
1851 * Four stage 2 varieties:
1852 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1855 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1856 int len, struct dlm_args *args)
1858 struct dlm_rsb *r;
1859 int error;
1861 error = validate_lock_args(ls, lkb, args);
1862 if (error)
1863 goto out;
1865 error = find_rsb(ls, name, len, R_CREATE, &r);
1866 if (error)
1867 goto out;
1869 lock_rsb(r);
1871 attach_lkb(r, lkb);
1872 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1874 error = _request_lock(r, lkb);
1876 unlock_rsb(r);
1877 put_rsb(r);
1879 out:
1880 return error;
1883 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1884 struct dlm_args *args)
1886 struct dlm_rsb *r;
1887 int error;
1889 r = lkb->lkb_resource;
1891 hold_rsb(r);
1892 lock_rsb(r);
1894 error = validate_lock_args(ls, lkb, args);
1895 if (error)
1896 goto out;
1898 error = _convert_lock(r, lkb);
1899 out:
1900 unlock_rsb(r);
1901 put_rsb(r);
1902 return error;
1905 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1906 struct dlm_args *args)
1908 struct dlm_rsb *r;
1909 int error;
1911 r = lkb->lkb_resource;
1913 hold_rsb(r);
1914 lock_rsb(r);
1916 error = validate_unlock_args(lkb, args);
1917 if (error)
1918 goto out;
1920 error = _unlock_lock(r, lkb);
1921 out:
1922 unlock_rsb(r);
1923 put_rsb(r);
1924 return error;
1927 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1928 struct dlm_args *args)
1930 struct dlm_rsb *r;
1931 int error;
1933 r = lkb->lkb_resource;
1935 hold_rsb(r);
1936 lock_rsb(r);
1938 error = validate_unlock_args(lkb, args);
1939 if (error)
1940 goto out;
1942 error = _cancel_lock(r, lkb);
1943 out:
1944 unlock_rsb(r);
1945 put_rsb(r);
1946 return error;
1950 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1953 int dlm_lock(dlm_lockspace_t *lockspace,
1954 int mode,
1955 struct dlm_lksb *lksb,
1956 uint32_t flags,
1957 void *name,
1958 unsigned int namelen,
1959 uint32_t parent_lkid,
1960 void (*ast) (void *astarg),
1961 void *astarg,
1962 void (*bast) (void *astarg, int mode))
1964 struct dlm_ls *ls;
1965 struct dlm_lkb *lkb;
1966 struct dlm_args args;
1967 int error, convert = flags & DLM_LKF_CONVERT;
1969 ls = dlm_find_lockspace_local(lockspace);
1970 if (!ls)
1971 return -EINVAL;
1973 lock_recovery(ls);
1975 if (convert)
1976 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1977 else
1978 error = create_lkb(ls, &lkb);
1980 if (error)
1981 goto out;
1983 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1984 astarg, bast, &args);
1985 if (error)
1986 goto out_put;
1988 if (convert)
1989 error = convert_lock(ls, lkb, &args);
1990 else
1991 error = request_lock(ls, lkb, name, namelen, &args);
1993 if (error == -EINPROGRESS)
1994 error = 0;
1995 out_put:
1996 if (convert || error)
1997 __put_lkb(ls, lkb);
1998 if (error == -EAGAIN)
1999 error = 0;
2000 out:
2001 unlock_recovery(ls);
2002 dlm_put_lockspace(ls);
2003 return error;
2006 int dlm_unlock(dlm_lockspace_t *lockspace,
2007 uint32_t lkid,
2008 uint32_t flags,
2009 struct dlm_lksb *lksb,
2010 void *astarg)
2012 struct dlm_ls *ls;
2013 struct dlm_lkb *lkb;
2014 struct dlm_args args;
2015 int error;
2017 ls = dlm_find_lockspace_local(lockspace);
2018 if (!ls)
2019 return -EINVAL;
2021 lock_recovery(ls);
2023 error = find_lkb(ls, lkid, &lkb);
2024 if (error)
2025 goto out;
2027 error = set_unlock_args(flags, astarg, &args);
2028 if (error)
2029 goto out_put;
2031 if (flags & DLM_LKF_CANCEL)
2032 error = cancel_lock(ls, lkb, &args);
2033 else
2034 error = unlock_lock(ls, lkb, &args);
2036 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2037 error = 0;
2038 out_put:
2039 dlm_put_lkb(lkb);
2040 out:
2041 unlock_recovery(ls);
2042 dlm_put_lockspace(ls);
2043 return error;
2047 * send/receive routines for remote operations and replies
2049 * send_args
2050 * send_common
2051 * send_request receive_request
2052 * send_convert receive_convert
2053 * send_unlock receive_unlock
2054 * send_cancel receive_cancel
2055 * send_grant receive_grant
2056 * send_bast receive_bast
2057 * send_lookup receive_lookup
2058 * send_remove receive_remove
2060 * send_common_reply
2061 * receive_request_reply send_request_reply
2062 * receive_convert_reply send_convert_reply
2063 * receive_unlock_reply send_unlock_reply
2064 * receive_cancel_reply send_cancel_reply
2065 * receive_lookup_reply send_lookup_reply
2068 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2069 int to_nodeid, int mstype,
2070 struct dlm_message **ms_ret,
2071 struct dlm_mhandle **mh_ret)
2073 struct dlm_message *ms;
2074 struct dlm_mhandle *mh;
2075 char *mb;
2076 int mb_len = sizeof(struct dlm_message);
2078 switch (mstype) {
2079 case DLM_MSG_REQUEST:
2080 case DLM_MSG_LOOKUP:
2081 case DLM_MSG_REMOVE:
2082 mb_len += r->res_length;
2083 break;
2084 case DLM_MSG_CONVERT:
2085 case DLM_MSG_UNLOCK:
2086 case DLM_MSG_REQUEST_REPLY:
2087 case DLM_MSG_CONVERT_REPLY:
2088 case DLM_MSG_GRANT:
2089 if (lkb && lkb->lkb_lvbptr)
2090 mb_len += r->res_ls->ls_lvblen;
2091 break;
2094 /* get_buffer gives us a message handle (mh) that we need to
2095 pass into lowcomms_commit and a message buffer (mb) that we
2096 write our data into */
2098 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2099 if (!mh)
2100 return -ENOBUFS;
2102 memset(mb, 0, mb_len);
2104 ms = (struct dlm_message *) mb;
2106 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2107 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2108 ms->m_header.h_nodeid = dlm_our_nodeid();
2109 ms->m_header.h_length = mb_len;
2110 ms->m_header.h_cmd = DLM_MSG;
2112 ms->m_type = mstype;
2114 *mh_ret = mh;
2115 *ms_ret = ms;
2116 return 0;
2119 /* further lowcomms enhancements or alternate implementations may make
2120 the return value from this function useful at some point */
2122 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2124 dlm_message_out(ms);
2125 dlm_lowcomms_commit_buffer(mh);
2126 return 0;
2129 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2130 struct dlm_message *ms)
2132 ms->m_nodeid = lkb->lkb_nodeid;
2133 ms->m_pid = lkb->lkb_ownpid;
2134 ms->m_lkid = lkb->lkb_id;
2135 ms->m_remid = lkb->lkb_remid;
2136 ms->m_exflags = lkb->lkb_exflags;
2137 ms->m_sbflags = lkb->lkb_sbflags;
2138 ms->m_flags = lkb->lkb_flags;
2139 ms->m_lvbseq = lkb->lkb_lvbseq;
2140 ms->m_status = lkb->lkb_status;
2141 ms->m_grmode = lkb->lkb_grmode;
2142 ms->m_rqmode = lkb->lkb_rqmode;
2143 ms->m_hash = r->res_hash;
2145 /* m_result and m_bastmode are set from function args,
2146 not from lkb fields */
2148 if (lkb->lkb_bastaddr)
2149 ms->m_asts |= AST_BAST;
2150 if (lkb->lkb_astaddr)
2151 ms->m_asts |= AST_COMP;
2153 /* compare with switch in create_message; send_remove() doesn't
2154 use send_args() */
2156 switch (ms->m_type) {
2157 case DLM_MSG_REQUEST:
2158 case DLM_MSG_LOOKUP:
2159 memcpy(ms->m_extra, r->res_name, r->res_length);
2160 break;
2161 case DLM_MSG_CONVERT:
2162 case DLM_MSG_UNLOCK:
2163 case DLM_MSG_REQUEST_REPLY:
2164 case DLM_MSG_CONVERT_REPLY:
2165 case DLM_MSG_GRANT:
2166 if (!lkb->lkb_lvbptr)
2167 break;
2168 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2169 break;
2173 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2175 struct dlm_message *ms;
2176 struct dlm_mhandle *mh;
2177 int to_nodeid, error;
2179 add_to_waiters(lkb, mstype);
2181 to_nodeid = r->res_nodeid;
2183 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2184 if (error)
2185 goto fail;
2187 send_args(r, lkb, ms);
2189 error = send_message(mh, ms);
2190 if (error)
2191 goto fail;
2192 return 0;
2194 fail:
2195 remove_from_waiters(lkb);
2196 return error;
2199 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2201 return send_common(r, lkb, DLM_MSG_REQUEST);
2204 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2206 int error;
2208 error = send_common(r, lkb, DLM_MSG_CONVERT);
2210 /* down conversions go without a reply from the master */
2211 if (!error && down_conversion(lkb)) {
2212 remove_from_waiters(lkb);
2213 r->res_ls->ls_stub_ms.m_result = 0;
2214 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2215 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2218 return error;
2221 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2222 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2223 that the master is still correct. */
2225 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2227 return send_common(r, lkb, DLM_MSG_UNLOCK);
2230 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2232 return send_common(r, lkb, DLM_MSG_CANCEL);
2235 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2237 struct dlm_message *ms;
2238 struct dlm_mhandle *mh;
2239 int to_nodeid, error;
2241 to_nodeid = lkb->lkb_nodeid;
2243 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2244 if (error)
2245 goto out;
2247 send_args(r, lkb, ms);
2249 ms->m_result = 0;
2251 error = send_message(mh, ms);
2252 out:
2253 return error;
2256 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2258 struct dlm_message *ms;
2259 struct dlm_mhandle *mh;
2260 int to_nodeid, error;
2262 to_nodeid = lkb->lkb_nodeid;
2264 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2265 if (error)
2266 goto out;
2268 send_args(r, lkb, ms);
2270 ms->m_bastmode = mode;
2272 error = send_message(mh, ms);
2273 out:
2274 return error;
2277 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2279 struct dlm_message *ms;
2280 struct dlm_mhandle *mh;
2281 int to_nodeid, error;
2283 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2285 to_nodeid = dlm_dir_nodeid(r);
2287 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2288 if (error)
2289 goto fail;
2291 send_args(r, lkb, ms);
2293 error = send_message(mh, ms);
2294 if (error)
2295 goto fail;
2296 return 0;
2298 fail:
2299 remove_from_waiters(lkb);
2300 return error;
2303 static int send_remove(struct dlm_rsb *r)
2305 struct dlm_message *ms;
2306 struct dlm_mhandle *mh;
2307 int to_nodeid, error;
2309 to_nodeid = dlm_dir_nodeid(r);
2311 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2312 if (error)
2313 goto out;
2315 memcpy(ms->m_extra, r->res_name, r->res_length);
2316 ms->m_hash = r->res_hash;
2318 error = send_message(mh, ms);
2319 out:
2320 return error;
2323 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2324 int mstype, int rv)
2326 struct dlm_message *ms;
2327 struct dlm_mhandle *mh;
2328 int to_nodeid, error;
2330 to_nodeid = lkb->lkb_nodeid;
2332 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2333 if (error)
2334 goto out;
2336 send_args(r, lkb, ms);
2338 ms->m_result = rv;
2340 error = send_message(mh, ms);
2341 out:
2342 return error;
2345 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2347 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2350 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2352 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2355 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2357 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2360 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2362 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2365 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2366 int ret_nodeid, int rv)
2368 struct dlm_rsb *r = &ls->ls_stub_rsb;
2369 struct dlm_message *ms;
2370 struct dlm_mhandle *mh;
2371 int error, nodeid = ms_in->m_header.h_nodeid;
2373 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2374 if (error)
2375 goto out;
2377 ms->m_lkid = ms_in->m_lkid;
2378 ms->m_result = rv;
2379 ms->m_nodeid = ret_nodeid;
2381 error = send_message(mh, ms);
2382 out:
2383 return error;
2386 /* which args we save from a received message depends heavily on the type
2387 of message, unlike the send side where we can safely send everything about
2388 the lkb for any type of message */
2390 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2392 lkb->lkb_exflags = ms->m_exflags;
2393 lkb->lkb_sbflags = ms->m_sbflags;
2394 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2395 (ms->m_flags & 0x0000FFFF);
2398 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2400 lkb->lkb_sbflags = ms->m_sbflags;
2401 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2402 (ms->m_flags & 0x0000FFFF);
2405 static int receive_extralen(struct dlm_message *ms)
2407 return (ms->m_header.h_length - sizeof(struct dlm_message));
2410 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2411 struct dlm_message *ms)
2413 int len;
2415 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2416 if (!lkb->lkb_lvbptr)
2417 lkb->lkb_lvbptr = allocate_lvb(ls);
2418 if (!lkb->lkb_lvbptr)
2419 return -ENOMEM;
2420 len = receive_extralen(ms);
2421 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2423 return 0;
2426 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2427 struct dlm_message *ms)
2429 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2430 lkb->lkb_ownpid = ms->m_pid;
2431 lkb->lkb_remid = ms->m_lkid;
2432 lkb->lkb_grmode = DLM_LOCK_IV;
2433 lkb->lkb_rqmode = ms->m_rqmode;
2434 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2435 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2437 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2439 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2440 /* lkb was just created so there won't be an lvb yet */
2441 lkb->lkb_lvbptr = allocate_lvb(ls);
2442 if (!lkb->lkb_lvbptr)
2443 return -ENOMEM;
2446 return 0;
2449 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2450 struct dlm_message *ms)
2452 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2453 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2454 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2455 lkb->lkb_id, lkb->lkb_remid);
2456 return -EINVAL;
2459 if (!is_master_copy(lkb))
2460 return -EINVAL;
2462 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2463 return -EBUSY;
2465 if (receive_lvb(ls, lkb, ms))
2466 return -ENOMEM;
2468 lkb->lkb_rqmode = ms->m_rqmode;
2469 lkb->lkb_lvbseq = ms->m_lvbseq;
2471 return 0;
2474 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2475 struct dlm_message *ms)
2477 if (!is_master_copy(lkb))
2478 return -EINVAL;
2479 if (receive_lvb(ls, lkb, ms))
2480 return -ENOMEM;
2481 return 0;
2484 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2485 uses to send a reply and that the remote end uses to process the reply. */
2487 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2489 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2490 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2491 lkb->lkb_remid = ms->m_lkid;
2494 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2496 struct dlm_lkb *lkb;
2497 struct dlm_rsb *r;
2498 int error, namelen;
2500 error = create_lkb(ls, &lkb);
2501 if (error)
2502 goto fail;
2504 receive_flags(lkb, ms);
2505 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2506 error = receive_request_args(ls, lkb, ms);
2507 if (error) {
2508 __put_lkb(ls, lkb);
2509 goto fail;
2512 namelen = receive_extralen(ms);
2514 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2515 if (error) {
2516 __put_lkb(ls, lkb);
2517 goto fail;
2520 lock_rsb(r);
2522 attach_lkb(r, lkb);
2523 error = do_request(r, lkb);
2524 send_request_reply(r, lkb, error);
2526 unlock_rsb(r);
2527 put_rsb(r);
2529 if (error == -EINPROGRESS)
2530 error = 0;
2531 if (error)
2532 dlm_put_lkb(lkb);
2533 return;
2535 fail:
2536 setup_stub_lkb(ls, ms);
2537 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2540 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2542 struct dlm_lkb *lkb;
2543 struct dlm_rsb *r;
2544 int error, reply = 1;
2546 error = find_lkb(ls, ms->m_remid, &lkb);
2547 if (error)
2548 goto fail;
2550 r = lkb->lkb_resource;
2552 hold_rsb(r);
2553 lock_rsb(r);
2555 receive_flags(lkb, ms);
2556 error = receive_convert_args(ls, lkb, ms);
2557 if (error)
2558 goto out;
2559 reply = !down_conversion(lkb);
2561 error = do_convert(r, lkb);
2562 out:
2563 if (reply)
2564 send_convert_reply(r, lkb, error);
2566 unlock_rsb(r);
2567 put_rsb(r);
2568 dlm_put_lkb(lkb);
2569 return;
2571 fail:
2572 setup_stub_lkb(ls, ms);
2573 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2576 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2578 struct dlm_lkb *lkb;
2579 struct dlm_rsb *r;
2580 int error;
2582 error = find_lkb(ls, ms->m_remid, &lkb);
2583 if (error)
2584 goto fail;
2586 r = lkb->lkb_resource;
2588 hold_rsb(r);
2589 lock_rsb(r);
2591 receive_flags(lkb, ms);
2592 error = receive_unlock_args(ls, lkb, ms);
2593 if (error)
2594 goto out;
2596 error = do_unlock(r, lkb);
2597 out:
2598 send_unlock_reply(r, lkb, error);
2600 unlock_rsb(r);
2601 put_rsb(r);
2602 dlm_put_lkb(lkb);
2603 return;
2605 fail:
2606 setup_stub_lkb(ls, ms);
2607 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2610 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2612 struct dlm_lkb *lkb;
2613 struct dlm_rsb *r;
2614 int error;
2616 error = find_lkb(ls, ms->m_remid, &lkb);
2617 if (error)
2618 goto fail;
2620 receive_flags(lkb, ms);
2622 r = lkb->lkb_resource;
2624 hold_rsb(r);
2625 lock_rsb(r);
2627 error = do_cancel(r, lkb);
2628 send_cancel_reply(r, lkb, error);
2630 unlock_rsb(r);
2631 put_rsb(r);
2632 dlm_put_lkb(lkb);
2633 return;
2635 fail:
2636 setup_stub_lkb(ls, ms);
2637 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2640 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2642 struct dlm_lkb *lkb;
2643 struct dlm_rsb *r;
2644 int error;
2646 error = find_lkb(ls, ms->m_remid, &lkb);
2647 if (error) {
2648 log_error(ls, "receive_grant no lkb");
2649 return;
2651 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2653 r = lkb->lkb_resource;
2655 hold_rsb(r);
2656 lock_rsb(r);
2658 receive_flags_reply(lkb, ms);
2659 grant_lock_pc(r, lkb, ms);
2660 queue_cast(r, lkb, 0);
2662 unlock_rsb(r);
2663 put_rsb(r);
2664 dlm_put_lkb(lkb);
2667 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2669 struct dlm_lkb *lkb;
2670 struct dlm_rsb *r;
2671 int error;
2673 error = find_lkb(ls, ms->m_remid, &lkb);
2674 if (error) {
2675 log_error(ls, "receive_bast no lkb");
2676 return;
2678 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2680 r = lkb->lkb_resource;
2682 hold_rsb(r);
2683 lock_rsb(r);
2685 queue_bast(r, lkb, ms->m_bastmode);
2687 unlock_rsb(r);
2688 put_rsb(r);
2689 dlm_put_lkb(lkb);
2692 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2694 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2696 from_nodeid = ms->m_header.h_nodeid;
2697 our_nodeid = dlm_our_nodeid();
2699 len = receive_extralen(ms);
2701 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2702 if (dir_nodeid != our_nodeid) {
2703 log_error(ls, "lookup dir_nodeid %d from %d",
2704 dir_nodeid, from_nodeid);
2705 error = -EINVAL;
2706 ret_nodeid = -1;
2707 goto out;
2710 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2712 /* Optimization: we're master so treat lookup as a request */
2713 if (!error && ret_nodeid == our_nodeid) {
2714 receive_request(ls, ms);
2715 return;
2717 out:
2718 send_lookup_reply(ls, ms, ret_nodeid, error);
2721 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2723 int len, dir_nodeid, from_nodeid;
2725 from_nodeid = ms->m_header.h_nodeid;
2727 len = receive_extralen(ms);
2729 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2730 if (dir_nodeid != dlm_our_nodeid()) {
2731 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2732 dir_nodeid, from_nodeid);
2733 return;
2736 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2739 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2741 struct dlm_lkb *lkb;
2742 struct dlm_rsb *r;
2743 int error, mstype;
2745 error = find_lkb(ls, ms->m_remid, &lkb);
2746 if (error) {
2747 log_error(ls, "receive_request_reply no lkb");
2748 return;
2750 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2752 mstype = lkb->lkb_wait_type;
2753 error = remove_from_waiters(lkb);
2754 if (error) {
2755 log_error(ls, "receive_request_reply not on waiters");
2756 goto out;
2759 /* this is the value returned from do_request() on the master */
2760 error = ms->m_result;
2762 r = lkb->lkb_resource;
2763 hold_rsb(r);
2764 lock_rsb(r);
2766 /* Optimization: the dir node was also the master, so it took our
2767 lookup as a request and sent request reply instead of lookup reply */
2768 if (mstype == DLM_MSG_LOOKUP) {
2769 r->res_nodeid = ms->m_header.h_nodeid;
2770 lkb->lkb_nodeid = r->res_nodeid;
2773 switch (error) {
2774 case -EAGAIN:
2775 /* request would block (be queued) on remote master;
2776 the unhold undoes the original ref from create_lkb()
2777 so it leads to the lkb being freed */
2778 queue_cast(r, lkb, -EAGAIN);
2779 confirm_master(r, -EAGAIN);
2780 unhold_lkb(lkb);
2781 break;
2783 case -EINPROGRESS:
2784 case 0:
2785 /* request was queued or granted on remote master */
2786 receive_flags_reply(lkb, ms);
2787 lkb->lkb_remid = ms->m_lkid;
2788 if (error)
2789 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2790 else {
2791 grant_lock_pc(r, lkb, ms);
2792 queue_cast(r, lkb, 0);
2794 confirm_master(r, error);
2795 break;
2797 case -EBADR:
2798 case -ENOTBLK:
2799 /* find_rsb failed to find rsb or rsb wasn't master */
2800 r->res_nodeid = -1;
2801 lkb->lkb_nodeid = -1;
2802 _request_lock(r, lkb);
2803 break;
2805 default:
2806 log_error(ls, "receive_request_reply error %d", error);
2809 unlock_rsb(r);
2810 put_rsb(r);
2811 out:
2812 dlm_put_lkb(lkb);
2815 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2816 struct dlm_message *ms)
2818 int error = ms->m_result;
2820 /* this is the value returned from do_convert() on the master */
2822 switch (error) {
2823 case -EAGAIN:
2824 /* convert would block (be queued) on remote master */
2825 queue_cast(r, lkb, -EAGAIN);
2826 break;
2828 case -EINPROGRESS:
2829 /* convert was queued on remote master */
2830 del_lkb(r, lkb);
2831 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2832 break;
2834 case 0:
2835 /* convert was granted on remote master */
2836 receive_flags_reply(lkb, ms);
2837 grant_lock_pc(r, lkb, ms);
2838 queue_cast(r, lkb, 0);
2839 break;
2841 default:
2842 log_error(r->res_ls, "receive_convert_reply error %d", error);
2846 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2848 struct dlm_rsb *r = lkb->lkb_resource;
2850 hold_rsb(r);
2851 lock_rsb(r);
2853 __receive_convert_reply(r, lkb, ms);
2855 unlock_rsb(r);
2856 put_rsb(r);
2859 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2861 struct dlm_lkb *lkb;
2862 int error;
2864 error = find_lkb(ls, ms->m_remid, &lkb);
2865 if (error) {
2866 log_error(ls, "receive_convert_reply no lkb");
2867 return;
2869 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2871 error = remove_from_waiters(lkb);
2872 if (error) {
2873 log_error(ls, "receive_convert_reply not on waiters");
2874 goto out;
2877 _receive_convert_reply(lkb, ms);
2878 out:
2879 dlm_put_lkb(lkb);
2882 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2884 struct dlm_rsb *r = lkb->lkb_resource;
2885 int error = ms->m_result;
2887 hold_rsb(r);
2888 lock_rsb(r);
2890 /* this is the value returned from do_unlock() on the master */
2892 switch (error) {
2893 case -DLM_EUNLOCK:
2894 receive_flags_reply(lkb, ms);
2895 remove_lock_pc(r, lkb);
2896 queue_cast(r, lkb, -DLM_EUNLOCK);
2897 break;
2898 default:
2899 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2902 unlock_rsb(r);
2903 put_rsb(r);
2906 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2908 struct dlm_lkb *lkb;
2909 int error;
2911 error = find_lkb(ls, ms->m_remid, &lkb);
2912 if (error) {
2913 log_error(ls, "receive_unlock_reply no lkb");
2914 return;
2916 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2918 error = remove_from_waiters(lkb);
2919 if (error) {
2920 log_error(ls, "receive_unlock_reply not on waiters");
2921 goto out;
2924 _receive_unlock_reply(lkb, ms);
2925 out:
2926 dlm_put_lkb(lkb);
2929 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2931 struct dlm_rsb *r = lkb->lkb_resource;
2932 int error = ms->m_result;
2934 hold_rsb(r);
2935 lock_rsb(r);
2937 /* this is the value returned from do_cancel() on the master */
2939 switch (error) {
2940 case -DLM_ECANCEL:
2941 receive_flags_reply(lkb, ms);
2942 revert_lock_pc(r, lkb);
2943 queue_cast(r, lkb, -DLM_ECANCEL);
2944 break;
2945 default:
2946 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2949 unlock_rsb(r);
2950 put_rsb(r);
2953 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2955 struct dlm_lkb *lkb;
2956 int error;
2958 error = find_lkb(ls, ms->m_remid, &lkb);
2959 if (error) {
2960 log_error(ls, "receive_cancel_reply no lkb");
2961 return;
2963 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2965 error = remove_from_waiters(lkb);
2966 if (error) {
2967 log_error(ls, "receive_cancel_reply not on waiters");
2968 goto out;
2971 _receive_cancel_reply(lkb, ms);
2972 out:
2973 dlm_put_lkb(lkb);
2976 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2978 struct dlm_lkb *lkb;
2979 struct dlm_rsb *r;
2980 int error, ret_nodeid;
2982 error = find_lkb(ls, ms->m_lkid, &lkb);
2983 if (error) {
2984 log_error(ls, "receive_lookup_reply no lkb");
2985 return;
2988 error = remove_from_waiters(lkb);
2989 if (error) {
2990 log_error(ls, "receive_lookup_reply not on waiters");
2991 goto out;
2994 /* this is the value returned by dlm_dir_lookup on dir node
2995 FIXME: will a non-zero error ever be returned? */
2996 error = ms->m_result;
2998 r = lkb->lkb_resource;
2999 hold_rsb(r);
3000 lock_rsb(r);
3002 ret_nodeid = ms->m_nodeid;
3003 if (ret_nodeid == dlm_our_nodeid()) {
3004 r->res_nodeid = 0;
3005 ret_nodeid = 0;
3006 r->res_first_lkid = 0;
3007 } else {
3008 /* set_master() will copy res_nodeid to lkb_nodeid */
3009 r->res_nodeid = ret_nodeid;
3012 _request_lock(r, lkb);
3014 if (!ret_nodeid)
3015 process_lookup_list(r);
3017 unlock_rsb(r);
3018 put_rsb(r);
3019 out:
3020 dlm_put_lkb(lkb);
3023 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3025 struct dlm_message *ms = (struct dlm_message *) hd;
3026 struct dlm_ls *ls;
3027 int error = 0;
3029 if (!recovery)
3030 dlm_message_in(ms);
3032 ls = dlm_find_lockspace_global(hd->h_lockspace);
3033 if (!ls) {
3034 log_print("drop message %d from %d for unknown lockspace %d",
3035 ms->m_type, nodeid, hd->h_lockspace);
3036 return -EINVAL;
3039 /* recovery may have just ended leaving a bunch of backed-up requests
3040 in the requestqueue; wait while dlm_recoverd clears them */
3042 if (!recovery)
3043 dlm_wait_requestqueue(ls);
3045 /* recovery may have just started while there were a bunch of
3046 in-flight requests -- save them in requestqueue to be processed
3047 after recovery. we can't let dlm_recvd block on the recovery
3048 lock. if dlm_recoverd is calling this function to clear the
3049 requestqueue, it needs to be interrupted (-EINTR) if another
3050 recovery operation is starting. */
3052 while (1) {
3053 if (dlm_locking_stopped(ls)) {
3054 if (recovery) {
3055 error = -EINTR;
3056 goto out;
3058 error = dlm_add_requestqueue(ls, nodeid, hd);
3059 if (error == -EAGAIN)
3060 continue;
3061 else {
3062 error = -EINTR;
3063 goto out;
3067 if (lock_recovery_try(ls))
3068 break;
3069 schedule();
3072 switch (ms->m_type) {
3074 /* messages sent to a master node */
3076 case DLM_MSG_REQUEST:
3077 receive_request(ls, ms);
3078 break;
3080 case DLM_MSG_CONVERT:
3081 receive_convert(ls, ms);
3082 break;
3084 case DLM_MSG_UNLOCK:
3085 receive_unlock(ls, ms);
3086 break;
3088 case DLM_MSG_CANCEL:
3089 receive_cancel(ls, ms);
3090 break;
3092 /* messages sent from a master node (replies to above) */
3094 case DLM_MSG_REQUEST_REPLY:
3095 receive_request_reply(ls, ms);
3096 break;
3098 case DLM_MSG_CONVERT_REPLY:
3099 receive_convert_reply(ls, ms);
3100 break;
3102 case DLM_MSG_UNLOCK_REPLY:
3103 receive_unlock_reply(ls, ms);
3104 break;
3106 case DLM_MSG_CANCEL_REPLY:
3107 receive_cancel_reply(ls, ms);
3108 break;
3110 /* messages sent from a master node (only two types of async msg) */
3112 case DLM_MSG_GRANT:
3113 receive_grant(ls, ms);
3114 break;
3116 case DLM_MSG_BAST:
3117 receive_bast(ls, ms);
3118 break;
3120 /* messages sent to a dir node */
3122 case DLM_MSG_LOOKUP:
3123 receive_lookup(ls, ms);
3124 break;
3126 case DLM_MSG_REMOVE:
3127 receive_remove(ls, ms);
3128 break;
3130 /* messages sent from a dir node (remove has no reply) */
3132 case DLM_MSG_LOOKUP_REPLY:
3133 receive_lookup_reply(ls, ms);
3134 break;
3136 default:
3137 log_error(ls, "unknown message type %d", ms->m_type);
3140 unlock_recovery(ls);
3141 out:
3142 dlm_put_lockspace(ls);
3143 dlm_astd_wake();
3144 return error;
3149 * Recovery related
3152 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3154 if (middle_conversion(lkb)) {
3155 hold_lkb(lkb);
3156 ls->ls_stub_ms.m_result = -EINPROGRESS;
3157 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3158 _remove_from_waiters(lkb);
3159 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3161 /* Same special case as in receive_rcom_lock_args() */
3162 lkb->lkb_grmode = DLM_LOCK_IV;
3163 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3164 unhold_lkb(lkb);
3166 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3167 lkb->lkb_flags |= DLM_IFL_RESEND;
3170 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3171 conversions are async; there's no reply from the remote master */
3174 /* A waiting lkb needs recovery if the master node has failed, or
3175 the master node is changing (only when no directory is used) */
3177 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3179 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3180 return 1;
3182 if (!dlm_no_directory(ls))
3183 return 0;
3185 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3186 return 1;
3188 return 0;
3191 /* Recovery for locks that are waiting for replies from nodes that are now
3192 gone. We can just complete unlocks and cancels by faking a reply from the
3193 dead node. Requests and up-conversions we flag to be resent after
3194 recovery. Down-conversions can just be completed with a fake reply like
3195 unlocks. Conversions between PR and CW need special attention. */
3197 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3199 struct dlm_lkb *lkb, *safe;
3201 mutex_lock(&ls->ls_waiters_mutex);
3203 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3204 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3205 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3207 /* all outstanding lookups, regardless of destination will be
3208 resent after recovery is done */
3210 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3211 lkb->lkb_flags |= DLM_IFL_RESEND;
3212 continue;
3215 if (!waiter_needs_recovery(ls, lkb))
3216 continue;
3218 switch (lkb->lkb_wait_type) {
3220 case DLM_MSG_REQUEST:
3221 lkb->lkb_flags |= DLM_IFL_RESEND;
3222 break;
3224 case DLM_MSG_CONVERT:
3225 recover_convert_waiter(ls, lkb);
3226 break;
3228 case DLM_MSG_UNLOCK:
3229 hold_lkb(lkb);
3230 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3231 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3232 _remove_from_waiters(lkb);
3233 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3234 dlm_put_lkb(lkb);
3235 break;
3237 case DLM_MSG_CANCEL:
3238 hold_lkb(lkb);
3239 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3240 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3241 _remove_from_waiters(lkb);
3242 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3243 dlm_put_lkb(lkb);
3244 break;
3246 default:
3247 log_error(ls, "invalid lkb wait_type %d",
3248 lkb->lkb_wait_type);
3250 schedule();
3252 mutex_unlock(&ls->ls_waiters_mutex);
3255 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3257 struct dlm_lkb *lkb;
3258 int rv = 0;
3260 mutex_lock(&ls->ls_waiters_mutex);
3261 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3262 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3263 rv = lkb->lkb_wait_type;
3264 _remove_from_waiters(lkb);
3265 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3266 break;
3269 mutex_unlock(&ls->ls_waiters_mutex);
3271 if (!rv)
3272 lkb = NULL;
3273 *lkb_ret = lkb;
3274 return rv;
3277 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3278 master or dir-node for r. Processing the lkb may result in it being placed
3279 back on waiters. */
3281 int dlm_recover_waiters_post(struct dlm_ls *ls)
3283 struct dlm_lkb *lkb;
3284 struct dlm_rsb *r;
3285 int error = 0, mstype;
3287 while (1) {
3288 if (dlm_locking_stopped(ls)) {
3289 log_debug(ls, "recover_waiters_post aborted");
3290 error = -EINTR;
3291 break;
3294 mstype = remove_resend_waiter(ls, &lkb);
3295 if (!mstype)
3296 break;
3298 r = lkb->lkb_resource;
3300 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3301 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3303 switch (mstype) {
3305 case DLM_MSG_LOOKUP:
3306 hold_rsb(r);
3307 lock_rsb(r);
3308 _request_lock(r, lkb);
3309 if (is_master(r))
3310 confirm_master(r, 0);
3311 unlock_rsb(r);
3312 put_rsb(r);
3313 break;
3315 case DLM_MSG_REQUEST:
3316 hold_rsb(r);
3317 lock_rsb(r);
3318 _request_lock(r, lkb);
3319 if (is_master(r))
3320 confirm_master(r, 0);
3321 unlock_rsb(r);
3322 put_rsb(r);
3323 break;
3325 case DLM_MSG_CONVERT:
3326 hold_rsb(r);
3327 lock_rsb(r);
3328 _convert_lock(r, lkb);
3329 unlock_rsb(r);
3330 put_rsb(r);
3331 break;
3333 default:
3334 log_error(ls, "recover_waiters_post type %d", mstype);
3338 return error;
3341 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3342 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3344 struct dlm_ls *ls = r->res_ls;
3345 struct dlm_lkb *lkb, *safe;
3347 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3348 if (test(ls, lkb)) {
3349 rsb_set_flag(r, RSB_LOCKS_PURGED);
3350 del_lkb(r, lkb);
3351 /* this put should free the lkb */
3352 if (!dlm_put_lkb(lkb))
3353 log_error(ls, "purged lkb not released");
3358 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3360 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3363 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3365 return is_master_copy(lkb);
3368 static void purge_dead_locks(struct dlm_rsb *r)
3370 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3371 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3372 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3375 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3377 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3378 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3379 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3382 /* Get rid of locks held by nodes that are gone. */
3384 int dlm_purge_locks(struct dlm_ls *ls)
3386 struct dlm_rsb *r;
3388 log_debug(ls, "dlm_purge_locks");
3390 down_write(&ls->ls_root_sem);
3391 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3392 hold_rsb(r);
3393 lock_rsb(r);
3394 if (is_master(r))
3395 purge_dead_locks(r);
3396 unlock_rsb(r);
3397 unhold_rsb(r);
3399 schedule();
3401 up_write(&ls->ls_root_sem);
3403 return 0;
3406 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3408 struct dlm_rsb *r, *r_ret = NULL;
3410 read_lock(&ls->ls_rsbtbl[bucket].lock);
3411 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3412 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3413 continue;
3414 hold_rsb(r);
3415 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3416 r_ret = r;
3417 break;
3419 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3420 return r_ret;
3423 void dlm_grant_after_purge(struct dlm_ls *ls)
3425 struct dlm_rsb *r;
3426 int bucket = 0;
3428 while (1) {
3429 r = find_purged_rsb(ls, bucket);
3430 if (!r) {
3431 if (bucket == ls->ls_rsbtbl_size - 1)
3432 break;
3433 bucket++;
3434 continue;
3436 lock_rsb(r);
3437 if (is_master(r)) {
3438 grant_pending_locks(r);
3439 confirm_master(r, 0);
3441 unlock_rsb(r);
3442 put_rsb(r);
3443 schedule();
3447 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3448 uint32_t remid)
3450 struct dlm_lkb *lkb;
3452 list_for_each_entry(lkb, head, lkb_statequeue) {
3453 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3454 return lkb;
3456 return NULL;
3459 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3460 uint32_t remid)
3462 struct dlm_lkb *lkb;
3464 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3465 if (lkb)
3466 return lkb;
3467 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3468 if (lkb)
3469 return lkb;
3470 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3471 if (lkb)
3472 return lkb;
3473 return NULL;
3476 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3477 struct dlm_rsb *r, struct dlm_rcom *rc)
3479 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3480 int lvblen;
3482 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3483 lkb->lkb_ownpid = rl->rl_ownpid;
3484 lkb->lkb_remid = rl->rl_lkid;
3485 lkb->lkb_exflags = rl->rl_exflags;
3486 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3487 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3488 lkb->lkb_lvbseq = rl->rl_lvbseq;
3489 lkb->lkb_rqmode = rl->rl_rqmode;
3490 lkb->lkb_grmode = rl->rl_grmode;
3491 /* don't set lkb_status because add_lkb wants to itself */
3493 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3494 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3496 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3497 lkb->lkb_lvbptr = allocate_lvb(ls);
3498 if (!lkb->lkb_lvbptr)
3499 return -ENOMEM;
3500 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3501 sizeof(struct rcom_lock);
3502 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3505 /* Conversions between PR and CW (middle modes) need special handling.
3506 The real granted mode of these converting locks cannot be determined
3507 until all locks have been rebuilt on the rsb (recover_conversion) */
3509 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3510 rl->rl_status = DLM_LKSTS_CONVERT;
3511 lkb->lkb_grmode = DLM_LOCK_IV;
3512 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3515 return 0;
3518 /* This lkb may have been recovered in a previous aborted recovery so we need
3519 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3520 If so we just send back a standard reply. If not, we create a new lkb with
3521 the given values and send back our lkid. We send back our lkid by sending
3522 back the rcom_lock struct we got but with the remid field filled in. */
3524 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3526 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3527 struct dlm_rsb *r;
3528 struct dlm_lkb *lkb;
3529 int error;
3531 if (rl->rl_parent_lkid) {
3532 error = -EOPNOTSUPP;
3533 goto out;
3536 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3537 if (error)
3538 goto out;
3540 lock_rsb(r);
3542 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3543 if (lkb) {
3544 error = -EEXIST;
3545 goto out_remid;
3548 error = create_lkb(ls, &lkb);
3549 if (error)
3550 goto out_unlock;
3552 error = receive_rcom_lock_args(ls, lkb, r, rc);
3553 if (error) {
3554 __put_lkb(ls, lkb);
3555 goto out_unlock;
3558 attach_lkb(r, lkb);
3559 add_lkb(r, lkb, rl->rl_status);
3560 error = 0;
3562 out_remid:
3563 /* this is the new value returned to the lock holder for
3564 saving in its process-copy lkb */
3565 rl->rl_remid = lkb->lkb_id;
3567 out_unlock:
3568 unlock_rsb(r);
3569 put_rsb(r);
3570 out:
3571 if (error)
3572 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3573 rl->rl_result = error;
3574 return error;
3577 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3579 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3580 struct dlm_rsb *r;
3581 struct dlm_lkb *lkb;
3582 int error;
3584 error = find_lkb(ls, rl->rl_lkid, &lkb);
3585 if (error) {
3586 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3587 return error;
3590 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3592 error = rl->rl_result;
3594 r = lkb->lkb_resource;
3595 hold_rsb(r);
3596 lock_rsb(r);
3598 switch (error) {
3599 case -EBADR:
3600 /* There's a chance the new master received our lock before
3601 dlm_recover_master_reply(), this wouldn't happen if we did
3602 a barrier between recover_masters and recover_locks. */
3603 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3604 (unsigned long)r, r->res_name);
3605 dlm_send_rcom_lock(r, lkb);
3606 goto out;
3607 case -EEXIST:
3608 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3609 /* fall through */
3610 case 0:
3611 lkb->lkb_remid = rl->rl_remid;
3612 break;
3613 default:
3614 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3615 error, lkb->lkb_id);
3618 /* an ack for dlm_recover_locks() which waits for replies from
3619 all the locks it sends to new masters */
3620 dlm_recovered_lock(r);
3621 out:
3622 unlock_rsb(r);
3623 put_rsb(r);
3624 dlm_put_lkb(lkb);
3626 return 0;
3629 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3630 int mode, uint32_t flags, void *name, unsigned int namelen,
3631 uint32_t parent_lkid)
3633 struct dlm_lkb *lkb;
3634 struct dlm_args args;
3635 int error;
3637 lock_recovery(ls);
3639 error = create_lkb(ls, &lkb);
3640 if (error) {
3641 kfree(ua);
3642 goto out;
3645 if (flags & DLM_LKF_VALBLK) {
3646 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3647 if (!ua->lksb.sb_lvbptr) {
3648 kfree(ua);
3649 __put_lkb(ls, lkb);
3650 error = -ENOMEM;
3651 goto out;
3655 /* After ua is attached to lkb it will be freed by free_lkb().
3656 When DLM_IFL_USER is set, the dlm knows that this is a userspace
3657 lock and that lkb_astparam is the dlm_user_args structure. */
3659 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3660 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
3661 lkb->lkb_flags |= DLM_IFL_USER;
3662 ua->old_mode = DLM_LOCK_IV;
3664 if (error) {
3665 __put_lkb(ls, lkb);
3666 goto out;
3669 error = request_lock(ls, lkb, name, namelen, &args);
3671 switch (error) {
3672 case 0:
3673 break;
3674 case -EINPROGRESS:
3675 error = 0;
3676 break;
3677 case -EAGAIN:
3678 error = 0;
3679 /* fall through */
3680 default:
3681 __put_lkb(ls, lkb);
3682 goto out;
3685 /* add this new lkb to the per-process list of locks */
3686 spin_lock(&ua->proc->locks_spin);
3687 kref_get(&lkb->lkb_ref);
3688 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3689 spin_unlock(&ua->proc->locks_spin);
3690 out:
3691 unlock_recovery(ls);
3692 return error;
3695 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3696 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3698 struct dlm_lkb *lkb;
3699 struct dlm_args args;
3700 struct dlm_user_args *ua;
3701 int error;
3703 lock_recovery(ls);
3705 error = find_lkb(ls, lkid, &lkb);
3706 if (error)
3707 goto out;
3709 /* user can change the params on its lock when it converts it, or
3710 add an lvb that didn't exist before */
3712 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3714 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3715 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3716 if (!ua->lksb.sb_lvbptr) {
3717 error = -ENOMEM;
3718 goto out_put;
3721 if (lvb_in && ua->lksb.sb_lvbptr)
3722 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3724 ua->castparam = ua_tmp->castparam;
3725 ua->castaddr = ua_tmp->castaddr;
3726 ua->bastparam = ua_tmp->bastparam;
3727 ua->bastaddr = ua_tmp->bastaddr;
3728 ua->user_lksb = ua_tmp->user_lksb;
3729 ua->old_mode = lkb->lkb_grmode;
3731 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
3732 ua, DLM_FAKE_USER_AST, &args);
3733 if (error)
3734 goto out_put;
3736 error = convert_lock(ls, lkb, &args);
3738 if (error == -EINPROGRESS || error == -EAGAIN)
3739 error = 0;
3740 out_put:
3741 dlm_put_lkb(lkb);
3742 out:
3743 unlock_recovery(ls);
3744 kfree(ua_tmp);
3745 return error;
3748 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3749 uint32_t flags, uint32_t lkid, char *lvb_in)
3751 struct dlm_lkb *lkb;
3752 struct dlm_args args;
3753 struct dlm_user_args *ua;
3754 int error;
3756 lock_recovery(ls);
3758 error = find_lkb(ls, lkid, &lkb);
3759 if (error)
3760 goto out;
3762 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3764 if (lvb_in && ua->lksb.sb_lvbptr)
3765 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3766 ua->castparam = ua_tmp->castparam;
3767 ua->user_lksb = ua_tmp->user_lksb;
3769 error = set_unlock_args(flags, ua, &args);
3770 if (error)
3771 goto out_put;
3773 error = unlock_lock(ls, lkb, &args);
3775 if (error == -DLM_EUNLOCK)
3776 error = 0;
3777 if (error)
3778 goto out_put;
3780 spin_lock(&ua->proc->locks_spin);
3781 /* dlm_user_add_ast() may have already taken lkb off the proc list */
3782 if (!list_empty(&lkb->lkb_ownqueue))
3783 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3784 spin_unlock(&ua->proc->locks_spin);
3785 out_put:
3786 dlm_put_lkb(lkb);
3787 out:
3788 unlock_recovery(ls);
3789 return error;
3792 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3793 uint32_t flags, uint32_t lkid)
3795 struct dlm_lkb *lkb;
3796 struct dlm_args args;
3797 struct dlm_user_args *ua;
3798 int error;
3800 lock_recovery(ls);
3802 error = find_lkb(ls, lkid, &lkb);
3803 if (error)
3804 goto out;
3806 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3807 ua->castparam = ua_tmp->castparam;
3808 ua->user_lksb = ua_tmp->user_lksb;
3810 error = set_unlock_args(flags, ua, &args);
3811 if (error)
3812 goto out_put;
3814 error = cancel_lock(ls, lkb, &args);
3816 if (error == -DLM_ECANCEL)
3817 error = 0;
3818 if (error)
3819 goto out_put;
3821 /* this lkb was removed from the WAITING queue */
3822 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3823 spin_lock(&ua->proc->locks_spin);
3824 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3825 spin_unlock(&ua->proc->locks_spin);
3827 out_put:
3828 dlm_put_lkb(lkb);
3829 out:
3830 unlock_recovery(ls);
3831 return error;
3834 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3836 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3838 if (ua->lksb.sb_lvbptr)
3839 kfree(ua->lksb.sb_lvbptr);
3840 kfree(ua);
3841 lkb->lkb_astparam = (long)NULL;
3843 /* TODO: propogate to master if needed */
3844 return 0;
3847 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3848 Regardless of what rsb queue the lock is on, it's removed and freed. */
3850 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3852 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3853 struct dlm_args args;
3854 int error;
3856 /* FIXME: we need to handle the case where the lkb is in limbo
3857 while the rsb is being looked up, currently we assert in
3858 _unlock_lock/is_remote because rsb nodeid is -1. */
3860 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3862 error = unlock_lock(ls, lkb, &args);
3863 if (error == -DLM_EUNLOCK)
3864 error = 0;
3865 return error;
3868 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3869 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3870 which we clear here. */
3872 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
3873 list, and no more device_writes should add lkb's to proc->locks list; so we
3874 shouldn't need to take asts_spin or locks_spin here. this assumes that
3875 device reads/writes/closes are serialized -- FIXME: we may need to serialize
3876 them ourself. */
3878 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3880 struct dlm_lkb *lkb, *safe;
3882 lock_recovery(ls);
3883 mutex_lock(&ls->ls_clear_proc_locks);
3885 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3886 list_del_init(&lkb->lkb_ownqueue);
3888 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3889 lkb->lkb_flags |= DLM_IFL_ORPHAN;
3890 orphan_proc_lock(ls, lkb);
3891 } else {
3892 lkb->lkb_flags |= DLM_IFL_DEAD;
3893 unlock_proc_lock(ls, lkb);
3896 /* this removes the reference for the proc->locks list
3897 added by dlm_user_request, it may result in the lkb
3898 being freed */
3900 dlm_put_lkb(lkb);
3903 /* in-progress unlocks */
3904 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
3905 list_del_init(&lkb->lkb_ownqueue);
3906 lkb->lkb_flags |= DLM_IFL_DEAD;
3907 dlm_put_lkb(lkb);
3910 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
3911 list_del(&lkb->lkb_astqueue);
3912 dlm_put_lkb(lkb);
3915 mutex_unlock(&ls->ls_clear_proc_locks);
3916 unlock_recovery(ls);