dlm: detect available userspace daemon
[linux-2.6/kvm.git] / fs / dlm / lockspace.c
blobba672fe0a6019f005a44a7d7c403b0e3dad5c5cd
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
28 static int ls_count;
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32 static struct task_struct * scand_task;
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 ssize_t ret = len;
38 int n = simple_strtol(buf, NULL, 0);
40 ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 if (!ls)
42 return -EINVAL;
44 switch (n) {
45 case 0:
46 dlm_ls_stop(ls);
47 break;
48 case 1:
49 dlm_ls_start(ls);
50 break;
51 default:
52 ret = -EINVAL;
54 dlm_put_lockspace(ls);
55 return ret;
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 wake_up(&ls->ls_uevent_wait);
63 return len;
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 return len;
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
79 uint32_t status = dlm_recover_status(ls);
80 return snprintf(buf, PAGE_SIZE, "%x\n", status);
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
85 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
88 struct dlm_attr {
89 struct attribute attr;
90 ssize_t (*show)(struct dlm_ls *, char *);
91 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
94 static struct dlm_attr dlm_attr_control = {
95 .attr = {.name = "control", .mode = S_IWUSR},
96 .store = dlm_control_store
99 static struct dlm_attr dlm_attr_event = {
100 .attr = {.name = "event_done", .mode = S_IWUSR},
101 .store = dlm_event_store
104 static struct dlm_attr dlm_attr_id = {
105 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106 .show = dlm_id_show,
107 .store = dlm_id_store
110 static struct dlm_attr dlm_attr_recover_status = {
111 .attr = {.name = "recover_status", .mode = S_IRUGO},
112 .show = dlm_recover_status_show
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
117 .show = dlm_recover_nodeid_show
120 static struct attribute *dlm_attrs[] = {
121 &dlm_attr_control.attr,
122 &dlm_attr_event.attr,
123 &dlm_attr_id.attr,
124 &dlm_attr_recover_status.attr,
125 &dlm_attr_recover_nodeid.attr,
126 NULL,
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130 char *buf)
132 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
133 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134 return a->show ? a->show(ls, buf) : 0;
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138 const char *buf, size_t len)
140 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
141 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142 return a->store ? a->store(ls, buf, len) : len;
145 static void lockspace_kobj_release(struct kobject *k)
147 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
148 kfree(ls);
151 static struct sysfs_ops dlm_attr_ops = {
152 .show = dlm_attr_show,
153 .store = dlm_attr_store,
156 static struct kobj_type dlm_ktype = {
157 .default_attrs = dlm_attrs,
158 .sysfs_ops = &dlm_attr_ops,
159 .release = lockspace_kobj_release,
162 static struct kset *dlm_kset;
164 static int do_uevent(struct dlm_ls *ls, int in)
166 int error;
168 if (in)
169 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170 else
171 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
173 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
175 /* dlm_controld will see the uevent, do the necessary group management
176 and then write to sysfs to wake us */
178 error = wait_event_interruptible(ls->ls_uevent_wait,
179 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
181 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
183 if (error)
184 goto out;
186 error = ls->ls_uevent_result;
187 out:
188 if (error)
189 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190 error, ls->ls_uevent_result);
191 return error;
195 int __init dlm_lockspace_init(void)
197 ls_count = 0;
198 mutex_init(&ls_lock);
199 INIT_LIST_HEAD(&lslist);
200 spin_lock_init(&lslist_lock);
202 dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203 if (!dlm_kset) {
204 printk(KERN_WARNING "%s: can not create kset\n", __func__);
205 return -ENOMEM;
207 return 0;
210 void dlm_lockspace_exit(void)
212 kset_unregister(dlm_kset);
215 static int dlm_scand(void *data)
217 struct dlm_ls *ls;
219 while (!kthread_should_stop()) {
220 list_for_each_entry(ls, &lslist, ls_list) {
221 if (dlm_lock_recovery_try(ls)) {
222 dlm_scan_rsbs(ls);
223 dlm_scan_timeout(ls);
224 dlm_unlock_recovery(ls);
227 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
229 return 0;
232 static int dlm_scand_start(void)
234 struct task_struct *p;
235 int error = 0;
237 p = kthread_run(dlm_scand, NULL, "dlm_scand");
238 if (IS_ERR(p))
239 error = PTR_ERR(p);
240 else
241 scand_task = p;
242 return error;
245 static void dlm_scand_stop(void)
247 kthread_stop(scand_task);
250 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
252 struct dlm_ls *ls;
254 spin_lock(&lslist_lock);
256 list_for_each_entry(ls, &lslist, ls_list) {
257 if (ls->ls_global_id == id) {
258 ls->ls_count++;
259 goto out;
262 ls = NULL;
263 out:
264 spin_unlock(&lslist_lock);
265 return ls;
268 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
270 struct dlm_ls *ls;
272 spin_lock(&lslist_lock);
273 list_for_each_entry(ls, &lslist, ls_list) {
274 if (ls->ls_local_handle == lockspace) {
275 ls->ls_count++;
276 goto out;
279 ls = NULL;
280 out:
281 spin_unlock(&lslist_lock);
282 return ls;
285 struct dlm_ls *dlm_find_lockspace_device(int minor)
287 struct dlm_ls *ls;
289 spin_lock(&lslist_lock);
290 list_for_each_entry(ls, &lslist, ls_list) {
291 if (ls->ls_device.minor == minor) {
292 ls->ls_count++;
293 goto out;
296 ls = NULL;
297 out:
298 spin_unlock(&lslist_lock);
299 return ls;
302 void dlm_put_lockspace(struct dlm_ls *ls)
304 spin_lock(&lslist_lock);
305 ls->ls_count--;
306 spin_unlock(&lslist_lock);
309 static void remove_lockspace(struct dlm_ls *ls)
311 for (;;) {
312 spin_lock(&lslist_lock);
313 if (ls->ls_count == 0) {
314 WARN_ON(ls->ls_create_count != 0);
315 list_del(&ls->ls_list);
316 spin_unlock(&lslist_lock);
317 return;
319 spin_unlock(&lslist_lock);
320 ssleep(1);
324 static int threads_start(void)
326 int error;
328 /* Thread which process lock requests for all lockspace's */
329 error = dlm_astd_start();
330 if (error) {
331 log_print("cannot start dlm_astd thread %d", error);
332 goto fail;
335 error = dlm_scand_start();
336 if (error) {
337 log_print("cannot start dlm_scand thread %d", error);
338 goto astd_fail;
341 /* Thread for sending/receiving messages for all lockspace's */
342 error = dlm_lowcomms_start();
343 if (error) {
344 log_print("cannot start dlm lowcomms %d", error);
345 goto scand_fail;
348 return 0;
350 scand_fail:
351 dlm_scand_stop();
352 astd_fail:
353 dlm_astd_stop();
354 fail:
355 return error;
358 static void threads_stop(void)
360 dlm_scand_stop();
361 dlm_lowcomms_stop();
362 dlm_astd_stop();
365 static int new_lockspace(char *name, int namelen, void **lockspace,
366 uint32_t flags, int lvblen)
368 struct dlm_ls *ls;
369 int i, size, error;
370 int do_unreg = 0;
372 if (namelen > DLM_LOCKSPACE_LEN)
373 return -EINVAL;
375 if (!lvblen || (lvblen % 8))
376 return -EINVAL;
378 if (!try_module_get(THIS_MODULE))
379 return -EINVAL;
381 if (!dlm_user_daemon_available()) {
382 module_put(THIS_MODULE);
383 return -EUNATCH;
386 error = 0;
388 spin_lock(&lslist_lock);
389 list_for_each_entry(ls, &lslist, ls_list) {
390 WARN_ON(ls->ls_create_count <= 0);
391 if (ls->ls_namelen != namelen)
392 continue;
393 if (memcmp(ls->ls_name, name, namelen))
394 continue;
395 if (flags & DLM_LSFL_NEWEXCL) {
396 error = -EEXIST;
397 break;
399 ls->ls_create_count++;
400 module_put(THIS_MODULE);
401 error = 1; /* not an error, return 0 */
402 break;
404 spin_unlock(&lslist_lock);
406 if (error < 0)
407 goto out;
408 if (error)
409 goto ret_zero;
411 error = -ENOMEM;
413 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
414 if (!ls)
415 goto out;
416 memcpy(ls->ls_name, name, namelen);
417 ls->ls_namelen = namelen;
418 ls->ls_lvblen = lvblen;
419 ls->ls_count = 0;
420 ls->ls_flags = 0;
422 if (flags & DLM_LSFL_TIMEWARN)
423 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
425 if (flags & DLM_LSFL_FS)
426 ls->ls_allocation = GFP_NOFS;
427 else
428 ls->ls_allocation = GFP_KERNEL;
430 /* ls_exflags are forced to match among nodes, and we don't
431 need to require all nodes to have some flags set */
432 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
433 DLM_LSFL_NEWEXCL));
435 size = dlm_config.ci_rsbtbl_size;
436 ls->ls_rsbtbl_size = size;
438 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
439 if (!ls->ls_rsbtbl)
440 goto out_lsfree;
441 for (i = 0; i < size; i++) {
442 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
443 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
444 rwlock_init(&ls->ls_rsbtbl[i].lock);
447 size = dlm_config.ci_lkbtbl_size;
448 ls->ls_lkbtbl_size = size;
450 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
451 if (!ls->ls_lkbtbl)
452 goto out_rsbfree;
453 for (i = 0; i < size; i++) {
454 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
455 rwlock_init(&ls->ls_lkbtbl[i].lock);
456 ls->ls_lkbtbl[i].counter = 1;
459 size = dlm_config.ci_dirtbl_size;
460 ls->ls_dirtbl_size = size;
462 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
463 if (!ls->ls_dirtbl)
464 goto out_lkbfree;
465 for (i = 0; i < size; i++) {
466 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
467 rwlock_init(&ls->ls_dirtbl[i].lock);
470 INIT_LIST_HEAD(&ls->ls_waiters);
471 mutex_init(&ls->ls_waiters_mutex);
472 INIT_LIST_HEAD(&ls->ls_orphans);
473 mutex_init(&ls->ls_orphans_mutex);
474 INIT_LIST_HEAD(&ls->ls_timeout);
475 mutex_init(&ls->ls_timeout_mutex);
477 INIT_LIST_HEAD(&ls->ls_nodes);
478 INIT_LIST_HEAD(&ls->ls_nodes_gone);
479 ls->ls_num_nodes = 0;
480 ls->ls_low_nodeid = 0;
481 ls->ls_total_weight = 0;
482 ls->ls_node_array = NULL;
484 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
485 ls->ls_stub_rsb.res_ls = ls;
487 ls->ls_debug_rsb_dentry = NULL;
488 ls->ls_debug_waiters_dentry = NULL;
490 init_waitqueue_head(&ls->ls_uevent_wait);
491 ls->ls_uevent_result = 0;
492 init_completion(&ls->ls_members_done);
493 ls->ls_members_result = -1;
495 ls->ls_recoverd_task = NULL;
496 mutex_init(&ls->ls_recoverd_active);
497 spin_lock_init(&ls->ls_recover_lock);
498 spin_lock_init(&ls->ls_rcom_spin);
499 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
500 ls->ls_recover_status = 0;
501 ls->ls_recover_seq = 0;
502 ls->ls_recover_args = NULL;
503 init_rwsem(&ls->ls_in_recovery);
504 init_rwsem(&ls->ls_recv_active);
505 INIT_LIST_HEAD(&ls->ls_requestqueue);
506 mutex_init(&ls->ls_requestqueue_mutex);
507 mutex_init(&ls->ls_clear_proc_locks);
509 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
510 if (!ls->ls_recover_buf)
511 goto out_dirfree;
513 INIT_LIST_HEAD(&ls->ls_recover_list);
514 spin_lock_init(&ls->ls_recover_list_lock);
515 ls->ls_recover_list_count = 0;
516 ls->ls_local_handle = ls;
517 init_waitqueue_head(&ls->ls_wait_general);
518 INIT_LIST_HEAD(&ls->ls_root_list);
519 init_rwsem(&ls->ls_root_sem);
521 down_write(&ls->ls_in_recovery);
523 spin_lock(&lslist_lock);
524 ls->ls_create_count = 1;
525 list_add(&ls->ls_list, &lslist);
526 spin_unlock(&lslist_lock);
528 /* needs to find ls in lslist */
529 error = dlm_recoverd_start(ls);
530 if (error) {
531 log_error(ls, "can't start dlm_recoverd %d", error);
532 goto out_delist;
535 ls->ls_kobj.kset = dlm_kset;
536 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
537 "%s", ls->ls_name);
538 if (error)
539 goto out_stop;
540 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
542 /* let kobject handle freeing of ls if there's an error */
543 do_unreg = 1;
545 /* This uevent triggers dlm_controld in userspace to add us to the
546 group of nodes that are members of this lockspace (managed by the
547 cluster infrastructure.) Once it's done that, it tells us who the
548 current lockspace members are (via configfs) and then tells the
549 lockspace to start running (via sysfs) in dlm_ls_start(). */
551 error = do_uevent(ls, 1);
552 if (error)
553 goto out_stop;
555 wait_for_completion(&ls->ls_members_done);
556 error = ls->ls_members_result;
557 if (error)
558 goto out_members;
560 dlm_create_debug_file(ls);
562 log_debug(ls, "join complete");
563 ret_zero:
564 *lockspace = ls;
565 return 0;
567 out_members:
568 do_uevent(ls, 0);
569 dlm_clear_members(ls);
570 kfree(ls->ls_node_array);
571 out_stop:
572 dlm_recoverd_stop(ls);
573 out_delist:
574 spin_lock(&lslist_lock);
575 list_del(&ls->ls_list);
576 spin_unlock(&lslist_lock);
577 kfree(ls->ls_recover_buf);
578 out_dirfree:
579 kfree(ls->ls_dirtbl);
580 out_lkbfree:
581 kfree(ls->ls_lkbtbl);
582 out_rsbfree:
583 kfree(ls->ls_rsbtbl);
584 out_lsfree:
585 if (do_unreg)
586 kobject_put(&ls->ls_kobj);
587 else
588 kfree(ls);
589 out:
590 module_put(THIS_MODULE);
591 return error;
594 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
595 uint32_t flags, int lvblen)
597 int error = 0;
599 mutex_lock(&ls_lock);
600 if (!ls_count)
601 error = threads_start();
602 if (error)
603 goto out;
605 error = new_lockspace(name, namelen, lockspace, flags, lvblen);
606 if (!error)
607 ls_count++;
608 else if (!ls_count)
609 threads_stop();
610 out:
611 mutex_unlock(&ls_lock);
612 return error;
615 /* Return 1 if the lockspace still has active remote locks,
616 * 2 if the lockspace still has active local locks.
618 static int lockspace_busy(struct dlm_ls *ls)
620 int i, lkb_found = 0;
621 struct dlm_lkb *lkb;
623 /* NOTE: We check the lockidtbl here rather than the resource table.
624 This is because there may be LKBs queued as ASTs that have been
625 unlinked from their RSBs and are pending deletion once the AST has
626 been delivered */
628 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
629 read_lock(&ls->ls_lkbtbl[i].lock);
630 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
631 lkb_found = 1;
632 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
633 lkb_idtbl_list) {
634 if (!lkb->lkb_nodeid) {
635 read_unlock(&ls->ls_lkbtbl[i].lock);
636 return 2;
640 read_unlock(&ls->ls_lkbtbl[i].lock);
642 return lkb_found;
645 static int release_lockspace(struct dlm_ls *ls, int force)
647 struct dlm_lkb *lkb;
648 struct dlm_rsb *rsb;
649 struct list_head *head;
650 int i, busy, rv;
652 busy = lockspace_busy(ls);
654 spin_lock(&lslist_lock);
655 if (ls->ls_create_count == 1) {
656 if (busy > force)
657 rv = -EBUSY;
658 else {
659 /* remove_lockspace takes ls off lslist */
660 ls->ls_create_count = 0;
661 rv = 0;
663 } else if (ls->ls_create_count > 1) {
664 rv = --ls->ls_create_count;
665 } else {
666 rv = -EINVAL;
668 spin_unlock(&lslist_lock);
670 if (rv) {
671 log_debug(ls, "release_lockspace no remove %d", rv);
672 return rv;
675 dlm_device_deregister(ls);
677 if (force < 3 && dlm_user_daemon_available())
678 do_uevent(ls, 0);
680 dlm_recoverd_stop(ls);
682 remove_lockspace(ls);
684 dlm_delete_debug_file(ls);
686 dlm_astd_suspend();
688 kfree(ls->ls_recover_buf);
691 * Free direntry structs.
694 dlm_dir_clear(ls);
695 kfree(ls->ls_dirtbl);
698 * Free all lkb's on lkbtbl[] lists.
701 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
702 head = &ls->ls_lkbtbl[i].list;
703 while (!list_empty(head)) {
704 lkb = list_entry(head->next, struct dlm_lkb,
705 lkb_idtbl_list);
707 list_del(&lkb->lkb_idtbl_list);
709 dlm_del_ast(lkb);
711 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
712 dlm_free_lvb(lkb->lkb_lvbptr);
714 dlm_free_lkb(lkb);
717 dlm_astd_resume();
719 kfree(ls->ls_lkbtbl);
722 * Free all rsb's on rsbtbl[] lists
725 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
726 head = &ls->ls_rsbtbl[i].list;
727 while (!list_empty(head)) {
728 rsb = list_entry(head->next, struct dlm_rsb,
729 res_hashchain);
731 list_del(&rsb->res_hashchain);
732 dlm_free_rsb(rsb);
735 head = &ls->ls_rsbtbl[i].toss;
736 while (!list_empty(head)) {
737 rsb = list_entry(head->next, struct dlm_rsb,
738 res_hashchain);
739 list_del(&rsb->res_hashchain);
740 dlm_free_rsb(rsb);
744 kfree(ls->ls_rsbtbl);
747 * Free structures on any other lists
750 dlm_purge_requestqueue(ls);
751 kfree(ls->ls_recover_args);
752 dlm_clear_free_entries(ls);
753 dlm_clear_members(ls);
754 dlm_clear_members_gone(ls);
755 kfree(ls->ls_node_array);
756 log_debug(ls, "release_lockspace final free");
757 kobject_put(&ls->ls_kobj);
758 /* The ls structure will be freed when the kobject is done with */
760 module_put(THIS_MODULE);
761 return 0;
765 * Called when a system has released all its locks and is not going to use the
766 * lockspace any longer. We free everything we're managing for this lockspace.
767 * Remaining nodes will go through the recovery process as if we'd died. The
768 * lockspace must continue to function as usual, participating in recoveries,
769 * until this returns.
771 * Force has 4 possible values:
772 * 0 - don't destroy locksapce if it has any LKBs
773 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
774 * 2 - destroy lockspace regardless of LKBs
775 * 3 - destroy lockspace as part of a forced shutdown
778 int dlm_release_lockspace(void *lockspace, int force)
780 struct dlm_ls *ls;
781 int error;
783 ls = dlm_find_lockspace_local(lockspace);
784 if (!ls)
785 return -EINVAL;
786 dlm_put_lockspace(ls);
788 mutex_lock(&ls_lock);
789 error = release_lockspace(ls, force);
790 if (!error)
791 ls_count--;
792 else if (!ls_count)
793 threads_stop();
794 mutex_unlock(&ls_lock);
796 return error;
799 void dlm_stop_lockspaces(void)
801 struct dlm_ls *ls;
803 restart:
804 spin_lock(&lslist_lock);
805 list_for_each_entry(ls, &lslist, ls_list) {
806 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
807 continue;
808 spin_unlock(&lslist_lock);
809 log_error(ls, "no userland control daemon, stopping lockspace");
810 dlm_ls_stop(ls);
811 goto restart;
813 spin_unlock(&lslist_lock);