1 /* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
6 * Code which implements an OCFS2 specific interface to our DLM.
8 * Copyright (C) 2003, 2004 Oracle. All rights reserved.
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
30 #include <linux/smp_lock.h>
31 #include <linux/crc32.h>
32 #include <linux/kthread.h>
33 #include <linux/pagemap.h>
34 #include <linux/debugfs.h>
35 #include <linux/seq_file.h>
37 #include <cluster/heartbeat.h>
38 #include <cluster/nodemanager.h>
39 #include <cluster/tcp.h>
41 #include <dlm/dlmapi.h>
43 #define MLOG_MASK_PREFIX ML_DLM_GLUE
44 #include <cluster/masklog.h>
50 #include "extent_map.h"
51 #include "heartbeat.h"
59 #include "buffer_head_io.h"
61 struct ocfs2_mask_waiter
{
62 struct list_head mw_item
;
64 struct completion mw_complete
;
65 unsigned long mw_mask
;
66 unsigned long mw_goal
;
69 static void ocfs2_inode_ast_func(void *opaque
);
70 static void ocfs2_inode_bast_func(void *opaque
,
72 static void ocfs2_super_ast_func(void *opaque
);
73 static void ocfs2_super_bast_func(void *opaque
,
75 static void ocfs2_rename_ast_func(void *opaque
);
76 static void ocfs2_rename_bast_func(void *opaque
,
79 /* so far, all locks have gotten along with the same unlock ast */
80 static void ocfs2_unlock_ast_func(void *opaque
,
81 enum dlm_status status
);
82 static int ocfs2_do_unblock_meta(struct inode
*inode
,
84 static int ocfs2_unblock_meta(struct ocfs2_lock_res
*lockres
,
86 static int ocfs2_unblock_data(struct ocfs2_lock_res
*lockres
,
88 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res
*lockres
,
90 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res
*lockres
,
92 typedef void (ocfs2_convert_worker_t
)(struct ocfs2_lock_res
*, int);
93 static int ocfs2_generic_unblock_lock(struct ocfs2_super
*osb
,
94 struct ocfs2_lock_res
*lockres
,
96 ocfs2_convert_worker_t
*worker
);
98 struct ocfs2_lock_res_ops
{
100 void (*bast
)(void *, int);
101 void (*unlock_ast
)(void *, enum dlm_status
);
102 int (*unblock
)(struct ocfs2_lock_res
*, int *);
105 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops
= {
106 .ast
= ocfs2_inode_ast_func
,
107 .bast
= ocfs2_inode_bast_func
,
108 .unlock_ast
= ocfs2_unlock_ast_func
,
109 .unblock
= ocfs2_unblock_inode_lock
,
112 static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops
= {
113 .ast
= ocfs2_inode_ast_func
,
114 .bast
= ocfs2_inode_bast_func
,
115 .unlock_ast
= ocfs2_unlock_ast_func
,
116 .unblock
= ocfs2_unblock_meta
,
119 static void ocfs2_data_convert_worker(struct ocfs2_lock_res
*lockres
,
122 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops
= {
123 .ast
= ocfs2_inode_ast_func
,
124 .bast
= ocfs2_inode_bast_func
,
125 .unlock_ast
= ocfs2_unlock_ast_func
,
126 .unblock
= ocfs2_unblock_data
,
129 static struct ocfs2_lock_res_ops ocfs2_super_lops
= {
130 .ast
= ocfs2_super_ast_func
,
131 .bast
= ocfs2_super_bast_func
,
132 .unlock_ast
= ocfs2_unlock_ast_func
,
133 .unblock
= ocfs2_unblock_osb_lock
,
136 static struct ocfs2_lock_res_ops ocfs2_rename_lops
= {
137 .ast
= ocfs2_rename_ast_func
,
138 .bast
= ocfs2_rename_bast_func
,
139 .unlock_ast
= ocfs2_unlock_ast_func
,
140 .unblock
= ocfs2_unblock_osb_lock
,
143 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res
*lockres
)
145 return lockres
->l_type
== OCFS2_LOCK_TYPE_META
||
146 lockres
->l_type
== OCFS2_LOCK_TYPE_DATA
||
147 lockres
->l_type
== OCFS2_LOCK_TYPE_RW
;
150 static inline int ocfs2_is_super_lock(struct ocfs2_lock_res
*lockres
)
152 return lockres
->l_type
== OCFS2_LOCK_TYPE_SUPER
;
155 static inline int ocfs2_is_rename_lock(struct ocfs2_lock_res
*lockres
)
157 return lockres
->l_type
== OCFS2_LOCK_TYPE_RENAME
;
160 static inline struct ocfs2_super
*ocfs2_lock_res_super(struct ocfs2_lock_res
*lockres
)
162 BUG_ON(!ocfs2_is_super_lock(lockres
)
163 && !ocfs2_is_rename_lock(lockres
));
165 return (struct ocfs2_super
*) lockres
->l_priv
;
168 static inline struct inode
*ocfs2_lock_res_inode(struct ocfs2_lock_res
*lockres
)
170 BUG_ON(!ocfs2_is_inode_lock(lockres
));
172 return (struct inode
*) lockres
->l_priv
;
175 static int ocfs2_lock_create(struct ocfs2_super
*osb
,
176 struct ocfs2_lock_res
*lockres
,
179 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res
*lockres
,
181 static void ocfs2_cluster_unlock(struct ocfs2_super
*osb
,
182 struct ocfs2_lock_res
*lockres
,
184 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res
*lockres
);
185 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res
*lockres
);
186 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res
*lockres
);
187 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res
*lockres
, int level
);
188 static void ocfs2_schedule_blocked_lock(struct ocfs2_super
*osb
,
189 struct ocfs2_lock_res
*lockres
);
190 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res
*lockres
,
192 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \
193 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \
194 "resource %s: %s\n", dlm_errname(_stat), _func, \
195 _lockres->l_name, dlm_errmsg(_stat)); \
197 static void ocfs2_vote_on_unlock(struct ocfs2_super
*osb
,
198 struct ocfs2_lock_res
*lockres
);
199 static int ocfs2_meta_lock_update(struct inode
*inode
,
200 struct buffer_head
**bh
);
201 static void ocfs2_drop_osb_locks(struct ocfs2_super
*osb
);
202 static inline int ocfs2_highest_compat_lock_level(int level
);
203 static inline int ocfs2_can_downconvert_meta_lock(struct inode
*inode
,
204 struct ocfs2_lock_res
*lockres
,
207 static char *ocfs2_lock_type_strings
[] = {
208 [OCFS2_LOCK_TYPE_META
] = "Meta",
209 [OCFS2_LOCK_TYPE_DATA
] = "Data",
210 [OCFS2_LOCK_TYPE_SUPER
] = "Super",
211 [OCFS2_LOCK_TYPE_RENAME
] = "Rename",
212 /* Need to differntiate from [R]ename.. serializing writes is the
213 * important job it does, anyway. */
214 [OCFS2_LOCK_TYPE_RW
] = "Write/Read",
217 static char *ocfs2_lock_type_string(enum ocfs2_lock_type type
)
219 mlog_bug_on_msg(type
>= OCFS2_NUM_LOCK_TYPES
, "%d\n", type
);
220 return ocfs2_lock_type_strings
[type
];
223 static void ocfs2_build_lock_name(enum ocfs2_lock_type type
,
232 BUG_ON(type
>= OCFS2_NUM_LOCK_TYPES
);
234 len
= snprintf(name
, OCFS2_LOCK_ID_MAX_LEN
, "%c%s%016llx%08x",
235 ocfs2_lock_type_char(type
), OCFS2_LOCK_ID_PAD
,
236 (long long)blkno
, generation
);
238 BUG_ON(len
!= (OCFS2_LOCK_ID_MAX_LEN
- 1));
240 mlog(0, "built lock resource with name: %s\n", name
);
245 static spinlock_t ocfs2_dlm_tracking_lock
= SPIN_LOCK_UNLOCKED
;
247 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res
*res
,
248 struct ocfs2_dlm_debug
*dlm_debug
)
250 mlog(0, "Add tracking for lockres %s\n", res
->l_name
);
252 spin_lock(&ocfs2_dlm_tracking_lock
);
253 list_add(&res
->l_debug_list
, &dlm_debug
->d_lockres_tracking
);
254 spin_unlock(&ocfs2_dlm_tracking_lock
);
257 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res
*res
)
259 spin_lock(&ocfs2_dlm_tracking_lock
);
260 if (!list_empty(&res
->l_debug_list
))
261 list_del_init(&res
->l_debug_list
);
262 spin_unlock(&ocfs2_dlm_tracking_lock
);
265 static void ocfs2_lock_res_init_common(struct ocfs2_super
*osb
,
266 struct ocfs2_lock_res
*res
,
267 enum ocfs2_lock_type type
,
270 struct ocfs2_lock_res_ops
*ops
,
273 ocfs2_build_lock_name(type
, blkno
, generation
, res
->l_name
);
279 res
->l_level
= LKM_IVMODE
;
280 res
->l_requested
= LKM_IVMODE
;
281 res
->l_blocking
= LKM_IVMODE
;
282 res
->l_action
= OCFS2_AST_INVALID
;
283 res
->l_unlock_action
= OCFS2_UNLOCK_INVALID
;
285 res
->l_flags
= OCFS2_LOCK_INITIALIZED
;
287 ocfs2_add_lockres_tracking(res
, osb
->osb_dlm_debug
);
290 void ocfs2_lock_res_init_once(struct ocfs2_lock_res
*res
)
292 /* This also clears out the lock status block */
293 memset(res
, 0, sizeof(struct ocfs2_lock_res
));
294 spin_lock_init(&res
->l_lock
);
295 init_waitqueue_head(&res
->l_event
);
296 INIT_LIST_HEAD(&res
->l_blocked_list
);
297 INIT_LIST_HEAD(&res
->l_mask_waiters
);
300 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res
*res
,
301 enum ocfs2_lock_type type
,
304 struct ocfs2_lock_res_ops
*ops
;
307 case OCFS2_LOCK_TYPE_RW
:
308 ops
= &ocfs2_inode_rw_lops
;
310 case OCFS2_LOCK_TYPE_META
:
311 ops
= &ocfs2_inode_meta_lops
;
313 case OCFS2_LOCK_TYPE_DATA
:
314 ops
= &ocfs2_inode_data_lops
;
317 mlog_bug_on_msg(1, "type: %d\n", type
);
318 ops
= NULL
; /* thanks, gcc */
322 ocfs2_lock_res_init_common(OCFS2_SB(inode
->i_sb
), res
, type
,
323 OCFS2_I(inode
)->ip_blkno
,
324 inode
->i_generation
, ops
, inode
);
327 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res
*res
,
328 struct ocfs2_super
*osb
)
330 /* Superblock lockres doesn't come from a slab so we call init
331 * once on it manually. */
332 ocfs2_lock_res_init_once(res
);
333 ocfs2_lock_res_init_common(osb
, res
, OCFS2_LOCK_TYPE_SUPER
,
334 OCFS2_SUPER_BLOCK_BLKNO
, 0,
335 &ocfs2_super_lops
, osb
);
338 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res
*res
,
339 struct ocfs2_super
*osb
)
341 /* Rename lockres doesn't come from a slab so we call init
342 * once on it manually. */
343 ocfs2_lock_res_init_once(res
);
344 ocfs2_lock_res_init_common(osb
, res
, OCFS2_LOCK_TYPE_RENAME
, 0, 0,
345 &ocfs2_rename_lops
, osb
);
348 void ocfs2_lock_res_free(struct ocfs2_lock_res
*res
)
352 if (!(res
->l_flags
& OCFS2_LOCK_INITIALIZED
))
355 ocfs2_remove_lockres_tracking(res
);
357 mlog_bug_on_msg(!list_empty(&res
->l_blocked_list
),
358 "Lockres %s is on the blocked list\n",
360 mlog_bug_on_msg(!list_empty(&res
->l_mask_waiters
),
361 "Lockres %s has mask waiters pending\n",
363 mlog_bug_on_msg(spin_is_locked(&res
->l_lock
),
364 "Lockres %s is locked\n",
366 mlog_bug_on_msg(res
->l_ro_holders
,
367 "Lockres %s has %u ro holders\n",
368 res
->l_name
, res
->l_ro_holders
);
369 mlog_bug_on_msg(res
->l_ex_holders
,
370 "Lockres %s has %u ex holders\n",
371 res
->l_name
, res
->l_ex_holders
);
373 /* Need to clear out the lock status block for the dlm */
374 memset(&res
->l_lksb
, 0, sizeof(res
->l_lksb
));
380 static inline void ocfs2_inc_holders(struct ocfs2_lock_res
*lockres
,
389 lockres
->l_ex_holders
++;
392 lockres
->l_ro_holders
++;
401 static inline void ocfs2_dec_holders(struct ocfs2_lock_res
*lockres
,
410 BUG_ON(!lockres
->l_ex_holders
);
411 lockres
->l_ex_holders
--;
414 BUG_ON(!lockres
->l_ro_holders
);
415 lockres
->l_ro_holders
--;
423 /* WARNING: This function lives in a world where the only three lock
424 * levels are EX, PR, and NL. It *will* have to be adjusted when more
425 * lock types are added. */
426 static inline int ocfs2_highest_compat_lock_level(int level
)
428 int new_level
= LKM_EXMODE
;
430 if (level
== LKM_EXMODE
)
431 new_level
= LKM_NLMODE
;
432 else if (level
== LKM_PRMODE
)
433 new_level
= LKM_PRMODE
;
437 static void lockres_set_flags(struct ocfs2_lock_res
*lockres
,
438 unsigned long newflags
)
440 struct list_head
*pos
, *tmp
;
441 struct ocfs2_mask_waiter
*mw
;
443 assert_spin_locked(&lockres
->l_lock
);
445 lockres
->l_flags
= newflags
;
447 list_for_each_safe(pos
, tmp
, &lockres
->l_mask_waiters
) {
448 mw
= list_entry(pos
, struct ocfs2_mask_waiter
, mw_item
);
449 if ((lockres
->l_flags
& mw
->mw_mask
) != mw
->mw_goal
)
452 list_del_init(&mw
->mw_item
);
454 complete(&mw
->mw_complete
);
457 static void lockres_or_flags(struct ocfs2_lock_res
*lockres
, unsigned long or)
459 lockres_set_flags(lockres
, lockres
->l_flags
| or);
461 static void lockres_clear_flags(struct ocfs2_lock_res
*lockres
,
464 lockres_set_flags(lockres
, lockres
->l_flags
& ~clear
);
467 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res
*lockres
)
471 BUG_ON(!(lockres
->l_flags
& OCFS2_LOCK_BUSY
));
472 BUG_ON(!(lockres
->l_flags
& OCFS2_LOCK_ATTACHED
));
473 BUG_ON(!(lockres
->l_flags
& OCFS2_LOCK_BLOCKED
));
474 BUG_ON(lockres
->l_blocking
<= LKM_NLMODE
);
476 lockres
->l_level
= lockres
->l_requested
;
477 if (lockres
->l_level
<=
478 ocfs2_highest_compat_lock_level(lockres
->l_blocking
)) {
479 lockres
->l_blocking
= LKM_NLMODE
;
480 lockres_clear_flags(lockres
, OCFS2_LOCK_BLOCKED
);
482 lockres_clear_flags(lockres
, OCFS2_LOCK_BUSY
);
487 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res
*lockres
)
491 BUG_ON(!(lockres
->l_flags
& OCFS2_LOCK_BUSY
));
492 BUG_ON(!(lockres
->l_flags
& OCFS2_LOCK_ATTACHED
));
494 /* Convert from RO to EX doesn't really need anything as our
495 * information is already up to data. Convert from NL to
496 * *anything* however should mark ourselves as needing an
498 if (lockres
->l_level
== LKM_NLMODE
)
499 lockres_or_flags(lockres
, OCFS2_LOCK_NEEDS_REFRESH
);
501 lockres
->l_level
= lockres
->l_requested
;
502 lockres_clear_flags(lockres
, OCFS2_LOCK_BUSY
);
507 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res
*lockres
)
511 BUG_ON((!lockres
->l_flags
& OCFS2_LOCK_BUSY
));
512 BUG_ON(lockres
->l_flags
& OCFS2_LOCK_ATTACHED
);
514 if (lockres
->l_requested
> LKM_NLMODE
&&
515 !(lockres
->l_flags
& OCFS2_LOCK_LOCAL
))
516 lockres_or_flags(lockres
, OCFS2_LOCK_NEEDS_REFRESH
);
518 lockres
->l_level
= lockres
->l_requested
;
519 lockres_or_flags(lockres
, OCFS2_LOCK_ATTACHED
);
520 lockres_clear_flags(lockres
, OCFS2_LOCK_BUSY
);
525 static void ocfs2_inode_ast_func(void *opaque
)
527 struct ocfs2_lock_res
*lockres
= opaque
;
529 struct dlm_lockstatus
*lksb
;
534 inode
= ocfs2_lock_res_inode(lockres
);
536 mlog(0, "AST fired for inode %llu, l_action = %u, type = %s\n",
537 (unsigned long long)OCFS2_I(inode
)->ip_blkno
, lockres
->l_action
,
538 ocfs2_lock_type_string(lockres
->l_type
));
540 BUG_ON(!ocfs2_is_inode_lock(lockres
));
542 spin_lock_irqsave(&lockres
->l_lock
, flags
);
544 lksb
= &(lockres
->l_lksb
);
545 if (lksb
->status
!= DLM_NORMAL
) {
546 mlog(ML_ERROR
, "ocfs2_inode_ast_func: lksb status value of %u "
547 "on inode %llu\n", lksb
->status
,
548 (unsigned long long)OCFS2_I(inode
)->ip_blkno
);
549 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
554 switch(lockres
->l_action
) {
555 case OCFS2_AST_ATTACH
:
556 ocfs2_generic_handle_attach_action(lockres
);
557 lockres_clear_flags(lockres
, OCFS2_LOCK_LOCAL
);
559 case OCFS2_AST_CONVERT
:
560 ocfs2_generic_handle_convert_action(lockres
);
562 case OCFS2_AST_DOWNCONVERT
:
563 ocfs2_generic_handle_downconvert_action(lockres
);
566 mlog(ML_ERROR
, "lockres %s: ast fired with invalid action: %u "
567 "lockres flags = 0x%lx, unlock action: %u\n",
568 lockres
->l_name
, lockres
->l_action
, lockres
->l_flags
,
569 lockres
->l_unlock_action
);
574 /* data and rw locking ignores refresh flag for now. */
575 if (lockres
->l_type
!= OCFS2_LOCK_TYPE_META
)
576 lockres_clear_flags(lockres
, OCFS2_LOCK_NEEDS_REFRESH
);
578 /* set it to something invalid so if we get called again we
580 lockres
->l_action
= OCFS2_AST_INVALID
;
581 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
582 wake_up(&lockres
->l_event
);
587 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res
*lockres
,
590 int needs_downconvert
= 0;
593 assert_spin_locked(&lockres
->l_lock
);
595 lockres_or_flags(lockres
, OCFS2_LOCK_BLOCKED
);
597 if (level
> lockres
->l_blocking
) {
598 /* only schedule a downconvert if we haven't already scheduled
599 * one that goes low enough to satisfy the level we're
600 * blocking. this also catches the case where we get
602 if (ocfs2_highest_compat_lock_level(level
) <
603 ocfs2_highest_compat_lock_level(lockres
->l_blocking
))
604 needs_downconvert
= 1;
606 lockres
->l_blocking
= level
;
609 mlog_exit(needs_downconvert
);
610 return needs_downconvert
;
613 static void ocfs2_generic_bast_func(struct ocfs2_super
*osb
,
614 struct ocfs2_lock_res
*lockres
,
617 int needs_downconvert
;
622 BUG_ON(level
<= LKM_NLMODE
);
624 spin_lock_irqsave(&lockres
->l_lock
, flags
);
625 needs_downconvert
= ocfs2_generic_handle_bast(lockres
, level
);
626 if (needs_downconvert
)
627 ocfs2_schedule_blocked_lock(osb
, lockres
);
628 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
630 ocfs2_kick_vote_thread(osb
);
632 wake_up(&lockres
->l_event
);
636 static void ocfs2_inode_bast_func(void *opaque
, int level
)
638 struct ocfs2_lock_res
*lockres
= opaque
;
640 struct ocfs2_super
*osb
;
644 BUG_ON(!ocfs2_is_inode_lock(lockres
));
646 inode
= ocfs2_lock_res_inode(lockres
);
647 osb
= OCFS2_SB(inode
->i_sb
);
649 mlog(0, "BAST fired for inode %llu, blocking %d, level %d type %s\n",
650 (unsigned long long)OCFS2_I(inode
)->ip_blkno
, level
,
651 lockres
->l_level
, ocfs2_lock_type_string(lockres
->l_type
));
653 ocfs2_generic_bast_func(osb
, lockres
, level
);
658 static void ocfs2_generic_ast_func(struct ocfs2_lock_res
*lockres
,
661 struct dlm_lockstatus
*lksb
= &lockres
->l_lksb
;
664 spin_lock_irqsave(&lockres
->l_lock
, flags
);
666 if (lksb
->status
!= DLM_NORMAL
) {
667 mlog(ML_ERROR
, "lockres %s: lksb status value of %u!\n",
668 lockres
->l_name
, lksb
->status
);
669 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
673 switch(lockres
->l_action
) {
674 case OCFS2_AST_ATTACH
:
675 ocfs2_generic_handle_attach_action(lockres
);
677 case OCFS2_AST_CONVERT
:
678 ocfs2_generic_handle_convert_action(lockres
);
680 case OCFS2_AST_DOWNCONVERT
:
681 ocfs2_generic_handle_downconvert_action(lockres
);
688 lockres_clear_flags(lockres
, OCFS2_LOCK_NEEDS_REFRESH
);
690 /* set it to something invalid so if we get called again we
692 lockres
->l_action
= OCFS2_AST_INVALID
;
693 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
695 wake_up(&lockres
->l_event
);
698 static void ocfs2_super_ast_func(void *opaque
)
700 struct ocfs2_lock_res
*lockres
= opaque
;
703 mlog(0, "Superblock AST fired\n");
705 BUG_ON(!ocfs2_is_super_lock(lockres
));
706 ocfs2_generic_ast_func(lockres
, 0);
711 static void ocfs2_super_bast_func(void *opaque
,
714 struct ocfs2_lock_res
*lockres
= opaque
;
715 struct ocfs2_super
*osb
;
718 mlog(0, "Superblock BAST fired\n");
720 BUG_ON(!ocfs2_is_super_lock(lockres
));
721 osb
= ocfs2_lock_res_super(lockres
);
722 ocfs2_generic_bast_func(osb
, lockres
, level
);
727 static void ocfs2_rename_ast_func(void *opaque
)
729 struct ocfs2_lock_res
*lockres
= opaque
;
733 mlog(0, "Rename AST fired\n");
735 BUG_ON(!ocfs2_is_rename_lock(lockres
));
737 ocfs2_generic_ast_func(lockres
, 1);
742 static void ocfs2_rename_bast_func(void *opaque
,
745 struct ocfs2_lock_res
*lockres
= opaque
;
746 struct ocfs2_super
*osb
;
750 mlog(0, "Rename BAST fired\n");
752 BUG_ON(!ocfs2_is_rename_lock(lockres
));
754 osb
= ocfs2_lock_res_super(lockres
);
755 ocfs2_generic_bast_func(osb
, lockres
, level
);
760 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res
*lockres
,
766 spin_lock_irqsave(&lockres
->l_lock
, flags
);
767 lockres_clear_flags(lockres
, OCFS2_LOCK_BUSY
);
769 lockres
->l_action
= OCFS2_AST_INVALID
;
771 lockres
->l_unlock_action
= OCFS2_UNLOCK_INVALID
;
772 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
774 wake_up(&lockres
->l_event
);
778 /* Note: If we detect another process working on the lock (i.e.,
779 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
780 * to do the right thing in that case.
782 static int ocfs2_lock_create(struct ocfs2_super
*osb
,
783 struct ocfs2_lock_res
*lockres
,
788 enum dlm_status status
;
793 mlog(0, "lock %s, level = %d, flags = %d\n", lockres
->l_name
, level
,
796 spin_lock_irqsave(&lockres
->l_lock
, flags
);
797 if ((lockres
->l_flags
& OCFS2_LOCK_ATTACHED
) ||
798 (lockres
->l_flags
& OCFS2_LOCK_BUSY
)) {
799 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
803 lockres
->l_action
= OCFS2_AST_ATTACH
;
804 lockres
->l_requested
= level
;
805 lockres_or_flags(lockres
, OCFS2_LOCK_BUSY
);
806 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
808 status
= dlmlock(osb
->dlm
,
815 lockres
->l_ops
->bast
);
816 if (status
!= DLM_NORMAL
) {
817 ocfs2_log_dlm_error("dlmlock", status
, lockres
);
819 ocfs2_recover_from_dlm_error(lockres
, 1);
822 mlog(0, "lock %s, successfull return from dlmlock\n", lockres
->l_name
);
829 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res
*lockres
,
835 spin_lock_irqsave(&lockres
->l_lock
, flags
);
836 ret
= lockres
->l_flags
& flag
;
837 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
842 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res
*lockres
)
845 wait_event(lockres
->l_event
,
846 !ocfs2_check_wait_flag(lockres
, OCFS2_LOCK_BUSY
));
849 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res
*lockres
)
852 wait_event(lockres
->l_event
,
853 !ocfs2_check_wait_flag(lockres
, OCFS2_LOCK_REFRESHING
));
856 /* predict what lock level we'll be dropping down to on behalf
857 * of another node, and return true if the currently wanted
858 * level will be compatible with it. */
859 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res
*lockres
,
862 BUG_ON(!(lockres
->l_flags
& OCFS2_LOCK_BLOCKED
));
864 return wanted
<= ocfs2_highest_compat_lock_level(lockres
->l_blocking
);
867 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter
*mw
)
869 INIT_LIST_HEAD(&mw
->mw_item
);
870 init_completion(&mw
->mw_complete
);
873 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter
*mw
)
875 wait_for_completion(&mw
->mw_complete
);
876 /* Re-arm the completion in case we want to wait on it again */
877 INIT_COMPLETION(mw
->mw_complete
);
878 return mw
->mw_status
;
881 static void lockres_add_mask_waiter(struct ocfs2_lock_res
*lockres
,
882 struct ocfs2_mask_waiter
*mw
,
886 BUG_ON(!list_empty(&mw
->mw_item
));
888 assert_spin_locked(&lockres
->l_lock
);
890 list_add_tail(&mw
->mw_item
, &lockres
->l_mask_waiters
);
895 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
896 * if the mask still hadn't reached its goal */
897 static int lockres_remove_mask_waiter(struct ocfs2_lock_res
*lockres
,
898 struct ocfs2_mask_waiter
*mw
)
903 spin_lock_irqsave(&lockres
->l_lock
, flags
);
904 if (!list_empty(&mw
->mw_item
)) {
905 if ((lockres
->l_flags
& mw
->mw_mask
) != mw
->mw_goal
)
908 list_del_init(&mw
->mw_item
);
909 init_completion(&mw
->mw_complete
);
911 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
917 static int ocfs2_cluster_lock(struct ocfs2_super
*osb
,
918 struct ocfs2_lock_res
*lockres
,
923 struct ocfs2_mask_waiter mw
;
924 enum dlm_status status
;
925 int wait
, catch_signals
= !(osb
->s_mount_opt
& OCFS2_MOUNT_NOINTR
);
926 int ret
= 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
931 ocfs2_init_mask_waiter(&mw
);
936 if (catch_signals
&& signal_pending(current
)) {
941 spin_lock_irqsave(&lockres
->l_lock
, flags
);
943 mlog_bug_on_msg(lockres
->l_flags
& OCFS2_LOCK_FREEING
,
944 "Cluster lock called on freeing lockres %s! flags "
945 "0x%lx\n", lockres
->l_name
, lockres
->l_flags
);
947 /* We only compare against the currently granted level
948 * here. If the lock is blocked waiting on a downconvert,
949 * we'll get caught below. */
950 if (lockres
->l_flags
& OCFS2_LOCK_BUSY
&&
951 level
> lockres
->l_level
) {
952 /* is someone sitting in dlm_lock? If so, wait on
954 lockres_add_mask_waiter(lockres
, &mw
, OCFS2_LOCK_BUSY
, 0);
959 if (!(lockres
->l_flags
& OCFS2_LOCK_ATTACHED
)) {
960 /* lock has not been created yet. */
961 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
963 ret
= ocfs2_lock_create(osb
, lockres
, LKM_NLMODE
, 0);
971 if (lockres
->l_flags
& OCFS2_LOCK_BLOCKED
&&
972 !ocfs2_may_continue_on_blocked_lock(lockres
, level
)) {
973 /* is the lock is currently blocked on behalf of
975 lockres_add_mask_waiter(lockres
, &mw
, OCFS2_LOCK_BLOCKED
, 0);
980 if (level
> lockres
->l_level
) {
981 if (lockres
->l_action
!= OCFS2_AST_INVALID
)
982 mlog(ML_ERROR
, "lockres %s has action %u pending\n",
983 lockres
->l_name
, lockres
->l_action
);
985 lockres
->l_action
= OCFS2_AST_CONVERT
;
986 lockres
->l_requested
= level
;
987 lockres_or_flags(lockres
, OCFS2_LOCK_BUSY
);
988 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
990 BUG_ON(level
== LKM_IVMODE
);
991 BUG_ON(level
== LKM_NLMODE
);
993 mlog(0, "lock %s, convert from %d to level = %d\n",
994 lockres
->l_name
, lockres
->l_level
, level
);
996 /* call dlm_lock to upgrade lock now */
997 status
= dlmlock(osb
->dlm
,
1000 lkm_flags
|LKM_CONVERT
|LKM_VALBLK
,
1002 lockres
->l_ops
->ast
,
1004 lockres
->l_ops
->bast
);
1005 if (status
!= DLM_NORMAL
) {
1006 if ((lkm_flags
& LKM_NOQUEUE
) &&
1007 (status
== DLM_NOTQUEUED
))
1010 ocfs2_log_dlm_error("dlmlock", status
,
1014 ocfs2_recover_from_dlm_error(lockres
, 1);
1018 mlog(0, "lock %s, successfull return from dlmlock\n",
1021 /* At this point we've gone inside the dlm and need to
1022 * complete our work regardless. */
1025 /* wait for busy to clear and carry on */
1029 /* Ok, if we get here then we're good to go. */
1030 ocfs2_inc_holders(lockres
, level
);
1034 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
1037 * This is helping work around a lock inversion between the page lock
1038 * and dlm locks. One path holds the page lock while calling aops
1039 * which block acquiring dlm locks. The voting thread holds dlm
1040 * locks while acquiring page locks while down converting data locks.
1041 * This block is helping an aop path notice the inversion and back
1042 * off to unlock its page lock before trying the dlm lock again.
1044 if (wait
&& arg_flags
& OCFS2_LOCK_NONBLOCK
&&
1045 mw
.mw_mask
& (OCFS2_LOCK_BUSY
|OCFS2_LOCK_BLOCKED
)) {
1047 if (lockres_remove_mask_waiter(lockres
, &mw
))
1053 ret
= ocfs2_wait_for_mask(&mw
);
1063 static void ocfs2_cluster_unlock(struct ocfs2_super
*osb
,
1064 struct ocfs2_lock_res
*lockres
,
1067 unsigned long flags
;
1070 spin_lock_irqsave(&lockres
->l_lock
, flags
);
1071 ocfs2_dec_holders(lockres
, level
);
1072 ocfs2_vote_on_unlock(osb
, lockres
);
1073 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
1077 static int ocfs2_create_new_inode_lock(struct inode
*inode
,
1078 struct ocfs2_lock_res
*lockres
)
1080 struct ocfs2_super
*osb
= OCFS2_SB(inode
->i_sb
);
1081 unsigned long flags
;
1083 spin_lock_irqsave(&lockres
->l_lock
, flags
);
1084 BUG_ON(lockres
->l_flags
& OCFS2_LOCK_ATTACHED
);
1085 lockres_or_flags(lockres
, OCFS2_LOCK_LOCAL
);
1086 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
1088 return ocfs2_lock_create(osb
, lockres
, LKM_EXMODE
, LKM_LOCAL
);
1091 /* Grants us an EX lock on the data and metadata resources, skipping
1092 * the normal cluster directory lookup. Use this ONLY on newly created
1093 * inodes which other nodes can't possibly see, and which haven't been
1094 * hashed in the inode hash yet. This can give us a good performance
1095 * increase as it'll skip the network broadcast normally associated
1096 * with creating a new lock resource. */
1097 int ocfs2_create_new_inode_locks(struct inode
*inode
)
1102 BUG_ON(!ocfs2_inode_is_new(inode
));
1106 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode
)->ip_blkno
);
1108 /* NOTE: That we don't increment any of the holder counts, nor
1109 * do we add anything to a journal handle. Since this is
1110 * supposed to be a new inode which the cluster doesn't know
1111 * about yet, there is no need to. As far as the LVB handling
1112 * is concerned, this is basically like acquiring an EX lock
1113 * on a resource which has an invalid one -- we'll set it
1114 * valid when we release the EX. */
1116 ret
= ocfs2_create_new_inode_lock(inode
,
1117 &OCFS2_I(inode
)->ip_rw_lockres
);
1123 ret
= ocfs2_create_new_inode_lock(inode
,
1124 &OCFS2_I(inode
)->ip_meta_lockres
);
1130 ret
= ocfs2_create_new_inode_lock(inode
,
1131 &OCFS2_I(inode
)->ip_data_lockres
);
1142 int ocfs2_rw_lock(struct inode
*inode
, int write
)
1145 struct ocfs2_lock_res
*lockres
;
1151 mlog(0, "inode %llu take %s RW lock\n",
1152 (unsigned long long)OCFS2_I(inode
)->ip_blkno
,
1153 write
? "EXMODE" : "PRMODE");
1155 lockres
= &OCFS2_I(inode
)->ip_rw_lockres
;
1157 level
= write
? LKM_EXMODE
: LKM_PRMODE
;
1159 status
= ocfs2_cluster_lock(OCFS2_SB(inode
->i_sb
), lockres
, level
, 0,
1168 void ocfs2_rw_unlock(struct inode
*inode
, int write
)
1170 int level
= write
? LKM_EXMODE
: LKM_PRMODE
;
1171 struct ocfs2_lock_res
*lockres
= &OCFS2_I(inode
)->ip_rw_lockres
;
1175 mlog(0, "inode %llu drop %s RW lock\n",
1176 (unsigned long long)OCFS2_I(inode
)->ip_blkno
,
1177 write
? "EXMODE" : "PRMODE");
1179 ocfs2_cluster_unlock(OCFS2_SB(inode
->i_sb
), lockres
, level
);
1184 int ocfs2_data_lock_full(struct inode
*inode
,
1188 int status
= 0, level
;
1189 struct ocfs2_lock_res
*lockres
;
1195 mlog(0, "inode %llu take %s DATA lock\n",
1196 (unsigned long long)OCFS2_I(inode
)->ip_blkno
,
1197 write
? "EXMODE" : "PRMODE");
1199 /* We'll allow faking a readonly data lock for
1201 if (ocfs2_is_hard_readonly(OCFS2_SB(inode
->i_sb
))) {
1209 lockres
= &OCFS2_I(inode
)->ip_data_lockres
;
1211 level
= write
? LKM_EXMODE
: LKM_PRMODE
;
1213 status
= ocfs2_cluster_lock(OCFS2_SB(inode
->i_sb
), lockres
, level
,
1215 if (status
< 0 && status
!= -EAGAIN
)
1223 /* see ocfs2_meta_lock_with_page() */
1224 int ocfs2_data_lock_with_page(struct inode
*inode
,
1230 ret
= ocfs2_data_lock_full(inode
, write
, OCFS2_LOCK_NONBLOCK
);
1231 if (ret
== -EAGAIN
) {
1233 if (ocfs2_data_lock(inode
, write
) == 0)
1234 ocfs2_data_unlock(inode
, write
);
1235 ret
= AOP_TRUNCATED_PAGE
;
1241 static void ocfs2_vote_on_unlock(struct ocfs2_super
*osb
,
1242 struct ocfs2_lock_res
*lockres
)
1248 /* If we know that another node is waiting on our lock, kick
1249 * the vote thread * pre-emptively when we reach a release
1251 if (lockres
->l_flags
& OCFS2_LOCK_BLOCKED
) {
1252 switch(lockres
->l_blocking
) {
1254 if (!lockres
->l_ex_holders
&& !lockres
->l_ro_holders
)
1258 if (!lockres
->l_ex_holders
)
1267 ocfs2_kick_vote_thread(osb
);
1272 void ocfs2_data_unlock(struct inode
*inode
,
1275 int level
= write
? LKM_EXMODE
: LKM_PRMODE
;
1276 struct ocfs2_lock_res
*lockres
= &OCFS2_I(inode
)->ip_data_lockres
;
1280 mlog(0, "inode %llu drop %s DATA lock\n",
1281 (unsigned long long)OCFS2_I(inode
)->ip_blkno
,
1282 write
? "EXMODE" : "PRMODE");
1284 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode
->i_sb
)))
1285 ocfs2_cluster_unlock(OCFS2_SB(inode
->i_sb
), lockres
, level
);
1290 #define OCFS2_SEC_BITS 34
1291 #define OCFS2_SEC_SHIFT (64 - 34)
1292 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1)
1294 /* LVB only has room for 64 bits of time here so we pack it for
1296 static u64
ocfs2_pack_timespec(struct timespec
*spec
)
1299 u64 sec
= spec
->tv_sec
;
1300 u32 nsec
= spec
->tv_nsec
;
1302 res
= (sec
<< OCFS2_SEC_SHIFT
) | (nsec
& OCFS2_NSEC_MASK
);
1307 /* Call this with the lockres locked. I am reasonably sure we don't
1308 * need ip_lock in this function as anyone who would be changing those
1309 * values is supposed to be blocked in ocfs2_meta_lock right now. */
1310 static void __ocfs2_stuff_meta_lvb(struct inode
*inode
)
1312 struct ocfs2_inode_info
*oi
= OCFS2_I(inode
);
1313 struct ocfs2_lock_res
*lockres
= &oi
->ip_meta_lockres
;
1314 struct ocfs2_meta_lvb
*lvb
;
1318 lvb
= (struct ocfs2_meta_lvb
*) lockres
->l_lksb
.lvb
;
1320 lvb
->lvb_version
= cpu_to_be32(OCFS2_LVB_VERSION
);
1321 lvb
->lvb_isize
= cpu_to_be64(i_size_read(inode
));
1322 lvb
->lvb_iclusters
= cpu_to_be32(oi
->ip_clusters
);
1323 lvb
->lvb_iuid
= cpu_to_be32(inode
->i_uid
);
1324 lvb
->lvb_igid
= cpu_to_be32(inode
->i_gid
);
1325 lvb
->lvb_imode
= cpu_to_be16(inode
->i_mode
);
1326 lvb
->lvb_inlink
= cpu_to_be16(inode
->i_nlink
);
1327 lvb
->lvb_iatime_packed
=
1328 cpu_to_be64(ocfs2_pack_timespec(&inode
->i_atime
));
1329 lvb
->lvb_ictime_packed
=
1330 cpu_to_be64(ocfs2_pack_timespec(&inode
->i_ctime
));
1331 lvb
->lvb_imtime_packed
=
1332 cpu_to_be64(ocfs2_pack_timespec(&inode
->i_mtime
));
1334 mlog_meta_lvb(0, lockres
);
1339 static void ocfs2_unpack_timespec(struct timespec
*spec
,
1342 spec
->tv_sec
= packed_time
>> OCFS2_SEC_SHIFT
;
1343 spec
->tv_nsec
= packed_time
& OCFS2_NSEC_MASK
;
1346 static void ocfs2_refresh_inode_from_lvb(struct inode
*inode
)
1348 struct ocfs2_inode_info
*oi
= OCFS2_I(inode
);
1349 struct ocfs2_lock_res
*lockres
= &oi
->ip_meta_lockres
;
1350 struct ocfs2_meta_lvb
*lvb
;
1354 mlog_meta_lvb(0, lockres
);
1356 lvb
= (struct ocfs2_meta_lvb
*) lockres
->l_lksb
.lvb
;
1358 /* We're safe here without the lockres lock... */
1359 spin_lock(&oi
->ip_lock
);
1360 oi
->ip_clusters
= be32_to_cpu(lvb
->lvb_iclusters
);
1361 i_size_write(inode
, be64_to_cpu(lvb
->lvb_isize
));
1363 /* fast-symlinks are a special case */
1364 if (S_ISLNK(inode
->i_mode
) && !oi
->ip_clusters
)
1365 inode
->i_blocks
= 0;
1368 ocfs2_align_bytes_to_sectors(i_size_read(inode
));
1370 inode
->i_uid
= be32_to_cpu(lvb
->lvb_iuid
);
1371 inode
->i_gid
= be32_to_cpu(lvb
->lvb_igid
);
1372 inode
->i_mode
= be16_to_cpu(lvb
->lvb_imode
);
1373 inode
->i_nlink
= be16_to_cpu(lvb
->lvb_inlink
);
1374 ocfs2_unpack_timespec(&inode
->i_atime
,
1375 be64_to_cpu(lvb
->lvb_iatime_packed
));
1376 ocfs2_unpack_timespec(&inode
->i_mtime
,
1377 be64_to_cpu(lvb
->lvb_imtime_packed
));
1378 ocfs2_unpack_timespec(&inode
->i_ctime
,
1379 be64_to_cpu(lvb
->lvb_ictime_packed
));
1380 spin_unlock(&oi
->ip_lock
);
1385 static inline int ocfs2_meta_lvb_is_trustable(struct ocfs2_lock_res
*lockres
)
1387 struct ocfs2_meta_lvb
*lvb
= (struct ocfs2_meta_lvb
*) lockres
->l_lksb
.lvb
;
1389 if (be32_to_cpu(lvb
->lvb_version
) == OCFS2_LVB_VERSION
)
1394 /* Determine whether a lock resource needs to be refreshed, and
1395 * arbitrate who gets to refresh it.
1397 * 0 means no refresh needed.
1399 * > 0 means you need to refresh this and you MUST call
1400 * ocfs2_complete_lock_res_refresh afterwards. */
1401 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res
*lockres
)
1403 unsigned long flags
;
1409 spin_lock_irqsave(&lockres
->l_lock
, flags
);
1410 if (!(lockres
->l_flags
& OCFS2_LOCK_NEEDS_REFRESH
)) {
1411 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
1415 if (lockres
->l_flags
& OCFS2_LOCK_REFRESHING
) {
1416 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
1418 ocfs2_wait_on_refreshing_lock(lockres
);
1422 /* Ok, I'll be the one to refresh this lock. */
1423 lockres_or_flags(lockres
, OCFS2_LOCK_REFRESHING
);
1424 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
1432 /* If status is non zero, I'll mark it as not being in refresh
1433 * anymroe, but i won't clear the needs refresh flag. */
1434 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res
*lockres
,
1437 unsigned long flags
;
1440 spin_lock_irqsave(&lockres
->l_lock
, flags
);
1441 lockres_clear_flags(lockres
, OCFS2_LOCK_REFRESHING
);
1443 lockres_clear_flags(lockres
, OCFS2_LOCK_NEEDS_REFRESH
);
1444 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
1446 wake_up(&lockres
->l_event
);
1451 /* may or may not return a bh if it went to disk. */
1452 static int ocfs2_meta_lock_update(struct inode
*inode
,
1453 struct buffer_head
**bh
)
1456 struct ocfs2_inode_info
*oi
= OCFS2_I(inode
);
1457 struct ocfs2_lock_res
*lockres
;
1458 struct ocfs2_dinode
*fe
;
1462 spin_lock(&oi
->ip_lock
);
1463 if (oi
->ip_flags
& OCFS2_INODE_DELETED
) {
1464 mlog(0, "Orphaned inode %llu was deleted while we "
1465 "were waiting on a lock. ip_flags = 0x%x\n",
1466 (unsigned long long)oi
->ip_blkno
, oi
->ip_flags
);
1467 spin_unlock(&oi
->ip_lock
);
1471 spin_unlock(&oi
->ip_lock
);
1473 lockres
= &oi
->ip_meta_lockres
;
1475 if (!ocfs2_should_refresh_lock_res(lockres
))
1478 /* This will discard any caching information we might have had
1479 * for the inode metadata. */
1480 ocfs2_metadata_cache_purge(inode
);
1482 /* will do nothing for inode types that don't use the extent
1483 * map (directories, bitmap files, etc) */
1484 ocfs2_extent_map_trunc(inode
, 0);
1486 if (ocfs2_meta_lvb_is_trustable(lockres
)) {
1487 mlog(0, "Trusting LVB on inode %llu\n",
1488 (unsigned long long)oi
->ip_blkno
);
1489 ocfs2_refresh_inode_from_lvb(inode
);
1491 /* Boo, we have to go to disk. */
1492 /* read bh, cast, ocfs2_refresh_inode */
1493 status
= ocfs2_read_block(OCFS2_SB(inode
->i_sb
), oi
->ip_blkno
,
1494 bh
, OCFS2_BH_CACHED
, inode
);
1499 fe
= (struct ocfs2_dinode
*) (*bh
)->b_data
;
1501 /* This is a good chance to make sure we're not
1502 * locking an invalid object.
1504 * We bug on a stale inode here because we checked
1505 * above whether it was wiped from disk. The wiping
1506 * node provides a guarantee that we receive that
1507 * message and can mark the inode before dropping any
1508 * locks associated with it. */
1509 if (!OCFS2_IS_VALID_DINODE(fe
)) {
1510 OCFS2_RO_ON_INVALID_DINODE(inode
->i_sb
, fe
);
1514 mlog_bug_on_msg(inode
->i_generation
!=
1515 le32_to_cpu(fe
->i_generation
),
1516 "Invalid dinode %llu disk generation: %u "
1517 "inode->i_generation: %u\n",
1518 (unsigned long long)oi
->ip_blkno
,
1519 le32_to_cpu(fe
->i_generation
),
1520 inode
->i_generation
);
1521 mlog_bug_on_msg(le64_to_cpu(fe
->i_dtime
) ||
1522 !(fe
->i_flags
& cpu_to_le32(OCFS2_VALID_FL
)),
1523 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1524 (unsigned long long)oi
->ip_blkno
,
1525 (unsigned long long)le64_to_cpu(fe
->i_dtime
),
1526 le32_to_cpu(fe
->i_flags
));
1528 ocfs2_refresh_inode(inode
, fe
);
1533 ocfs2_complete_lock_res_refresh(lockres
, status
);
1539 static int ocfs2_assign_bh(struct inode
*inode
,
1540 struct buffer_head
**ret_bh
,
1541 struct buffer_head
*passed_bh
)
1546 /* Ok, the update went to disk for us, use the
1548 *ret_bh
= passed_bh
;
1554 status
= ocfs2_read_block(OCFS2_SB(inode
->i_sb
),
1555 OCFS2_I(inode
)->ip_blkno
,
1566 * returns < 0 error if the callback will never be called, otherwise
1567 * the result of the lock will be communicated via the callback.
1569 int ocfs2_meta_lock_full(struct inode
*inode
,
1570 struct ocfs2_journal_handle
*handle
,
1571 struct buffer_head
**ret_bh
,
1575 int status
, level
, dlm_flags
, acquired
;
1576 struct ocfs2_lock_res
*lockres
;
1577 struct ocfs2_super
*osb
= OCFS2_SB(inode
->i_sb
);
1578 struct buffer_head
*local_bh
= NULL
;
1584 mlog(0, "inode %llu, take %s META lock\n",
1585 (unsigned long long)OCFS2_I(inode
)->ip_blkno
,
1586 ex
? "EXMODE" : "PRMODE");
1590 /* We'll allow faking a readonly metadata lock for
1592 if (ocfs2_is_hard_readonly(osb
)) {
1598 if (!(arg_flags
& OCFS2_META_LOCK_RECOVERY
))
1599 wait_event(osb
->recovery_event
,
1600 ocfs2_node_map_is_empty(osb
, &osb
->recovery_map
));
1603 lockres
= &OCFS2_I(inode
)->ip_meta_lockres
;
1604 level
= ex
? LKM_EXMODE
: LKM_PRMODE
;
1606 if (arg_flags
& OCFS2_META_LOCK_NOQUEUE
)
1607 dlm_flags
|= LKM_NOQUEUE
;
1609 status
= ocfs2_cluster_lock(osb
, lockres
, level
, dlm_flags
, arg_flags
);
1611 if (status
!= -EAGAIN
&& status
!= -EIOCBRETRY
)
1616 /* Notify the error cleanup path to drop the cluster lock. */
1619 /* We wait twice because a node may have died while we were in
1620 * the lower dlm layers. The second time though, we've
1621 * committed to owning this lock so we don't allow signals to
1622 * abort the operation. */
1623 if (!(arg_flags
& OCFS2_META_LOCK_RECOVERY
))
1624 wait_event(osb
->recovery_event
,
1625 ocfs2_node_map_is_empty(osb
, &osb
->recovery_map
));
1627 /* This is fun. The caller may want a bh back, or it may
1628 * not. ocfs2_meta_lock_update definitely wants one in, but
1629 * may or may not read one, depending on what's in the
1630 * LVB. The result of all of this is that we've *only* gone to
1631 * disk if we have to, so the complexity is worthwhile. */
1632 status
= ocfs2_meta_lock_update(inode
, &local_bh
);
1634 if (status
!= -ENOENT
)
1640 status
= ocfs2_assign_bh(inode
, ret_bh
, local_bh
);
1648 status
= ocfs2_handle_add_lock(handle
, inode
);
1655 if (ret_bh
&& (*ret_bh
)) {
1660 ocfs2_meta_unlock(inode
, ex
);
1671 * This is working around a lock inversion between tasks acquiring DLM locks
1672 * while holding a page lock and the vote thread which blocks dlm lock acquiry
1673 * while acquiring page locks.
1675 * ** These _with_page variantes are only intended to be called from aop
1676 * methods that hold page locks and return a very specific *positive* error
1677 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1679 * The DLM is called such that it returns -EAGAIN if it would have blocked
1680 * waiting for the vote thread. In that case we unlock our page so the vote
1681 * thread can make progress. Once we've done this we have to return
1682 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1683 * into the VFS who will then immediately retry the aop call.
1685 * We do a blocking lock and immediate unlock before returning, though, so that
1686 * the lock has a great chance of being cached on this node by the time the VFS
1687 * calls back to retry the aop. This has a potential to livelock as nodes
1688 * ping locks back and forth, but that's a risk we're willing to take to avoid
1689 * the lock inversion simply.
1691 int ocfs2_meta_lock_with_page(struct inode
*inode
,
1692 struct ocfs2_journal_handle
*handle
,
1693 struct buffer_head
**ret_bh
,
1699 ret
= ocfs2_meta_lock_full(inode
, handle
, ret_bh
, ex
,
1700 OCFS2_LOCK_NONBLOCK
);
1701 if (ret
== -EAGAIN
) {
1703 if (ocfs2_meta_lock(inode
, handle
, ret_bh
, ex
) == 0)
1704 ocfs2_meta_unlock(inode
, ex
);
1705 ret
= AOP_TRUNCATED_PAGE
;
1711 void ocfs2_meta_unlock(struct inode
*inode
,
1714 int level
= ex
? LKM_EXMODE
: LKM_PRMODE
;
1715 struct ocfs2_lock_res
*lockres
= &OCFS2_I(inode
)->ip_meta_lockres
;
1719 mlog(0, "inode %llu drop %s META lock\n",
1720 (unsigned long long)OCFS2_I(inode
)->ip_blkno
,
1721 ex
? "EXMODE" : "PRMODE");
1723 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode
->i_sb
)))
1724 ocfs2_cluster_unlock(OCFS2_SB(inode
->i_sb
), lockres
, level
);
1729 int ocfs2_super_lock(struct ocfs2_super
*osb
,
1733 int level
= ex
? LKM_EXMODE
: LKM_PRMODE
;
1734 struct ocfs2_lock_res
*lockres
= &osb
->osb_super_lockres
;
1735 struct buffer_head
*bh
;
1736 struct ocfs2_slot_info
*si
= osb
->slot_info
;
1740 if (ocfs2_is_hard_readonly(osb
))
1743 status
= ocfs2_cluster_lock(osb
, lockres
, level
, 0, 0);
1749 /* The super block lock path is really in the best position to
1750 * know when resources covered by the lock need to be
1751 * refreshed, so we do it here. Of course, making sense of
1752 * everything is up to the caller :) */
1753 status
= ocfs2_should_refresh_lock_res(lockres
);
1760 status
= ocfs2_read_block(osb
, bh
->b_blocknr
, &bh
, 0,
1763 ocfs2_update_slot_info(si
);
1765 ocfs2_complete_lock_res_refresh(lockres
, status
);
1775 void ocfs2_super_unlock(struct ocfs2_super
*osb
,
1778 int level
= ex
? LKM_EXMODE
: LKM_PRMODE
;
1779 struct ocfs2_lock_res
*lockres
= &osb
->osb_super_lockres
;
1781 ocfs2_cluster_unlock(osb
, lockres
, level
);
1784 int ocfs2_rename_lock(struct ocfs2_super
*osb
)
1787 struct ocfs2_lock_res
*lockres
= &osb
->osb_rename_lockres
;
1789 if (ocfs2_is_hard_readonly(osb
))
1792 status
= ocfs2_cluster_lock(osb
, lockres
, LKM_EXMODE
, 0, 0);
1799 void ocfs2_rename_unlock(struct ocfs2_super
*osb
)
1801 struct ocfs2_lock_res
*lockres
= &osb
->osb_rename_lockres
;
1803 ocfs2_cluster_unlock(osb
, lockres
, LKM_EXMODE
);
1806 /* Reference counting of the dlm debug structure. We want this because
1807 * open references on the debug inodes can live on after a mount, so
1808 * we can't rely on the ocfs2_super to always exist. */
1809 static void ocfs2_dlm_debug_free(struct kref
*kref
)
1811 struct ocfs2_dlm_debug
*dlm_debug
;
1813 dlm_debug
= container_of(kref
, struct ocfs2_dlm_debug
, d_refcnt
);
1818 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug
*dlm_debug
)
1821 kref_put(&dlm_debug
->d_refcnt
, ocfs2_dlm_debug_free
);
1824 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug
*debug
)
1826 kref_get(&debug
->d_refcnt
);
1829 struct ocfs2_dlm_debug
*ocfs2_new_dlm_debug(void)
1831 struct ocfs2_dlm_debug
*dlm_debug
;
1833 dlm_debug
= kmalloc(sizeof(struct ocfs2_dlm_debug
), GFP_KERNEL
);
1835 mlog_errno(-ENOMEM
);
1839 kref_init(&dlm_debug
->d_refcnt
);
1840 INIT_LIST_HEAD(&dlm_debug
->d_lockres_tracking
);
1841 dlm_debug
->d_locking_state
= NULL
;
1846 /* Access to this is arbitrated for us via seq_file->sem. */
1847 struct ocfs2_dlm_seq_priv
{
1848 struct ocfs2_dlm_debug
*p_dlm_debug
;
1849 struct ocfs2_lock_res p_iter_res
;
1850 struct ocfs2_lock_res p_tmp_res
;
1853 static struct ocfs2_lock_res
*ocfs2_dlm_next_res(struct ocfs2_lock_res
*start
,
1854 struct ocfs2_dlm_seq_priv
*priv
)
1856 struct ocfs2_lock_res
*iter
, *ret
= NULL
;
1857 struct ocfs2_dlm_debug
*dlm_debug
= priv
->p_dlm_debug
;
1859 assert_spin_locked(&ocfs2_dlm_tracking_lock
);
1861 list_for_each_entry(iter
, &start
->l_debug_list
, l_debug_list
) {
1862 /* discover the head of the list */
1863 if (&iter
->l_debug_list
== &dlm_debug
->d_lockres_tracking
) {
1864 mlog(0, "End of list found, %p\n", ret
);
1868 /* We track our "dummy" iteration lockres' by a NULL
1870 if (iter
->l_ops
!= NULL
) {
1879 static void *ocfs2_dlm_seq_start(struct seq_file
*m
, loff_t
*pos
)
1881 struct ocfs2_dlm_seq_priv
*priv
= m
->private;
1882 struct ocfs2_lock_res
*iter
;
1884 spin_lock(&ocfs2_dlm_tracking_lock
);
1885 iter
= ocfs2_dlm_next_res(&priv
->p_iter_res
, priv
);
1887 /* Since lockres' have the lifetime of their container
1888 * (which can be inodes, ocfs2_supers, etc) we want to
1889 * copy this out to a temporary lockres while still
1890 * under the spinlock. Obviously after this we can't
1891 * trust any pointers on the copy returned, but that's
1892 * ok as the information we want isn't typically held
1894 priv
->p_tmp_res
= *iter
;
1895 iter
= &priv
->p_tmp_res
;
1897 spin_unlock(&ocfs2_dlm_tracking_lock
);
1902 static void ocfs2_dlm_seq_stop(struct seq_file
*m
, void *v
)
1906 static void *ocfs2_dlm_seq_next(struct seq_file
*m
, void *v
, loff_t
*pos
)
1908 struct ocfs2_dlm_seq_priv
*priv
= m
->private;
1909 struct ocfs2_lock_res
*iter
= v
;
1910 struct ocfs2_lock_res
*dummy
= &priv
->p_iter_res
;
1912 spin_lock(&ocfs2_dlm_tracking_lock
);
1913 iter
= ocfs2_dlm_next_res(iter
, priv
);
1914 list_del_init(&dummy
->l_debug_list
);
1916 list_add(&dummy
->l_debug_list
, &iter
->l_debug_list
);
1917 priv
->p_tmp_res
= *iter
;
1918 iter
= &priv
->p_tmp_res
;
1920 spin_unlock(&ocfs2_dlm_tracking_lock
);
1925 /* So that debugfs.ocfs2 can determine which format is being used */
1926 #define OCFS2_DLM_DEBUG_STR_VERSION 1
1927 static int ocfs2_dlm_seq_show(struct seq_file
*m
, void *v
)
1931 struct ocfs2_lock_res
*lockres
= v
;
1936 seq_printf(m
, "0x%x\t"
1946 OCFS2_DLM_DEBUG_STR_VERSION
,
1947 OCFS2_LOCK_ID_MAX_LEN
, lockres
->l_name
,
1951 lockres
->l_unlock_action
,
1952 lockres
->l_ro_holders
,
1953 lockres
->l_ex_holders
,
1954 lockres
->l_requested
,
1955 lockres
->l_blocking
);
1957 /* Dump the raw LVB */
1958 lvb
= lockres
->l_lksb
.lvb
;
1959 for(i
= 0; i
< DLM_LVB_LEN
; i
++)
1960 seq_printf(m
, "0x%x\t", lvb
[i
]);
1963 seq_printf(m
, "\n");
1967 static struct seq_operations ocfs2_dlm_seq_ops
= {
1968 .start
= ocfs2_dlm_seq_start
,
1969 .stop
= ocfs2_dlm_seq_stop
,
1970 .next
= ocfs2_dlm_seq_next
,
1971 .show
= ocfs2_dlm_seq_show
,
1974 static int ocfs2_dlm_debug_release(struct inode
*inode
, struct file
*file
)
1976 struct seq_file
*seq
= (struct seq_file
*) file
->private_data
;
1977 struct ocfs2_dlm_seq_priv
*priv
= seq
->private;
1978 struct ocfs2_lock_res
*res
= &priv
->p_iter_res
;
1980 ocfs2_remove_lockres_tracking(res
);
1981 ocfs2_put_dlm_debug(priv
->p_dlm_debug
);
1982 return seq_release_private(inode
, file
);
1985 static int ocfs2_dlm_debug_open(struct inode
*inode
, struct file
*file
)
1988 struct ocfs2_dlm_seq_priv
*priv
;
1989 struct seq_file
*seq
;
1990 struct ocfs2_super
*osb
;
1992 priv
= kzalloc(sizeof(struct ocfs2_dlm_seq_priv
), GFP_KERNEL
);
1998 osb
= (struct ocfs2_super
*) inode
->u
.generic_ip
;
1999 ocfs2_get_dlm_debug(osb
->osb_dlm_debug
);
2000 priv
->p_dlm_debug
= osb
->osb_dlm_debug
;
2001 INIT_LIST_HEAD(&priv
->p_iter_res
.l_debug_list
);
2003 ret
= seq_open(file
, &ocfs2_dlm_seq_ops
);
2010 seq
= (struct seq_file
*) file
->private_data
;
2011 seq
->private = priv
;
2013 ocfs2_add_lockres_tracking(&priv
->p_iter_res
,
2020 static const struct file_operations ocfs2_dlm_debug_fops
= {
2021 .open
= ocfs2_dlm_debug_open
,
2022 .release
= ocfs2_dlm_debug_release
,
2024 .llseek
= seq_lseek
,
2027 static int ocfs2_dlm_init_debug(struct ocfs2_super
*osb
)
2030 struct ocfs2_dlm_debug
*dlm_debug
= osb
->osb_dlm_debug
;
2032 dlm_debug
->d_locking_state
= debugfs_create_file("locking_state",
2034 osb
->osb_debug_root
,
2036 &ocfs2_dlm_debug_fops
);
2037 if (!dlm_debug
->d_locking_state
) {
2040 "Unable to create locking state debugfs file.\n");
2044 ocfs2_get_dlm_debug(dlm_debug
);
2049 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super
*osb
)
2051 struct ocfs2_dlm_debug
*dlm_debug
= osb
->osb_dlm_debug
;
2054 debugfs_remove(dlm_debug
->d_locking_state
);
2055 ocfs2_put_dlm_debug(dlm_debug
);
2059 int ocfs2_dlm_init(struct ocfs2_super
*osb
)
2063 struct dlm_ctxt
*dlm
;
2067 status
= ocfs2_dlm_init_debug(osb
);
2073 /* launch vote thread */
2074 osb
->vote_task
= kthread_run(ocfs2_vote_thread
, osb
, "ocfs2vote-%d",
2076 if (IS_ERR(osb
->vote_task
)) {
2077 status
= PTR_ERR(osb
->vote_task
);
2078 osb
->vote_task
= NULL
;
2083 /* used by the dlm code to make message headers unique, each
2084 * node in this domain must agree on this. */
2085 dlm_key
= crc32_le(0, osb
->uuid_str
, strlen(osb
->uuid_str
));
2087 /* for now, uuid == domain */
2088 dlm
= dlm_register_domain(osb
->uuid_str
, dlm_key
);
2090 status
= PTR_ERR(dlm
);
2095 ocfs2_super_lock_res_init(&osb
->osb_super_lockres
, osb
);
2096 ocfs2_rename_lock_res_init(&osb
->osb_rename_lockres
, osb
);
2098 dlm_register_eviction_cb(dlm
, &osb
->osb_eviction_cb
);
2105 ocfs2_dlm_shutdown_debug(osb
);
2107 kthread_stop(osb
->vote_task
);
2114 void ocfs2_dlm_shutdown(struct ocfs2_super
*osb
)
2118 dlm_unregister_eviction_cb(&osb
->osb_eviction_cb
);
2120 ocfs2_drop_osb_locks(osb
);
2122 if (osb
->vote_task
) {
2123 kthread_stop(osb
->vote_task
);
2124 osb
->vote_task
= NULL
;
2127 ocfs2_lock_res_free(&osb
->osb_super_lockres
);
2128 ocfs2_lock_res_free(&osb
->osb_rename_lockres
);
2130 dlm_unregister_domain(osb
->dlm
);
2133 ocfs2_dlm_shutdown_debug(osb
);
2138 static void ocfs2_unlock_ast_func(void *opaque
, enum dlm_status status
)
2140 struct ocfs2_lock_res
*lockres
= opaque
;
2141 unsigned long flags
;
2145 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres
->l_name
,
2146 lockres
->l_unlock_action
);
2148 spin_lock_irqsave(&lockres
->l_lock
, flags
);
2149 /* We tried to cancel a convert request, but it was already
2150 * granted. All we want to do here is clear our unlock
2151 * state. The wake_up call done at the bottom is redundant
2152 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2153 * hurt anything anyway */
2154 if (status
== DLM_CANCELGRANT
&&
2155 lockres
->l_unlock_action
== OCFS2_UNLOCK_CANCEL_CONVERT
) {
2156 mlog(0, "Got cancelgrant for %s\n", lockres
->l_name
);
2158 /* We don't clear the busy flag in this case as it
2159 * should have been cleared by the ast which the dlm
2161 goto complete_unlock
;
2164 if (status
!= DLM_NORMAL
) {
2165 mlog(ML_ERROR
, "Dlm passes status %d for lock %s, "
2166 "unlock_action %d\n", status
, lockres
->l_name
,
2167 lockres
->l_unlock_action
);
2168 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2172 switch(lockres
->l_unlock_action
) {
2173 case OCFS2_UNLOCK_CANCEL_CONVERT
:
2174 mlog(0, "Cancel convert success for %s\n", lockres
->l_name
);
2175 lockres
->l_action
= OCFS2_AST_INVALID
;
2177 case OCFS2_UNLOCK_DROP_LOCK
:
2178 lockres
->l_level
= LKM_IVMODE
;
2184 lockres_clear_flags(lockres
, OCFS2_LOCK_BUSY
);
2186 lockres
->l_unlock_action
= OCFS2_UNLOCK_INVALID
;
2187 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2189 wake_up(&lockres
->l_event
);
2194 typedef void (ocfs2_pre_drop_cb_t
)(struct ocfs2_lock_res
*, void *);
2196 struct drop_lock_cb
{
2197 ocfs2_pre_drop_cb_t
*drop_func
;
2201 static int ocfs2_drop_lock(struct ocfs2_super
*osb
,
2202 struct ocfs2_lock_res
*lockres
,
2203 struct drop_lock_cb
*dcb
)
2205 enum dlm_status status
;
2206 unsigned long flags
;
2208 /* We didn't get anywhere near actually using this lockres. */
2209 if (!(lockres
->l_flags
& OCFS2_LOCK_INITIALIZED
))
2212 spin_lock_irqsave(&lockres
->l_lock
, flags
);
2214 mlog_bug_on_msg(!(lockres
->l_flags
& OCFS2_LOCK_FREEING
),
2215 "lockres %s, flags 0x%lx\n",
2216 lockres
->l_name
, lockres
->l_flags
);
2218 while (lockres
->l_flags
& OCFS2_LOCK_BUSY
) {
2219 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2220 "%u, unlock_action = %u\n",
2221 lockres
->l_name
, lockres
->l_flags
, lockres
->l_action
,
2222 lockres
->l_unlock_action
);
2224 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2226 /* XXX: Today we just wait on any busy
2227 * locks... Perhaps we need to cancel converts in the
2229 ocfs2_wait_on_busy_lock(lockres
);
2231 spin_lock_irqsave(&lockres
->l_lock
, flags
);
2235 dcb
->drop_func(lockres
, dcb
->drop_data
);
2237 if (lockres
->l_flags
& OCFS2_LOCK_BUSY
)
2238 mlog(ML_ERROR
, "destroying busy lock: \"%s\"\n",
2240 if (lockres
->l_flags
& OCFS2_LOCK_BLOCKED
)
2241 mlog(0, "destroying blocked lock: \"%s\"\n", lockres
->l_name
);
2243 if (!(lockres
->l_flags
& OCFS2_LOCK_ATTACHED
)) {
2244 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2248 lockres_clear_flags(lockres
, OCFS2_LOCK_ATTACHED
);
2250 /* make sure we never get here while waiting for an ast to
2252 BUG_ON(lockres
->l_action
!= OCFS2_AST_INVALID
);
2254 /* is this necessary? */
2255 lockres_or_flags(lockres
, OCFS2_LOCK_BUSY
);
2256 lockres
->l_unlock_action
= OCFS2_UNLOCK_DROP_LOCK
;
2257 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2259 mlog(0, "lock %s\n", lockres
->l_name
);
2261 status
= dlmunlock(osb
->dlm
, &lockres
->l_lksb
, LKM_VALBLK
,
2262 lockres
->l_ops
->unlock_ast
, lockres
);
2263 if (status
!= DLM_NORMAL
) {
2264 ocfs2_log_dlm_error("dlmunlock", status
, lockres
);
2265 mlog(ML_ERROR
, "lockres flags: %lu\n", lockres
->l_flags
);
2266 dlm_print_one_lock(lockres
->l_lksb
.lockid
);
2269 mlog(0, "lock %s, successfull return from dlmunlock\n",
2272 ocfs2_wait_on_busy_lock(lockres
);
2278 /* Mark the lockres as being dropped. It will no longer be
2279 * queued if blocking, but we still may have to wait on it
2280 * being dequeued from the vote thread before we can consider
2283 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2284 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res
*lockres
)
2287 struct ocfs2_mask_waiter mw
;
2288 unsigned long flags
;
2290 ocfs2_init_mask_waiter(&mw
);
2292 spin_lock_irqsave(&lockres
->l_lock
, flags
);
2293 lockres
->l_flags
|= OCFS2_LOCK_FREEING
;
2294 while (lockres
->l_flags
& OCFS2_LOCK_QUEUED
) {
2295 lockres_add_mask_waiter(lockres
, &mw
, OCFS2_LOCK_QUEUED
, 0);
2296 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2298 mlog(0, "Waiting on lockres %s\n", lockres
->l_name
);
2300 status
= ocfs2_wait_for_mask(&mw
);
2304 spin_lock_irqsave(&lockres
->l_lock
, flags
);
2306 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2309 static void ocfs2_drop_osb_locks(struct ocfs2_super
*osb
)
2315 ocfs2_mark_lockres_freeing(&osb
->osb_super_lockres
);
2317 status
= ocfs2_drop_lock(osb
, &osb
->osb_super_lockres
, NULL
);
2321 ocfs2_mark_lockres_freeing(&osb
->osb_rename_lockres
);
2323 status
= ocfs2_drop_lock(osb
, &osb
->osb_rename_lockres
, NULL
);
2330 static void ocfs2_meta_pre_drop(struct ocfs2_lock_res
*lockres
, void *data
)
2332 struct inode
*inode
= data
;
2334 /* the metadata lock requires a bit more work as we have an
2335 * LVB to worry about. */
2336 if (lockres
->l_flags
& OCFS2_LOCK_ATTACHED
&&
2337 lockres
->l_level
== LKM_EXMODE
&&
2338 !(lockres
->l_flags
& OCFS2_LOCK_NEEDS_REFRESH
))
2339 __ocfs2_stuff_meta_lvb(inode
);
2342 int ocfs2_drop_inode_locks(struct inode
*inode
)
2345 struct drop_lock_cb meta_dcb
= { ocfs2_meta_pre_drop
, inode
, };
2349 /* No need to call ocfs2_mark_lockres_freeing here -
2350 * ocfs2_clear_inode has done it for us. */
2352 err
= ocfs2_drop_lock(OCFS2_SB(inode
->i_sb
),
2353 &OCFS2_I(inode
)->ip_data_lockres
,
2360 err
= ocfs2_drop_lock(OCFS2_SB(inode
->i_sb
),
2361 &OCFS2_I(inode
)->ip_meta_lockres
,
2365 if (err
< 0 && !status
)
2368 err
= ocfs2_drop_lock(OCFS2_SB(inode
->i_sb
),
2369 &OCFS2_I(inode
)->ip_rw_lockres
,
2373 if (err
< 0 && !status
)
2380 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res
*lockres
,
2383 assert_spin_locked(&lockres
->l_lock
);
2385 BUG_ON(lockres
->l_blocking
<= LKM_NLMODE
);
2387 if (lockres
->l_level
<= new_level
) {
2388 mlog(ML_ERROR
, "lockres->l_level (%u) <= new_level (%u)\n",
2389 lockres
->l_level
, new_level
);
2393 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2394 lockres
->l_name
, new_level
, lockres
->l_blocking
);
2396 lockres
->l_action
= OCFS2_AST_DOWNCONVERT
;
2397 lockres
->l_requested
= new_level
;
2398 lockres_or_flags(lockres
, OCFS2_LOCK_BUSY
);
2401 static int ocfs2_downconvert_lock(struct ocfs2_super
*osb
,
2402 struct ocfs2_lock_res
*lockres
,
2406 int ret
, dlm_flags
= LKM_CONVERT
;
2407 enum dlm_status status
;
2412 dlm_flags
|= LKM_VALBLK
;
2414 status
= dlmlock(osb
->dlm
,
2419 lockres
->l_ops
->ast
,
2421 lockres
->l_ops
->bast
);
2422 if (status
!= DLM_NORMAL
) {
2423 ocfs2_log_dlm_error("dlmlock", status
, lockres
);
2425 ocfs2_recover_from_dlm_error(lockres
, 1);
2435 /* returns 1 when the caller should unlock and call dlmunlock */
2436 static int ocfs2_prepare_cancel_convert(struct ocfs2_super
*osb
,
2437 struct ocfs2_lock_res
*lockres
)
2439 assert_spin_locked(&lockres
->l_lock
);
2442 mlog(0, "lock %s\n", lockres
->l_name
);
2444 if (lockres
->l_unlock_action
== OCFS2_UNLOCK_CANCEL_CONVERT
) {
2445 /* If we're already trying to cancel a lock conversion
2446 * then just drop the spinlock and allow the caller to
2447 * requeue this lock. */
2449 mlog(0, "Lockres %s, skip convert\n", lockres
->l_name
);
2453 /* were we in a convert when we got the bast fire? */
2454 BUG_ON(lockres
->l_action
!= OCFS2_AST_CONVERT
&&
2455 lockres
->l_action
!= OCFS2_AST_DOWNCONVERT
);
2456 /* set things up for the unlockast to know to just
2457 * clear out the ast_action and unset busy, etc. */
2458 lockres
->l_unlock_action
= OCFS2_UNLOCK_CANCEL_CONVERT
;
2460 mlog_bug_on_msg(!(lockres
->l_flags
& OCFS2_LOCK_BUSY
),
2461 "lock %s, invalid flags: 0x%lx\n",
2462 lockres
->l_name
, lockres
->l_flags
);
2467 static int ocfs2_cancel_convert(struct ocfs2_super
*osb
,
2468 struct ocfs2_lock_res
*lockres
)
2471 enum dlm_status status
;
2474 mlog(0, "lock %s\n", lockres
->l_name
);
2477 status
= dlmunlock(osb
->dlm
,
2480 lockres
->l_ops
->unlock_ast
,
2482 if (status
!= DLM_NORMAL
) {
2483 ocfs2_log_dlm_error("dlmunlock", status
, lockres
);
2485 ocfs2_recover_from_dlm_error(lockres
, 0);
2488 mlog(0, "lock %s return from dlmunlock\n", lockres
->l_name
);
2494 static inline int ocfs2_can_downconvert_meta_lock(struct inode
*inode
,
2495 struct ocfs2_lock_res
*lockres
,
2502 BUG_ON(new_level
!= LKM_NLMODE
&& new_level
!= LKM_PRMODE
);
2504 if (lockres
->l_flags
& OCFS2_LOCK_REFRESHING
) {
2506 mlog(0, "lockres %s currently being refreshed -- backing "
2507 "off!\n", lockres
->l_name
);
2508 } else if (new_level
== LKM_PRMODE
)
2509 ret
= !lockres
->l_ex_holders
&&
2510 ocfs2_inode_fully_checkpointed(inode
);
2511 else /* Must be NLMODE we're converting to. */
2512 ret
= !lockres
->l_ro_holders
&& !lockres
->l_ex_holders
&&
2513 ocfs2_inode_fully_checkpointed(inode
);
2519 static int ocfs2_do_unblock_meta(struct inode
*inode
,
2525 struct ocfs2_lock_res
*lockres
= &OCFS2_I(inode
)->ip_meta_lockres
;
2526 unsigned long flags
;
2528 struct ocfs2_super
*osb
= OCFS2_SB(inode
->i_sb
);
2532 spin_lock_irqsave(&lockres
->l_lock
, flags
);
2534 BUG_ON(!(lockres
->l_flags
& OCFS2_LOCK_BLOCKED
));
2536 mlog(0, "l_level=%d, l_blocking=%d\n", lockres
->l_level
,
2537 lockres
->l_blocking
);
2539 BUG_ON(lockres
->l_level
!= LKM_EXMODE
&&
2540 lockres
->l_level
!= LKM_PRMODE
);
2542 if (lockres
->l_flags
& OCFS2_LOCK_BUSY
) {
2544 ret
= ocfs2_prepare_cancel_convert(osb
, lockres
);
2545 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2547 ret
= ocfs2_cancel_convert(osb
, lockres
);
2554 new_level
= ocfs2_highest_compat_lock_level(lockres
->l_blocking
);
2556 mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n",
2557 lockres
->l_level
, lockres
->l_blocking
, new_level
);
2559 if (ocfs2_can_downconvert_meta_lock(inode
, lockres
, new_level
)) {
2560 if (lockres
->l_level
== LKM_EXMODE
)
2563 /* If the lock hasn't been refreshed yet (rare), then
2564 * our memory inode values are old and we skip
2565 * stuffing the lvb. There's no need to actually clear
2566 * out the lvb here as it's value is still valid. */
2567 if (!(lockres
->l_flags
& OCFS2_LOCK_NEEDS_REFRESH
)) {
2569 __ocfs2_stuff_meta_lvb(inode
);
2571 mlog(0, "lockres %s: downconverting stale lock!\n",
2574 mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, "
2575 "l_blocking=%d, new_level=%d\n",
2576 lockres
->l_level
, lockres
->l_blocking
, new_level
);
2578 ocfs2_prepare_downconvert(lockres
, new_level
);
2579 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2580 ret
= ocfs2_downconvert_lock(osb
, lockres
, new_level
, set_lvb
);
2583 if (!ocfs2_inode_fully_checkpointed(inode
))
2584 ocfs2_start_checkpoint(osb
);
2587 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2594 static int ocfs2_generic_unblock_lock(struct ocfs2_super
*osb
,
2595 struct ocfs2_lock_res
*lockres
,
2597 ocfs2_convert_worker_t
*worker
)
2599 unsigned long flags
;
2606 spin_lock_irqsave(&lockres
->l_lock
, flags
);
2608 BUG_ON(!(lockres
->l_flags
& OCFS2_LOCK_BLOCKED
));
2611 if (lockres
->l_flags
& OCFS2_LOCK_BUSY
) {
2613 ret
= ocfs2_prepare_cancel_convert(osb
, lockres
);
2614 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2616 ret
= ocfs2_cancel_convert(osb
, lockres
);
2623 /* if we're blocking an exclusive and we have *any* holders,
2625 if ((lockres
->l_blocking
== LKM_EXMODE
)
2626 && (lockres
->l_ex_holders
|| lockres
->l_ro_holders
)) {
2627 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2633 /* If it's a PR we're blocking, then only
2634 * requeue if we've got any EX holders */
2635 if (lockres
->l_blocking
== LKM_PRMODE
&&
2636 lockres
->l_ex_holders
) {
2637 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2643 /* If we get here, then we know that there are no more
2644 * incompatible holders (and anyone asking for an incompatible
2645 * lock is blocked). We can now downconvert the lock */
2649 /* Some lockres types want to do a bit of work before
2650 * downconverting a lock. Allow that here. The worker function
2651 * may sleep, so we save off a copy of what we're blocking as
2652 * it may change while we're not holding the spin lock. */
2653 blocking
= lockres
->l_blocking
;
2654 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2656 worker(lockres
, blocking
);
2658 spin_lock_irqsave(&lockres
->l_lock
, flags
);
2659 if (blocking
!= lockres
->l_blocking
) {
2660 /* If this changed underneath us, then we can't drop
2667 new_level
= ocfs2_highest_compat_lock_level(lockres
->l_blocking
);
2669 ocfs2_prepare_downconvert(lockres
, new_level
);
2670 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2671 ret
= ocfs2_downconvert_lock(osb
, lockres
, new_level
, 0);
2677 static void ocfs2_data_convert_worker(struct ocfs2_lock_res
*lockres
,
2680 struct inode
*inode
;
2681 struct address_space
*mapping
;
2685 inode
= ocfs2_lock_res_inode(lockres
);
2686 mapping
= inode
->i_mapping
;
2688 if (filemap_fdatawrite(mapping
)) {
2689 mlog(ML_ERROR
, "Could not sync inode %llu for downconvert!",
2690 (unsigned long long)OCFS2_I(inode
)->ip_blkno
);
2692 sync_mapping_buffers(mapping
);
2693 if (blocking
== LKM_EXMODE
) {
2694 truncate_inode_pages(mapping
, 0);
2695 unmap_mapping_range(mapping
, 0, 0, 0);
2697 /* We only need to wait on the I/O if we're not also
2698 * truncating pages because truncate_inode_pages waits
2699 * for us above. We don't truncate pages if we're
2700 * blocking anything < EXMODE because we want to keep
2701 * them around in that case. */
2702 filemap_fdatawait(mapping
);
2708 int ocfs2_unblock_data(struct ocfs2_lock_res
*lockres
,
2712 struct inode
*inode
;
2713 struct ocfs2_super
*osb
;
2717 inode
= ocfs2_lock_res_inode(lockres
);
2718 osb
= OCFS2_SB(inode
->i_sb
);
2720 mlog(0, "unblock inode %llu\n",
2721 (unsigned long long)OCFS2_I(inode
)->ip_blkno
);
2723 status
= ocfs2_generic_unblock_lock(osb
,
2726 ocfs2_data_convert_worker
);
2730 mlog(0, "inode %llu, requeue = %d\n",
2731 (unsigned long long)OCFS2_I(inode
)->ip_blkno
, *requeue
);
2737 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res
*lockres
,
2741 struct inode
*inode
;
2745 mlog(0, "Unblock lockres %s\n", lockres
->l_name
);
2747 inode
= ocfs2_lock_res_inode(lockres
);
2749 status
= ocfs2_generic_unblock_lock(OCFS2_SB(inode
->i_sb
),
2761 int ocfs2_unblock_meta(struct ocfs2_lock_res
*lockres
,
2765 struct inode
*inode
;
2769 inode
= ocfs2_lock_res_inode(lockres
);
2771 mlog(0, "unblock inode %llu\n",
2772 (unsigned long long)OCFS2_I(inode
)->ip_blkno
);
2774 status
= ocfs2_do_unblock_meta(inode
, requeue
);
2778 mlog(0, "inode %llu, requeue = %d\n",
2779 (unsigned long long)OCFS2_I(inode
)->ip_blkno
, *requeue
);
2785 /* Generic unblock function for any lockres whose private data is an
2786 * ocfs2_super pointer. */
2787 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res
*lockres
,
2791 struct ocfs2_super
*osb
;
2795 mlog(0, "Unblock lockres %s\n", lockres
->l_name
);
2797 osb
= ocfs2_lock_res_super(lockres
);
2799 status
= ocfs2_generic_unblock_lock(osb
,
2810 void ocfs2_process_blocked_lock(struct ocfs2_super
*osb
,
2811 struct ocfs2_lock_res
*lockres
)
2815 unsigned long flags
;
2817 /* Our reference to the lockres in this function can be
2818 * considered valid until we remove the OCFS2_LOCK_QUEUED
2824 BUG_ON(!lockres
->l_ops
);
2825 BUG_ON(!lockres
->l_ops
->unblock
);
2827 mlog(0, "lockres %s blocked.\n", lockres
->l_name
);
2829 /* Detect whether a lock has been marked as going away while
2830 * the vote thread was processing other things. A lock can
2831 * still be marked with OCFS2_LOCK_FREEING after this check,
2832 * but short circuiting here will still save us some
2834 spin_lock_irqsave(&lockres
->l_lock
, flags
);
2835 if (lockres
->l_flags
& OCFS2_LOCK_FREEING
)
2837 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2839 status
= lockres
->l_ops
->unblock(lockres
, &requeue
);
2843 spin_lock_irqsave(&lockres
->l_lock
, flags
);
2845 if (lockres
->l_flags
& OCFS2_LOCK_FREEING
|| !requeue
) {
2846 lockres_clear_flags(lockres
, OCFS2_LOCK_QUEUED
);
2848 ocfs2_schedule_blocked_lock(osb
, lockres
);
2850 mlog(0, "lockres %s, requeue = %s.\n", lockres
->l_name
,
2851 requeue
? "yes" : "no");
2852 spin_unlock_irqrestore(&lockres
->l_lock
, flags
);
2857 static void ocfs2_schedule_blocked_lock(struct ocfs2_super
*osb
,
2858 struct ocfs2_lock_res
*lockres
)
2862 assert_spin_locked(&lockres
->l_lock
);
2864 if (lockres
->l_flags
& OCFS2_LOCK_FREEING
) {
2865 /* Do not schedule a lock for downconvert when it's on
2866 * the way to destruction - any nodes wanting access
2867 * to the resource will get it soon. */
2868 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
2869 lockres
->l_name
, lockres
->l_flags
);
2873 lockres_or_flags(lockres
, OCFS2_LOCK_QUEUED
);
2875 spin_lock(&osb
->vote_task_lock
);
2876 if (list_empty(&lockres
->l_blocked_list
)) {
2877 list_add_tail(&lockres
->l_blocked_list
,
2878 &osb
->blocked_lock_list
);
2879 osb
->blocked_lock_count
++;
2881 spin_unlock(&osb
->vote_task_lock
);
2886 /* This aids in debugging situations where a bad LVB might be involved. */
2887 void ocfs2_dump_meta_lvb_info(u64 level
,
2888 const char *function
,
2890 struct ocfs2_lock_res
*lockres
)
2892 struct ocfs2_meta_lvb
*lvb
= (struct ocfs2_meta_lvb
*) lockres
->l_lksb
.lvb
;
2894 mlog(level
, "LVB information for %s (called from %s:%u):\n",
2895 lockres
->l_name
, function
, line
);
2896 mlog(level
, "version: %u, clusters: %u\n",
2897 be32_to_cpu(lvb
->lvb_version
), be32_to_cpu(lvb
->lvb_iclusters
));
2898 mlog(level
, "size: %llu, uid %u, gid %u, mode 0x%x\n",
2899 (unsigned long long)be64_to_cpu(lvb
->lvb_isize
),
2900 be32_to_cpu(lvb
->lvb_iuid
), be32_to_cpu(lvb
->lvb_igid
),
2901 be16_to_cpu(lvb
->lvb_imode
));
2902 mlog(level
, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
2903 "mtime_packed 0x%llx\n", be16_to_cpu(lvb
->lvb_inlink
),
2904 (long long)be64_to_cpu(lvb
->lvb_iatime_packed
),
2905 (long long)be64_to_cpu(lvb
->lvb_ictime_packed
),
2906 (long long)be64_to_cpu(lvb
->lvb_imtime_packed
));