1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
18 static uint64_t dlm_cb_seq
;
19 static DEFINE_SPINLOCK(dlm_cb_seq_spin
);
21 static void dlm_dump_lkb_callbacks(struct dlm_lkb
*lkb
)
25 log_print("last_bast %x %llu flags %x mode %d sb %d %x",
27 (unsigned long long)lkb
->lkb_last_bast
.seq
,
28 lkb
->lkb_last_bast
.flags
,
29 lkb
->lkb_last_bast
.mode
,
30 lkb
->lkb_last_bast
.sb_status
,
31 lkb
->lkb_last_bast
.sb_flags
);
33 log_print("last_cast %x %llu flags %x mode %d sb %d %x",
35 (unsigned long long)lkb
->lkb_last_cast
.seq
,
36 lkb
->lkb_last_cast
.flags
,
37 lkb
->lkb_last_cast
.mode
,
38 lkb
->lkb_last_cast
.sb_status
,
39 lkb
->lkb_last_cast
.sb_flags
);
41 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
42 log_print("cb %x %llu flags %x mode %d sb %d %x",
44 (unsigned long long)lkb
->lkb_callbacks
[i
].seq
,
45 lkb
->lkb_callbacks
[i
].flags
,
46 lkb
->lkb_callbacks
[i
].mode
,
47 lkb
->lkb_callbacks
[i
].sb_status
,
48 lkb
->lkb_callbacks
[i
].sb_flags
);
52 int dlm_add_lkb_callback(struct dlm_lkb
*lkb
, uint32_t flags
, int mode
,
53 int status
, uint32_t sbflags
, uint64_t seq
)
55 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
60 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
61 if (lkb
->lkb_callbacks
[i
].seq
)
65 * Suppress some redundant basts here, do more on removal.
66 * Don't even add a bast if the callback just before it
67 * is a bast for the same mode or a more restrictive mode.
68 * (the addional > PR check is needed for PR/CW inversion)
71 if ((i
> 0) && (flags
& DLM_CB_BAST
) &&
72 (lkb
->lkb_callbacks
[i
-1].flags
& DLM_CB_BAST
)) {
74 prev_seq
= lkb
->lkb_callbacks
[i
-1].seq
;
75 prev_mode
= lkb
->lkb_callbacks
[i
-1].mode
;
77 if ((prev_mode
== mode
) ||
78 (prev_mode
> mode
&& prev_mode
> DLM_LOCK_PR
)) {
80 log_debug(ls
, "skip %x add bast %llu mode %d "
81 "for bast %llu mode %d",
83 (unsigned long long)seq
,
85 (unsigned long long)prev_seq
,
92 lkb
->lkb_callbacks
[i
].seq
= seq
;
93 lkb
->lkb_callbacks
[i
].flags
= flags
;
94 lkb
->lkb_callbacks
[i
].mode
= mode
;
95 lkb
->lkb_callbacks
[i
].sb_status
= status
;
96 lkb
->lkb_callbacks
[i
].sb_flags
= (sbflags
& 0x000000FF);
101 if (i
== DLM_CALLBACKS_SIZE
) {
102 log_error(ls
, "no callbacks %x %llu flags %x mode %d sb %d %x",
103 lkb
->lkb_id
, (unsigned long long)seq
,
104 flags
, mode
, status
, sbflags
);
105 dlm_dump_lkb_callbacks(lkb
);
113 int dlm_rem_lkb_callback(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
114 struct dlm_callback
*cb
, int *resid
)
120 if (!lkb
->lkb_callbacks
[0].seq
) {
125 /* oldest undelivered cb is callbacks[0] */
127 memcpy(cb
, &lkb
->lkb_callbacks
[0], sizeof(struct dlm_callback
));
128 memset(&lkb
->lkb_callbacks
[0], 0, sizeof(struct dlm_callback
));
130 /* shift others down */
132 for (i
= 1; i
< DLM_CALLBACKS_SIZE
; i
++) {
133 if (!lkb
->lkb_callbacks
[i
].seq
)
135 memcpy(&lkb
->lkb_callbacks
[i
-1], &lkb
->lkb_callbacks
[i
],
136 sizeof(struct dlm_callback
));
137 memset(&lkb
->lkb_callbacks
[i
], 0, sizeof(struct dlm_callback
));
141 /* if cb is a bast, it should be skipped if the blocking mode is
142 compatible with the last granted mode */
144 if ((cb
->flags
& DLM_CB_BAST
) && lkb
->lkb_last_cast
.seq
) {
145 if (dlm_modes_compat(cb
->mode
, lkb
->lkb_last_cast
.mode
)) {
146 cb
->flags
|= DLM_CB_SKIP
;
148 log_debug(ls
, "skip %x bast %llu mode %d "
149 "for cast %llu mode %d",
151 (unsigned long long)cb
->seq
,
153 (unsigned long long)lkb
->lkb_last_cast
.seq
,
154 lkb
->lkb_last_cast
.mode
);
160 if (cb
->flags
& DLM_CB_CAST
) {
161 memcpy(&lkb
->lkb_last_cast
, cb
, sizeof(struct dlm_callback
));
162 lkb
->lkb_last_cast_time
= ktime_get();
165 if (cb
->flags
& DLM_CB_BAST
) {
166 memcpy(&lkb
->lkb_last_bast
, cb
, sizeof(struct dlm_callback
));
167 lkb
->lkb_last_bast_time
= ktime_get();
174 void dlm_add_cb(struct dlm_lkb
*lkb
, uint32_t flags
, int mode
, int status
,
177 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
178 uint64_t new_seq
, prev_seq
;
181 spin_lock(&dlm_cb_seq_spin
);
182 new_seq
= ++dlm_cb_seq
;
183 spin_unlock(&dlm_cb_seq_spin
);
185 if (lkb
->lkb_flags
& DLM_IFL_USER
) {
186 dlm_user_add_ast(lkb
, flags
, mode
, status
, sbflags
, new_seq
);
190 mutex_lock(&lkb
->lkb_cb_mutex
);
191 prev_seq
= lkb
->lkb_callbacks
[0].seq
;
193 rv
= dlm_add_lkb_callback(lkb
, flags
, mode
, status
, sbflags
, new_seq
);
198 kref_get(&lkb
->lkb_ref
);
200 if (test_bit(LSFL_CB_DELAY
, &ls
->ls_flags
)) {
201 mutex_lock(&ls
->ls_cb_mutex
);
202 list_add(&lkb
->lkb_cb_list
, &ls
->ls_cb_delay
);
203 mutex_unlock(&ls
->ls_cb_mutex
);
205 queue_work(ls
->ls_callback_wq
, &lkb
->lkb_cb_work
);
209 mutex_unlock(&lkb
->lkb_cb_mutex
);
212 void dlm_callback_work(struct work_struct
*work
)
214 struct dlm_lkb
*lkb
= container_of(work
, struct dlm_lkb
, lkb_cb_work
);
215 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
216 void (*castfn
) (void *astparam
);
217 void (*bastfn
) (void *astparam
, int mode
);
218 struct dlm_callback callbacks
[DLM_CALLBACKS_SIZE
];
221 memset(&callbacks
, 0, sizeof(callbacks
));
223 mutex_lock(&lkb
->lkb_cb_mutex
);
224 if (!lkb
->lkb_callbacks
[0].seq
) {
225 /* no callback work exists, shouldn't happen */
226 log_error(ls
, "dlm_callback_work %x no work", lkb
->lkb_id
);
228 dlm_dump_lkb_callbacks(lkb
);
231 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
232 rv
= dlm_rem_lkb_callback(ls
, lkb
, &callbacks
[i
], &resid
);
238 /* cbs remain, loop should have removed all, shouldn't happen */
239 log_error(ls
, "dlm_callback_work %x resid %d", lkb
->lkb_id
,
242 dlm_dump_lkb_callbacks(lkb
);
244 mutex_unlock(&lkb
->lkb_cb_mutex
);
246 castfn
= lkb
->lkb_astfn
;
247 bastfn
= lkb
->lkb_bastfn
;
249 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
250 if (!callbacks
[i
].seq
)
252 if (callbacks
[i
].flags
& DLM_CB_SKIP
) {
254 } else if (callbacks
[i
].flags
& DLM_CB_BAST
) {
255 bastfn(lkb
->lkb_astparam
, callbacks
[i
].mode
);
256 } else if (callbacks
[i
].flags
& DLM_CB_CAST
) {
257 lkb
->lkb_lksb
->sb_status
= callbacks
[i
].sb_status
;
258 lkb
->lkb_lksb
->sb_flags
= callbacks
[i
].sb_flags
;
259 castfn(lkb
->lkb_astparam
);
263 /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
267 int dlm_callback_start(struct dlm_ls
*ls
)
269 ls
->ls_callback_wq
= alloc_workqueue("dlm_callback",
274 if (!ls
->ls_callback_wq
) {
275 log_print("can't start dlm_callback workqueue");
281 void dlm_callback_stop(struct dlm_ls
*ls
)
283 if (ls
->ls_callback_wq
)
284 destroy_workqueue(ls
->ls_callback_wq
);
287 void dlm_callback_suspend(struct dlm_ls
*ls
)
289 set_bit(LSFL_CB_DELAY
, &ls
->ls_flags
);
291 if (ls
->ls_callback_wq
)
292 flush_workqueue(ls
->ls_callback_wq
);
295 void dlm_callback_resume(struct dlm_ls
*ls
)
297 struct dlm_lkb
*lkb
, *safe
;
300 clear_bit(LSFL_CB_DELAY
, &ls
->ls_flags
);
302 if (!ls
->ls_callback_wq
)
305 mutex_lock(&ls
->ls_cb_mutex
);
306 list_for_each_entry_safe(lkb
, safe
, &ls
->ls_cb_delay
, lkb_cb_list
) {
307 list_del_init(&lkb
->lkb_cb_list
);
308 queue_work(ls
->ls_callback_wq
, &lkb
->lkb_cb_work
);
311 mutex_unlock(&ls
->ls_cb_mutex
);
314 log_debug(ls
, "dlm_callback_resume %d", count
);