4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
32 ctdb_control_getvnnmap(struct ctdb_context
*ctdb
, uint32_t opcode
, TDB_DATA indata
, TDB_DATA
*outdata
)
34 struct ctdb_vnn_map_wire
*map
;
37 CHECK_CONTROL_DATA_SIZE(0);
39 len
= offsetof(struct ctdb_vnn_map_wire
, map
) + sizeof(uint32_t)*ctdb
->vnn_map
->size
;
40 map
= talloc_size(outdata
, len
);
41 CTDB_NO_MEMORY(ctdb
, map
);
43 map
->generation
= ctdb
->vnn_map
->generation
;
44 map
->size
= ctdb
->vnn_map
->size
;
45 memcpy(map
->map
, ctdb
->vnn_map
->map
, sizeof(uint32_t)*map
->size
);
48 outdata
->dptr
= (uint8_t *)map
;
54 ctdb_control_setvnnmap(struct ctdb_context
*ctdb
, uint32_t opcode
, TDB_DATA indata
, TDB_DATA
*outdata
)
56 struct ctdb_vnn_map_wire
*map
= (struct ctdb_vnn_map_wire
*)indata
.dptr
;
59 for(i
=1; i
<=NUM_DB_PRIORITIES
; i
++) {
60 if (ctdb
->freeze_mode
[i
] != CTDB_FREEZE_FROZEN
) {
61 DEBUG(DEBUG_ERR
,("Attempt to set vnnmap when not frozen\n"));
66 talloc_free(ctdb
->vnn_map
);
68 ctdb
->vnn_map
= talloc(ctdb
, struct ctdb_vnn_map
);
69 CTDB_NO_MEMORY(ctdb
, ctdb
->vnn_map
);
71 ctdb
->vnn_map
->generation
= map
->generation
;
72 ctdb
->vnn_map
->size
= map
->size
;
73 ctdb
->vnn_map
->map
= talloc_array(ctdb
->vnn_map
, uint32_t, map
->size
);
74 CTDB_NO_MEMORY(ctdb
, ctdb
->vnn_map
->map
);
76 memcpy(ctdb
->vnn_map
->map
, map
->map
, sizeof(uint32_t)*map
->size
);
82 ctdb_control_getdbmap(struct ctdb_context
*ctdb
, uint32_t opcode
, TDB_DATA indata
, TDB_DATA
*outdata
)
85 struct ctdb_db_context
*ctdb_db
;
86 struct ctdb_dbid_map
*dbid_map
;
88 CHECK_CONTROL_DATA_SIZE(0);
91 for(ctdb_db
=ctdb
->db_list
;ctdb_db
;ctdb_db
=ctdb_db
->next
){
96 outdata
->dsize
= offsetof(struct ctdb_dbid_map
, dbs
) + sizeof(dbid_map
->dbs
[0])*len
;
97 outdata
->dptr
= (unsigned char *)talloc_zero_size(outdata
, outdata
->dsize
);
99 DEBUG(DEBUG_ALERT
, (__location__
" Failed to allocate dbmap array\n"));
103 dbid_map
= (struct ctdb_dbid_map
*)outdata
->dptr
;
105 for (i
=0,ctdb_db
=ctdb
->db_list
;ctdb_db
;i
++,ctdb_db
=ctdb_db
->next
){
106 dbid_map
->dbs
[i
].dbid
= ctdb_db
->db_id
;
107 if (ctdb_db
->persistent
!= 0) {
108 dbid_map
->dbs
[i
].flags
|= CTDB_DB_FLAGS_PERSISTENT
;
110 if (ctdb_db
->readonly
!= 0) {
111 dbid_map
->dbs
[i
].flags
|= CTDB_DB_FLAGS_READONLY
;
113 if (ctdb_db
->sticky
!= 0) {
114 dbid_map
->dbs
[i
].flags
|= CTDB_DB_FLAGS_STICKY
;
122 ctdb_control_getnodemap(struct ctdb_context
*ctdb
, uint32_t opcode
, TDB_DATA indata
, TDB_DATA
*outdata
)
124 CHECK_CONTROL_DATA_SIZE(0);
126 outdata
->dptr
= (unsigned char *)ctdb_node_list_to_map(ctdb
->nodes
,
129 if (outdata
->dptr
== NULL
) {
133 outdata
->dsize
= talloc_get_size(outdata
->dptr
);
139 reload the nodes file
142 ctdb_control_reload_nodes_file(struct ctdb_context
*ctdb
, uint32_t opcode
)
146 struct ctdb_node
**nodes
;
148 tmp_ctx
= talloc_new(ctdb
);
150 /* steal the old nodes file for a while */
151 talloc_steal(tmp_ctx
, ctdb
->nodes
);
154 num_nodes
= ctdb
->num_nodes
;
157 /* load the new nodes file */
158 ctdb_load_nodes_file(ctdb
);
160 for (i
=0; i
<ctdb
->num_nodes
; i
++) {
161 /* keep any identical pre-existing nodes and connections */
162 if ((i
< num_nodes
) && ctdb_same_address(&ctdb
->nodes
[i
]->address
, &nodes
[i
]->address
)) {
163 talloc_free(ctdb
->nodes
[i
]);
164 ctdb
->nodes
[i
] = talloc_steal(ctdb
->nodes
, nodes
[i
]);
168 if (ctdb
->nodes
[i
]->flags
& NODE_FLAGS_DELETED
) {
172 /* any new or different nodes must be added */
173 if (ctdb
->methods
->add_node(ctdb
->nodes
[i
]) != 0) {
174 DEBUG(DEBUG_CRIT
, (__location__
" methods->add_node failed at %d\n", i
));
175 ctdb_fatal(ctdb
, "failed to add node. shutting down\n");
177 if (ctdb
->methods
->connect_node(ctdb
->nodes
[i
]) != 0) {
178 DEBUG(DEBUG_CRIT
, (__location__
" methods->add_connect failed at %d\n", i
));
179 ctdb_fatal(ctdb
, "failed to connect to node. shutting down\n");
183 /* tell the recovery daemon to reaload the nodes file too */
184 ctdb_daemon_send_message(ctdb
, ctdb
->pnn
, CTDB_SRVID_RELOAD_NODES
, tdb_null
);
186 talloc_free(tmp_ctx
);
192 a traverse function for pulling all relevent records from pulldb
195 struct ctdb_context
*ctdb
;
196 struct ctdb_db_context
*ctdb_db
;
197 struct ctdb_marshall_buffer
*pulldata
;
199 uint32_t allocated_len
;
203 static int traverse_pulldb(struct tdb_context
*tdb
, TDB_DATA key
, TDB_DATA data
, void *p
)
205 struct pulldb_data
*params
= (struct pulldb_data
*)p
;
206 struct ctdb_rec_data
*rec
;
207 struct ctdb_context
*ctdb
= params
->ctdb
;
208 struct ctdb_db_context
*ctdb_db
= params
->ctdb_db
;
210 /* add the record to the blob */
211 rec
= ctdb_marshall_record(params
->pulldata
, 0, key
, NULL
, data
);
213 params
->failed
= true;
216 if (params
->len
+ rec
->length
>= params
->allocated_len
) {
217 params
->allocated_len
= rec
->length
+ params
->len
+ ctdb
->tunable
.pulldb_preallocation_size
;
218 params
->pulldata
= talloc_realloc_size(NULL
, params
->pulldata
, params
->allocated_len
);
220 if (params
->pulldata
== NULL
) {
221 DEBUG(DEBUG_CRIT
,(__location__
" Failed to expand pulldb_data to %u\n", rec
->length
+ params
->len
));
222 ctdb_fatal(params
->ctdb
, "failed to allocate memory for recovery. shutting down\n");
224 params
->pulldata
->count
++;
225 memcpy(params
->len
+(uint8_t *)params
->pulldata
, rec
, rec
->length
);
226 params
->len
+= rec
->length
;
228 if (ctdb
->tunable
.db_record_size_warn
!= 0 && rec
->length
> ctdb
->tunable
.db_record_size_warn
) {
229 DEBUG(DEBUG_ERR
,("Data record in %s is big. Record size is %d bytes\n", ctdb_db
->db_name
, (int)rec
->length
));
238 pull a bunch of records from a ltdb, filtering by lmaster
240 int32_t ctdb_control_pull_db(struct ctdb_context
*ctdb
, TDB_DATA indata
, TDB_DATA
*outdata
)
242 struct ctdb_control_pulldb
*pull
;
243 struct ctdb_db_context
*ctdb_db
;
244 struct pulldb_data params
;
245 struct ctdb_marshall_buffer
*reply
;
247 pull
= (struct ctdb_control_pulldb
*)indata
.dptr
;
249 ctdb_db
= find_ctdb_db(ctdb
, pull
->db_id
);
251 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%08x\n", pull
->db_id
));
255 if (ctdb
->freeze_mode
[ctdb_db
->priority
] != CTDB_FREEZE_FROZEN
) {
256 DEBUG(DEBUG_DEBUG
,("rejecting ctdb_control_pull_db when not frozen\n"));
260 reply
= talloc_zero(outdata
, struct ctdb_marshall_buffer
);
261 CTDB_NO_MEMORY(ctdb
, reply
);
263 reply
->db_id
= pull
->db_id
;
266 params
.ctdb_db
= ctdb_db
;
267 params
.pulldata
= reply
;
268 params
.len
= offsetof(struct ctdb_marshall_buffer
, data
);
269 params
.allocated_len
= params
.len
;
270 params
.failed
= false;
272 if (ctdb_db
->unhealthy_reason
) {
273 /* this is just a warning, as the tdb should be empty anyway */
274 DEBUG(DEBUG_WARNING
,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
275 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
278 if (ctdb_lockall_mark_prio(ctdb
, ctdb_db
->priority
) != 0) {
279 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock on entired db - failing\n"));
283 if (tdb_traverse_read(ctdb_db
->ltdb
->tdb
, traverse_pulldb
, ¶ms
) == -1) {
284 DEBUG(DEBUG_ERR
,(__location__
" Failed to get traverse db '%s'\n", ctdb_db
->db_name
));
285 ctdb_lockall_unmark_prio(ctdb
, ctdb_db
->priority
);
286 talloc_free(params
.pulldata
);
290 ctdb_lockall_unmark_prio(ctdb
, ctdb_db
->priority
);
292 outdata
->dptr
= (uint8_t *)params
.pulldata
;
293 outdata
->dsize
= params
.len
;
295 if (ctdb
->tunable
.db_record_count_warn
!= 0 && params
.pulldata
->count
> ctdb
->tunable
.db_record_count_warn
) {
296 DEBUG(DEBUG_ERR
,("Database %s is big. Contains %d records\n", ctdb_db
->db_name
, params
.pulldata
->count
));
298 if (ctdb
->tunable
.db_size_warn
!= 0 && outdata
->dsize
> ctdb
->tunable
.db_size_warn
) {
299 DEBUG(DEBUG_ERR
,("Database %s is big. Contains %d bytes\n", ctdb_db
->db_name
, (int)outdata
->dsize
));
307 push a bunch of records into a ltdb, filtering by rsn
309 int32_t ctdb_control_push_db(struct ctdb_context
*ctdb
, TDB_DATA indata
)
311 struct ctdb_marshall_buffer
*reply
= (struct ctdb_marshall_buffer
*)indata
.dptr
;
312 struct ctdb_db_context
*ctdb_db
;
314 struct ctdb_rec_data
*rec
;
316 if (indata
.dsize
< offsetof(struct ctdb_marshall_buffer
, data
)) {
317 DEBUG(DEBUG_ERR
,(__location__
" invalid data in pulldb reply\n"));
321 ctdb_db
= find_ctdb_db(ctdb
, reply
->db_id
);
323 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%08x\n", reply
->db_id
));
327 if (ctdb
->freeze_mode
[ctdb_db
->priority
] != CTDB_FREEZE_FROZEN
) {
328 DEBUG(DEBUG_DEBUG
,("rejecting ctdb_control_push_db when not frozen\n"));
332 if (ctdb_lockall_mark_prio(ctdb
, ctdb_db
->priority
) != 0) {
333 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock on entired db - failing\n"));
337 rec
= (struct ctdb_rec_data
*)&reply
->data
[0];
339 DEBUG(DEBUG_INFO
,("starting push of %u records for dbid 0x%x\n",
340 reply
->count
, reply
->db_id
));
342 for (i
=0;i
<reply
->count
;i
++) {
344 struct ctdb_ltdb_header
*hdr
;
346 key
.dptr
= &rec
->data
[0];
347 key
.dsize
= rec
->keylen
;
348 data
.dptr
= &rec
->data
[key
.dsize
];
349 data
.dsize
= rec
->datalen
;
351 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
352 DEBUG(DEBUG_CRIT
,(__location__
" bad ltdb record\n"));
355 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
356 /* strip off any read only record flags. All readonly records
357 are revoked implicitely by a recovery
359 hdr
->flags
&= ~CTDB_REC_RO_FLAGS
;
361 data
.dptr
+= sizeof(*hdr
);
362 data
.dsize
-= sizeof(*hdr
);
364 ret
= ctdb_ltdb_store(ctdb_db
, key
, hdr
, data
);
366 DEBUG(DEBUG_CRIT
, (__location__
" Unable to store record\n"));
370 rec
= (struct ctdb_rec_data
*)(rec
->length
+ (uint8_t *)rec
);
373 DEBUG(DEBUG_DEBUG
,("finished push of %u records for dbid 0x%x\n",
374 reply
->count
, reply
->db_id
));
376 if (ctdb_db
->readonly
) {
377 DEBUG(DEBUG_CRIT
,("Clearing the tracking database for dbid 0x%x\n",
379 if (tdb_wipe_all(ctdb_db
->rottdb
) != 0) {
380 DEBUG(DEBUG_ERR
,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db
->db_id
));
381 ctdb_db
->readonly
= false;
382 tdb_close(ctdb_db
->rottdb
);
383 ctdb_db
->rottdb
= NULL
;
384 ctdb_db
->readonly
= false;
386 while (ctdb_db
->revokechild_active
!= NULL
) {
387 talloc_free(ctdb_db
->revokechild_active
);
391 ctdb_lockall_unmark_prio(ctdb
, ctdb_db
->priority
);
395 ctdb_lockall_unmark_prio(ctdb
, ctdb_db
->priority
);
399 struct ctdb_set_recmode_state
{
400 struct ctdb_context
*ctdb
;
401 struct ctdb_req_control
*c
;
404 struct timed_event
*te
;
405 struct fd_event
*fde
;
407 struct timeval start_time
;
411 called if our set_recmode child times out. this would happen if
412 ctdb_recovery_lock() would block.
414 static void ctdb_set_recmode_timeout(struct event_context
*ev
, struct timed_event
*te
,
415 struct timeval t
, void *private_data
)
417 struct ctdb_set_recmode_state
*state
= talloc_get_type(private_data
,
418 struct ctdb_set_recmode_state
);
420 /* we consider this a success, not a failure, as we failed to
421 set the recovery lock which is what we wanted. This can be
422 caused by the cluster filesystem being very slow to
423 arbitrate locks immediately after a node failure.
425 DEBUG(DEBUG_ERR
,(__location__
" set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
426 state
->ctdb
->recovery_mode
= state
->recmode
;
427 ctdb_request_control_reply(state
->ctdb
, state
->c
, NULL
, 0, NULL
);
432 /* when we free the recmode state we must kill any child process.
434 static int set_recmode_destructor(struct ctdb_set_recmode_state
*state
)
436 double l
= timeval_elapsed(&state
->start_time
);
438 CTDB_UPDATE_RECLOCK_LATENCY(state
->ctdb
, "daemon reclock", reclock
.ctdbd
, l
);
440 if (state
->fd
[0] != -1) {
443 if (state
->fd
[1] != -1) {
446 ctdb_kill(state
->ctdb
, state
->child
, SIGKILL
);
450 /* this is called when the client process has completed ctdb_recovery_lock()
451 and has written data back to us through the pipe.
453 static void set_recmode_handler(struct event_context
*ev
, struct fd_event
*fde
,
454 uint16_t flags
, void *private_data
)
456 struct ctdb_set_recmode_state
*state
= talloc_get_type(private_data
,
457 struct ctdb_set_recmode_state
);
461 /* we got a response from our child process so we can abort the
464 talloc_free(state
->te
);
468 /* If, as expected, the child was unable to take the recovery
469 * lock then it will have written 0 into the pipe, so
470 * continue. However, any other value (e.g. 1) indicates that
471 * it was able to take the recovery lock when it should have
472 * been held by the recovery daemon on the recovery master.
474 ret
= sys_read(state
->fd
[0], &c
, 1);
475 if (ret
!= 1 || c
!= 0) {
476 ctdb_request_control_reply(
477 state
->ctdb
, state
->c
, NULL
, -1,
478 "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem");
483 state
->ctdb
->recovery_mode
= state
->recmode
;
485 /* release any deferred attach calls from clients */
486 if (state
->recmode
== CTDB_RECOVERY_NORMAL
) {
487 ctdb_process_deferred_attach(state
->ctdb
);
490 ctdb_request_control_reply(state
->ctdb
, state
->c
, NULL
, 0, NULL
);
496 ctdb_drop_all_ips_event(struct event_context
*ev
, struct timed_event
*te
,
497 struct timeval t
, void *private_data
)
499 struct ctdb_context
*ctdb
= talloc_get_type(private_data
, struct ctdb_context
);
501 DEBUG(DEBUG_ERR
,(__location__
" Been in recovery mode for too long. Dropping all IPS\n"));
502 talloc_free(ctdb
->release_ips_ctx
);
503 ctdb
->release_ips_ctx
= NULL
;
505 ctdb_release_all_ips(ctdb
);
509 * Set up an event to drop all public ips if we remain in recovery for too
512 int ctdb_deferred_drop_all_ips(struct ctdb_context
*ctdb
)
514 if (ctdb
->release_ips_ctx
!= NULL
) {
515 talloc_free(ctdb
->release_ips_ctx
);
517 ctdb
->release_ips_ctx
= talloc_new(ctdb
);
518 CTDB_NO_MEMORY(ctdb
, ctdb
->release_ips_ctx
);
520 event_add_timed(ctdb
->ev
, ctdb
->release_ips_ctx
, timeval_current_ofs(ctdb
->tunable
.recovery_drop_all_ips
, 0), ctdb_drop_all_ips_event
, ctdb
);
525 set the recovery mode
527 int32_t ctdb_control_set_recmode(struct ctdb_context
*ctdb
,
528 struct ctdb_req_control
*c
,
529 TDB_DATA indata
, bool *async_reply
,
530 const char **errormsg
)
532 uint32_t recmode
= *(uint32_t *)indata
.dptr
;
534 struct ctdb_set_recmode_state
*state
;
535 pid_t parent
= getpid();
537 /* if we enter recovery but stay in recovery for too long
538 we will eventually drop all our ip addresses
540 if (recmode
== CTDB_RECOVERY_NORMAL
) {
541 talloc_free(ctdb
->release_ips_ctx
);
542 ctdb
->release_ips_ctx
= NULL
;
544 if (ctdb_deferred_drop_all_ips(ctdb
) != 0) {
545 DEBUG(DEBUG_ERR
,("Failed to set up deferred drop all ips\n"));
549 if (recmode
!= ctdb
->recovery_mode
) {
550 DEBUG(DEBUG_NOTICE
,(__location__
" Recovery mode set to %s\n",
551 recmode
==CTDB_RECOVERY_NORMAL
?"NORMAL":"ACTIVE"));
554 if (recmode
!= CTDB_RECOVERY_NORMAL
||
555 ctdb
->recovery_mode
!= CTDB_RECOVERY_ACTIVE
) {
556 ctdb
->recovery_mode
= recmode
;
560 /* some special handling when ending recovery mode */
562 /* force the databases to thaw */
563 for (i
=1; i
<=NUM_DB_PRIORITIES
; i
++) {
564 if (ctdb
->freeze_handles
[i
] != NULL
) {
565 ctdb_control_thaw(ctdb
, i
, false);
569 state
= talloc(ctdb
, struct ctdb_set_recmode_state
);
570 CTDB_NO_MEMORY(ctdb
, state
);
572 state
->start_time
= timeval_current();
576 /* release any deferred attach calls from clients */
577 if (recmode
== CTDB_RECOVERY_NORMAL
) {
578 ctdb_process_deferred_attach(ctdb
);
581 if (ctdb
->recovery_lock_file
== NULL
) {
582 /* Not using recovery lock file */
583 ctdb
->recovery_mode
= recmode
;
587 /* For the rest of what needs to be done, we need to do this in
588 a child process since
589 1, the call to ctdb_recovery_lock() can block if the cluster
590 filesystem is in the process of recovery.
592 ret
= pipe(state
->fd
);
595 DEBUG(DEBUG_CRIT
,(__location__
" Failed to open pipe for set_recmode child\n"));
599 state
->child
= ctdb_fork(ctdb
);
600 if (state
->child
== (pid_t
)-1) {
607 if (state
->child
== 0) {
611 ctdb_set_process_name("ctdb_recmode");
612 debug_extra
= talloc_asprintf(NULL
, "set_recmode:");
613 /* Daemon should not be able to get the recover lock,
614 * as it should be held by the recovery master */
615 if (ctdb_recovery_lock(ctdb
)) {
617 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
618 ctdb
->recovery_lock_file
));
619 ctdb_recovery_unlock(ctdb
);
623 sys_write(state
->fd
[1], &cc
, 1);
624 /* make sure we die when our parent dies */
625 while (ctdb_kill(ctdb
, parent
, 0) == 0 || errno
!= ESRCH
) {
627 sys_write(state
->fd
[1], &cc
, 1);
632 set_close_on_exec(state
->fd
[0]);
636 talloc_set_destructor(state
, set_recmode_destructor
);
638 DEBUG(DEBUG_DEBUG
, (__location__
" Created PIPE FD:%d for setrecmode\n", state
->fd
[0]));
640 state
->te
= event_add_timed(ctdb
->ev
, state
, timeval_current_ofs(5, 0),
641 ctdb_set_recmode_timeout
, state
);
643 state
->fde
= event_add_fd(ctdb
->ev
, state
, state
->fd
[0],
648 if (state
->fde
== NULL
) {
652 tevent_fd_set_auto_close(state
->fde
);
655 state
->recmode
= recmode
;
656 state
->c
= talloc_steal(state
, c
);
664 bool ctdb_recovery_have_lock(struct ctdb_context
*ctdb
)
666 return ctdb
->recovery_lock_fd
!= -1;
670 try and get the recovery lock in shared storage - should only work
671 on the recovery master recovery daemon. Anywhere else is a bug
673 bool ctdb_recovery_lock(struct ctdb_context
*ctdb
)
677 ctdb
->recovery_lock_fd
= open(ctdb
->recovery_lock_file
,
678 O_RDWR
|O_CREAT
, 0600);
679 if (ctdb
->recovery_lock_fd
== -1) {
681 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
682 ctdb
->recovery_lock_file
, strerror(errno
)));
686 set_close_on_exec(ctdb
->recovery_lock_fd
);
688 lock
.l_type
= F_WRLCK
;
689 lock
.l_whence
= SEEK_SET
;
694 if (fcntl(ctdb
->recovery_lock_fd
, F_SETLK
, &lock
) != 0) {
695 int saved_errno
= errno
;
696 close(ctdb
->recovery_lock_fd
);
697 ctdb
->recovery_lock_fd
= -1;
698 /* Fail silently on these errors, since they indicate
699 * lock contention, but log an error for any other
701 if (saved_errno
!= EACCES
&&
702 saved_errno
!= EAGAIN
) {
703 DEBUG(DEBUG_ERR
,("ctdb_recovery_lock: Failed to get "
704 "recovery lock on '%s' - (%s)\n",
705 ctdb
->recovery_lock_file
,
706 strerror(saved_errno
)));
714 void ctdb_recovery_unlock(struct ctdb_context
*ctdb
)
716 if (ctdb
->recovery_lock_fd
!= -1) {
717 DEBUG(DEBUG_NOTICE
, ("Releasing recovery lock\n"));
718 close(ctdb
->recovery_lock_fd
);
719 ctdb
->recovery_lock_fd
= -1;
724 delete a record as part of the vacuum process
725 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
726 use non-blocking locks
728 return 0 if the record was successfully deleted (i.e. it does not exist
729 when the function returns)
730 or !0 is the record still exists in the tdb after returning.
732 static int delete_tdb_record(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, struct ctdb_rec_data
*rec
)
734 TDB_DATA key
, data
, data2
;
735 struct ctdb_ltdb_header
*hdr
, *hdr2
;
737 /* these are really internal tdb functions - but we need them here for
738 non-blocking lock of the freelist */
739 int tdb_lock_nonblock(struct tdb_context
*tdb
, int list
, int ltype
);
740 int tdb_unlock(struct tdb_context
*tdb
, int list
, int ltype
);
743 key
.dsize
= rec
->keylen
;
744 key
.dptr
= &rec
->data
[0];
745 data
.dsize
= rec
->datalen
;
746 data
.dptr
= &rec
->data
[rec
->keylen
];
748 if (ctdb_lmaster(ctdb
, &key
) == ctdb
->pnn
) {
749 DEBUG(DEBUG_INFO
,(__location__
" Called delete on record where we are lmaster\n"));
753 if (data
.dsize
!= sizeof(struct ctdb_ltdb_header
)) {
754 DEBUG(DEBUG_ERR
,(__location__
" Bad record size\n"));
758 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
760 /* use a non-blocking lock */
761 if (tdb_chainlock_nonblock(ctdb_db
->ltdb
->tdb
, key
) != 0) {
765 data2
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
766 if (data2
.dptr
== NULL
) {
767 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
771 if (data2
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
772 if (tdb_lock_nonblock(ctdb_db
->ltdb
->tdb
, -1, F_WRLCK
) == 0) {
773 if (tdb_delete(ctdb_db
->ltdb
->tdb
, key
) != 0) {
774 DEBUG(DEBUG_CRIT
,(__location__
" Failed to delete corrupt record\n"));
776 tdb_unlock(ctdb_db
->ltdb
->tdb
, -1, F_WRLCK
);
777 DEBUG(DEBUG_CRIT
,(__location__
" Deleted corrupt record\n"));
779 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
784 hdr2
= (struct ctdb_ltdb_header
*)data2
.dptr
;
786 if (hdr2
->rsn
> hdr
->rsn
) {
787 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
788 DEBUG(DEBUG_INFO
,(__location__
" Skipping record with rsn=%llu - called with rsn=%llu\n",
789 (unsigned long long)hdr2
->rsn
, (unsigned long long)hdr
->rsn
));
794 /* do not allow deleting record that have readonly flags set. */
795 if (hdr
->flags
& CTDB_REC_RO_FLAGS
) {
796 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
797 DEBUG(DEBUG_INFO
,(__location__
" Skipping record with readonly flags set\n"));
801 if (hdr2
->flags
& CTDB_REC_RO_FLAGS
) {
802 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
803 DEBUG(DEBUG_INFO
,(__location__
" Skipping record with readonly flags set\n"));
808 if (hdr2
->dmaster
== ctdb
->pnn
) {
809 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
810 DEBUG(DEBUG_INFO
,(__location__
" Attempted delete record where we are the dmaster\n"));
815 if (tdb_lock_nonblock(ctdb_db
->ltdb
->tdb
, -1, F_WRLCK
) != 0) {
816 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
821 if (tdb_delete(ctdb_db
->ltdb
->tdb
, key
) != 0) {
822 tdb_unlock(ctdb_db
->ltdb
->tdb
, -1, F_WRLCK
);
823 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
824 DEBUG(DEBUG_INFO
,(__location__
" Failed to delete record\n"));
829 tdb_unlock(ctdb_db
->ltdb
->tdb
, -1, F_WRLCK
);
830 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
837 struct recovery_callback_state
{
838 struct ctdb_req_control
*c
;
843 called when the 'recovered' event script has finished
845 static void ctdb_end_recovery_callback(struct ctdb_context
*ctdb
, int status
, void *p
)
847 struct recovery_callback_state
*state
= talloc_get_type(p
, struct recovery_callback_state
);
849 ctdb_enable_monitoring(ctdb
);
850 CTDB_INCREMENT_STAT(ctdb
, num_recoveries
);
853 DEBUG(DEBUG_ERR
,(__location__
" recovered event script failed (status %d)\n", status
));
854 if (status
== -ETIME
) {
859 ctdb_request_control_reply(ctdb
, state
->c
, NULL
, status
, NULL
);
862 gettimeofday(&ctdb
->last_recovery_finished
, NULL
);
864 if (ctdb
->runstate
== CTDB_RUNSTATE_FIRST_RECOVERY
) {
865 ctdb_set_runstate(ctdb
, CTDB_RUNSTATE_STARTUP
);
870 recovery has finished
872 int32_t ctdb_control_end_recovery(struct ctdb_context
*ctdb
,
873 struct ctdb_req_control
*c
,
877 struct recovery_callback_state
*state
;
879 DEBUG(DEBUG_NOTICE
,("Recovery has finished\n"));
881 ctdb_persistent_finish_trans3_commits(ctdb
);
883 state
= talloc(ctdb
, struct recovery_callback_state
);
884 CTDB_NO_MEMORY(ctdb
, state
);
888 ctdb_disable_monitoring(ctdb
);
890 ret
= ctdb_event_script_callback(ctdb
, state
,
891 ctdb_end_recovery_callback
,
893 CTDB_EVENT_RECOVERED
, "%s", "");
896 ctdb_enable_monitoring(ctdb
);
898 DEBUG(DEBUG_ERR
,(__location__
" Failed to end recovery\n"));
903 /* tell the control that we will be reply asynchronously */
904 state
->c
= talloc_steal(state
, c
);
910 called when the 'startrecovery' event script has finished
912 static void ctdb_start_recovery_callback(struct ctdb_context
*ctdb
, int status
, void *p
)
914 struct recovery_callback_state
*state
= talloc_get_type(p
, struct recovery_callback_state
);
917 DEBUG(DEBUG_ERR
,(__location__
" startrecovery event script failed (status %d)\n", status
));
920 ctdb_request_control_reply(ctdb
, state
->c
, NULL
, status
, NULL
);
925 run the startrecovery eventscript
927 int32_t ctdb_control_start_recovery(struct ctdb_context
*ctdb
,
928 struct ctdb_req_control
*c
,
932 struct recovery_callback_state
*state
;
934 DEBUG(DEBUG_NOTICE
,(__location__
" startrecovery eventscript has been invoked\n"));
935 gettimeofday(&ctdb
->last_recovery_started
, NULL
);
937 state
= talloc(ctdb
, struct recovery_callback_state
);
938 CTDB_NO_MEMORY(ctdb
, state
);
940 state
->c
= talloc_steal(state
, c
);
942 ctdb_disable_monitoring(ctdb
);
944 ret
= ctdb_event_script_callback(ctdb
, state
,
945 ctdb_start_recovery_callback
,
947 CTDB_EVENT_START_RECOVERY
,
951 DEBUG(DEBUG_ERR
,(__location__
" Failed to start recovery\n"));
956 /* tell the control that we will be reply asynchronously */
962 try to delete all these records as part of the vacuuming process
963 and return the records we failed to delete
965 int32_t ctdb_control_try_delete_records(struct ctdb_context
*ctdb
, TDB_DATA indata
, TDB_DATA
*outdata
)
967 struct ctdb_marshall_buffer
*reply
= (struct ctdb_marshall_buffer
*)indata
.dptr
;
968 struct ctdb_db_context
*ctdb_db
;
970 struct ctdb_rec_data
*rec
;
971 struct ctdb_marshall_buffer
*records
;
973 if (indata
.dsize
< offsetof(struct ctdb_marshall_buffer
, data
)) {
974 DEBUG(DEBUG_ERR
,(__location__
" invalid data in try_delete_records\n"));
978 ctdb_db
= find_ctdb_db(ctdb
, reply
->db_id
);
980 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%08x\n", reply
->db_id
));
985 DEBUG(DEBUG_DEBUG
,("starting try_delete_records of %u records for dbid 0x%x\n",
986 reply
->count
, reply
->db_id
));
989 /* create a blob to send back the records we couldnt delete */
990 records
= (struct ctdb_marshall_buffer
*)
991 talloc_zero_size(outdata
,
992 offsetof(struct ctdb_marshall_buffer
, data
));
993 if (records
== NULL
) {
994 DEBUG(DEBUG_ERR
,(__location__
" Out of memory\n"));
997 records
->db_id
= ctdb_db
->db_id
;
1000 rec
= (struct ctdb_rec_data
*)&reply
->data
[0];
1001 for (i
=0;i
<reply
->count
;i
++) {
1004 key
.dptr
= &rec
->data
[0];
1005 key
.dsize
= rec
->keylen
;
1006 data
.dptr
= &rec
->data
[key
.dsize
];
1007 data
.dsize
= rec
->datalen
;
1009 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1010 DEBUG(DEBUG_CRIT
,(__location__
" bad ltdb record in indata\n"));
1014 /* If we cant delete the record we must add it to the reply
1015 so the lmaster knows it may not purge this record
1017 if (delete_tdb_record(ctdb
, ctdb_db
, rec
) != 0) {
1019 struct ctdb_ltdb_header
*hdr
;
1021 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
1022 data
.dptr
+= sizeof(*hdr
);
1023 data
.dsize
-= sizeof(*hdr
);
1025 DEBUG(DEBUG_INFO
, (__location__
" Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key
)));
1027 old_size
= talloc_get_size(records
);
1028 records
= talloc_realloc_size(outdata
, records
, old_size
+ rec
->length
);
1029 if (records
== NULL
) {
1030 DEBUG(DEBUG_ERR
,(__location__
" Failed to expand\n"));
1034 memcpy(old_size
+(uint8_t *)records
, rec
, rec
->length
);
1037 rec
= (struct ctdb_rec_data
*)(rec
->length
+ (uint8_t *)rec
);
1041 *outdata
= ctdb_marshall_finish(records
);
1047 * Store a record as part of the vacuum process:
1048 * This is called from the RECEIVE_RECORD control which
1049 * the lmaster uses to send the current empty copy
1050 * to all nodes for storing, before it lets the other
1051 * nodes delete the records in the second phase with
1052 * the TRY_DELETE_RECORDS control.
1054 * Only store if we are not lmaster or dmaster, and our
1055 * rsn is <= the provided rsn. Use non-blocking locks.
1057 * return 0 if the record was successfully stored.
1058 * return !0 if the record still exists in the tdb after returning.
1060 static int store_tdb_record(struct ctdb_context
*ctdb
,
1061 struct ctdb_db_context
*ctdb_db
,
1062 struct ctdb_rec_data
*rec
)
1064 TDB_DATA key
, data
, data2
;
1065 struct ctdb_ltdb_header
*hdr
, *hdr2
;
1068 key
.dsize
= rec
->keylen
;
1069 key
.dptr
= &rec
->data
[0];
1070 data
.dsize
= rec
->datalen
;
1071 data
.dptr
= &rec
->data
[rec
->keylen
];
1073 if (ctdb_lmaster(ctdb
, &key
) == ctdb
->pnn
) {
1074 DEBUG(DEBUG_INFO
, (__location__
" Called store_tdb_record "
1075 "where we are lmaster\n"));
1079 if (data
.dsize
!= sizeof(struct ctdb_ltdb_header
)) {
1080 DEBUG(DEBUG_ERR
, (__location__
" Bad record size\n"));
1084 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
1086 /* use a non-blocking lock */
1087 if (tdb_chainlock_nonblock(ctdb_db
->ltdb
->tdb
, key
) != 0) {
1088 DEBUG(DEBUG_INFO
, (__location__
" Failed to lock chain in non-blocking mode\n"));
1092 data2
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
1093 if (data2
.dptr
== NULL
|| data2
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1094 if (tdb_store(ctdb_db
->ltdb
->tdb
, key
, data
, 0) == -1) {
1095 DEBUG(DEBUG_ERR
, (__location__
"Failed to store record\n"));
1099 DEBUG(DEBUG_INFO
, (__location__
" Stored record\n"));
1104 hdr2
= (struct ctdb_ltdb_header
*)data2
.dptr
;
1106 if (hdr2
->rsn
> hdr
->rsn
) {
1107 DEBUG(DEBUG_INFO
, (__location__
" Skipping record with "
1108 "rsn=%llu - called with rsn=%llu\n",
1109 (unsigned long long)hdr2
->rsn
,
1110 (unsigned long long)hdr
->rsn
));
1115 /* do not allow vacuuming of records that have readonly flags set. */
1116 if (hdr
->flags
& CTDB_REC_RO_FLAGS
) {
1117 DEBUG(DEBUG_INFO
,(__location__
" Skipping record with readonly "
1122 if (hdr2
->flags
& CTDB_REC_RO_FLAGS
) {
1123 DEBUG(DEBUG_INFO
,(__location__
" Skipping record with readonly "
1129 if (hdr2
->dmaster
== ctdb
->pnn
) {
1130 DEBUG(DEBUG_INFO
, (__location__
" Attempted to store record "
1131 "where we are the dmaster\n"));
1136 if (tdb_store(ctdb_db
->ltdb
->tdb
, key
, data
, 0) != 0) {
1137 DEBUG(DEBUG_INFO
,(__location__
" Failed to store record\n"));
1145 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
1153 * Try to store all these records as part of the vacuuming process
1154 * and return the records we failed to store.
1156 int32_t ctdb_control_receive_records(struct ctdb_context
*ctdb
,
1157 TDB_DATA indata
, TDB_DATA
*outdata
)
1159 struct ctdb_marshall_buffer
*reply
= (struct ctdb_marshall_buffer
*)indata
.dptr
;
1160 struct ctdb_db_context
*ctdb_db
;
1162 struct ctdb_rec_data
*rec
;
1163 struct ctdb_marshall_buffer
*records
;
1165 if (indata
.dsize
< offsetof(struct ctdb_marshall_buffer
, data
)) {
1167 (__location__
" invalid data in receive_records\n"));
1171 ctdb_db
= find_ctdb_db(ctdb
, reply
->db_id
);
1173 DEBUG(DEBUG_ERR
, (__location__
" Unknown db 0x%08x\n",
1178 DEBUG(DEBUG_DEBUG
, ("starting receive_records of %u records for "
1179 "dbid 0x%x\n", reply
->count
, reply
->db_id
));
1181 /* create a blob to send back the records we could not store */
1182 records
= (struct ctdb_marshall_buffer
*)
1183 talloc_zero_size(outdata
,
1184 offsetof(struct ctdb_marshall_buffer
, data
));
1185 if (records
== NULL
) {
1186 DEBUG(DEBUG_ERR
, (__location__
" Out of memory\n"));
1189 records
->db_id
= ctdb_db
->db_id
;
1191 rec
= (struct ctdb_rec_data
*)&reply
->data
[0];
1192 for (i
=0; i
<reply
->count
; i
++) {
1195 key
.dptr
= &rec
->data
[0];
1196 key
.dsize
= rec
->keylen
;
1197 data
.dptr
= &rec
->data
[key
.dsize
];
1198 data
.dsize
= rec
->datalen
;
1200 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1201 DEBUG(DEBUG_CRIT
, (__location__
" bad ltdb record "
1207 * If we can not store the record we must add it to the reply
1208 * so the lmaster knows it may not purge this record.
1210 if (store_tdb_record(ctdb
, ctdb_db
, rec
) != 0) {
1212 struct ctdb_ltdb_header
*hdr
;
1214 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
1215 data
.dptr
+= sizeof(*hdr
);
1216 data
.dsize
-= sizeof(*hdr
);
1218 DEBUG(DEBUG_INFO
, (__location__
" Failed to store "
1219 "record with hash 0x%08x in vacuum "
1220 "via RECEIVE_RECORDS\n",
1223 old_size
= talloc_get_size(records
);
1224 records
= talloc_realloc_size(outdata
, records
,
1225 old_size
+ rec
->length
);
1226 if (records
== NULL
) {
1227 DEBUG(DEBUG_ERR
, (__location__
" Failed to "
1232 memcpy(old_size
+(uint8_t *)records
, rec
, rec
->length
);
1235 rec
= (struct ctdb_rec_data
*)(rec
->length
+ (uint8_t *)rec
);
1238 *outdata
= ctdb_marshall_finish(records
);
1247 int32_t ctdb_control_get_capabilities(struct ctdb_context
*ctdb
, TDB_DATA
*outdata
)
1249 uint32_t *capabilities
= NULL
;
1251 capabilities
= talloc(outdata
, uint32_t);
1252 CTDB_NO_MEMORY(ctdb
, capabilities
);
1253 *capabilities
= ctdb
->capabilities
;
1255 outdata
->dsize
= sizeof(uint32_t);
1256 outdata
->dptr
= (uint8_t *)capabilities
;
1261 /* The recovery daemon will ping us at regular intervals.
1262 If we havent been pinged for a while we assume the recovery
1263 daemon is inoperable and we restart.
1265 static void ctdb_recd_ping_timeout(struct event_context
*ev
, struct timed_event
*te
, struct timeval t
, void *p
)
1267 struct ctdb_context
*ctdb
= talloc_get_type(p
, struct ctdb_context
);
1268 uint32_t *count
= talloc_get_type(ctdb
->recd_ping_count
, uint32_t);
1270 DEBUG(DEBUG_ERR
, ("Recovery daemon ping timeout. Count : %u\n", *count
));
1272 if (*count
< ctdb
->tunable
.recd_ping_failcount
) {
1274 event_add_timed(ctdb
->ev
, ctdb
->recd_ping_count
,
1275 timeval_current_ofs(ctdb
->tunable
.recd_ping_timeout
, 0),
1276 ctdb_recd_ping_timeout
, ctdb
);
1280 DEBUG(DEBUG_ERR
, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1282 ctdb_stop_recoverd(ctdb
);
1283 ctdb_start_recoverd(ctdb
);
1286 int32_t ctdb_control_recd_ping(struct ctdb_context
*ctdb
)
1288 talloc_free(ctdb
->recd_ping_count
);
1290 ctdb
->recd_ping_count
= talloc_zero(ctdb
, uint32_t);
1291 CTDB_NO_MEMORY(ctdb
, ctdb
->recd_ping_count
);
1293 if (ctdb
->tunable
.recd_ping_timeout
!= 0) {
1294 event_add_timed(ctdb
->ev
, ctdb
->recd_ping_count
,
1295 timeval_current_ofs(ctdb
->tunable
.recd_ping_timeout
, 0),
1296 ctdb_recd_ping_timeout
, ctdb
);
1304 int32_t ctdb_control_set_recmaster(struct ctdb_context
*ctdb
, uint32_t opcode
, TDB_DATA indata
)
1306 uint32_t new_recmaster
;
1308 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1309 new_recmaster
= ((uint32_t *)(&indata
.dptr
[0]))[0];
1311 if (ctdb
->pnn
!= new_recmaster
&& ctdb
->recovery_master
== ctdb
->pnn
) {
1313 ("This node (%u) is no longer the recovery master\n", ctdb
->pnn
));
1316 if (ctdb
->pnn
== new_recmaster
&& ctdb
->recovery_master
!= new_recmaster
) {
1318 ("This node (%u) is now the recovery master\n", ctdb
->pnn
));
1321 ctdb
->recovery_master
= new_recmaster
;
1326 int32_t ctdb_control_stop_node(struct ctdb_context
*ctdb
)
1328 DEBUG(DEBUG_NOTICE
, ("Stopping node\n"));
1329 ctdb_disable_monitoring(ctdb
);
1330 ctdb
->nodes
[ctdb
->pnn
]->flags
|= NODE_FLAGS_STOPPED
;
1335 int32_t ctdb_control_continue_node(struct ctdb_context
*ctdb
)
1337 DEBUG(DEBUG_NOTICE
, ("Continue node\n"));
1338 ctdb
->nodes
[ctdb
->pnn
]->flags
&= ~NODE_FLAGS_STOPPED
;