1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
28 * Recovery waiting routines: these functions wait for a particular reply from
29 * a remote node, or for the remote node to report a certain status. They need
30 * to abort if the lockspace is stopped indicating a node has failed (perhaps
31 * the one being waited for).
35 * Wait until given function returns non-zero or lockspace is stopped
36 * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another
37 * function thinks it could have completed the waited-on task, they should wake
38 * up ls_wait_general to get an immediate response rather than waiting for the
39 * timeout. This uses a timeout so it can check periodically if the wait
40 * should abort due to node failure (which doesn't cause a wake_up).
41 * This should only be called by the dlm_recoverd thread.
44 int dlm_wait_function(struct dlm_ls
*ls
, int (*testfn
) (struct dlm_ls
*ls
))
50 rv
= wait_event_timeout(ls
->ls_wait_general
,
51 testfn(ls
) || dlm_recovery_stopped(ls
),
52 dlm_config
.ci_recover_timer
* HZ
);
57 if (dlm_recovery_stopped(ls
)) {
58 log_debug(ls
, "dlm_wait_function aborted");
65 * An efficient way for all nodes to wait for all others to have a certain
66 * status. The node with the lowest nodeid polls all the others for their
67 * status (wait_status_all) and all the others poll the node with the low id
68 * for its accumulated result (wait_status_low). When all nodes have set
69 * status flag X, then status flag X_ALL will be set on the low nodeid.
72 uint32_t dlm_recover_status(struct dlm_ls
*ls
)
75 spin_lock(&ls
->ls_recover_lock
);
76 status
= ls
->ls_recover_status
;
77 spin_unlock(&ls
->ls_recover_lock
);
81 static void _set_recover_status(struct dlm_ls
*ls
, uint32_t status
)
83 ls
->ls_recover_status
|= status
;
86 void dlm_set_recover_status(struct dlm_ls
*ls
, uint32_t status
)
88 spin_lock(&ls
->ls_recover_lock
);
89 _set_recover_status(ls
, status
);
90 spin_unlock(&ls
->ls_recover_lock
);
93 static int wait_status_all(struct dlm_ls
*ls
, uint32_t wait_status
,
96 struct dlm_rcom
*rc
= ls
->ls_recover_buf
;
97 struct dlm_member
*memb
;
100 list_for_each_entry(memb
, &ls
->ls_nodes
, list
) {
103 if (dlm_recovery_stopped(ls
)) {
108 error
= dlm_rcom_status(ls
, memb
->nodeid
, 0);
113 dlm_slot_save(ls
, rc
, memb
);
115 if (rc
->rc_result
& wait_status
)
126 static int wait_status_low(struct dlm_ls
*ls
, uint32_t wait_status
,
127 uint32_t status_flags
)
129 struct dlm_rcom
*rc
= ls
->ls_recover_buf
;
130 int error
= 0, delay
= 0, nodeid
= ls
->ls_low_nodeid
;
133 if (dlm_recovery_stopped(ls
)) {
138 error
= dlm_rcom_status(ls
, nodeid
, status_flags
);
142 if (rc
->rc_result
& wait_status
)
152 static int wait_status(struct dlm_ls
*ls
, uint32_t status
)
154 uint32_t status_all
= status
<< 1;
157 if (ls
->ls_low_nodeid
== dlm_our_nodeid()) {
158 error
= wait_status_all(ls
, status
, 0);
160 dlm_set_recover_status(ls
, status_all
);
162 error
= wait_status_low(ls
, status_all
, 0);
167 int dlm_recover_members_wait(struct dlm_ls
*ls
)
169 struct dlm_member
*memb
;
170 struct dlm_slot
*slots
;
171 int num_slots
, slots_size
;
175 list_for_each_entry(memb
, &ls
->ls_nodes
, list
) {
177 memb
->generation
= 0;
180 if (ls
->ls_low_nodeid
== dlm_our_nodeid()) {
181 error
= wait_status_all(ls
, DLM_RS_NODES
, 1);
185 /* slots array is sparse, slots_size may be > num_slots */
187 rv
= dlm_slots_assign(ls
, &num_slots
, &slots_size
, &slots
, &gen
);
189 spin_lock(&ls
->ls_recover_lock
);
190 _set_recover_status(ls
, DLM_RS_NODES_ALL
);
191 ls
->ls_num_slots
= num_slots
;
192 ls
->ls_slots_size
= slots_size
;
193 ls
->ls_slots
= slots
;
194 ls
->ls_generation
= gen
;
195 spin_unlock(&ls
->ls_recover_lock
);
197 dlm_set_recover_status(ls
, DLM_RS_NODES_ALL
);
200 error
= wait_status_low(ls
, DLM_RS_NODES_ALL
, DLM_RSF_NEED_SLOTS
);
204 dlm_slots_copy_in(ls
);
210 int dlm_recover_directory_wait(struct dlm_ls
*ls
)
212 return wait_status(ls
, DLM_RS_DIR
);
215 int dlm_recover_locks_wait(struct dlm_ls
*ls
)
217 return wait_status(ls
, DLM_RS_LOCKS
);
220 int dlm_recover_done_wait(struct dlm_ls
*ls
)
222 return wait_status(ls
, DLM_RS_DONE
);
226 * The recover_list contains all the rsb's for which we've requested the new
227 * master nodeid. As replies are returned from the resource directories the
228 * rsb's are removed from the list. When the list is empty we're done.
230 * The recover_list is later similarly used for all rsb's for which we've sent
231 * new lkb's and need to receive new corresponding lkid's.
233 * We use the address of the rsb struct as a simple local identifier for the
234 * rsb so we can match an rcom reply with the rsb it was sent for.
237 static int recover_list_empty(struct dlm_ls
*ls
)
241 spin_lock(&ls
->ls_recover_list_lock
);
242 empty
= list_empty(&ls
->ls_recover_list
);
243 spin_unlock(&ls
->ls_recover_list_lock
);
248 static void recover_list_add(struct dlm_rsb
*r
)
250 struct dlm_ls
*ls
= r
->res_ls
;
252 spin_lock(&ls
->ls_recover_list_lock
);
253 if (list_empty(&r
->res_recover_list
)) {
254 list_add_tail(&r
->res_recover_list
, &ls
->ls_recover_list
);
255 ls
->ls_recover_list_count
++;
258 spin_unlock(&ls
->ls_recover_list_lock
);
261 static void recover_list_del(struct dlm_rsb
*r
)
263 struct dlm_ls
*ls
= r
->res_ls
;
265 spin_lock(&ls
->ls_recover_list_lock
);
266 list_del_init(&r
->res_recover_list
);
267 ls
->ls_recover_list_count
--;
268 spin_unlock(&ls
->ls_recover_list_lock
);
273 static void recover_list_clear(struct dlm_ls
*ls
)
275 struct dlm_rsb
*r
, *s
;
277 spin_lock(&ls
->ls_recover_list_lock
);
278 list_for_each_entry_safe(r
, s
, &ls
->ls_recover_list
, res_recover_list
) {
279 list_del_init(&r
->res_recover_list
);
280 r
->res_recover_locks_count
= 0;
282 ls
->ls_recover_list_count
--;
285 if (ls
->ls_recover_list_count
!= 0) {
286 log_error(ls
, "warning: recover_list_count %d",
287 ls
->ls_recover_list_count
);
288 ls
->ls_recover_list_count
= 0;
290 spin_unlock(&ls
->ls_recover_list_lock
);
293 static int recover_idr_empty(struct dlm_ls
*ls
)
297 spin_lock(&ls
->ls_recover_idr_lock
);
298 if (ls
->ls_recover_list_count
)
300 spin_unlock(&ls
->ls_recover_idr_lock
);
305 static int recover_idr_add(struct dlm_rsb
*r
)
307 struct dlm_ls
*ls
= r
->res_ls
;
310 idr_preload(GFP_NOFS
);
311 spin_lock(&ls
->ls_recover_idr_lock
);
316 rv
= idr_alloc(&ls
->ls_recover_idr
, r
, 1, 0, GFP_NOWAIT
);
321 ls
->ls_recover_list_count
++;
325 spin_unlock(&ls
->ls_recover_idr_lock
);
330 static void recover_idr_del(struct dlm_rsb
*r
)
332 struct dlm_ls
*ls
= r
->res_ls
;
334 spin_lock(&ls
->ls_recover_idr_lock
);
335 idr_remove(&ls
->ls_recover_idr
, r
->res_id
);
337 ls
->ls_recover_list_count
--;
338 spin_unlock(&ls
->ls_recover_idr_lock
);
343 static struct dlm_rsb
*recover_idr_find(struct dlm_ls
*ls
, uint64_t id
)
347 spin_lock(&ls
->ls_recover_idr_lock
);
348 r
= idr_find(&ls
->ls_recover_idr
, (int)id
);
349 spin_unlock(&ls
->ls_recover_idr_lock
);
353 static void recover_idr_clear(struct dlm_ls
*ls
)
358 spin_lock(&ls
->ls_recover_idr_lock
);
360 idr_for_each_entry(&ls
->ls_recover_idr
, r
, id
) {
361 idr_remove(&ls
->ls_recover_idr
, id
);
363 r
->res_recover_locks_count
= 0;
364 ls
->ls_recover_list_count
--;
369 if (ls
->ls_recover_list_count
!= 0) {
370 log_error(ls
, "warning: recover_list_count %d",
371 ls
->ls_recover_list_count
);
372 ls
->ls_recover_list_count
= 0;
374 spin_unlock(&ls
->ls_recover_idr_lock
);
378 /* Master recovery: find new master node for rsb's that were
379 mastered on nodes that have been removed.
383 dlm_send_rcom_lookup -> receive_rcom_lookup
385 receive_rcom_lookup_reply <-
386 dlm_recover_master_reply
393 * Set the lock master for all LKBs in a lock queue
394 * If we are the new master of the rsb, we may have received new
395 * MSTCPY locks from other nodes already which we need to ignore
396 * when setting the new nodeid.
399 static void set_lock_master(struct list_head
*queue
, int nodeid
)
403 list_for_each_entry(lkb
, queue
, lkb_statequeue
) {
404 if (!(lkb
->lkb_flags
& DLM_IFL_MSTCPY
)) {
405 lkb
->lkb_nodeid
= nodeid
;
411 static void set_master_lkbs(struct dlm_rsb
*r
)
413 set_lock_master(&r
->res_grantqueue
, r
->res_nodeid
);
414 set_lock_master(&r
->res_convertqueue
, r
->res_nodeid
);
415 set_lock_master(&r
->res_waitqueue
, r
->res_nodeid
);
419 * Propagate the new master nodeid to locks
420 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
421 * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
425 static void set_new_master(struct dlm_rsb
*r
)
428 rsb_set_flag(r
, RSB_NEW_MASTER
);
429 rsb_set_flag(r
, RSB_NEW_MASTER2
);
433 * We do async lookups on rsb's that need new masters. The rsb's
434 * waiting for a lookup reply are kept on the recover_list.
436 * Another node recovering the master may have sent us a rcom lookup,
437 * and our dlm_master_lookup() set it as the new master, along with
438 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
439 * equals our_nodeid below).
442 static int recover_master(struct dlm_rsb
*r
, unsigned int *count
)
444 struct dlm_ls
*ls
= r
->res_ls
;
445 int our_nodeid
, dir_nodeid
;
452 is_removed
= dlm_is_removed(ls
, r
->res_nodeid
);
454 if (!is_removed
&& !rsb_flag(r
, RSB_NEW_MASTER
))
457 our_nodeid
= dlm_our_nodeid();
458 dir_nodeid
= dlm_dir_nodeid(r
);
460 if (dir_nodeid
== our_nodeid
) {
462 r
->res_master_nodeid
= our_nodeid
;
466 /* set master of lkbs to ourself when is_removed, or to
467 another new master which we set along with NEW_MASTER
468 in dlm_master_lookup */
473 error
= dlm_send_rcom_lookup(r
, dir_nodeid
);
481 * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
482 * This is necessary because recovery can be started, aborted and restarted,
483 * causing the master nodeid to briefly change during the aborted recovery, and
484 * change back to the original value in the second recovery. The MSTCPY locks
485 * may or may not have been purged during the aborted recovery. Another node
486 * with an outstanding request in waiters list and a request reply saved in the
487 * requestqueue, cannot know whether it should ignore the reply and resend the
488 * request, or accept the reply and complete the request. It must do the
489 * former if the remote node purged MSTCPY locks, and it must do the later if
490 * the remote node did not. This is solved by always purging MSTCPY locks, in
491 * which case, the request reply would always be ignored and the request
495 static int recover_master_static(struct dlm_rsb
*r
, unsigned int *count
)
497 int dir_nodeid
= dlm_dir_nodeid(r
);
498 int new_master
= dir_nodeid
;
500 if (dir_nodeid
== dlm_our_nodeid())
503 dlm_purge_mstcpy_locks(r
);
504 r
->res_master_nodeid
= dir_nodeid
;
505 r
->res_nodeid
= new_master
;
512 * Go through local root resources and for each rsb which has a master which
513 * has departed, get the new master nodeid from the directory. The dir will
514 * assign mastery to the first node to look up the new master. That means
515 * we'll discover in this lookup if we're the new master of any rsb's.
517 * We fire off all the dir lookup requests individually and asynchronously to
518 * the correct dir node.
521 int dlm_recover_masters(struct dlm_ls
*ls
)
524 unsigned int total
= 0;
525 unsigned int count
= 0;
526 int nodir
= dlm_no_directory(ls
);
529 log_debug(ls
, "dlm_recover_masters");
531 down_read(&ls
->ls_root_sem
);
532 list_for_each_entry(r
, &ls
->ls_root_list
, res_root_list
) {
533 if (dlm_recovery_stopped(ls
)) {
534 up_read(&ls
->ls_root_sem
);
541 error
= recover_master_static(r
, &count
);
543 error
= recover_master(r
, &count
);
549 up_read(&ls
->ls_root_sem
);
553 up_read(&ls
->ls_root_sem
);
555 log_debug(ls
, "dlm_recover_masters %u of %u", count
, total
);
557 error
= dlm_wait_function(ls
, &recover_idr_empty
);
560 recover_idr_clear(ls
);
564 int dlm_recover_master_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc
)
567 int ret_nodeid
, new_master
;
569 r
= recover_idr_find(ls
, rc
->rc_id
);
571 log_error(ls
, "dlm_recover_master_reply no id %llx",
572 (unsigned long long)rc
->rc_id
);
576 ret_nodeid
= rc
->rc_result
;
578 if (ret_nodeid
== dlm_our_nodeid())
581 new_master
= ret_nodeid
;
584 r
->res_master_nodeid
= ret_nodeid
;
585 r
->res_nodeid
= new_master
;
590 if (recover_idr_empty(ls
))
591 wake_up(&ls
->ls_wait_general
);
597 /* Lock recovery: rebuild the process-copy locks we hold on a
598 remastered rsb on the new rsb master.
603 dlm_send_rcom_lock -> receive_rcom_lock
604 dlm_recover_master_copy
605 receive_rcom_lock_reply <-
606 dlm_recover_process_copy
611 * keep a count of the number of lkb's we send to the new master; when we get
612 * an equal number of replies then recovery for the rsb is done
615 static int recover_locks_queue(struct dlm_rsb
*r
, struct list_head
*head
)
620 list_for_each_entry(lkb
, head
, lkb_statequeue
) {
621 error
= dlm_send_rcom_lock(r
, lkb
);
624 r
->res_recover_locks_count
++;
630 static int recover_locks(struct dlm_rsb
*r
)
636 DLM_ASSERT(!r
->res_recover_locks_count
, dlm_dump_rsb(r
););
638 error
= recover_locks_queue(r
, &r
->res_grantqueue
);
641 error
= recover_locks_queue(r
, &r
->res_convertqueue
);
644 error
= recover_locks_queue(r
, &r
->res_waitqueue
);
648 if (r
->res_recover_locks_count
)
651 rsb_clear_flag(r
, RSB_NEW_MASTER
);
657 int dlm_recover_locks(struct dlm_ls
*ls
)
660 int error
, count
= 0;
662 down_read(&ls
->ls_root_sem
);
663 list_for_each_entry(r
, &ls
->ls_root_list
, res_root_list
) {
665 rsb_clear_flag(r
, RSB_NEW_MASTER
);
669 if (!rsb_flag(r
, RSB_NEW_MASTER
))
672 if (dlm_recovery_stopped(ls
)) {
674 up_read(&ls
->ls_root_sem
);
678 error
= recover_locks(r
);
680 up_read(&ls
->ls_root_sem
);
684 count
+= r
->res_recover_locks_count
;
686 up_read(&ls
->ls_root_sem
);
688 log_debug(ls
, "dlm_recover_locks %d out", count
);
690 error
= dlm_wait_function(ls
, &recover_list_empty
);
693 recover_list_clear(ls
);
697 void dlm_recovered_lock(struct dlm_rsb
*r
)
699 DLM_ASSERT(rsb_flag(r
, RSB_NEW_MASTER
), dlm_dump_rsb(r
););
701 r
->res_recover_locks_count
--;
702 if (!r
->res_recover_locks_count
) {
703 rsb_clear_flag(r
, RSB_NEW_MASTER
);
707 if (recover_list_empty(r
->res_ls
))
708 wake_up(&r
->res_ls
->ls_wait_general
);
712 * The lvb needs to be recovered on all master rsb's. This includes setting
713 * the VALNOTVALID flag if necessary, and determining the correct lvb contents
714 * based on the lvb's of the locks held on the rsb.
716 * RSB_VALNOTVALID is set in two cases:
718 * 1. we are master, but not new, and we purged an EX/PW lock held by a
719 * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL)
721 * 2. we are a new master, and there are only NL/CR locks left.
722 * (We could probably improve this by only invaliding in this way when
723 * the previous master left uncleanly. VMS docs mention that.)
725 * The LVB contents are only considered for changing when this is a new master
726 * of the rsb (NEW_MASTER2). Then, the rsb's lvb is taken from any lkb with
727 * mode > CR. If no lkb's exist with mode above CR, the lvb contents are taken
728 * from the lkb with the largest lvb sequence number.
731 static void recover_lvb(struct dlm_rsb
*r
)
733 struct dlm_lkb
*lkb
, *high_lkb
= NULL
;
734 uint32_t high_seq
= 0;
735 int lock_lvb_exists
= 0;
736 int big_lock_exists
= 0;
737 int lvblen
= r
->res_ls
->ls_lvblen
;
739 if (!rsb_flag(r
, RSB_NEW_MASTER2
) &&
740 rsb_flag(r
, RSB_RECOVER_LVB_INVAL
)) {
742 rsb_set_flag(r
, RSB_VALNOTVALID
);
746 if (!rsb_flag(r
, RSB_NEW_MASTER2
))
749 /* we are the new master, so figure out if VALNOTVALID should
750 be set, and set the rsb lvb from the best lkb available. */
752 list_for_each_entry(lkb
, &r
->res_grantqueue
, lkb_statequeue
) {
753 if (!(lkb
->lkb_exflags
& DLM_LKF_VALBLK
))
758 if (lkb
->lkb_grmode
> DLM_LOCK_CR
) {
763 if (((int)lkb
->lkb_lvbseq
- (int)high_seq
) >= 0) {
765 high_seq
= lkb
->lkb_lvbseq
;
769 list_for_each_entry(lkb
, &r
->res_convertqueue
, lkb_statequeue
) {
770 if (!(lkb
->lkb_exflags
& DLM_LKF_VALBLK
))
775 if (lkb
->lkb_grmode
> DLM_LOCK_CR
) {
780 if (((int)lkb
->lkb_lvbseq
- (int)high_seq
) >= 0) {
782 high_seq
= lkb
->lkb_lvbseq
;
787 if (!lock_lvb_exists
)
790 /* lvb is invalidated if only NL/CR locks remain */
791 if (!big_lock_exists
)
792 rsb_set_flag(r
, RSB_VALNOTVALID
);
794 if (!r
->res_lvbptr
) {
795 r
->res_lvbptr
= dlm_allocate_lvb(r
->res_ls
);
800 if (big_lock_exists
) {
801 r
->res_lvbseq
= lkb
->lkb_lvbseq
;
802 memcpy(r
->res_lvbptr
, lkb
->lkb_lvbptr
, lvblen
);
803 } else if (high_lkb
) {
804 r
->res_lvbseq
= high_lkb
->lkb_lvbseq
;
805 memcpy(r
->res_lvbptr
, high_lkb
->lkb_lvbptr
, lvblen
);
808 memset(r
->res_lvbptr
, 0, lvblen
);
814 /* All master rsb's flagged RECOVER_CONVERT need to be looked at. The locks
815 converting PR->CW or CW->PR need to have their lkb_grmode set. */
817 static void recover_conversion(struct dlm_rsb
*r
)
819 struct dlm_ls
*ls
= r
->res_ls
;
823 list_for_each_entry(lkb
, &r
->res_grantqueue
, lkb_statequeue
) {
824 if (lkb
->lkb_grmode
== DLM_LOCK_PR
||
825 lkb
->lkb_grmode
== DLM_LOCK_CW
) {
826 grmode
= lkb
->lkb_grmode
;
831 list_for_each_entry(lkb
, &r
->res_convertqueue
, lkb_statequeue
) {
832 if (lkb
->lkb_grmode
!= DLM_LOCK_IV
)
835 log_debug(ls
, "recover_conversion %x set gr to rq %d",
836 lkb
->lkb_id
, lkb
->lkb_rqmode
);
837 lkb
->lkb_grmode
= lkb
->lkb_rqmode
;
839 log_debug(ls
, "recover_conversion %x set gr %d",
840 lkb
->lkb_id
, grmode
);
841 lkb
->lkb_grmode
= grmode
;
846 /* We've become the new master for this rsb and waiting/converting locks may
847 need to be granted in dlm_recover_grant() due to locks that may have
848 existed from a removed node. */
850 static void recover_grant(struct dlm_rsb
*r
)
852 if (!list_empty(&r
->res_waitqueue
) || !list_empty(&r
->res_convertqueue
))
853 rsb_set_flag(r
, RSB_RECOVER_GRANT
);
856 void dlm_recover_rsbs(struct dlm_ls
*ls
)
859 unsigned int count
= 0;
861 down_read(&ls
->ls_root_sem
);
862 list_for_each_entry(r
, &ls
->ls_root_list
, res_root_list
) {
865 if (rsb_flag(r
, RSB_RECOVER_CONVERT
))
866 recover_conversion(r
);
868 /* recover lvb before granting locks so the updated
869 lvb/VALNOTVALID is presented in the completion */
872 if (rsb_flag(r
, RSB_NEW_MASTER2
))
876 rsb_clear_flag(r
, RSB_VALNOTVALID
);
878 rsb_clear_flag(r
, RSB_RECOVER_CONVERT
);
879 rsb_clear_flag(r
, RSB_RECOVER_LVB_INVAL
);
880 rsb_clear_flag(r
, RSB_NEW_MASTER2
);
883 up_read(&ls
->ls_root_sem
);
886 log_debug(ls
, "dlm_recover_rsbs %d done", count
);
889 /* Create a single list of all root rsb's to be used during recovery */
891 int dlm_create_root_list(struct dlm_ls
*ls
)
897 down_write(&ls
->ls_root_sem
);
898 if (!list_empty(&ls
->ls_root_list
)) {
899 log_error(ls
, "root list not empty");
904 for (i
= 0; i
< ls
->ls_rsbtbl_size
; i
++) {
905 spin_lock(&ls
->ls_rsbtbl
[i
].lock
);
906 for (n
= rb_first(&ls
->ls_rsbtbl
[i
].keep
); n
; n
= rb_next(n
)) {
907 r
= rb_entry(n
, struct dlm_rsb
, res_hashnode
);
908 list_add(&r
->res_root_list
, &ls
->ls_root_list
);
912 if (!RB_EMPTY_ROOT(&ls
->ls_rsbtbl
[i
].toss
))
913 log_error(ls
, "dlm_create_root_list toss not empty");
914 spin_unlock(&ls
->ls_rsbtbl
[i
].lock
);
917 up_write(&ls
->ls_root_sem
);
921 void dlm_release_root_list(struct dlm_ls
*ls
)
923 struct dlm_rsb
*r
, *safe
;
925 down_write(&ls
->ls_root_sem
);
926 list_for_each_entry_safe(r
, safe
, &ls
->ls_root_list
, res_root_list
) {
927 list_del_init(&r
->res_root_list
);
930 up_write(&ls
->ls_root_sem
);
933 void dlm_clear_toss(struct dlm_ls
*ls
)
935 struct rb_node
*n
, *next
;
937 unsigned int count
= 0;
940 for (i
= 0; i
< ls
->ls_rsbtbl_size
; i
++) {
941 spin_lock(&ls
->ls_rsbtbl
[i
].lock
);
942 for (n
= rb_first(&ls
->ls_rsbtbl
[i
].toss
); n
; n
= next
) {
944 r
= rb_entry(n
, struct dlm_rsb
, res_hashnode
);
945 rb_erase(n
, &ls
->ls_rsbtbl
[i
].toss
);
949 spin_unlock(&ls
->ls_rsbtbl
[i
].lock
);
953 log_debug(ls
, "dlm_clear_toss %u done", count
);