1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
28 static int rcom_response(struct dlm_ls
*ls
)
30 return test_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
33 static int create_rcom(struct dlm_ls
*ls
, int to_nodeid
, int type
, int len
,
34 struct dlm_rcom
**rc_ret
, struct dlm_mhandle
**mh_ret
)
37 struct dlm_mhandle
*mh
;
39 int mb_len
= sizeof(struct dlm_rcom
) + len
;
41 mh
= dlm_lowcomms_get_buffer(to_nodeid
, mb_len
, GFP_NOFS
, &mb
);
43 log_print("create_rcom to %d type %d len %d ENOBUFS",
44 to_nodeid
, type
, len
);
47 memset(mb
, 0, mb_len
);
49 rc
= (struct dlm_rcom
*) mb
;
51 rc
->rc_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
52 rc
->rc_header
.h_lockspace
= ls
->ls_global_id
;
53 rc
->rc_header
.h_nodeid
= dlm_our_nodeid();
54 rc
->rc_header
.h_length
= mb_len
;
55 rc
->rc_header
.h_cmd
= DLM_RCOM
;
59 spin_lock(&ls
->ls_recover_lock
);
60 rc
->rc_seq
= ls
->ls_recover_seq
;
61 spin_unlock(&ls
->ls_recover_lock
);
68 static void send_rcom(struct dlm_ls
*ls
, struct dlm_mhandle
*mh
,
72 dlm_lowcomms_commit_buffer(mh
);
75 /* When replying to a status request, a node also sends back its
76 configuration values. The requesting node then checks that the remote
77 node is configured the same way as itself. */
79 static void make_config(struct dlm_ls
*ls
, struct rcom_config
*rf
)
81 rf
->rf_lvblen
= cpu_to_le32(ls
->ls_lvblen
);
82 rf
->rf_lsflags
= cpu_to_le32(ls
->ls_exflags
);
85 static int check_config(struct dlm_ls
*ls
, struct dlm_rcom
*rc
, int nodeid
)
87 struct rcom_config
*rf
= (struct rcom_config
*) rc
->rc_buf
;
88 size_t conf_size
= sizeof(struct dlm_rcom
) + sizeof(struct rcom_config
);
90 if ((rc
->rc_header
.h_version
& 0xFFFF0000) != DLM_HEADER_MAJOR
) {
91 log_error(ls
, "version mismatch: %x nodeid %d: %x",
92 DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
, nodeid
,
93 rc
->rc_header
.h_version
);
97 if (rc
->rc_header
.h_length
< conf_size
) {
98 log_error(ls
, "config too short: %d nodeid %d",
99 rc
->rc_header
.h_length
, nodeid
);
103 if (le32_to_cpu(rf
->rf_lvblen
) != ls
->ls_lvblen
||
104 le32_to_cpu(rf
->rf_lsflags
) != ls
->ls_exflags
) {
105 log_error(ls
, "config mismatch: %d,%x nodeid %d: %d,%x",
106 ls
->ls_lvblen
, ls
->ls_exflags
, nodeid
,
107 le32_to_cpu(rf
->rf_lvblen
),
108 le32_to_cpu(rf
->rf_lsflags
));
114 static void allow_sync_reply(struct dlm_ls
*ls
, uint64_t *new_seq
)
116 spin_lock(&ls
->ls_rcom_spin
);
117 *new_seq
= ++ls
->ls_rcom_seq
;
118 set_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
119 spin_unlock(&ls
->ls_rcom_spin
);
122 static void disallow_sync_reply(struct dlm_ls
*ls
)
124 spin_lock(&ls
->ls_rcom_spin
);
125 clear_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
126 clear_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
127 spin_unlock(&ls
->ls_rcom_spin
);
130 int dlm_rcom_status(struct dlm_ls
*ls
, int nodeid
)
133 struct dlm_mhandle
*mh
;
136 ls
->ls_recover_nodeid
= nodeid
;
138 if (nodeid
== dlm_our_nodeid()) {
139 rc
= ls
->ls_recover_buf
;
140 rc
->rc_result
= dlm_recover_status(ls
);
144 error
= create_rcom(ls
, nodeid
, DLM_RCOM_STATUS
, 0, &rc
, &mh
);
148 allow_sync_reply(ls
, &rc
->rc_id
);
149 memset(ls
->ls_recover_buf
, 0, dlm_config
.ci_buffer_size
);
151 send_rcom(ls
, mh
, rc
);
153 error
= dlm_wait_function(ls
, &rcom_response
);
154 disallow_sync_reply(ls
);
158 rc
= ls
->ls_recover_buf
;
160 if (rc
->rc_result
== -ESRCH
) {
161 /* we pretend the remote lockspace exists with 0 status */
162 log_debug(ls
, "remote node %d not ready", nodeid
);
165 error
= check_config(ls
, rc
, nodeid
);
166 /* the caller looks at rc_result for the remote recovery status */
171 static void receive_rcom_status(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
174 struct dlm_mhandle
*mh
;
175 int error
, nodeid
= rc_in
->rc_header
.h_nodeid
;
177 error
= create_rcom(ls
, nodeid
, DLM_RCOM_STATUS_REPLY
,
178 sizeof(struct rcom_config
), &rc
, &mh
);
181 rc
->rc_id
= rc_in
->rc_id
;
182 rc
->rc_seq_reply
= rc_in
->rc_seq
;
183 rc
->rc_result
= dlm_recover_status(ls
);
184 make_config(ls
, (struct rcom_config
*) rc
->rc_buf
);
186 send_rcom(ls
, mh
, rc
);
189 static void receive_sync_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
191 spin_lock(&ls
->ls_rcom_spin
);
192 if (!test_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
) ||
193 rc_in
->rc_id
!= ls
->ls_rcom_seq
) {
194 log_debug(ls
, "reject reply %d from %d seq %llx expect %llx",
195 rc_in
->rc_type
, rc_in
->rc_header
.h_nodeid
,
196 (unsigned long long)rc_in
->rc_id
,
197 (unsigned long long)ls
->ls_rcom_seq
);
200 memcpy(ls
->ls_recover_buf
, rc_in
, rc_in
->rc_header
.h_length
);
201 set_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
202 clear_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
203 wake_up(&ls
->ls_wait_general
);
205 spin_unlock(&ls
->ls_rcom_spin
);
208 int dlm_rcom_names(struct dlm_ls
*ls
, int nodeid
, char *last_name
, int last_len
)
211 struct dlm_mhandle
*mh
;
213 int max_size
= dlm_config
.ci_buffer_size
- sizeof(struct dlm_rcom
);
215 ls
->ls_recover_nodeid
= nodeid
;
217 if (nodeid
== dlm_our_nodeid()) {
218 ls
->ls_recover_buf
->rc_header
.h_length
=
219 dlm_config
.ci_buffer_size
;
220 dlm_copy_master_names(ls
, last_name
, last_len
,
221 ls
->ls_recover_buf
->rc_buf
,
226 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES
, last_len
, &rc
, &mh
);
229 memcpy(rc
->rc_buf
, last_name
, last_len
);
231 allow_sync_reply(ls
, &rc
->rc_id
);
232 memset(ls
->ls_recover_buf
, 0, dlm_config
.ci_buffer_size
);
234 send_rcom(ls
, mh
, rc
);
236 error
= dlm_wait_function(ls
, &rcom_response
);
237 disallow_sync_reply(ls
);
242 static void receive_rcom_names(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
245 struct dlm_mhandle
*mh
;
246 int error
, inlen
, outlen
, nodeid
;
248 nodeid
= rc_in
->rc_header
.h_nodeid
;
249 inlen
= rc_in
->rc_header
.h_length
- sizeof(struct dlm_rcom
);
250 outlen
= dlm_config
.ci_buffer_size
- sizeof(struct dlm_rcom
);
252 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES_REPLY
, outlen
, &rc
, &mh
);
255 rc
->rc_id
= rc_in
->rc_id
;
256 rc
->rc_seq_reply
= rc_in
->rc_seq
;
258 dlm_copy_master_names(ls
, rc_in
->rc_buf
, inlen
, rc
->rc_buf
, outlen
,
260 send_rcom(ls
, mh
, rc
);
263 int dlm_send_rcom_lookup(struct dlm_rsb
*r
, int dir_nodeid
)
266 struct dlm_mhandle
*mh
;
267 struct dlm_ls
*ls
= r
->res_ls
;
270 error
= create_rcom(ls
, dir_nodeid
, DLM_RCOM_LOOKUP
, r
->res_length
,
274 memcpy(rc
->rc_buf
, r
->res_name
, r
->res_length
);
275 rc
->rc_id
= (unsigned long) r
;
277 send_rcom(ls
, mh
, rc
);
282 static void receive_rcom_lookup(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
285 struct dlm_mhandle
*mh
;
286 int error
, ret_nodeid
, nodeid
= rc_in
->rc_header
.h_nodeid
;
287 int len
= rc_in
->rc_header
.h_length
- sizeof(struct dlm_rcom
);
289 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOOKUP_REPLY
, 0, &rc
, &mh
);
293 error
= dlm_dir_lookup(ls
, nodeid
, rc_in
->rc_buf
, len
, &ret_nodeid
);
296 rc
->rc_result
= ret_nodeid
;
297 rc
->rc_id
= rc_in
->rc_id
;
298 rc
->rc_seq_reply
= rc_in
->rc_seq
;
300 send_rcom(ls
, mh
, rc
);
303 static void receive_rcom_lookup_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
305 dlm_recover_master_reply(ls
, rc_in
);
308 static void pack_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
309 struct rcom_lock
*rl
)
311 memset(rl
, 0, sizeof(*rl
));
313 rl
->rl_ownpid
= cpu_to_le32(lkb
->lkb_ownpid
);
314 rl
->rl_lkid
= cpu_to_le32(lkb
->lkb_id
);
315 rl
->rl_exflags
= cpu_to_le32(lkb
->lkb_exflags
);
316 rl
->rl_flags
= cpu_to_le32(lkb
->lkb_flags
);
317 rl
->rl_lvbseq
= cpu_to_le32(lkb
->lkb_lvbseq
);
318 rl
->rl_rqmode
= lkb
->lkb_rqmode
;
319 rl
->rl_grmode
= lkb
->lkb_grmode
;
320 rl
->rl_status
= lkb
->lkb_status
;
321 rl
->rl_wait_type
= cpu_to_le16(lkb
->lkb_wait_type
);
324 rl
->rl_asts
|= AST_BAST
;
326 rl
->rl_asts
|= AST_COMP
;
328 rl
->rl_namelen
= cpu_to_le16(r
->res_length
);
329 memcpy(rl
->rl_name
, r
->res_name
, r
->res_length
);
331 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
332 If so, receive_rcom_lock_args() won't take this copy. */
335 memcpy(rl
->rl_lvb
, lkb
->lkb_lvbptr
, r
->res_ls
->ls_lvblen
);
338 int dlm_send_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
340 struct dlm_ls
*ls
= r
->res_ls
;
342 struct dlm_mhandle
*mh
;
343 struct rcom_lock
*rl
;
344 int error
, len
= sizeof(struct rcom_lock
);
347 len
+= ls
->ls_lvblen
;
349 error
= create_rcom(ls
, r
->res_nodeid
, DLM_RCOM_LOCK
, len
, &rc
, &mh
);
353 rl
= (struct rcom_lock
*) rc
->rc_buf
;
354 pack_rcom_lock(r
, lkb
, rl
);
355 rc
->rc_id
= (unsigned long) r
;
357 send_rcom(ls
, mh
, rc
);
362 /* needs at least dlm_rcom + rcom_lock */
363 static void receive_rcom_lock(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
366 struct dlm_mhandle
*mh
;
367 int error
, nodeid
= rc_in
->rc_header
.h_nodeid
;
369 dlm_recover_master_copy(ls
, rc_in
);
371 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOCK_REPLY
,
372 sizeof(struct rcom_lock
), &rc
, &mh
);
376 /* We send back the same rcom_lock struct we received, but
377 dlm_recover_master_copy() has filled in rl_remid and rl_result */
379 memcpy(rc
->rc_buf
, rc_in
->rc_buf
, sizeof(struct rcom_lock
));
380 rc
->rc_id
= rc_in
->rc_id
;
381 rc
->rc_seq_reply
= rc_in
->rc_seq
;
383 send_rcom(ls
, mh
, rc
);
386 /* If the lockspace doesn't exist then still send a status message
387 back; it's possible that it just doesn't have its global_id yet. */
389 int dlm_send_ls_not_ready(int nodeid
, struct dlm_rcom
*rc_in
)
392 struct rcom_config
*rf
;
393 struct dlm_mhandle
*mh
;
395 int mb_len
= sizeof(struct dlm_rcom
) + sizeof(struct rcom_config
);
397 mh
= dlm_lowcomms_get_buffer(nodeid
, mb_len
, GFP_NOFS
, &mb
);
400 memset(mb
, 0, mb_len
);
402 rc
= (struct dlm_rcom
*) mb
;
404 rc
->rc_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
405 rc
->rc_header
.h_lockspace
= rc_in
->rc_header
.h_lockspace
;
406 rc
->rc_header
.h_nodeid
= dlm_our_nodeid();
407 rc
->rc_header
.h_length
= mb_len
;
408 rc
->rc_header
.h_cmd
= DLM_RCOM
;
410 rc
->rc_type
= DLM_RCOM_STATUS_REPLY
;
411 rc
->rc_id
= rc_in
->rc_id
;
412 rc
->rc_seq_reply
= rc_in
->rc_seq
;
413 rc
->rc_result
= -ESRCH
;
415 rf
= (struct rcom_config
*) rc
->rc_buf
;
416 rf
->rf_lvblen
= cpu_to_le32(~0U);
419 dlm_lowcomms_commit_buffer(mh
);
424 static int is_old_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc
)
429 switch (rc
->rc_type
) {
430 case DLM_RCOM_STATUS_REPLY
:
431 case DLM_RCOM_NAMES_REPLY
:
432 case DLM_RCOM_LOOKUP_REPLY
:
433 case DLM_RCOM_LOCK_REPLY
:
434 spin_lock(&ls
->ls_recover_lock
);
435 seq
= ls
->ls_recover_seq
;
436 spin_unlock(&ls
->ls_recover_lock
);
437 if (rc
->rc_seq_reply
!= seq
) {
438 log_debug(ls
, "ignoring old reply %x from %d "
439 "seq_reply %llx expect %llx",
440 rc
->rc_type
, rc
->rc_header
.h_nodeid
,
441 (unsigned long long)rc
->rc_seq_reply
,
442 (unsigned long long)seq
);
449 /* Called by dlm_recv; corresponds to dlm_receive_message() but special
450 recovery-only comms are sent through here. */
452 void dlm_receive_rcom(struct dlm_ls
*ls
, struct dlm_rcom
*rc
, int nodeid
)
454 int lock_size
= sizeof(struct dlm_rcom
) + sizeof(struct rcom_lock
);
456 if (dlm_recovery_stopped(ls
) && (rc
->rc_type
!= DLM_RCOM_STATUS
)) {
457 log_debug(ls
, "ignoring recovery message %x from %d",
458 rc
->rc_type
, nodeid
);
462 if (is_old_reply(ls
, rc
))
465 switch (rc
->rc_type
) {
466 case DLM_RCOM_STATUS
:
467 receive_rcom_status(ls
, rc
);
471 receive_rcom_names(ls
, rc
);
474 case DLM_RCOM_LOOKUP
:
475 receive_rcom_lookup(ls
, rc
);
479 if (rc
->rc_header
.h_length
< lock_size
)
481 receive_rcom_lock(ls
, rc
);
484 case DLM_RCOM_STATUS_REPLY
:
485 receive_sync_reply(ls
, rc
);
488 case DLM_RCOM_NAMES_REPLY
:
489 receive_sync_reply(ls
, rc
);
492 case DLM_RCOM_LOOKUP_REPLY
:
493 receive_rcom_lookup_reply(ls
, rc
);
496 case DLM_RCOM_LOCK_REPLY
:
497 if (rc
->rc_header
.h_length
< lock_size
)
499 dlm_recover_process_copy(ls
, rc
);
503 log_error(ls
, "receive_rcom bad type %d", rc
->rc_type
);
508 log_error(ls
, "recovery message %x from %d is too short",
509 rc
->rc_type
, nodeid
);