1 /* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
6 * standalone DLM module
8 * Copyright (C) 2004 Oracle. All rights reserved.
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
28 #include <linux/module.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
49 #include "dlmcommon.h"
51 #include "dlmdomain.h"
53 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
54 #include "cluster/masklog.h"
65 u8 name
[DLM_LOCKID_NAME_MAX
];
68 struct dlm_master_list_entry
70 struct list_head list
;
71 struct list_head hb_events
;
77 unsigned long maybe_map
[BITS_TO_LONGS(O2NM_MAX_NODES
)];
78 unsigned long vote_map
[BITS_TO_LONGS(O2NM_MAX_NODES
)];
79 unsigned long response_map
[BITS_TO_LONGS(O2NM_MAX_NODES
)];
80 unsigned long node_map
[BITS_TO_LONGS(O2NM_MAX_NODES
)];
83 enum dlm_mle_type type
;
84 struct o2hb_callback_func mle_hb_up
;
85 struct o2hb_callback_func mle_hb_down
;
87 struct dlm_lock_resource
*res
;
88 struct dlm_lock_name name
;
92 static void dlm_mle_node_down(struct dlm_ctxt
*dlm
,
93 struct dlm_master_list_entry
*mle
,
94 struct o2nm_node
*node
,
96 static void dlm_mle_node_up(struct dlm_ctxt
*dlm
,
97 struct dlm_master_list_entry
*mle
,
98 struct o2nm_node
*node
,
101 static void dlm_assert_master_worker(struct dlm_work_item
*item
, void *data
);
102 static int dlm_do_assert_master(struct dlm_ctxt
*dlm
, const char *lockname
,
103 unsigned int namelen
, void *nodemap
,
106 static inline int dlm_mle_equal(struct dlm_ctxt
*dlm
,
107 struct dlm_master_list_entry
*mle
,
109 unsigned int namelen
)
111 struct dlm_lock_resource
*res
;
116 if (mle
->type
== DLM_MLE_BLOCK
||
117 mle
->type
== DLM_MLE_MIGRATION
) {
118 if (namelen
!= mle
->u
.name
.len
||
119 memcmp(name
, mle
->u
.name
.name
, namelen
)!=0)
123 if (namelen
!= res
->lockname
.len
||
124 memcmp(res
->lockname
.name
, name
, namelen
) != 0)
131 /* Code here is included but defined out as it aids debugging */
133 void dlm_print_one_mle(struct dlm_master_list_entry
*mle
)
139 unsigned int namelen
;
144 if (mle
->type
== DLM_MLE_BLOCK
)
146 else if (mle
->type
== DLM_MLE_MASTER
)
150 refs
= atomic_read(&k
->refcount
);
151 master
= mle
->master
;
152 attached
= (list_empty(&mle
->hb_events
) ? 'N' : 'Y');
154 if (mle
->type
!= DLM_MLE_MASTER
) {
155 namelen
= mle
->u
.name
.len
;
156 name
= mle
->u
.name
.name
;
158 namelen
= mle
->u
.res
->lockname
.len
;
159 name
= mle
->u
.res
->lockname
.name
;
162 mlog(ML_NOTICE
, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n",
163 i
, type
, refs
, master
, mle
->new_master
, attached
,
164 namelen
, namelen
, name
);
167 static void dlm_dump_mles(struct dlm_ctxt
*dlm
)
169 struct dlm_master_list_entry
*mle
;
170 struct list_head
*iter
;
172 mlog(ML_NOTICE
, "dumping all mles for domain %s:\n", dlm
->name
);
173 mlog(ML_NOTICE
, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
174 spin_lock(&dlm
->master_lock
);
175 list_for_each(iter
, &dlm
->master_list
) {
176 mle
= list_entry(iter
, struct dlm_master_list_entry
, list
);
177 dlm_print_one_mle(mle
);
179 spin_unlock(&dlm
->master_lock
);
182 int dlm_dump_all_mles(const char __user
*data
, unsigned int len
)
184 struct list_head
*iter
;
185 struct dlm_ctxt
*dlm
;
187 spin_lock(&dlm_domain_lock
);
188 list_for_each(iter
, &dlm_domains
) {
189 dlm
= list_entry (iter
, struct dlm_ctxt
, list
);
190 mlog(ML_NOTICE
, "found dlm: %p, name=%s\n", dlm
, dlm
->name
);
193 spin_unlock(&dlm_domain_lock
);
196 EXPORT_SYMBOL_GPL(dlm_dump_all_mles
);
201 static kmem_cache_t
*dlm_mle_cache
= NULL
;
204 static void dlm_mle_release(struct kref
*kref
);
205 static void dlm_init_mle(struct dlm_master_list_entry
*mle
,
206 enum dlm_mle_type type
,
207 struct dlm_ctxt
*dlm
,
208 struct dlm_lock_resource
*res
,
210 unsigned int namelen
);
211 static void dlm_put_mle(struct dlm_master_list_entry
*mle
);
212 static void __dlm_put_mle(struct dlm_master_list_entry
*mle
);
213 static int dlm_find_mle(struct dlm_ctxt
*dlm
,
214 struct dlm_master_list_entry
**mle
,
215 char *name
, unsigned int namelen
);
217 static int dlm_do_master_request(struct dlm_master_list_entry
*mle
, int to
);
220 static int dlm_wait_for_lock_mastery(struct dlm_ctxt
*dlm
,
221 struct dlm_lock_resource
*res
,
222 struct dlm_master_list_entry
*mle
,
224 static int dlm_restart_lock_mastery(struct dlm_ctxt
*dlm
,
225 struct dlm_lock_resource
*res
,
226 struct dlm_master_list_entry
*mle
,
228 static int dlm_add_migration_mle(struct dlm_ctxt
*dlm
,
229 struct dlm_lock_resource
*res
,
230 struct dlm_master_list_entry
*mle
,
231 struct dlm_master_list_entry
**oldmle
,
232 const char *name
, unsigned int namelen
,
233 u8 new_master
, u8 master
);
235 static u8
dlm_pick_migration_target(struct dlm_ctxt
*dlm
,
236 struct dlm_lock_resource
*res
);
237 static void dlm_remove_nonlocal_locks(struct dlm_ctxt
*dlm
,
238 struct dlm_lock_resource
*res
);
239 static int dlm_mark_lockres_migrating(struct dlm_ctxt
*dlm
,
240 struct dlm_lock_resource
*res
,
242 static int dlm_pre_master_reco_lockres(struct dlm_ctxt
*dlm
,
243 struct dlm_lock_resource
*res
);
246 int dlm_is_host_down(int errno
)
263 case -EINVAL
: /* if returned from our tcp code,
264 this means there is no socket */
272 * MASTER LIST FUNCTIONS
277 * regarding master list entries and heartbeat callbacks:
279 * in order to avoid sleeping and allocation that occurs in
280 * heartbeat, master list entries are simply attached to the
281 * dlm's established heartbeat callbacks. the mle is attached
282 * when it is created, and since the dlm->spinlock is held at
283 * that time, any heartbeat event will be properly discovered
284 * by the mle. the mle needs to be detached from the
285 * dlm->mle_hb_events list as soon as heartbeat events are no
286 * longer useful to the mle, and before the mle is freed.
288 * as a general rule, heartbeat events are no longer needed by
289 * the mle once an "answer" regarding the lock master has been
292 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt
*dlm
,
293 struct dlm_master_list_entry
*mle
)
295 assert_spin_locked(&dlm
->spinlock
);
297 list_add_tail(&mle
->hb_events
, &dlm
->mle_hb_events
);
301 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt
*dlm
,
302 struct dlm_master_list_entry
*mle
)
304 if (!list_empty(&mle
->hb_events
))
305 list_del_init(&mle
->hb_events
);
309 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt
*dlm
,
310 struct dlm_master_list_entry
*mle
)
312 spin_lock(&dlm
->spinlock
);
313 __dlm_mle_detach_hb_events(dlm
, mle
);
314 spin_unlock(&dlm
->spinlock
);
317 /* remove from list and free */
318 static void __dlm_put_mle(struct dlm_master_list_entry
*mle
)
320 struct dlm_ctxt
*dlm
;
323 assert_spin_locked(&dlm
->spinlock
);
324 assert_spin_locked(&dlm
->master_lock
);
325 BUG_ON(!atomic_read(&mle
->mle_refs
.refcount
));
327 kref_put(&mle
->mle_refs
, dlm_mle_release
);
331 /* must not have any spinlocks coming in */
332 static void dlm_put_mle(struct dlm_master_list_entry
*mle
)
334 struct dlm_ctxt
*dlm
;
337 spin_lock(&dlm
->spinlock
);
338 spin_lock(&dlm
->master_lock
);
340 spin_unlock(&dlm
->master_lock
);
341 spin_unlock(&dlm
->spinlock
);
344 static inline void dlm_get_mle(struct dlm_master_list_entry
*mle
)
346 kref_get(&mle
->mle_refs
);
349 static void dlm_init_mle(struct dlm_master_list_entry
*mle
,
350 enum dlm_mle_type type
,
351 struct dlm_ctxt
*dlm
,
352 struct dlm_lock_resource
*res
,
354 unsigned int namelen
)
356 assert_spin_locked(&dlm
->spinlock
);
360 INIT_LIST_HEAD(&mle
->list
);
361 INIT_LIST_HEAD(&mle
->hb_events
);
362 memset(mle
->maybe_map
, 0, sizeof(mle
->maybe_map
));
363 spin_lock_init(&mle
->spinlock
);
364 init_waitqueue_head(&mle
->wq
);
365 atomic_set(&mle
->woken
, 0);
366 kref_init(&mle
->mle_refs
);
367 memset(mle
->response_map
, 0, sizeof(mle
->response_map
));
368 mle
->master
= O2NM_MAX_NODES
;
369 mle
->new_master
= O2NM_MAX_NODES
;
371 if (mle
->type
== DLM_MLE_MASTER
) {
374 } else if (mle
->type
== DLM_MLE_BLOCK
) {
376 memcpy(mle
->u
.name
.name
, name
, namelen
);
377 mle
->u
.name
.len
= namelen
;
378 } else /* DLM_MLE_MIGRATION */ {
380 memcpy(mle
->u
.name
.name
, name
, namelen
);
381 mle
->u
.name
.len
= namelen
;
384 /* copy off the node_map and register hb callbacks on our copy */
385 memcpy(mle
->node_map
, dlm
->domain_map
, sizeof(mle
->node_map
));
386 memcpy(mle
->vote_map
, dlm
->domain_map
, sizeof(mle
->vote_map
));
387 clear_bit(dlm
->node_num
, mle
->vote_map
);
388 clear_bit(dlm
->node_num
, mle
->node_map
);
390 /* attach the mle to the domain node up/down events */
391 __dlm_mle_attach_hb_events(dlm
, mle
);
395 /* returns 1 if found, 0 if not */
396 static int dlm_find_mle(struct dlm_ctxt
*dlm
,
397 struct dlm_master_list_entry
**mle
,
398 char *name
, unsigned int namelen
)
400 struct dlm_master_list_entry
*tmpmle
;
401 struct list_head
*iter
;
403 assert_spin_locked(&dlm
->master_lock
);
405 list_for_each(iter
, &dlm
->master_list
) {
406 tmpmle
= list_entry(iter
, struct dlm_master_list_entry
, list
);
407 if (!dlm_mle_equal(dlm
, tmpmle
, name
, namelen
))
416 void dlm_hb_event_notify_attached(struct dlm_ctxt
*dlm
, int idx
, int node_up
)
418 struct dlm_master_list_entry
*mle
;
419 struct list_head
*iter
;
421 assert_spin_locked(&dlm
->spinlock
);
423 list_for_each(iter
, &dlm
->mle_hb_events
) {
424 mle
= list_entry(iter
, struct dlm_master_list_entry
,
427 dlm_mle_node_up(dlm
, mle
, NULL
, idx
);
429 dlm_mle_node_down(dlm
, mle
, NULL
, idx
);
433 static void dlm_mle_node_down(struct dlm_ctxt
*dlm
,
434 struct dlm_master_list_entry
*mle
,
435 struct o2nm_node
*node
, int idx
)
437 spin_lock(&mle
->spinlock
);
439 if (!test_bit(idx
, mle
->node_map
))
440 mlog(0, "node %u already removed from nodemap!\n", idx
);
442 clear_bit(idx
, mle
->node_map
);
444 spin_unlock(&mle
->spinlock
);
447 static void dlm_mle_node_up(struct dlm_ctxt
*dlm
,
448 struct dlm_master_list_entry
*mle
,
449 struct o2nm_node
*node
, int idx
)
451 spin_lock(&mle
->spinlock
);
453 if (test_bit(idx
, mle
->node_map
))
454 mlog(0, "node %u already in node map!\n", idx
);
456 set_bit(idx
, mle
->node_map
);
458 spin_unlock(&mle
->spinlock
);
462 int dlm_init_mle_cache(void)
464 dlm_mle_cache
= kmem_cache_create("dlm_mle_cache",
465 sizeof(struct dlm_master_list_entry
),
466 0, SLAB_HWCACHE_ALIGN
,
468 if (dlm_mle_cache
== NULL
)
473 void dlm_destroy_mle_cache(void)
476 kmem_cache_destroy(dlm_mle_cache
);
479 static void dlm_mle_release(struct kref
*kref
)
481 struct dlm_master_list_entry
*mle
;
482 struct dlm_ctxt
*dlm
;
486 mle
= container_of(kref
, struct dlm_master_list_entry
, mle_refs
);
489 if (mle
->type
!= DLM_MLE_MASTER
) {
490 mlog(0, "calling mle_release for %.*s, type %d\n",
491 mle
->u
.name
.len
, mle
->u
.name
.name
, mle
->type
);
493 mlog(0, "calling mle_release for %.*s, type %d\n",
494 mle
->u
.res
->lockname
.len
,
495 mle
->u
.res
->lockname
.name
, mle
->type
);
497 assert_spin_locked(&dlm
->spinlock
);
498 assert_spin_locked(&dlm
->master_lock
);
500 /* remove from list if not already */
501 if (!list_empty(&mle
->list
))
502 list_del_init(&mle
->list
);
504 /* detach the mle from the domain node up/down events */
505 __dlm_mle_detach_hb_events(dlm
, mle
);
507 /* NOTE: kfree under spinlock here.
508 * if this is bad, we can move this to a freelist. */
509 kmem_cache_free(dlm_mle_cache
, mle
);
514 * LOCK RESOURCE FUNCTIONS
517 static void dlm_set_lockres_owner(struct dlm_ctxt
*dlm
,
518 struct dlm_lock_resource
*res
,
521 assert_spin_locked(&res
->spinlock
);
523 mlog_entry("%.*s, %u\n", res
->lockname
.len
, res
->lockname
.name
, owner
);
525 if (owner
== dlm
->node_num
)
526 atomic_inc(&dlm
->local_resources
);
527 else if (owner
== DLM_LOCK_RES_OWNER_UNKNOWN
)
528 atomic_inc(&dlm
->unknown_resources
);
530 atomic_inc(&dlm
->remote_resources
);
535 void dlm_change_lockres_owner(struct dlm_ctxt
*dlm
,
536 struct dlm_lock_resource
*res
, u8 owner
)
538 assert_spin_locked(&res
->spinlock
);
540 if (owner
== res
->owner
)
543 if (res
->owner
== dlm
->node_num
)
544 atomic_dec(&dlm
->local_resources
);
545 else if (res
->owner
== DLM_LOCK_RES_OWNER_UNKNOWN
)
546 atomic_dec(&dlm
->unknown_resources
);
548 atomic_dec(&dlm
->remote_resources
);
550 dlm_set_lockres_owner(dlm
, res
, owner
);
554 static void dlm_lockres_release(struct kref
*kref
)
556 struct dlm_lock_resource
*res
;
558 res
= container_of(kref
, struct dlm_lock_resource
, refs
);
560 /* This should not happen -- all lockres' have a name
561 * associated with them at init time. */
562 BUG_ON(!res
->lockname
.name
);
564 mlog(0, "destroying lockres %.*s\n", res
->lockname
.len
,
567 /* By the time we're ready to blow this guy away, we shouldn't
568 * be on any lists. */
569 BUG_ON(!hlist_unhashed(&res
->hash_node
));
570 BUG_ON(!list_empty(&res
->granted
));
571 BUG_ON(!list_empty(&res
->converting
));
572 BUG_ON(!list_empty(&res
->blocked
));
573 BUG_ON(!list_empty(&res
->dirty
));
574 BUG_ON(!list_empty(&res
->recovering
));
575 BUG_ON(!list_empty(&res
->purge
));
577 kfree(res
->lockname
.name
);
582 void dlm_lockres_get(struct dlm_lock_resource
*res
)
584 kref_get(&res
->refs
);
587 void dlm_lockres_put(struct dlm_lock_resource
*res
)
589 kref_put(&res
->refs
, dlm_lockres_release
);
592 static void dlm_init_lockres(struct dlm_ctxt
*dlm
,
593 struct dlm_lock_resource
*res
,
594 const char *name
, unsigned int namelen
)
598 /* If we memset here, we lose our reference to the kmalloc'd
599 * res->lockname.name, so be sure to init every field
602 qname
= (char *) res
->lockname
.name
;
603 memcpy(qname
, name
, namelen
);
605 res
->lockname
.len
= namelen
;
606 res
->lockname
.hash
= dlm_lockid_hash(name
, namelen
);
608 init_waitqueue_head(&res
->wq
);
609 spin_lock_init(&res
->spinlock
);
610 INIT_HLIST_NODE(&res
->hash_node
);
611 INIT_LIST_HEAD(&res
->granted
);
612 INIT_LIST_HEAD(&res
->converting
);
613 INIT_LIST_HEAD(&res
->blocked
);
614 INIT_LIST_HEAD(&res
->dirty
);
615 INIT_LIST_HEAD(&res
->recovering
);
616 INIT_LIST_HEAD(&res
->purge
);
617 atomic_set(&res
->asts_reserved
, 0);
618 res
->migration_pending
= 0;
620 kref_init(&res
->refs
);
622 /* just for consistency */
623 spin_lock(&res
->spinlock
);
624 dlm_set_lockres_owner(dlm
, res
, DLM_LOCK_RES_OWNER_UNKNOWN
);
625 spin_unlock(&res
->spinlock
);
627 res
->state
= DLM_LOCK_RES_IN_PROGRESS
;
631 memset(res
->lvb
, 0, DLM_LVB_LEN
);
634 struct dlm_lock_resource
*dlm_new_lockres(struct dlm_ctxt
*dlm
,
636 unsigned int namelen
)
638 struct dlm_lock_resource
*res
;
640 res
= kmalloc(sizeof(struct dlm_lock_resource
), GFP_KERNEL
);
644 res
->lockname
.name
= kmalloc(namelen
, GFP_KERNEL
);
645 if (!res
->lockname
.name
) {
650 dlm_init_lockres(dlm
, res
, name
, namelen
);
655 * lookup a lock resource by name.
656 * may already exist in the hashtable.
657 * lockid is null terminated
659 * if not, allocate enough for the lockres and for
660 * the temporary structure used in doing the mastering.
662 * also, do a lookup in the dlm->master_list to see
663 * if another node has begun mastering the same lock.
664 * if so, there should be a block entry in there
665 * for this name, and we should *not* attempt to master
666 * the lock here. need to wait around for that node
667 * to assert_master (or die).
670 struct dlm_lock_resource
* dlm_get_lock_resource(struct dlm_ctxt
*dlm
,
674 struct dlm_lock_resource
*tmpres
=NULL
, *res
=NULL
;
675 struct dlm_master_list_entry
*mle
= NULL
;
676 struct dlm_master_list_entry
*alloc_mle
= NULL
;
679 struct dlm_node_iter iter
;
680 unsigned int namelen
, hash
;
682 int bit
, wait_on_recovery
= 0;
686 namelen
= strlen(lockid
);
687 hash
= dlm_lockid_hash(lockid
, namelen
);
689 mlog(0, "get lockres %s (len %d)\n", lockid
, namelen
);
692 spin_lock(&dlm
->spinlock
);
693 tmpres
= __dlm_lookup_lockres(dlm
, lockid
, namelen
, hash
);
695 spin_unlock(&dlm
->spinlock
);
696 mlog(0, "found in hash!\n");
698 dlm_lockres_put(res
);
704 spin_unlock(&dlm
->spinlock
);
705 mlog(0, "allocating a new resource\n");
706 /* nothing found and we need to allocate one. */
707 alloc_mle
= (struct dlm_master_list_entry
*)
708 kmem_cache_alloc(dlm_mle_cache
, GFP_KERNEL
);
711 res
= dlm_new_lockres(dlm
, lockid
, namelen
);
717 mlog(0, "no lockres found, allocated our own: %p\n", res
);
719 if (flags
& LKM_LOCAL
) {
720 /* caller knows it's safe to assume it's not mastered elsewhere
721 * DONE! return right away */
722 spin_lock(&res
->spinlock
);
723 dlm_change_lockres_owner(dlm
, res
, dlm
->node_num
);
724 __dlm_insert_lockres(dlm
, res
);
725 spin_unlock(&res
->spinlock
);
726 spin_unlock(&dlm
->spinlock
);
727 /* lockres still marked IN_PROGRESS */
731 /* check master list to see if another node has started mastering it */
732 spin_lock(&dlm
->master_lock
);
734 /* if we found a block, wait for lock to be mastered by another node */
735 blocked
= dlm_find_mle(dlm
, &mle
, (char *)lockid
, namelen
);
737 if (mle
->type
== DLM_MLE_MASTER
) {
738 mlog(ML_ERROR
, "master entry for nonexistent lock!\n");
740 } else if (mle
->type
== DLM_MLE_MIGRATION
) {
741 /* migration is in progress! */
742 /* the good news is that we now know the
743 * "current" master (mle->master). */
745 spin_unlock(&dlm
->master_lock
);
746 assert_spin_locked(&dlm
->spinlock
);
748 /* set the lockres owner and hash it */
749 spin_lock(&res
->spinlock
);
750 dlm_set_lockres_owner(dlm
, res
, mle
->master
);
751 __dlm_insert_lockres(dlm
, res
);
752 spin_unlock(&res
->spinlock
);
753 spin_unlock(&dlm
->spinlock
);
755 /* master is known, detach */
756 dlm_mle_detach_hb_events(dlm
, mle
);
762 /* go ahead and try to master lock on this node */
764 /* make sure this does not get freed below */
766 dlm_init_mle(mle
, DLM_MLE_MASTER
, dlm
, res
, NULL
, 0);
767 set_bit(dlm
->node_num
, mle
->maybe_map
);
768 list_add(&mle
->list
, &dlm
->master_list
);
770 /* still holding the dlm spinlock, check the recovery map
771 * to see if there are any nodes that still need to be
772 * considered. these will not appear in the mle nodemap
773 * but they might own this lockres. wait on them. */
774 bit
= find_next_bit(dlm
->recovery_map
, O2NM_MAX_NODES
, 0);
775 if (bit
< O2NM_MAX_NODES
) {
776 mlog(ML_NOTICE
, "%s:%.*s: at least one node (%d) to"
777 "recover before lock mastery can begin\n",
778 dlm
->name
, namelen
, (char *)lockid
, bit
);
779 wait_on_recovery
= 1;
783 /* at this point there is either a DLM_MLE_BLOCK or a
784 * DLM_MLE_MASTER on the master list, so it's safe to add the
785 * lockres to the hashtable. anyone who finds the lock will
786 * still have to wait on the IN_PROGRESS. */
788 /* finally add the lockres to its hash bucket */
789 __dlm_insert_lockres(dlm
, res
);
790 /* get an extra ref on the mle in case this is a BLOCK
791 * if so, the creator of the BLOCK may try to put the last
792 * ref at this time in the assert master handler, so we
793 * need an extra one to keep from a bad ptr deref. */
795 spin_unlock(&dlm
->master_lock
);
796 spin_unlock(&dlm
->spinlock
);
798 while (wait_on_recovery
) {
799 /* any cluster changes that occurred after dropping the
800 * dlm spinlock would be detectable be a change on the mle,
801 * so we only need to clear out the recovery map once. */
802 if (dlm_is_recovery_lock(lockid
, namelen
)) {
803 mlog(ML_NOTICE
, "%s: recovery map is not empty, but "
804 "must master $RECOVERY lock now\n", dlm
->name
);
805 if (!dlm_pre_master_reco_lockres(dlm
, res
))
806 wait_on_recovery
= 0;
808 mlog(0, "%s: waiting 500ms for heartbeat state "
809 "change\n", dlm
->name
);
815 dlm_kick_recovery_thread(dlm
);
817 dlm_wait_for_recovery(dlm
);
819 spin_lock(&dlm
->spinlock
);
820 bit
= find_next_bit(dlm
->recovery_map
, O2NM_MAX_NODES
, 0);
821 if (bit
< O2NM_MAX_NODES
) {
822 mlog(ML_NOTICE
, "%s:%.*s: at least one node (%d) to"
823 "recover before lock mastery can begin\n",
824 dlm
->name
, namelen
, (char *)lockid
, bit
);
825 wait_on_recovery
= 1;
827 wait_on_recovery
= 0;
828 spin_unlock(&dlm
->spinlock
);
831 /* must wait for lock to be mastered elsewhere */
837 dlm_node_iter_init(mle
->vote_map
, &iter
);
838 while ((nodenum
= dlm_node_iter_next(&iter
)) >= 0) {
839 ret
= dlm_do_master_request(mle
, nodenum
);
842 if (mle
->master
!= O2NM_MAX_NODES
) {
843 /* found a master ! */
844 if (mle
->master
<= nodenum
)
846 /* if our master request has not reached the master
847 * yet, keep going until it does. this is how the
848 * master will know that asserts are needed back to
849 * the lower nodes. */
850 mlog(0, "%s:%.*s: requests only up to %u but master "
851 "is %u, keep going\n", dlm
->name
, namelen
,
852 lockid
, nodenum
, mle
->master
);
857 /* keep going until the response map includes all nodes */
858 ret
= dlm_wait_for_lock_mastery(dlm
, res
, mle
, &blocked
);
860 mlog(0, "%s:%.*s: node map changed, redo the "
861 "master request now, blocked=%d\n",
862 dlm
->name
, res
->lockname
.len
,
863 res
->lockname
.name
, blocked
);
865 mlog(ML_ERROR
, "%s:%.*s: spinning on "
866 "dlm_wait_for_lock_mastery, blocked=%d\n",
867 dlm
->name
, res
->lockname
.len
,
868 res
->lockname
.name
, blocked
);
869 dlm_print_one_lock_resource(res
);
870 /* dlm_print_one_mle(mle); */
876 mlog(0, "lockres mastered by %u\n", res
->owner
);
877 /* make sure we never continue without this */
878 BUG_ON(res
->owner
== O2NM_MAX_NODES
);
880 /* master is known, detach if not already detached */
881 dlm_mle_detach_hb_events(dlm
, mle
);
883 /* put the extra ref */
887 spin_lock(&res
->spinlock
);
888 res
->state
&= ~DLM_LOCK_RES_IN_PROGRESS
;
889 spin_unlock(&res
->spinlock
);
893 /* need to free the unused mle */
895 kmem_cache_free(dlm_mle_cache
, alloc_mle
);
901 #define DLM_MASTERY_TIMEOUT_MS 5000
903 static int dlm_wait_for_lock_mastery(struct dlm_ctxt
*dlm
,
904 struct dlm_lock_resource
*res
,
905 struct dlm_master_list_entry
*mle
,
910 int map_changed
, voting_done
;
917 /* check if another node has already become the owner */
918 spin_lock(&res
->spinlock
);
919 if (res
->owner
!= DLM_LOCK_RES_OWNER_UNKNOWN
) {
920 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm
->name
,
921 res
->lockname
.len
, res
->lockname
.name
, res
->owner
);
922 spin_unlock(&res
->spinlock
);
923 /* this will cause the master to re-assert across
924 * the whole cluster, freeing up mles */
925 ret
= dlm_do_master_request(mle
, res
->owner
);
927 /* give recovery a chance to run */
928 mlog(ML_ERROR
, "link to %u went down?: %d\n", res
->owner
, ret
);
935 spin_unlock(&res
->spinlock
);
937 spin_lock(&mle
->spinlock
);
939 map_changed
= (memcmp(mle
->vote_map
, mle
->node_map
,
940 sizeof(mle
->vote_map
)) != 0);
941 voting_done
= (memcmp(mle
->vote_map
, mle
->response_map
,
942 sizeof(mle
->vote_map
)) == 0);
944 /* restart if we hit any errors */
947 mlog(0, "%s: %.*s: node map changed, restarting\n",
948 dlm
->name
, res
->lockname
.len
, res
->lockname
.name
);
949 ret
= dlm_restart_lock_mastery(dlm
, res
, mle
, *blocked
);
950 b
= (mle
->type
== DLM_MLE_BLOCK
);
951 if ((*blocked
&& !b
) || (!*blocked
&& b
)) {
952 mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
953 dlm
->name
, res
->lockname
.len
, res
->lockname
.name
,
957 spin_unlock(&mle
->spinlock
);
962 mlog(0, "%s:%.*s: restart lock mastery succeeded, "
963 "rechecking now\n", dlm
->name
, res
->lockname
.len
,
968 if (m
!= O2NM_MAX_NODES
) {
969 /* another node has done an assert!
974 /* have all nodes responded? */
975 if (voting_done
&& !*blocked
) {
976 bit
= find_next_bit(mle
->maybe_map
, O2NM_MAX_NODES
, 0);
977 if (dlm
->node_num
<= bit
) {
978 /* my node number is lowest.
979 * now tell other nodes that I am
981 mle
->master
= dlm
->node_num
;
985 /* if voting is done, but we have not received
986 * an assert master yet, we must sleep */
990 spin_unlock(&mle
->spinlock
);
992 /* sleep if we haven't finished voting yet */
994 unsigned long timeo
= msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS
);
997 if (atomic_read(&mle->mle_refs.refcount) < 2)
998 mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
999 atomic_read(&mle->mle_refs.refcount),
1000 res->lockname.len, res->lockname.name);
1002 atomic_set(&mle
->woken
, 0);
1003 (void)wait_event_timeout(mle
->wq
,
1004 (atomic_read(&mle
->woken
) == 1),
1006 if (res
->owner
== O2NM_MAX_NODES
) {
1007 mlog(0, "waiting again\n");
1010 mlog(0, "done waiting, master is %u\n", res
->owner
);
1018 mlog(0, "about to master %.*s here, this=%u\n",
1019 res
->lockname
.len
, res
->lockname
.name
, m
);
1020 ret
= dlm_do_assert_master(dlm
, res
->lockname
.name
,
1021 res
->lockname
.len
, mle
->vote_map
, 0);
1023 /* This is a failure in the network path,
1024 * not in the response to the assert_master
1025 * (any nonzero response is a BUG on this node).
1026 * Most likely a socket just got disconnected
1027 * due to node death. */
1030 /* no longer need to restart lock mastery.
1031 * all living nodes have been contacted. */
1035 /* set the lockres owner */
1036 spin_lock(&res
->spinlock
);
1037 dlm_change_lockres_owner(dlm
, res
, m
);
1038 spin_unlock(&res
->spinlock
);
1044 struct dlm_bitmap_diff_iter
1047 unsigned long *orig_bm
;
1048 unsigned long *cur_bm
;
1049 unsigned long diff_bm
[BITS_TO_LONGS(O2NM_MAX_NODES
)];
1052 enum dlm_node_state_change
1059 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter
*iter
,
1060 unsigned long *orig_bm
,
1061 unsigned long *cur_bm
)
1063 unsigned long p1
, p2
;
1067 iter
->orig_bm
= orig_bm
;
1068 iter
->cur_bm
= cur_bm
;
1070 for (i
= 0; i
< BITS_TO_LONGS(O2NM_MAX_NODES
); i
++) {
1071 p1
= *(iter
->orig_bm
+ i
);
1072 p2
= *(iter
->cur_bm
+ i
);
1073 iter
->diff_bm
[i
] = (p1
& ~p2
) | (p2
& ~p1
);
1077 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter
*iter
,
1078 enum dlm_node_state_change
*state
)
1082 if (iter
->curnode
>= O2NM_MAX_NODES
)
1085 bit
= find_next_bit(iter
->diff_bm
, O2NM_MAX_NODES
,
1087 if (bit
>= O2NM_MAX_NODES
) {
1088 iter
->curnode
= O2NM_MAX_NODES
;
1092 /* if it was there in the original then this node died */
1093 if (test_bit(bit
, iter
->orig_bm
))
1098 iter
->curnode
= bit
;
1103 static int dlm_restart_lock_mastery(struct dlm_ctxt
*dlm
,
1104 struct dlm_lock_resource
*res
,
1105 struct dlm_master_list_entry
*mle
,
1108 struct dlm_bitmap_diff_iter bdi
;
1109 enum dlm_node_state_change sc
;
1113 mlog(0, "something happened such that the "
1114 "master process may need to be restarted!\n");
1116 assert_spin_locked(&mle
->spinlock
);
1118 dlm_bitmap_diff_iter_init(&bdi
, mle
->vote_map
, mle
->node_map
);
1119 node
= dlm_bitmap_diff_iter_next(&bdi
, &sc
);
1121 if (sc
== NODE_UP
) {
1122 /* a node came up. clear any old vote from
1123 * the response map and set it in the vote map
1124 * then restart the mastery. */
1125 mlog(ML_NOTICE
, "node %d up while restarting\n", node
);
1127 /* redo the master request, but only for the new node */
1128 mlog(0, "sending request to new node\n");
1129 clear_bit(node
, mle
->response_map
);
1130 set_bit(node
, mle
->vote_map
);
1132 mlog(ML_ERROR
, "node down! %d\n", node
);
1134 /* if the node wasn't involved in mastery skip it,
1135 * but clear it out from the maps so that it will
1136 * not affect mastery of this lockres */
1137 clear_bit(node
, mle
->response_map
);
1138 clear_bit(node
, mle
->vote_map
);
1139 if (!test_bit(node
, mle
->maybe_map
))
1142 /* if we're already blocked on lock mastery, and the
1143 * dead node wasn't the expected master, or there is
1144 * another node in the maybe_map, keep waiting */
1146 int lowest
= find_next_bit(mle
->maybe_map
,
1149 /* act like it was never there */
1150 clear_bit(node
, mle
->maybe_map
);
1155 mlog(ML_ERROR
, "expected master %u died while "
1156 "this node was blocked waiting on it!\n",
1158 lowest
= find_next_bit(mle
->maybe_map
,
1161 if (lowest
< O2NM_MAX_NODES
) {
1162 mlog(0, "still blocked. waiting "
1163 "on %u now\n", lowest
);
1167 /* mle is an MLE_BLOCK, but there is now
1168 * nothing left to block on. we need to return
1169 * all the way back out and try again with
1170 * an MLE_MASTER. dlm_do_local_recovery_cleanup
1171 * has already run, so the mle refcount is ok */
1172 mlog(0, "no longer blocking. we can "
1173 "try to master this here\n");
1174 mle
->type
= DLM_MLE_MASTER
;
1175 memset(mle
->maybe_map
, 0,
1176 sizeof(mle
->maybe_map
));
1177 memset(mle
->response_map
, 0,
1178 sizeof(mle
->maybe_map
));
1179 memcpy(mle
->vote_map
, mle
->node_map
,
1180 sizeof(mle
->node_map
));
1182 set_bit(dlm
->node_num
, mle
->maybe_map
);
1188 clear_bit(node
, mle
->maybe_map
);
1189 if (node
> dlm
->node_num
)
1192 mlog(0, "dead node in map!\n");
1193 /* yuck. go back and re-contact all nodes
1194 * in the vote_map, removing this node. */
1195 memset(mle
->response_map
, 0,
1196 sizeof(mle
->response_map
));
1200 node
= dlm_bitmap_diff_iter_next(&bdi
, &sc
);
1207 * DLM_MASTER_REQUEST_MSG
1209 * returns: 0 on success,
1210 * -errno on a network error
1212 * on error, the caller should assume the target node is "dead"
1216 static int dlm_do_master_request(struct dlm_master_list_entry
*mle
, int to
)
1218 struct dlm_ctxt
*dlm
= mle
->dlm
;
1219 struct dlm_master_request request
;
1220 int ret
, response
=0, resend
;
1222 memset(&request
, 0, sizeof(request
));
1223 request
.node_idx
= dlm
->node_num
;
1225 BUG_ON(mle
->type
== DLM_MLE_MIGRATION
);
1227 if (mle
->type
!= DLM_MLE_MASTER
) {
1228 request
.namelen
= mle
->u
.name
.len
;
1229 memcpy(request
.name
, mle
->u
.name
.name
, request
.namelen
);
1231 request
.namelen
= mle
->u
.res
->lockname
.len
;
1232 memcpy(request
.name
, mle
->u
.res
->lockname
.name
,
1237 ret
= o2net_send_message(DLM_MASTER_REQUEST_MSG
, dlm
->key
, &request
,
1238 sizeof(request
), to
, &response
);
1240 if (ret
== -ESRCH
) {
1241 /* should never happen */
1242 mlog(ML_ERROR
, "TCP stack not ready!\n");
1244 } else if (ret
== -EINVAL
) {
1245 mlog(ML_ERROR
, "bad args passed to o2net!\n");
1247 } else if (ret
== -ENOMEM
) {
1248 mlog(ML_ERROR
, "out of memory while trying to send "
1249 "network message! retrying\n");
1250 /* this is totally crude */
1253 } else if (!dlm_is_host_down(ret
)) {
1254 /* not a network error. bad. */
1256 mlog(ML_ERROR
, "unhandled error!");
1259 /* all other errors should be network errors,
1260 * and likely indicate node death */
1261 mlog(ML_ERROR
, "link to %d went down!\n", to
);
1267 spin_lock(&mle
->spinlock
);
1269 case DLM_MASTER_RESP_YES
:
1270 set_bit(to
, mle
->response_map
);
1271 mlog(0, "node %u is the master, response=YES\n", to
);
1274 case DLM_MASTER_RESP_NO
:
1275 mlog(0, "node %u not master, response=NO\n", to
);
1276 set_bit(to
, mle
->response_map
);
1278 case DLM_MASTER_RESP_MAYBE
:
1279 mlog(0, "node %u not master, response=MAYBE\n", to
);
1280 set_bit(to
, mle
->response_map
);
1281 set_bit(to
, mle
->maybe_map
);
1283 case DLM_MASTER_RESP_ERROR
:
1284 mlog(0, "node %u hit an error, resending\n", to
);
1289 mlog(ML_ERROR
, "bad response! %u\n", response
);
1292 spin_unlock(&mle
->spinlock
);
1294 /* this is also totally crude */
1304 * locks that can be taken here:
1310 * if possible, TRIM THIS DOWN!!!
1312 int dlm_master_request_handler(struct o2net_msg
*msg
, u32 len
, void *data
)
1314 u8 response
= DLM_MASTER_RESP_MAYBE
;
1315 struct dlm_ctxt
*dlm
= data
;
1316 struct dlm_lock_resource
*res
= NULL
;
1317 struct dlm_master_request
*request
= (struct dlm_master_request
*) msg
->buf
;
1318 struct dlm_master_list_entry
*mle
= NULL
, *tmpmle
= NULL
;
1320 unsigned int namelen
, hash
;
1323 int dispatch_assert
= 0;
1326 return DLM_MASTER_RESP_NO
;
1328 if (!dlm_domain_fully_joined(dlm
)) {
1329 response
= DLM_MASTER_RESP_NO
;
1333 name
= request
->name
;
1334 namelen
= request
->namelen
;
1335 hash
= dlm_lockid_hash(name
, namelen
);
1337 if (namelen
> DLM_LOCKID_NAME_MAX
) {
1338 response
= DLM_IVBUFLEN
;
1343 spin_lock(&dlm
->spinlock
);
1344 res
= __dlm_lookup_lockres(dlm
, name
, namelen
, hash
);
1346 spin_unlock(&dlm
->spinlock
);
1348 /* take care of the easy cases up front */
1349 spin_lock(&res
->spinlock
);
1350 if (res
->state
& DLM_LOCK_RES_RECOVERING
) {
1351 spin_unlock(&res
->spinlock
);
1352 mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1353 "being recovered\n");
1354 response
= DLM_MASTER_RESP_ERROR
;
1356 kmem_cache_free(dlm_mle_cache
, mle
);
1360 if (res
->owner
== dlm
->node_num
) {
1361 spin_unlock(&res
->spinlock
);
1362 // mlog(0, "this node is the master\n");
1363 response
= DLM_MASTER_RESP_YES
;
1365 kmem_cache_free(dlm_mle_cache
, mle
);
1367 /* this node is the owner.
1368 * there is some extra work that needs to
1369 * happen now. the requesting node has
1370 * caused all nodes up to this one to
1371 * create mles. this node now needs to
1372 * go back and clean those up. */
1373 dispatch_assert
= 1;
1375 } else if (res
->owner
!= DLM_LOCK_RES_OWNER_UNKNOWN
) {
1376 spin_unlock(&res
->spinlock
);
1377 // mlog(0, "node %u is the master\n", res->owner);
1378 response
= DLM_MASTER_RESP_NO
;
1380 kmem_cache_free(dlm_mle_cache
, mle
);
1384 /* ok, there is no owner. either this node is
1385 * being blocked, or it is actively trying to
1386 * master this lock. */
1387 if (!(res
->state
& DLM_LOCK_RES_IN_PROGRESS
)) {
1388 mlog(ML_ERROR
, "lock with no owner should be "
1393 // mlog(0, "lockres is in progress...\n");
1394 spin_lock(&dlm
->master_lock
);
1395 found
= dlm_find_mle(dlm
, &tmpmle
, name
, namelen
);
1397 mlog(ML_ERROR
, "no mle found for this lock!\n");
1401 spin_lock(&tmpmle
->spinlock
);
1402 if (tmpmle
->type
== DLM_MLE_BLOCK
) {
1403 // mlog(0, "this node is waiting for "
1404 // "lockres to be mastered\n");
1405 response
= DLM_MASTER_RESP_NO
;
1406 } else if (tmpmle
->type
== DLM_MLE_MIGRATION
) {
1407 mlog(0, "node %u is master, but trying to migrate to "
1408 "node %u.\n", tmpmle
->master
, tmpmle
->new_master
);
1409 if (tmpmle
->master
== dlm
->node_num
) {
1410 response
= DLM_MASTER_RESP_YES
;
1411 mlog(ML_ERROR
, "no owner on lockres, but this "
1412 "node is trying to migrate it to %u?!\n",
1413 tmpmle
->new_master
);
1416 /* the real master can respond on its own */
1417 response
= DLM_MASTER_RESP_NO
;
1419 } else if (tmpmle
->master
!= DLM_LOCK_RES_OWNER_UNKNOWN
) {
1421 if (tmpmle
->master
== dlm
->node_num
) {
1422 response
= DLM_MASTER_RESP_YES
;
1423 /* this node will be the owner.
1424 * go back and clean the mles on any
1426 dispatch_assert
= 1;
1428 response
= DLM_MASTER_RESP_NO
;
1430 // mlog(0, "this node is attempting to "
1431 // "master lockres\n");
1432 response
= DLM_MASTER_RESP_MAYBE
;
1435 set_bit(request
->node_idx
, tmpmle
->maybe_map
);
1436 spin_unlock(&tmpmle
->spinlock
);
1438 spin_unlock(&dlm
->master_lock
);
1439 spin_unlock(&res
->spinlock
);
1441 /* keep the mle attached to heartbeat events */
1442 dlm_put_mle(tmpmle
);
1444 kmem_cache_free(dlm_mle_cache
, mle
);
1449 * lockres doesn't exist on this node
1450 * if there is an MLE_BLOCK, return NO
1451 * if there is an MLE_MASTER, return MAYBE
1452 * otherwise, add an MLE_BLOCK, return NO
1454 spin_lock(&dlm
->master_lock
);
1455 found
= dlm_find_mle(dlm
, &tmpmle
, name
, namelen
);
1457 /* this lockid has never been seen on this node yet */
1458 // mlog(0, "no mle found\n");
1460 spin_unlock(&dlm
->master_lock
);
1461 spin_unlock(&dlm
->spinlock
);
1463 mle
= (struct dlm_master_list_entry
*)
1464 kmem_cache_alloc(dlm_mle_cache
, GFP_KERNEL
);
1466 response
= DLM_MASTER_RESP_ERROR
;
1467 mlog_errno(-ENOMEM
);
1470 spin_lock(&dlm
->spinlock
);
1471 dlm_init_mle(mle
, DLM_MLE_BLOCK
, dlm
, NULL
,
1473 spin_unlock(&dlm
->spinlock
);
1477 // mlog(0, "this is second time thru, already allocated, "
1478 // "add the block.\n");
1479 set_bit(request
->node_idx
, mle
->maybe_map
);
1480 list_add(&mle
->list
, &dlm
->master_list
);
1481 response
= DLM_MASTER_RESP_NO
;
1483 // mlog(0, "mle was found\n");
1485 spin_lock(&tmpmle
->spinlock
);
1486 if (tmpmle
->master
== dlm
->node_num
) {
1487 mlog(ML_ERROR
, "no lockres, but an mle with this node as master!\n");
1490 if (tmpmle
->type
== DLM_MLE_BLOCK
)
1491 response
= DLM_MASTER_RESP_NO
;
1492 else if (tmpmle
->type
== DLM_MLE_MIGRATION
) {
1493 mlog(0, "migration mle was found (%u->%u)\n",
1494 tmpmle
->master
, tmpmle
->new_master
);
1495 /* real master can respond on its own */
1496 response
= DLM_MASTER_RESP_NO
;
1498 response
= DLM_MASTER_RESP_MAYBE
;
1500 set_bit(request
->node_idx
, tmpmle
->maybe_map
);
1501 spin_unlock(&tmpmle
->spinlock
);
1503 spin_unlock(&dlm
->master_lock
);
1504 spin_unlock(&dlm
->spinlock
);
1507 /* keep the mle attached to heartbeat events */
1508 dlm_put_mle(tmpmle
);
1512 if (dispatch_assert
) {
1513 if (response
!= DLM_MASTER_RESP_YES
)
1514 mlog(ML_ERROR
, "invalid response %d\n", response
);
1516 mlog(ML_ERROR
, "bad lockres while trying to assert!\n");
1519 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1520 dlm
->node_num
, res
->lockname
.len
, res
->lockname
.name
);
1521 ret
= dlm_dispatch_assert_master(dlm
, res
, 0, request
->node_idx
,
1522 DLM_ASSERT_MASTER_MLE_CLEANUP
);
1524 mlog(ML_ERROR
, "failed to dispatch assert master work\n");
1525 response
= DLM_MASTER_RESP_ERROR
;
1534 * DLM_ASSERT_MASTER_MSG
1539 * NOTE: this can be used for debugging
1540 * can periodically run all locks owned by this node
1541 * and re-assert across the cluster...
1543 static int dlm_do_assert_master(struct dlm_ctxt
*dlm
, const char *lockname
,
1544 unsigned int namelen
, void *nodemap
,
1547 struct dlm_assert_master
assert;
1549 struct dlm_node_iter iter
;
1553 BUG_ON(namelen
> O2NM_MAX_NAME_LEN
);
1557 /* note that if this nodemap is empty, it returns 0 */
1558 dlm_node_iter_init(nodemap
, &iter
);
1559 while ((to
= dlm_node_iter_next(&iter
)) >= 0) {
1561 mlog(0, "sending assert master to %d (%.*s)\n", to
,
1563 memset(&assert, 0, sizeof(assert));
1564 assert.node_idx
= dlm
->node_num
;
1565 assert.namelen
= namelen
;
1566 memcpy(assert.name
, lockname
, namelen
);
1567 assert.flags
= cpu_to_be32(flags
);
1569 tmpret
= o2net_send_message(DLM_ASSERT_MASTER_MSG
, dlm
->key
,
1570 &assert, sizeof(assert), to
, &r
);
1572 mlog(ML_ERROR
, "assert_master returned %d!\n", tmpret
);
1573 if (!dlm_is_host_down(tmpret
)) {
1574 mlog(ML_ERROR
, "unhandled error!\n");
1577 /* a node died. finish out the rest of the nodes. */
1578 mlog(ML_ERROR
, "link to %d went down!\n", to
);
1579 /* any nonzero status return will do */
1582 /* ok, something horribly messed. kill thyself. */
1583 mlog(ML_ERROR
,"during assert master of %.*s to %u, "
1584 "got %d.\n", namelen
, lockname
, to
, r
);
1585 dlm_dump_lock_resources(dlm
);
1587 } else if (r
== EAGAIN
) {
1588 mlog(0, "%.*s: node %u create mles on other "
1589 "nodes and requests a re-assert\n",
1590 namelen
, lockname
, to
);
1602 * locks that can be taken here:
1608 * if possible, TRIM THIS DOWN!!!
1610 int dlm_assert_master_handler(struct o2net_msg
*msg
, u32 len
, void *data
)
1612 struct dlm_ctxt
*dlm
= data
;
1613 struct dlm_master_list_entry
*mle
= NULL
;
1614 struct dlm_assert_master
*assert = (struct dlm_assert_master
*)msg
->buf
;
1615 struct dlm_lock_resource
*res
= NULL
;
1617 unsigned int namelen
, hash
;
1619 int master_request
= 0;
1625 name
= assert->name
;
1626 namelen
= assert->namelen
;
1627 hash
= dlm_lockid_hash(name
, namelen
);
1628 flags
= be32_to_cpu(assert->flags
);
1630 if (namelen
> DLM_LOCKID_NAME_MAX
) {
1631 mlog(ML_ERROR
, "Invalid name length!");
1635 spin_lock(&dlm
->spinlock
);
1638 mlog(0, "assert_master with flags: %u\n", flags
);
1641 spin_lock(&dlm
->master_lock
);
1642 if (!dlm_find_mle(dlm
, &mle
, name
, namelen
)) {
1643 /* not an error, could be master just re-asserting */
1644 mlog(0, "just got an assert_master from %u, but no "
1645 "MLE for it! (%.*s)\n", assert->node_idx
,
1648 int bit
= find_next_bit (mle
->maybe_map
, O2NM_MAX_NODES
, 0);
1649 if (bit
>= O2NM_MAX_NODES
) {
1650 /* not necessarily an error, though less likely.
1651 * could be master just re-asserting. */
1652 mlog(ML_ERROR
, "no bits set in the maybe_map, but %u "
1653 "is asserting! (%.*s)\n", assert->node_idx
,
1655 } else if (bit
!= assert->node_idx
) {
1656 if (flags
& DLM_ASSERT_MASTER_MLE_CLEANUP
) {
1657 mlog(0, "master %u was found, %u should "
1658 "back off\n", assert->node_idx
, bit
);
1660 /* with the fix for bug 569, a higher node
1661 * number winning the mastery will respond
1662 * YES to mastery requests, but this node
1663 * had no way of knowing. let it pass. */
1664 mlog(ML_ERROR
, "%u is the lowest node, "
1665 "%u is asserting. (%.*s) %u must "
1666 "have begun after %u won.\n", bit
,
1667 assert->node_idx
, namelen
, name
, bit
,
1672 spin_unlock(&dlm
->master_lock
);
1674 /* ok everything checks out with the MLE
1675 * now check to see if there is a lockres */
1676 res
= __dlm_lookup_lockres(dlm
, name
, namelen
, hash
);
1678 spin_lock(&res
->spinlock
);
1679 if (res
->state
& DLM_LOCK_RES_RECOVERING
) {
1680 mlog(ML_ERROR
, "%u asserting but %.*s is "
1681 "RECOVERING!\n", assert->node_idx
, namelen
, name
);
1685 if (res
->owner
!= assert->node_idx
) {
1686 mlog(ML_ERROR
, "assert_master from "
1687 "%u, but current owner is "
1689 assert->node_idx
, res
->owner
,
1693 } else if (mle
->type
!= DLM_MLE_MIGRATION
) {
1694 if (res
->owner
!= DLM_LOCK_RES_OWNER_UNKNOWN
) {
1695 /* owner is just re-asserting */
1696 if (res
->owner
== assert->node_idx
) {
1697 mlog(0, "owner %u re-asserting on "
1698 "lock %.*s\n", assert->node_idx
,
1702 mlog(ML_ERROR
, "got assert_master from "
1703 "node %u, but %u is the owner! "
1704 "(%.*s)\n", assert->node_idx
,
1705 res
->owner
, namelen
, name
);
1708 if (!(res
->state
& DLM_LOCK_RES_IN_PROGRESS
)) {
1709 mlog(ML_ERROR
, "got assert from %u, but lock "
1710 "with no owner should be "
1711 "in-progress! (%.*s)\n",
1716 } else /* mle->type == DLM_MLE_MIGRATION */ {
1717 /* should only be getting an assert from new master */
1718 if (assert->node_idx
!= mle
->new_master
) {
1719 mlog(ML_ERROR
, "got assert from %u, but "
1720 "new master is %u, and old master "
1722 assert->node_idx
, mle
->new_master
,
1723 mle
->master
, namelen
, name
);
1729 spin_unlock(&res
->spinlock
);
1731 spin_unlock(&dlm
->spinlock
);
1733 // mlog(0, "woo! got an assert_master from node %u!\n",
1734 // assert->node_idx);
1739 spin_lock(&mle
->spinlock
);
1740 if (mle
->type
== DLM_MLE_BLOCK
|| mle
->type
== DLM_MLE_MIGRATION
)
1743 /* MASTER mle: if any bits set in the response map
1744 * then the calling node needs to re-assert to clear
1745 * up nodes that this node contacted */
1746 while ((nn
= find_next_bit (mle
->response_map
, O2NM_MAX_NODES
,
1747 nn
+1)) < O2NM_MAX_NODES
) {
1748 if (nn
!= dlm
->node_num
&& nn
!= assert->node_idx
)
1752 mle
->master
= assert->node_idx
;
1753 atomic_set(&mle
->woken
, 1);
1755 spin_unlock(&mle
->spinlock
);
1757 if (mle
->type
== DLM_MLE_MIGRATION
&& res
) {
1758 mlog(0, "finishing off migration of lockres %.*s, "
1760 res
->lockname
.len
, res
->lockname
.name
,
1761 dlm
->node_num
, mle
->new_master
);
1762 spin_lock(&res
->spinlock
);
1763 res
->state
&= ~DLM_LOCK_RES_MIGRATING
;
1764 dlm_change_lockres_owner(dlm
, res
, mle
->new_master
);
1765 BUG_ON(res
->state
& DLM_LOCK_RES_DIRTY
);
1766 spin_unlock(&res
->spinlock
);
1768 /* master is known, detach if not already detached */
1769 dlm_mle_detach_hb_events(dlm
, mle
);
1773 /* the assert master message now balances the extra
1774 * ref given by the master / migration request message.
1775 * if this is the last put, it will be removed
1784 dlm_lockres_put(res
);
1786 if (master_request
) {
1787 mlog(0, "need to tell master to reassert\n");
1788 ret
= EAGAIN
; // positive. negative would shoot down the node.
1793 /* kill the caller! */
1794 spin_unlock(&res
->spinlock
);
1795 spin_unlock(&dlm
->spinlock
);
1796 dlm_lockres_put(res
);
1797 mlog(ML_ERROR
, "Bad message received from another node. Dumping state "
1798 "and killing the other node now! This node is OK and can continue.\n");
1799 dlm_dump_lock_resources(dlm
);
1804 int dlm_dispatch_assert_master(struct dlm_ctxt
*dlm
,
1805 struct dlm_lock_resource
*res
,
1806 int ignore_higher
, u8 request_from
, u32 flags
)
1808 struct dlm_work_item
*item
;
1809 item
= kcalloc(1, sizeof(*item
), GFP_KERNEL
);
1814 /* queue up work for dlm_assert_master_worker */
1815 dlm_grab(dlm
); /* get an extra ref for the work item */
1816 dlm_init_work_item(dlm
, item
, dlm_assert_master_worker
, NULL
);
1817 item
->u
.am
.lockres
= res
; /* already have a ref */
1818 /* can optionally ignore node numbers higher than this node */
1819 item
->u
.am
.ignore_higher
= ignore_higher
;
1820 item
->u
.am
.request_from
= request_from
;
1821 item
->u
.am
.flags
= flags
;
1824 mlog(0, "IGNORE HIGHER: %.*s\n", res
->lockname
.len
,
1825 res
->lockname
.name
);
1827 spin_lock(&dlm
->work_lock
);
1828 list_add_tail(&item
->list
, &dlm
->work_list
);
1829 spin_unlock(&dlm
->work_lock
);
1831 schedule_work(&dlm
->dispatched_work
);
1835 static void dlm_assert_master_worker(struct dlm_work_item
*item
, void *data
)
1837 struct dlm_ctxt
*dlm
= data
;
1839 struct dlm_lock_resource
*res
;
1840 unsigned long nodemap
[BITS_TO_LONGS(O2NM_MAX_NODES
)];
1847 res
= item
->u
.am
.lockres
;
1848 ignore_higher
= item
->u
.am
.ignore_higher
;
1849 request_from
= item
->u
.am
.request_from
;
1850 flags
= item
->u
.am
.flags
;
1852 spin_lock(&dlm
->spinlock
);
1853 memcpy(nodemap
, dlm
->domain_map
, sizeof(nodemap
));
1854 spin_unlock(&dlm
->spinlock
);
1856 clear_bit(dlm
->node_num
, nodemap
);
1857 if (ignore_higher
) {
1858 /* if is this just to clear up mles for nodes below
1859 * this node, do not send the message to the original
1860 * caller or any node number higher than this */
1861 clear_bit(request_from
, nodemap
);
1862 bit
= dlm
->node_num
;
1864 bit
= find_next_bit(nodemap
, O2NM_MAX_NODES
,
1866 if (bit
>= O2NM_MAX_NODES
)
1868 clear_bit(bit
, nodemap
);
1872 /* this call now finishes out the nodemap
1873 * even if one or more nodes die */
1874 mlog(0, "worker about to master %.*s here, this=%u\n",
1875 res
->lockname
.len
, res
->lockname
.name
, dlm
->node_num
);
1876 ret
= dlm_do_assert_master(dlm
, res
->lockname
.name
,
1880 /* no need to restart, we are done */
1884 dlm_lockres_put(res
);
1886 mlog(0, "finished with dlm_assert_master_worker\n");
1889 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
1890 * We cannot wait for node recovery to complete to begin mastering this
1891 * lockres because this lockres is used to kick off recovery! ;-)
1892 * So, do a pre-check on all living nodes to see if any of those nodes
1893 * think that $RECOVERY is currently mastered by a dead node. If so,
1894 * we wait a short time to allow that node to get notified by its own
1895 * heartbeat stack, then check again. All $RECOVERY lock resources
1896 * mastered by dead nodes are purged when the hearbeat callback is
1897 * fired, so we can know for sure that it is safe to continue once
1898 * the node returns a live node or no node. */
1899 static int dlm_pre_master_reco_lockres(struct dlm_ctxt
*dlm
,
1900 struct dlm_lock_resource
*res
)
1902 struct dlm_node_iter iter
;
1905 u8 master
= DLM_LOCK_RES_OWNER_UNKNOWN
;
1907 spin_lock(&dlm
->spinlock
);
1908 dlm_node_iter_init(dlm
->domain_map
, &iter
);
1909 spin_unlock(&dlm
->spinlock
);
1911 while ((nodenum
= dlm_node_iter_next(&iter
)) >= 0) {
1912 /* do not send to self */
1913 if (nodenum
== dlm
->node_num
)
1915 ret
= dlm_do_master_requery(dlm
, res
, nodenum
, &master
);
1918 if (!dlm_is_host_down(ret
))
1920 /* host is down, so answer for that node would be
1921 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
1924 if (master
!= DLM_LOCK_RES_OWNER_UNKNOWN
) {
1925 /* check to see if this master is in the recovery map */
1926 spin_lock(&dlm
->spinlock
);
1927 if (test_bit(master
, dlm
->recovery_map
)) {
1928 mlog(ML_NOTICE
, "%s: node %u has not seen "
1929 "node %u go down yet, and thinks the "
1930 "dead node is mastering the recovery "
1931 "lock. must wait.\n", dlm
->name
,
1935 spin_unlock(&dlm
->spinlock
);
1936 mlog(0, "%s: reco lock master is %u\n", dlm
->name
,
1946 * DLM_MIGRATE_LOCKRES
1950 int dlm_migrate_lockres(struct dlm_ctxt
*dlm
, struct dlm_lock_resource
*res
,
1953 struct dlm_master_list_entry
*mle
= NULL
;
1954 struct dlm_master_list_entry
*oldmle
= NULL
;
1955 struct dlm_migratable_lockres
*mres
= NULL
;
1958 unsigned int namelen
;
1960 struct list_head
*queue
, *iter
;
1962 struct dlm_lock
*lock
;
1968 name
= res
->lockname
.name
;
1969 namelen
= res
->lockname
.len
;
1971 mlog(0, "migrating %.*s to %u\n", namelen
, name
, target
);
1974 * ensure this lockres is a proper candidate for migration
1976 spin_lock(&res
->spinlock
);
1977 if (res
->owner
== DLM_LOCK_RES_OWNER_UNKNOWN
) {
1978 mlog(0, "cannot migrate lockres with unknown owner!\n");
1979 spin_unlock(&res
->spinlock
);
1982 if (res
->owner
!= dlm
->node_num
) {
1983 mlog(0, "cannot migrate lockres this node doesn't own!\n");
1984 spin_unlock(&res
->spinlock
);
1987 mlog(0, "checking queues...\n");
1988 queue
= &res
->granted
;
1989 for (i
=0; i
<3; i
++) {
1990 list_for_each(iter
, queue
) {
1991 lock
= list_entry (iter
, struct dlm_lock
, list
);
1993 if (lock
->ml
.node
== dlm
->node_num
) {
1994 mlog(0, "found a lock owned by this node "
1995 "still on the %s queue! will not "
1996 "migrate this lockres\n",
1998 (i
==1 ? "converting" : "blocked"));
1999 spin_unlock(&res
->spinlock
);
2006 mlog(0, "all locks on this lockres are nonlocal. continuing\n");
2007 spin_unlock(&res
->spinlock
);
2011 mlog(0, "no locks were found on this lockres! done!\n");
2017 * preallocate up front
2018 * if this fails, abort
2022 mres
= (struct dlm_migratable_lockres
*) __get_free_page(GFP_KERNEL
);
2028 mle
= (struct dlm_master_list_entry
*) kmem_cache_alloc(dlm_mle_cache
,
2037 * find a node to migrate the lockres to
2040 mlog(0, "picking a migration node\n");
2041 spin_lock(&dlm
->spinlock
);
2042 /* pick a new node */
2043 if (!test_bit(target
, dlm
->domain_map
) ||
2044 target
>= O2NM_MAX_NODES
) {
2045 target
= dlm_pick_migration_target(dlm
, res
);
2047 mlog(0, "node %u chosen for migration\n", target
);
2049 if (target
>= O2NM_MAX_NODES
||
2050 !test_bit(target
, dlm
->domain_map
)) {
2051 /* target chosen is not alive */
2056 spin_unlock(&dlm
->spinlock
);
2060 mlog(0, "continuing with target = %u\n", target
);
2063 * clear any existing master requests and
2064 * add the migration mle to the list
2066 spin_lock(&dlm
->master_lock
);
2067 ret
= dlm_add_migration_mle(dlm
, res
, mle
, &oldmle
, name
,
2068 namelen
, target
, dlm
->node_num
);
2069 spin_unlock(&dlm
->master_lock
);
2070 spin_unlock(&dlm
->spinlock
);
2072 if (ret
== -EEXIST
) {
2073 mlog(0, "another process is already migrating it\n");
2079 * set the MIGRATING flag and flush asts
2080 * if we fail after this we need to re-dirty the lockres
2082 if (dlm_mark_lockres_migrating(dlm
, res
, target
) < 0) {
2083 mlog(ML_ERROR
, "tried to migrate %.*s to %u, but "
2084 "the target went down.\n", res
->lockname
.len
,
2085 res
->lockname
.name
, target
);
2086 spin_lock(&res
->spinlock
);
2087 res
->state
&= ~DLM_LOCK_RES_MIGRATING
;
2088 spin_unlock(&res
->spinlock
);
2094 /* master is known, detach if not already detached */
2095 dlm_mle_detach_hb_events(dlm
, oldmle
);
2096 dlm_put_mle(oldmle
);
2101 dlm_mle_detach_hb_events(dlm
, mle
);
2104 kmem_cache_free(dlm_mle_cache
, mle
);
2110 * at this point, we have a migration target, an mle
2111 * in the master list, and the MIGRATING flag set on
2116 /* get an extra reference on the mle.
2117 * otherwise the assert_master from the new
2118 * master will destroy this.
2119 * also, make sure that all callers of dlm_get_mle
2120 * take both dlm->spinlock and dlm->master_lock */
2121 spin_lock(&dlm
->spinlock
);
2122 spin_lock(&dlm
->master_lock
);
2124 spin_unlock(&dlm
->master_lock
);
2125 spin_unlock(&dlm
->spinlock
);
2127 /* notify new node and send all lock state */
2128 /* call send_one_lockres with migration flag.
2129 * this serves as notice to the target node that a
2130 * migration is starting. */
2131 ret
= dlm_send_one_lockres(dlm
, res
, mres
, target
,
2132 DLM_MRES_MIGRATION
);
2135 mlog(0, "migration to node %u failed with %d\n",
2137 /* migration failed, detach and clean up mle */
2138 dlm_mle_detach_hb_events(dlm
, mle
);
2144 /* at this point, the target sends a message to all nodes,
2145 * (using dlm_do_migrate_request). this node is skipped since
2146 * we had to put an mle in the list to begin the process. this
2147 * node now waits for target to do an assert master. this node
2148 * will be the last one notified, ensuring that the migration
2149 * is complete everywhere. if the target dies while this is
2150 * going on, some nodes could potentially see the target as the
2151 * master, so it is important that my recovery finds the migration
2152 * mle and sets the master to UNKNONWN. */
2155 /* wait for new node to assert master */
2157 ret
= wait_event_interruptible_timeout(mle
->wq
,
2158 (atomic_read(&mle
->woken
) == 1),
2159 msecs_to_jiffies(5000));
2162 if (atomic_read(&mle
->woken
) == 1 ||
2163 res
->owner
== target
)
2166 mlog(0, "timed out during migration\n");
2167 /* avoid hang during shutdown when migrating lockres
2168 * to a node which also goes down */
2169 if (dlm_is_node_dead(dlm
, target
)) {
2170 mlog(0, "%s:%.*s: expected migration target %u "
2171 "is no longer up. restarting.\n",
2172 dlm
->name
, res
->lockname
.len
,
2173 res
->lockname
.name
, target
);
2177 if (ret
== -ERESTARTSYS
) {
2178 /* migration failed, detach and clean up mle */
2179 dlm_mle_detach_hb_events(dlm
, mle
);
2184 /* TODO: if node died: stop, clean up, return error */
2187 /* all done, set the owner, clear the flag */
2188 spin_lock(&res
->spinlock
);
2189 dlm_set_lockres_owner(dlm
, res
, target
);
2190 res
->state
&= ~DLM_LOCK_RES_MIGRATING
;
2191 dlm_remove_nonlocal_locks(dlm
, res
);
2192 spin_unlock(&res
->spinlock
);
2195 /* master is known, detach if not already detached */
2196 dlm_mle_detach_hb_events(dlm
, mle
);
2200 dlm_lockres_calc_usage(dlm
, res
);
2203 /* re-dirty the lockres if we failed */
2205 dlm_kick_thread(dlm
, res
);
2209 free_page((unsigned long)mres
);
2213 mlog(0, "returning %d\n", ret
);
2216 EXPORT_SYMBOL_GPL(dlm_migrate_lockres
);
2218 int dlm_lock_basts_flushed(struct dlm_ctxt
*dlm
, struct dlm_lock
*lock
)
2221 spin_lock(&dlm
->ast_lock
);
2222 spin_lock(&lock
->spinlock
);
2223 ret
= (list_empty(&lock
->bast_list
) && !lock
->bast_pending
);
2224 spin_unlock(&lock
->spinlock
);
2225 spin_unlock(&dlm
->ast_lock
);
2229 static int dlm_migration_can_proceed(struct dlm_ctxt
*dlm
,
2230 struct dlm_lock_resource
*res
,
2234 spin_lock(&res
->spinlock
);
2235 can_proceed
= !!(res
->state
& DLM_LOCK_RES_MIGRATING
);
2236 spin_unlock(&res
->spinlock
);
2238 /* target has died, so make the caller break out of the
2239 * wait_event, but caller must recheck the domain_map */
2240 spin_lock(&dlm
->spinlock
);
2241 if (!test_bit(mig_target
, dlm
->domain_map
))
2243 spin_unlock(&dlm
->spinlock
);
2247 int dlm_lockres_is_dirty(struct dlm_ctxt
*dlm
, struct dlm_lock_resource
*res
)
2250 spin_lock(&res
->spinlock
);
2251 ret
= !!(res
->state
& DLM_LOCK_RES_DIRTY
);
2252 spin_unlock(&res
->spinlock
);
2257 static int dlm_mark_lockres_migrating(struct dlm_ctxt
*dlm
,
2258 struct dlm_lock_resource
*res
,
2263 mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2264 res
->lockname
.len
, res
->lockname
.name
, dlm
->node_num
,
2266 /* need to set MIGRATING flag on lockres. this is done by
2267 * ensuring that all asts have been flushed for this lockres. */
2268 spin_lock(&res
->spinlock
);
2269 BUG_ON(res
->migration_pending
);
2270 res
->migration_pending
= 1;
2271 /* strategy is to reserve an extra ast then release
2272 * it below, letting the release do all of the work */
2273 __dlm_lockres_reserve_ast(res
);
2274 spin_unlock(&res
->spinlock
);
2276 /* now flush all the pending asts.. hang out for a bit */
2277 dlm_kick_thread(dlm
, res
);
2278 wait_event(dlm
->ast_wq
, !dlm_lockres_is_dirty(dlm
, res
));
2279 dlm_lockres_release_ast(dlm
, res
);
2281 mlog(0, "about to wait on migration_wq, dirty=%s\n",
2282 res
->state
& DLM_LOCK_RES_DIRTY
? "yes" : "no");
2283 /* if the extra ref we just put was the final one, this
2284 * will pass thru immediately. otherwise, we need to wait
2285 * for the last ast to finish. */
2287 ret
= wait_event_interruptible_timeout(dlm
->migration_wq
,
2288 dlm_migration_can_proceed(dlm
, res
, target
),
2289 msecs_to_jiffies(1000));
2291 mlog(0, "woken again: migrating? %s, dead? %s\n",
2292 res
->state
& DLM_LOCK_RES_MIGRATING
? "yes":"no",
2293 test_bit(target
, dlm
->domain_map
) ? "no":"yes");
2295 mlog(0, "all is well: migrating? %s, dead? %s\n",
2296 res
->state
& DLM_LOCK_RES_MIGRATING
? "yes":"no",
2297 test_bit(target
, dlm
->domain_map
) ? "no":"yes");
2299 if (!dlm_migration_can_proceed(dlm
, res
, target
)) {
2300 mlog(0, "trying again...\n");
2304 /* did the target go down or die? */
2305 spin_lock(&dlm
->spinlock
);
2306 if (!test_bit(target
, dlm
->domain_map
)) {
2307 mlog(ML_ERROR
, "aha. migration target %u just went down\n",
2311 spin_unlock(&dlm
->spinlock
);
2316 * o the DLM_LOCK_RES_MIGRATING flag is set
2317 * o there are no pending asts on this lockres
2318 * o all processes trying to reserve an ast on this
2319 * lockres must wait for the MIGRATING flag to clear
2324 /* last step in the migration process.
2325 * original master calls this to free all of the dlm_lock
2326 * structures that used to be for other nodes. */
2327 static void dlm_remove_nonlocal_locks(struct dlm_ctxt
*dlm
,
2328 struct dlm_lock_resource
*res
)
2330 struct list_head
*iter
, *iter2
;
2331 struct list_head
*queue
= &res
->granted
;
2333 struct dlm_lock
*lock
;
2335 assert_spin_locked(&res
->spinlock
);
2337 BUG_ON(res
->owner
== dlm
->node_num
);
2339 for (i
=0; i
<3; i
++) {
2340 list_for_each_safe(iter
, iter2
, queue
) {
2341 lock
= list_entry (iter
, struct dlm_lock
, list
);
2342 if (lock
->ml
.node
!= dlm
->node_num
) {
2343 mlog(0, "putting lock for node %u\n",
2345 /* be extra careful */
2346 BUG_ON(!list_empty(&lock
->ast_list
));
2347 BUG_ON(!list_empty(&lock
->bast_list
));
2348 BUG_ON(lock
->ast_pending
);
2349 BUG_ON(lock
->bast_pending
);
2350 list_del_init(&lock
->list
);
2358 /* for now this is not too intelligent. we will
2359 * need stats to make this do the right thing.
2360 * this just finds the first lock on one of the
2361 * queues and uses that node as the target. */
2362 static u8
dlm_pick_migration_target(struct dlm_ctxt
*dlm
,
2363 struct dlm_lock_resource
*res
)
2366 struct list_head
*queue
= &res
->granted
;
2367 struct list_head
*iter
;
2368 struct dlm_lock
*lock
;
2371 assert_spin_locked(&dlm
->spinlock
);
2373 spin_lock(&res
->spinlock
);
2374 for (i
=0; i
<3; i
++) {
2375 list_for_each(iter
, queue
) {
2376 /* up to the caller to make sure this node
2378 lock
= list_entry (iter
, struct dlm_lock
, list
);
2379 if (lock
->ml
.node
!= dlm
->node_num
) {
2380 spin_unlock(&res
->spinlock
);
2381 return lock
->ml
.node
;
2386 spin_unlock(&res
->spinlock
);
2387 mlog(0, "have not found a suitable target yet! checking domain map\n");
2389 /* ok now we're getting desperate. pick anyone alive. */
2392 nodenum
= find_next_bit(dlm
->domain_map
,
2393 O2NM_MAX_NODES
, nodenum
+1);
2394 mlog(0, "found %d in domain map\n", nodenum
);
2395 if (nodenum
>= O2NM_MAX_NODES
)
2397 if (nodenum
!= dlm
->node_num
) {
2398 mlog(0, "picking %d\n", nodenum
);
2403 mlog(0, "giving up. no master to migrate to\n");
2404 return DLM_LOCK_RES_OWNER_UNKNOWN
;
2409 /* this is called by the new master once all lockres
2410 * data has been received */
2411 static int dlm_do_migrate_request(struct dlm_ctxt
*dlm
,
2412 struct dlm_lock_resource
*res
,
2413 u8 master
, u8 new_master
,
2414 struct dlm_node_iter
*iter
)
2416 struct dlm_migrate_request migrate
;
2417 int ret
, status
= 0;
2420 memset(&migrate
, 0, sizeof(migrate
));
2421 migrate
.namelen
= res
->lockname
.len
;
2422 memcpy(migrate
.name
, res
->lockname
.name
, migrate
.namelen
);
2423 migrate
.new_master
= new_master
;
2424 migrate
.master
= master
;
2428 /* send message to all nodes, except the master and myself */
2429 while ((nodenum
= dlm_node_iter_next(iter
)) >= 0) {
2430 if (nodenum
== master
||
2431 nodenum
== new_master
)
2434 ret
= o2net_send_message(DLM_MIGRATE_REQUEST_MSG
, dlm
->key
,
2435 &migrate
, sizeof(migrate
), nodenum
,
2439 else if (status
< 0) {
2440 mlog(0, "migrate request (node %u) returned %d!\n",
2449 mlog(0, "returning ret=%d\n", ret
);
2454 /* if there is an existing mle for this lockres, we now know who the master is.
2455 * (the one who sent us *this* message) we can clear it up right away.
2456 * since the process that put the mle on the list still has a reference to it,
2457 * we can unhash it now, set the master and wake the process. as a result,
2458 * we will have no mle in the list to start with. now we can add an mle for
2459 * the migration and this should be the only one found for those scanning the
2461 int dlm_migrate_request_handler(struct o2net_msg
*msg
, u32 len
, void *data
)
2463 struct dlm_ctxt
*dlm
= data
;
2464 struct dlm_lock_resource
*res
= NULL
;
2465 struct dlm_migrate_request
*migrate
= (struct dlm_migrate_request
*) msg
->buf
;
2466 struct dlm_master_list_entry
*mle
= NULL
, *oldmle
= NULL
;
2468 unsigned int namelen
, hash
;
2474 name
= migrate
->name
;
2475 namelen
= migrate
->namelen
;
2476 hash
= dlm_lockid_hash(name
, namelen
);
2478 /* preallocate.. if this fails, abort */
2479 mle
= (struct dlm_master_list_entry
*) kmem_cache_alloc(dlm_mle_cache
,
2487 /* check for pre-existing lock */
2488 spin_lock(&dlm
->spinlock
);
2489 res
= __dlm_lookup_lockres(dlm
, name
, namelen
, hash
);
2490 spin_lock(&dlm
->master_lock
);
2493 spin_lock(&res
->spinlock
);
2494 if (res
->state
& DLM_LOCK_RES_RECOVERING
) {
2495 /* if all is working ok, this can only mean that we got
2496 * a migrate request from a node that we now see as
2497 * dead. what can we do here? drop it to the floor? */
2498 spin_unlock(&res
->spinlock
);
2499 mlog(ML_ERROR
, "Got a migrate request, but the "
2500 "lockres is marked as recovering!");
2501 kmem_cache_free(dlm_mle_cache
, mle
);
2502 ret
= -EINVAL
; /* need a better solution */
2505 res
->state
|= DLM_LOCK_RES_MIGRATING
;
2506 spin_unlock(&res
->spinlock
);
2509 /* ignore status. only nonzero status would BUG. */
2510 ret
= dlm_add_migration_mle(dlm
, res
, mle
, &oldmle
,
2512 migrate
->new_master
,
2516 spin_unlock(&dlm
->master_lock
);
2517 spin_unlock(&dlm
->spinlock
);
2520 /* master is known, detach if not already detached */
2521 dlm_mle_detach_hb_events(dlm
, oldmle
);
2522 dlm_put_mle(oldmle
);
2526 dlm_lockres_put(res
);
2532 /* must be holding dlm->spinlock and dlm->master_lock
2533 * when adding a migration mle, we can clear any other mles
2534 * in the master list because we know with certainty that
2535 * the master is "master". so we remove any old mle from
2536 * the list after setting it's master field, and then add
2537 * the new migration mle. this way we can hold with the rule
2538 * of having only one mle for a given lock name at all times. */
2539 static int dlm_add_migration_mle(struct dlm_ctxt
*dlm
,
2540 struct dlm_lock_resource
*res
,
2541 struct dlm_master_list_entry
*mle
,
2542 struct dlm_master_list_entry
**oldmle
,
2543 const char *name
, unsigned int namelen
,
2544 u8 new_master
, u8 master
)
2553 assert_spin_locked(&dlm
->spinlock
);
2554 assert_spin_locked(&dlm
->master_lock
);
2556 /* caller is responsible for any ref taken here on oldmle */
2557 found
= dlm_find_mle(dlm
, oldmle
, (char *)name
, namelen
);
2559 struct dlm_master_list_entry
*tmp
= *oldmle
;
2560 spin_lock(&tmp
->spinlock
);
2561 if (tmp
->type
== DLM_MLE_MIGRATION
) {
2562 if (master
== dlm
->node_num
) {
2563 /* ah another process raced me to it */
2564 mlog(0, "tried to migrate %.*s, but some "
2565 "process beat me to it\n",
2569 /* bad. 2 NODES are trying to migrate! */
2570 mlog(ML_ERROR
, "migration error mle: "
2571 "master=%u new_master=%u // request: "
2572 "master=%u new_master=%u // "
2574 tmp
->master
, tmp
->new_master
,
2580 /* this is essentially what assert_master does */
2581 tmp
->master
= master
;
2582 atomic_set(&tmp
->woken
, 1);
2584 /* remove it from the list so that only one
2585 * mle will be found */
2586 list_del_init(&tmp
->list
);
2588 spin_unlock(&tmp
->spinlock
);
2591 /* now add a migration mle to the tail of the list */
2592 dlm_init_mle(mle
, DLM_MLE_MIGRATION
, dlm
, res
, name
, namelen
);
2593 mle
->new_master
= new_master
;
2594 mle
->master
= master
;
2595 /* do this for consistency with other mle types */
2596 set_bit(new_master
, mle
->maybe_map
);
2597 list_add(&mle
->list
, &dlm
->master_list
);
2603 void dlm_clean_master_list(struct dlm_ctxt
*dlm
, u8 dead_node
)
2605 struct list_head
*iter
, *iter2
;
2606 struct dlm_master_list_entry
*mle
;
2607 struct dlm_lock_resource
*res
;
2610 mlog_entry("dlm=%s, dead node=%u\n", dlm
->name
, dead_node
);
2612 assert_spin_locked(&dlm
->spinlock
);
2614 /* clean the master list */
2615 spin_lock(&dlm
->master_lock
);
2616 list_for_each_safe(iter
, iter2
, &dlm
->master_list
) {
2617 mle
= list_entry(iter
, struct dlm_master_list_entry
, list
);
2619 BUG_ON(mle
->type
!= DLM_MLE_BLOCK
&&
2620 mle
->type
!= DLM_MLE_MASTER
&&
2621 mle
->type
!= DLM_MLE_MIGRATION
);
2623 /* MASTER mles are initiated locally. the waiting
2624 * process will notice the node map change
2625 * shortly. let that happen as normal. */
2626 if (mle
->type
== DLM_MLE_MASTER
)
2630 /* BLOCK mles are initiated by other nodes.
2631 * need to clean up if the dead node would have
2632 * been the master. */
2633 if (mle
->type
== DLM_MLE_BLOCK
) {
2636 spin_lock(&mle
->spinlock
);
2637 bit
= find_next_bit(mle
->maybe_map
, O2NM_MAX_NODES
, 0);
2638 if (bit
!= dead_node
) {
2639 mlog(0, "mle found, but dead node %u would "
2640 "not have been master\n", dead_node
);
2641 spin_unlock(&mle
->spinlock
);
2643 /* must drop the refcount by one since the
2644 * assert_master will never arrive. this
2645 * may result in the mle being unlinked and
2646 * freed, but there may still be a process
2647 * waiting in the dlmlock path which is fine. */
2648 mlog(ML_ERROR
, "node %u was expected master\n",
2650 atomic_set(&mle
->woken
, 1);
2651 spin_unlock(&mle
->spinlock
);
2653 /* do not need events any longer, so detach
2655 __dlm_mle_detach_hb_events(dlm
, mle
);
2661 /* everything else is a MIGRATION mle */
2663 /* the rule for MIGRATION mles is that the master
2664 * becomes UNKNOWN if *either* the original or
2665 * the new master dies. all UNKNOWN lockreses
2666 * are sent to whichever node becomes the recovery
2667 * master. the new master is responsible for
2668 * determining if there is still a master for
2669 * this lockres, or if he needs to take over
2670 * mastery. either way, this node should expect
2671 * another message to resolve this. */
2672 if (mle
->master
!= dead_node
&&
2673 mle
->new_master
!= dead_node
)
2676 /* if we have reached this point, this mle needs to
2677 * be removed from the list and freed. */
2679 /* remove from the list early. NOTE: unlinking
2680 * list_head while in list_for_each_safe */
2681 spin_lock(&mle
->spinlock
);
2682 list_del_init(&mle
->list
);
2683 atomic_set(&mle
->woken
, 1);
2684 spin_unlock(&mle
->spinlock
);
2687 mlog(0, "node %u died during migration from "
2688 "%u to %u!\n", dead_node
,
2689 mle
->master
, mle
->new_master
);
2690 /* if there is a lockres associated with this
2691 * mle, find it and set its owner to UNKNOWN */
2692 hash
= dlm_lockid_hash(mle
->u
.name
.name
, mle
->u
.name
.len
);
2693 res
= __dlm_lookup_lockres(dlm
, mle
->u
.name
.name
,
2694 mle
->u
.name
.len
, hash
);
2696 /* unfortunately if we hit this rare case, our
2697 * lock ordering is messed. we need to drop
2698 * the master lock so that we can take the
2699 * lockres lock, meaning that we will have to
2700 * restart from the head of list. */
2701 spin_unlock(&dlm
->master_lock
);
2703 /* move lockres onto recovery list */
2704 spin_lock(&res
->spinlock
);
2705 dlm_set_lockres_owner(dlm
, res
,
2706 DLM_LOCK_RES_OWNER_UNKNOWN
);
2707 dlm_move_lockres_to_recovery_list(dlm
, res
);
2708 spin_unlock(&res
->spinlock
);
2709 dlm_lockres_put(res
);
2711 /* about to get rid of mle, detach from heartbeat */
2712 __dlm_mle_detach_hb_events(dlm
, mle
);
2715 spin_lock(&dlm
->master_lock
);
2717 spin_unlock(&dlm
->master_lock
);
2723 /* this may be the last reference */
2726 spin_unlock(&dlm
->master_lock
);
2730 int dlm_finish_migration(struct dlm_ctxt
*dlm
, struct dlm_lock_resource
*res
,
2733 struct dlm_node_iter iter
;
2736 spin_lock(&dlm
->spinlock
);
2737 dlm_node_iter_init(dlm
->domain_map
, &iter
);
2738 clear_bit(old_master
, iter
.node_map
);
2739 clear_bit(dlm
->node_num
, iter
.node_map
);
2740 spin_unlock(&dlm
->spinlock
);
2742 mlog(0, "now time to do a migrate request to other nodes\n");
2743 ret
= dlm_do_migrate_request(dlm
, res
, old_master
,
2744 dlm
->node_num
, &iter
);
2750 mlog(0, "doing assert master of %.*s to all except the original node\n",
2751 res
->lockname
.len
, res
->lockname
.name
);
2752 /* this call now finishes out the nodemap
2753 * even if one or more nodes die */
2754 ret
= dlm_do_assert_master(dlm
, res
->lockname
.name
,
2755 res
->lockname
.len
, iter
.node_map
,
2756 DLM_ASSERT_MASTER_FINISH_MIGRATION
);
2758 /* no longer need to retry. all living nodes contacted. */
2763 memset(iter
.node_map
, 0, sizeof(iter
.node_map
));
2764 set_bit(old_master
, iter
.node_map
);
2765 mlog(0, "doing assert master of %.*s back to %u\n",
2766 res
->lockname
.len
, res
->lockname
.name
, old_master
);
2767 ret
= dlm_do_assert_master(dlm
, res
->lockname
.name
,
2768 res
->lockname
.len
, iter
.node_map
,
2769 DLM_ASSERT_MASTER_FINISH_MIGRATION
);
2771 mlog(0, "assert master to original master failed "
2773 /* the only nonzero status here would be because of
2774 * a dead original node. we're done. */
2778 /* all done, set the owner, clear the flag */
2779 spin_lock(&res
->spinlock
);
2780 dlm_set_lockres_owner(dlm
, res
, dlm
->node_num
);
2781 res
->state
&= ~DLM_LOCK_RES_MIGRATING
;
2782 spin_unlock(&res
->spinlock
);
2783 /* re-dirty it on the new master */
2784 dlm_kick_thread(dlm
, res
);
2791 * LOCKRES AST REFCOUNT
2792 * this is integral to migration
2795 /* for future intent to call an ast, reserve one ahead of time.
2796 * this should be called only after waiting on the lockres
2797 * with dlm_wait_on_lockres, and while still holding the
2798 * spinlock after the call. */
2799 void __dlm_lockres_reserve_ast(struct dlm_lock_resource
*res
)
2801 assert_spin_locked(&res
->spinlock
);
2802 if (res
->state
& DLM_LOCK_RES_MIGRATING
) {
2803 __dlm_print_one_lock_resource(res
);
2805 BUG_ON(res
->state
& DLM_LOCK_RES_MIGRATING
);
2807 atomic_inc(&res
->asts_reserved
);
2811 * used to drop the reserved ast, either because it went unused,
2812 * or because the ast/bast was actually called.
2814 * also, if there is a pending migration on this lockres,
2815 * and this was the last pending ast on the lockres,
2816 * atomically set the MIGRATING flag before we drop the lock.
2817 * this is how we ensure that migration can proceed with no
2818 * asts in progress. note that it is ok if the state of the
2819 * queues is such that a lock should be granted in the future
2820 * or that a bast should be fired, because the new master will
2821 * shuffle the lists on this lockres as soon as it is migrated.
2823 void dlm_lockres_release_ast(struct dlm_ctxt
*dlm
,
2824 struct dlm_lock_resource
*res
)
2826 if (!atomic_dec_and_lock(&res
->asts_reserved
, &res
->spinlock
))
2829 if (!res
->migration_pending
) {
2830 spin_unlock(&res
->spinlock
);
2834 BUG_ON(res
->state
& DLM_LOCK_RES_MIGRATING
);
2835 res
->migration_pending
= 0;
2836 res
->state
|= DLM_LOCK_RES_MIGRATING
;
2837 spin_unlock(&res
->spinlock
);
2839 wake_up(&dlm
->migration_wq
);