2 * Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
3 * Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
5 * This copyrighted material is made available to anyone wishing to use,
6 * modify, copy, or redistribute it subject to the terms and conditions
7 * of the GNU General Public License version 2.
12 /* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm
15 static void queue_submit(struct gdlm_lock
*lp
)
17 struct gdlm_ls
*ls
= lp
->ls
;
19 spin_lock(&ls
->async_lock
);
20 list_add_tail(&lp
->delay_list
, &ls
->submit
);
21 spin_unlock(&ls
->async_lock
);
22 wake_up(&ls
->thread_wait
);
25 static void process_blocking(struct gdlm_lock
*lp
, int bast_mode
)
27 struct gdlm_ls
*ls
= lp
->ls
;
30 switch (gdlm_make_lmstate(bast_mode
)) {
41 gdlm_assert(0, "unknown bast mode %u", lp
->bast_mode
);
44 ls
->fscb(ls
->fsdata
, cb
, &lp
->lockname
);
47 static void process_complete(struct gdlm_lock
*lp
)
49 struct gdlm_ls
*ls
= lp
->ls
;
50 struct lm_async_cb acb
;
51 s16 prev_mode
= lp
->cur
;
53 memset(&acb
, 0, sizeof(acb
));
55 if (lp
->lksb
.sb_status
== -DLM_ECANCEL
) {
56 log_info("complete dlm cancel %x,%llx flags %lx",
58 (unsigned long long)lp
->lockname
.ln_number
,
62 acb
.lc_ret
|= LM_OUT_CANCELED
;
63 if (lp
->cur
== DLM_LOCK_IV
)
68 if (test_and_clear_bit(LFL_DLM_UNLOCK
, &lp
->flags
)) {
69 if (lp
->lksb
.sb_status
!= -DLM_EUNLOCK
) {
70 log_info("unlock sb_status %d %x,%llx flags %lx",
71 lp
->lksb
.sb_status
, lp
->lockname
.ln_type
,
72 (unsigned long long)lp
->lockname
.ln_number
,
77 lp
->cur
= DLM_LOCK_IV
;
78 lp
->req
= DLM_LOCK_IV
;
81 if (test_and_clear_bit(LFL_UNLOCK_DELETE
, &lp
->flags
)) {
88 if (lp
->lksb
.sb_flags
& DLM_SBF_VALNOTVALID
)
89 memset(lp
->lksb
.sb_lvbptr
, 0, GDLM_LVB_SIZE
);
91 if (lp
->lksb
.sb_flags
& DLM_SBF_ALTMODE
) {
92 if (lp
->req
== DLM_LOCK_PR
)
93 lp
->req
= DLM_LOCK_CW
;
94 else if (lp
->req
== DLM_LOCK_CW
)
95 lp
->req
= DLM_LOCK_PR
;
99 * A canceled lock request. The lock was just taken off the delayed
100 * list and was never even submitted to dlm.
103 if (test_and_clear_bit(LFL_CANCEL
, &lp
->flags
)) {
104 log_info("complete internal cancel %x,%llx",
105 lp
->lockname
.ln_type
,
106 (unsigned long long)lp
->lockname
.ln_number
);
108 acb
.lc_ret
|= LM_OUT_CANCELED
;
116 if (lp
->lksb
.sb_status
) {
117 /* a "normal" error */
118 if ((lp
->lksb
.sb_status
== -EAGAIN
) &&
119 (lp
->lkf
& DLM_LKF_NOQUEUE
)) {
121 if (lp
->cur
== DLM_LOCK_IV
)
122 lp
->lksb
.sb_lkid
= 0;
126 /* this could only happen with cancels I think */
127 log_info("ast sb_status %d %x,%llx flags %lx",
128 lp
->lksb
.sb_status
, lp
->lockname
.ln_type
,
129 (unsigned long long)lp
->lockname
.ln_number
,
135 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
138 if (test_and_clear_bit(LFL_SYNC_LVB
, &lp
->flags
)) {
139 complete(&lp
->ast_wait
);
144 * A lock has been demoted to NL because it initially completed during
145 * BLOCK_LOCKS. Now it must be requested in the originally requested
149 if (test_and_clear_bit(LFL_REREQUEST
, &lp
->flags
)) {
150 gdlm_assert(lp
->req
== DLM_LOCK_NL
, "%x,%llx",
151 lp
->lockname
.ln_type
,
152 (unsigned long long)lp
->lockname
.ln_number
);
153 gdlm_assert(lp
->prev_req
> DLM_LOCK_NL
, "%x,%llx",
154 lp
->lockname
.ln_type
,
155 (unsigned long long)lp
->lockname
.ln_number
);
157 lp
->cur
= DLM_LOCK_NL
;
158 lp
->req
= lp
->prev_req
;
159 lp
->prev_req
= DLM_LOCK_IV
;
160 lp
->lkf
&= ~DLM_LKF_CONVDEADLK
;
162 set_bit(LFL_NOCACHE
, &lp
->flags
);
164 if (test_bit(DFL_BLOCK_LOCKS
, &ls
->flags
) &&
165 !test_bit(LFL_NOBLOCK
, &lp
->flags
))
166 gdlm_queue_delayed(lp
);
173 * A request is granted during dlm recovery. It may be granted
174 * because the locks of a failed node were cleared. In that case,
175 * there may be inconsistent data beneath this lock and we must wait
176 * for recovery to complete to use it. When gfs recovery is done this
177 * granted lock will be converted to NL and then reacquired in this
181 if (test_bit(DFL_BLOCK_LOCKS
, &ls
->flags
) &&
182 !test_bit(LFL_NOBLOCK
, &lp
->flags
) &&
183 lp
->req
!= DLM_LOCK_NL
) {
186 lp
->prev_req
= lp
->req
;
187 lp
->req
= DLM_LOCK_NL
;
188 lp
->lkf
|= DLM_LKF_CONVERT
;
189 lp
->lkf
&= ~DLM_LKF_CONVDEADLK
;
191 log_debug("rereq %x,%llx id %x %d,%d",
192 lp
->lockname
.ln_type
,
193 (unsigned long long)lp
->lockname
.ln_number
,
194 lp
->lksb
.sb_lkid
, lp
->cur
, lp
->req
);
196 set_bit(LFL_REREQUEST
, &lp
->flags
);
202 * DLM demoted the lock to NL before it was granted so GFS must be
203 * told it cannot cache data for this lock.
206 if (lp
->lksb
.sb_flags
& DLM_SBF_DEMOTED
)
207 set_bit(LFL_NOCACHE
, &lp
->flags
);
211 * This is an internal lock_dlm lock
214 if (test_bit(LFL_INLOCK
, &lp
->flags
)) {
215 clear_bit(LFL_NOBLOCK
, &lp
->flags
);
217 complete(&lp
->ast_wait
);
222 * Normal completion of a lock request. Tell GFS it now has the lock.
225 clear_bit(LFL_NOBLOCK
, &lp
->flags
);
228 acb
.lc_name
= lp
->lockname
;
229 acb
.lc_ret
|= gdlm_make_lmstate(lp
->cur
);
231 if (!test_and_clear_bit(LFL_NOCACHE
, &lp
->flags
) &&
232 (lp
->cur
> DLM_LOCK_NL
) && (prev_mode
> DLM_LOCK_NL
))
233 acb
.lc_ret
|= LM_OUT_CACHEABLE
;
235 ls
->fscb(ls
->fsdata
, LM_CB_ASYNC
, &acb
);
238 static inline int no_work(struct gdlm_ls
*ls
, int blocking
)
242 spin_lock(&ls
->async_lock
);
243 ret
= list_empty(&ls
->complete
) && list_empty(&ls
->submit
);
245 ret
= list_empty(&ls
->blocking
);
246 spin_unlock(&ls
->async_lock
);
251 static inline int check_drop(struct gdlm_ls
*ls
)
253 if (!ls
->drop_locks_count
)
256 if (time_after(jiffies
, ls
->drop_time
+ ls
->drop_locks_period
* HZ
)) {
257 ls
->drop_time
= jiffies
;
258 if (ls
->all_locks_count
>= ls
->drop_locks_count
)
264 static int gdlm_thread(void *data
)
266 struct gdlm_ls
*ls
= (struct gdlm_ls
*) data
;
267 struct gdlm_lock
*lp
= NULL
;
269 uint8_t complete
, blocking
, submit
, drop
;
270 DECLARE_WAITQUEUE(wait
, current
);
272 /* Only thread1 is allowed to do blocking callbacks since gfs
273 may wait for a completion callback within a blocking cb. */
275 if (current
== ls
->thread1
)
278 while (!kthread_should_stop()) {
279 set_current_state(TASK_INTERRUPTIBLE
);
280 add_wait_queue(&ls
->thread_wait
, &wait
);
281 if (no_work(ls
, blist
))
283 remove_wait_queue(&ls
->thread_wait
, &wait
);
284 set_current_state(TASK_RUNNING
);
286 complete
= blocking
= submit
= drop
= 0;
288 spin_lock(&ls
->async_lock
);
290 if (blist
&& !list_empty(&ls
->blocking
)) {
291 lp
= list_entry(ls
->blocking
.next
, struct gdlm_lock
,
293 list_del_init(&lp
->blist
);
294 blocking
= lp
->bast_mode
;
296 } else if (!list_empty(&ls
->complete
)) {
297 lp
= list_entry(ls
->complete
.next
, struct gdlm_lock
,
299 list_del_init(&lp
->clist
);
301 } else if (!list_empty(&ls
->submit
)) {
302 lp
= list_entry(ls
->submit
.next
, struct gdlm_lock
,
304 list_del_init(&lp
->delay_list
);
308 drop
= check_drop(ls
);
309 spin_unlock(&ls
->async_lock
);
312 process_complete(lp
);
315 process_blocking(lp
, blocking
);
321 ls
->fscb(ls
->fsdata
, LM_CB_DROPLOCKS
, NULL
);
329 int gdlm_init_threads(struct gdlm_ls
*ls
)
331 struct task_struct
*p
;
334 p
= kthread_run(gdlm_thread
, ls
, "lock_dlm1");
337 log_error("can't start lock_dlm1 thread %d", error
);
342 p
= kthread_run(gdlm_thread
, ls
, "lock_dlm2");
345 log_error("can't start lock_dlm2 thread %d", error
);
346 kthread_stop(ls
->thread1
);
354 void gdlm_release_threads(struct gdlm_ls
*ls
)
356 kthread_stop(ls
->thread1
);
357 kthread_stop(ls
->thread2
);