aio_fork: Fix CID 1273291 Uninitialized scalar variable
[Samba.git] / ctdb / server / ctdb_recover.c
blobdb88f060d1c10d8bb567f0a1273bd880634e35c5
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
31 int
32 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
34 struct ctdb_vnn_map_wire *map;
35 size_t len;
37 CHECK_CONTROL_DATA_SIZE(0);
39 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
40 map = talloc_size(outdata, len);
41 CTDB_NO_MEMORY(ctdb, map);
43 map->generation = ctdb->vnn_map->generation;
44 map->size = ctdb->vnn_map->size;
45 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
47 outdata->dsize = len;
48 outdata->dptr = (uint8_t *)map;
50 return 0;
53 int
54 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
56 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
57 int i;
59 for(i=1; i<=NUM_DB_PRIORITIES; i++) {
60 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
61 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
62 return -1;
66 talloc_free(ctdb->vnn_map);
68 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
69 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
71 ctdb->vnn_map->generation = map->generation;
72 ctdb->vnn_map->size = map->size;
73 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
74 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
76 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
78 return 0;
81 int
82 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
84 uint32_t i, len;
85 struct ctdb_db_context *ctdb_db;
86 struct ctdb_dbid_map *dbid_map;
88 CHECK_CONTROL_DATA_SIZE(0);
90 len = 0;
91 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
92 len++;
96 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
97 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
98 if (!outdata->dptr) {
99 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
100 exit(1);
103 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
104 dbid_map->num = len;
105 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
106 dbid_map->dbs[i].dbid = ctdb_db->db_id;
107 if (ctdb_db->persistent != 0) {
108 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
110 if (ctdb_db->readonly != 0) {
111 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
113 if (ctdb_db->sticky != 0) {
114 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
118 return 0;
121 int
122 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
124 uint32_t i, num_nodes;
125 struct ctdb_node_map *node_map;
127 CHECK_CONTROL_DATA_SIZE(0);
129 num_nodes = ctdb->num_nodes;
131 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
132 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
133 if (!outdata->dptr) {
134 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
135 exit(1);
138 node_map = (struct ctdb_node_map *)outdata->dptr;
139 node_map->num = num_nodes;
140 for (i=0; i<num_nodes; i++) {
141 if (parse_ip(ctdb->nodes[i]->address.address,
142 NULL, /* TODO: pass in the correct interface here*/
144 &node_map->nodes[i].addr) == 0)
146 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
149 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
150 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
153 return 0;
157 get an old style ipv4-only nodemap
159 int
160 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
162 uint32_t i, num_nodes;
163 struct ctdb_node_mapv4 *node_map;
165 CHECK_CONTROL_DATA_SIZE(0);
167 num_nodes = ctdb->num_nodes;
169 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
170 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
171 if (!outdata->dptr) {
172 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
173 exit(1);
176 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
177 node_map->num = num_nodes;
178 for (i=0; i<num_nodes; i++) {
179 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
180 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
181 return -1;
184 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
185 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
188 return 0;
191 static void
192 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
193 struct timeval t, void *private_data)
195 int i, num_nodes;
196 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
197 TALLOC_CTX *tmp_ctx;
198 struct ctdb_node **nodes;
200 tmp_ctx = talloc_new(ctdb);
202 /* steal the old nodes file for a while */
203 talloc_steal(tmp_ctx, ctdb->nodes);
204 nodes = ctdb->nodes;
205 ctdb->nodes = NULL;
206 num_nodes = ctdb->num_nodes;
207 ctdb->num_nodes = 0;
209 /* load the new nodes file */
210 ctdb_load_nodes_file(ctdb);
212 for (i=0; i<ctdb->num_nodes; i++) {
213 /* keep any identical pre-existing nodes and connections */
214 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
215 talloc_free(ctdb->nodes[i]);
216 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
217 continue;
220 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
221 continue;
224 /* any new or different nodes must be added */
225 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
226 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
227 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
229 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
230 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
231 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
235 /* tell the recovery daemon to reaload the nodes file too */
236 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
238 talloc_free(tmp_ctx);
239 return;
243 reload the nodes file after a short delay (so that we can send the response
244 back first
246 int
247 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
249 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
251 return 0;
255 a traverse function for pulling all relevent records from pulldb
257 struct pulldb_data {
258 struct ctdb_context *ctdb;
259 struct ctdb_db_context *ctdb_db;
260 struct ctdb_marshall_buffer *pulldata;
261 uint32_t len;
262 uint32_t allocated_len;
263 bool failed;
266 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
268 struct pulldb_data *params = (struct pulldb_data *)p;
269 struct ctdb_rec_data *rec;
270 struct ctdb_context *ctdb = params->ctdb;
271 struct ctdb_db_context *ctdb_db = params->ctdb_db;
273 /* add the record to the blob */
274 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
275 if (rec == NULL) {
276 params->failed = true;
277 return -1;
279 if (params->len + rec->length >= params->allocated_len) {
280 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
281 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
283 if (params->pulldata == NULL) {
284 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
285 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
287 params->pulldata->count++;
288 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
289 params->len += rec->length;
291 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
292 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
295 talloc_free(rec);
297 return 0;
301 pull a bunch of records from a ltdb, filtering by lmaster
303 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
305 struct ctdb_control_pulldb *pull;
306 struct ctdb_db_context *ctdb_db;
307 struct pulldb_data params;
308 struct ctdb_marshall_buffer *reply;
310 pull = (struct ctdb_control_pulldb *)indata.dptr;
312 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
313 if (!ctdb_db) {
314 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
315 return -1;
318 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
319 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
320 return -1;
323 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
324 CTDB_NO_MEMORY(ctdb, reply);
326 reply->db_id = pull->db_id;
328 params.ctdb = ctdb;
329 params.ctdb_db = ctdb_db;
330 params.pulldata = reply;
331 params.len = offsetof(struct ctdb_marshall_buffer, data);
332 params.allocated_len = params.len;
333 params.failed = false;
335 if (ctdb_db->unhealthy_reason) {
336 /* this is just a warning, as the tdb should be empty anyway */
337 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
338 ctdb_db->db_name, ctdb_db->unhealthy_reason));
341 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
342 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
343 return -1;
346 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
347 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
348 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
349 talloc_free(params.pulldata);
350 return -1;
353 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
355 outdata->dptr = (uint8_t *)params.pulldata;
356 outdata->dsize = params.len;
358 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
359 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
361 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
362 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
366 return 0;
370 push a bunch of records into a ltdb, filtering by rsn
372 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
374 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
375 struct ctdb_db_context *ctdb_db;
376 int i, ret;
377 struct ctdb_rec_data *rec;
379 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
380 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
381 return -1;
384 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
385 if (!ctdb_db) {
386 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
387 return -1;
390 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
391 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
392 return -1;
395 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
396 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
397 return -1;
400 rec = (struct ctdb_rec_data *)&reply->data[0];
402 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
403 reply->count, reply->db_id));
405 for (i=0;i<reply->count;i++) {
406 TDB_DATA key, data;
407 struct ctdb_ltdb_header *hdr;
409 key.dptr = &rec->data[0];
410 key.dsize = rec->keylen;
411 data.dptr = &rec->data[key.dsize];
412 data.dsize = rec->datalen;
414 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
415 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
416 goto failed;
418 hdr = (struct ctdb_ltdb_header *)data.dptr;
419 /* strip off any read only record flags. All readonly records
420 are revoked implicitely by a recovery
422 hdr->flags &= ~CTDB_REC_RO_FLAGS;
424 data.dptr += sizeof(*hdr);
425 data.dsize -= sizeof(*hdr);
427 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
428 if (ret != 0) {
429 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
430 goto failed;
433 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
436 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
437 reply->count, reply->db_id));
439 if (ctdb_db->readonly) {
440 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
441 ctdb_db->db_id));
442 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
443 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
444 ctdb_db->readonly = false;
445 tdb_close(ctdb_db->rottdb);
446 ctdb_db->rottdb = NULL;
447 ctdb_db->readonly = false;
449 while (ctdb_db->revokechild_active != NULL) {
450 talloc_free(ctdb_db->revokechild_active);
454 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
455 return 0;
457 failed:
458 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
459 return -1;
462 struct ctdb_set_recmode_state {
463 struct ctdb_context *ctdb;
464 struct ctdb_req_control *c;
465 uint32_t recmode;
466 int fd[2];
467 struct timed_event *te;
468 struct fd_event *fde;
469 pid_t child;
470 struct timeval start_time;
474 called if our set_recmode child times out. this would happen if
475 ctdb_recovery_lock() would block.
477 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
478 struct timeval t, void *private_data)
480 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
481 struct ctdb_set_recmode_state);
483 /* we consider this a success, not a failure, as we failed to
484 set the recovery lock which is what we wanted. This can be
485 caused by the cluster filesystem being very slow to
486 arbitrate locks immediately after a node failure.
488 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
489 state->ctdb->recovery_mode = state->recmode;
490 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
491 talloc_free(state);
495 /* when we free the recmode state we must kill any child process.
497 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
499 double l = timeval_elapsed(&state->start_time);
501 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
503 if (state->fd[0] != -1) {
504 state->fd[0] = -1;
506 if (state->fd[1] != -1) {
507 state->fd[1] = -1;
509 ctdb_kill(state->ctdb, state->child, SIGKILL);
510 return 0;
513 /* this is called when the client process has completed ctdb_recovery_lock()
514 and has written data back to us through the pipe.
516 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
517 uint16_t flags, void *private_data)
519 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
520 struct ctdb_set_recmode_state);
521 char c = 0;
522 int ret;
524 /* we got a response from our child process so we can abort the
525 timeout.
527 talloc_free(state->te);
528 state->te = NULL;
531 /* If, as expected, the child was unable to take the recovery
532 * lock then it will have written 0 into the pipe, so
533 * continue. However, any other value (e.g. 1) indicates that
534 * it was able to take the recovery lock when it should have
535 * been held by the recovery daemon on the recovery master.
537 ret = sys_read(state->fd[0], &c, 1);
538 if (ret != 1 || c != 0) {
539 const char *msg = \
540 "Took recovery lock from daemon - probably a cluster filesystem lock coherence problem";
541 ctdb_request_control_reply(
542 state->ctdb, state->c, NULL, -1,
543 msg);
544 talloc_free(state);
545 ctdb_die(state->ctdb, msg);
548 state->ctdb->recovery_mode = state->recmode;
550 /* release any deferred attach calls from clients */
551 if (state->recmode == CTDB_RECOVERY_NORMAL) {
552 ctdb_process_deferred_attach(state->ctdb);
555 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
556 talloc_free(state);
557 return;
560 static void
561 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
562 struct timeval t, void *private_data)
564 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
566 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
567 talloc_free(ctdb->release_ips_ctx);
568 ctdb->release_ips_ctx = NULL;
570 ctdb_release_all_ips(ctdb);
574 * Set up an event to drop all public ips if we remain in recovery for too
575 * long
577 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
579 if (ctdb->release_ips_ctx != NULL) {
580 talloc_free(ctdb->release_ips_ctx);
582 ctdb->release_ips_ctx = talloc_new(ctdb);
583 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
585 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
586 return 0;
590 set the recovery mode
592 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
593 struct ctdb_req_control *c,
594 TDB_DATA indata, bool *async_reply,
595 const char **errormsg)
597 uint32_t recmode = *(uint32_t *)indata.dptr;
598 int i, ret;
599 struct ctdb_set_recmode_state *state;
600 pid_t parent = getpid();
602 /* if we enter recovery but stay in recovery for too long
603 we will eventually drop all our ip addresses
605 if (recmode == CTDB_RECOVERY_NORMAL) {
606 talloc_free(ctdb->release_ips_ctx);
607 ctdb->release_ips_ctx = NULL;
608 } else {
609 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
610 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
614 if (recmode != ctdb->recovery_mode) {
615 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
616 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
619 if (recmode != CTDB_RECOVERY_NORMAL ||
620 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
621 ctdb->recovery_mode = recmode;
622 return 0;
625 /* some special handling when ending recovery mode */
627 /* force the databases to thaw */
628 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
629 if (ctdb->freeze_handles[i] != NULL) {
630 ctdb_control_thaw(ctdb, i, false);
634 state = talloc(ctdb, struct ctdb_set_recmode_state);
635 CTDB_NO_MEMORY(ctdb, state);
637 state->start_time = timeval_current();
638 state->fd[0] = -1;
639 state->fd[1] = -1;
641 /* release any deferred attach calls from clients */
642 if (recmode == CTDB_RECOVERY_NORMAL) {
643 ctdb_process_deferred_attach(ctdb);
646 if (ctdb->recovery_lock_file == NULL) {
647 /* Not using recovery lock file */
648 ctdb->recovery_mode = recmode;
649 return 0;
652 /* For the rest of what needs to be done, we need to do this in
653 a child process since
654 1, the call to ctdb_recovery_lock() can block if the cluster
655 filesystem is in the process of recovery.
657 ret = pipe(state->fd);
658 if (ret != 0) {
659 talloc_free(state);
660 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
661 return -1;
664 state->child = ctdb_fork(ctdb);
665 if (state->child == (pid_t)-1) {
666 close(state->fd[0]);
667 close(state->fd[1]);
668 talloc_free(state);
669 return -1;
672 if (state->child == 0) {
673 char cc = 0;
674 close(state->fd[0]);
676 ctdb_set_process_name("ctdb_recmode");
677 debug_extra = talloc_asprintf(NULL, "set_recmode:");
678 /* Daemon should not be able to get the recover lock,
679 * as it should be held by the recovery master */
680 if (ctdb_recovery_lock(ctdb)) {
681 DEBUG(DEBUG_ERR,
682 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
683 ctdb->recovery_lock_file));
684 ctdb_recovery_unlock(ctdb);
685 cc = 1;
688 sys_write(state->fd[1], &cc, 1);
689 /* make sure we die when our parent dies */
690 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
691 sleep(5);
692 sys_write(state->fd[1], &cc, 1);
694 _exit(0);
696 close(state->fd[1]);
697 set_close_on_exec(state->fd[0]);
699 state->fd[1] = -1;
701 talloc_set_destructor(state, set_recmode_destructor);
703 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
705 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
706 ctdb_set_recmode_timeout, state);
708 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
709 EVENT_FD_READ,
710 set_recmode_handler,
711 (void *)state);
713 if (state->fde == NULL) {
714 talloc_free(state);
715 return -1;
717 tevent_fd_set_auto_close(state->fde);
719 state->ctdb = ctdb;
720 state->recmode = recmode;
721 state->c = talloc_steal(state, c);
723 *async_reply = true;
725 return 0;
729 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
731 return ctdb->recovery_lock_fd != -1;
735 try and get the recovery lock in shared storage - should only work
736 on the recovery master recovery daemon. Anywhere else is a bug
738 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
740 struct flock lock;
742 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
743 O_RDWR|O_CREAT, 0600);
744 if (ctdb->recovery_lock_fd == -1) {
745 DEBUG(DEBUG_ERR,
746 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
747 ctdb->recovery_lock_file, strerror(errno)));
748 return false;
751 set_close_on_exec(ctdb->recovery_lock_fd);
753 lock.l_type = F_WRLCK;
754 lock.l_whence = SEEK_SET;
755 lock.l_start = 0;
756 lock.l_len = 1;
757 lock.l_pid = 0;
759 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
760 int saved_errno = errno;
761 close(ctdb->recovery_lock_fd);
762 ctdb->recovery_lock_fd = -1;
763 /* Fail silently on these errors, since they indicate
764 * lock contention, but log an error for any other
765 * failure. */
766 if (saved_errno != EACCES &&
767 saved_errno != EAGAIN) {
768 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
769 "recovery lock on '%s' - (%s)\n",
770 ctdb->recovery_lock_file,
771 strerror(saved_errno)));
773 return false;
776 return true;
779 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
781 if (ctdb->recovery_lock_fd != -1) {
782 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
783 close(ctdb->recovery_lock_fd);
784 ctdb->recovery_lock_fd = -1;
789 delete a record as part of the vacuum process
790 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
791 use non-blocking locks
793 return 0 if the record was successfully deleted (i.e. it does not exist
794 when the function returns)
795 or !0 is the record still exists in the tdb after returning.
797 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
799 TDB_DATA key, data, data2;
800 struct ctdb_ltdb_header *hdr, *hdr2;
802 /* these are really internal tdb functions - but we need them here for
803 non-blocking lock of the freelist */
804 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
805 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
808 key.dsize = rec->keylen;
809 key.dptr = &rec->data[0];
810 data.dsize = rec->datalen;
811 data.dptr = &rec->data[rec->keylen];
813 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
814 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
815 return -1;
818 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
819 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
820 return -1;
823 hdr = (struct ctdb_ltdb_header *)data.dptr;
825 /* use a non-blocking lock */
826 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
827 return -1;
830 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
831 if (data2.dptr == NULL) {
832 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
833 return 0;
836 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
837 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
838 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
839 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
841 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
842 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
844 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
845 free(data2.dptr);
846 return 0;
849 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
851 if (hdr2->rsn > hdr->rsn) {
852 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
853 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
854 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
855 free(data2.dptr);
856 return -1;
859 /* do not allow deleting record that have readonly flags set. */
860 if (hdr->flags & CTDB_REC_RO_FLAGS) {
861 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
862 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
863 free(data2.dptr);
864 return -1;
866 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
867 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
868 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
869 free(data2.dptr);
870 return -1;
873 if (hdr2->dmaster == ctdb->pnn) {
874 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
875 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
876 free(data2.dptr);
877 return -1;
880 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
881 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
882 free(data2.dptr);
883 return -1;
886 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
887 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
888 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
889 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
890 free(data2.dptr);
891 return -1;
894 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
895 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
896 free(data2.dptr);
897 return 0;
902 struct recovery_callback_state {
903 struct ctdb_req_control *c;
908 called when the 'recovered' event script has finished
910 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
912 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
914 ctdb_enable_monitoring(ctdb);
915 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
917 if (status != 0) {
918 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
919 if (status == -ETIME) {
920 ctdb_ban_self(ctdb);
924 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
925 talloc_free(state);
927 gettimeofday(&ctdb->last_recovery_finished, NULL);
929 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
930 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
935 recovery has finished
937 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
938 struct ctdb_req_control *c,
939 bool *async_reply)
941 int ret;
942 struct recovery_callback_state *state;
944 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
946 ctdb_persistent_finish_trans3_commits(ctdb);
948 state = talloc(ctdb, struct recovery_callback_state);
949 CTDB_NO_MEMORY(ctdb, state);
951 state->c = c;
953 ctdb_disable_monitoring(ctdb);
955 ret = ctdb_event_script_callback(ctdb, state,
956 ctdb_end_recovery_callback,
957 state,
958 CTDB_EVENT_RECOVERED, "%s", "");
960 if (ret != 0) {
961 ctdb_enable_monitoring(ctdb);
963 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
964 talloc_free(state);
965 return -1;
968 /* tell the control that we will be reply asynchronously */
969 state->c = talloc_steal(state, c);
970 *async_reply = true;
971 return 0;
975 called when the 'startrecovery' event script has finished
977 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
979 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
981 if (status != 0) {
982 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
985 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
986 talloc_free(state);
990 run the startrecovery eventscript
992 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
993 struct ctdb_req_control *c,
994 bool *async_reply)
996 int ret;
997 struct recovery_callback_state *state;
999 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1000 gettimeofday(&ctdb->last_recovery_started, NULL);
1002 state = talloc(ctdb, struct recovery_callback_state);
1003 CTDB_NO_MEMORY(ctdb, state);
1005 state->c = talloc_steal(state, c);
1007 ctdb_disable_monitoring(ctdb);
1009 ret = ctdb_event_script_callback(ctdb, state,
1010 ctdb_start_recovery_callback,
1011 state,
1012 CTDB_EVENT_START_RECOVERY,
1013 "%s", "");
1015 if (ret != 0) {
1016 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1017 talloc_free(state);
1018 return -1;
1021 /* tell the control that we will be reply asynchronously */
1022 *async_reply = true;
1023 return 0;
1027 try to delete all these records as part of the vacuuming process
1028 and return the records we failed to delete
1030 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1032 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1033 struct ctdb_db_context *ctdb_db;
1034 int i;
1035 struct ctdb_rec_data *rec;
1036 struct ctdb_marshall_buffer *records;
1038 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1039 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1040 return -1;
1043 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1044 if (!ctdb_db) {
1045 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1046 return -1;
1050 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1051 reply->count, reply->db_id));
1054 /* create a blob to send back the records we couldnt delete */
1055 records = (struct ctdb_marshall_buffer *)
1056 talloc_zero_size(outdata,
1057 offsetof(struct ctdb_marshall_buffer, data));
1058 if (records == NULL) {
1059 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1060 return -1;
1062 records->db_id = ctdb_db->db_id;
1065 rec = (struct ctdb_rec_data *)&reply->data[0];
1066 for (i=0;i<reply->count;i++) {
1067 TDB_DATA key, data;
1069 key.dptr = &rec->data[0];
1070 key.dsize = rec->keylen;
1071 data.dptr = &rec->data[key.dsize];
1072 data.dsize = rec->datalen;
1074 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1075 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1076 return -1;
1079 /* If we cant delete the record we must add it to the reply
1080 so the lmaster knows it may not purge this record
1082 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1083 size_t old_size;
1084 struct ctdb_ltdb_header *hdr;
1086 hdr = (struct ctdb_ltdb_header *)data.dptr;
1087 data.dptr += sizeof(*hdr);
1088 data.dsize -= sizeof(*hdr);
1090 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1092 old_size = talloc_get_size(records);
1093 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1094 if (records == NULL) {
1095 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1096 return -1;
1098 records->count++;
1099 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1102 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1106 *outdata = ctdb_marshall_finish(records);
1108 return 0;
1112 * Store a record as part of the vacuum process:
1113 * This is called from the RECEIVE_RECORD control which
1114 * the lmaster uses to send the current empty copy
1115 * to all nodes for storing, before it lets the other
1116 * nodes delete the records in the second phase with
1117 * the TRY_DELETE_RECORDS control.
1119 * Only store if we are not lmaster or dmaster, and our
1120 * rsn is <= the provided rsn. Use non-blocking locks.
1122 * return 0 if the record was successfully stored.
1123 * return !0 if the record still exists in the tdb after returning.
1125 static int store_tdb_record(struct ctdb_context *ctdb,
1126 struct ctdb_db_context *ctdb_db,
1127 struct ctdb_rec_data *rec)
1129 TDB_DATA key, data, data2;
1130 struct ctdb_ltdb_header *hdr, *hdr2;
1131 int ret;
1133 key.dsize = rec->keylen;
1134 key.dptr = &rec->data[0];
1135 data.dsize = rec->datalen;
1136 data.dptr = &rec->data[rec->keylen];
1138 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1139 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1140 "where we are lmaster\n"));
1141 return -1;
1144 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1145 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1146 return -1;
1149 hdr = (struct ctdb_ltdb_header *)data.dptr;
1151 /* use a non-blocking lock */
1152 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1153 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1154 return -1;
1157 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1158 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1159 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1160 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1161 ret = -1;
1162 goto done;
1164 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1165 ret = 0;
1166 goto done;
1169 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1171 if (hdr2->rsn > hdr->rsn) {
1172 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1173 "rsn=%llu - called with rsn=%llu\n",
1174 (unsigned long long)hdr2->rsn,
1175 (unsigned long long)hdr->rsn));
1176 ret = -1;
1177 goto done;
1180 /* do not allow vacuuming of records that have readonly flags set. */
1181 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1182 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1183 "flags set\n"));
1184 ret = -1;
1185 goto done;
1187 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1188 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1189 "flags set\n"));
1190 ret = -1;
1191 goto done;
1194 if (hdr2->dmaster == ctdb->pnn) {
1195 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1196 "where we are the dmaster\n"));
1197 ret = -1;
1198 goto done;
1201 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1202 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1203 ret = -1;
1204 goto done;
1207 ret = 0;
1209 done:
1210 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1211 free(data2.dptr);
1212 return ret;
1218 * Try to store all these records as part of the vacuuming process
1219 * and return the records we failed to store.
1221 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1222 TDB_DATA indata, TDB_DATA *outdata)
1224 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1225 struct ctdb_db_context *ctdb_db;
1226 int i;
1227 struct ctdb_rec_data *rec;
1228 struct ctdb_marshall_buffer *records;
1230 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1231 DEBUG(DEBUG_ERR,
1232 (__location__ " invalid data in receive_records\n"));
1233 return -1;
1236 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1237 if (!ctdb_db) {
1238 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1239 reply->db_id));
1240 return -1;
1243 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1244 "dbid 0x%x\n", reply->count, reply->db_id));
1246 /* create a blob to send back the records we could not store */
1247 records = (struct ctdb_marshall_buffer *)
1248 talloc_zero_size(outdata,
1249 offsetof(struct ctdb_marshall_buffer, data));
1250 if (records == NULL) {
1251 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1252 return -1;
1254 records->db_id = ctdb_db->db_id;
1256 rec = (struct ctdb_rec_data *)&reply->data[0];
1257 for (i=0; i<reply->count; i++) {
1258 TDB_DATA key, data;
1260 key.dptr = &rec->data[0];
1261 key.dsize = rec->keylen;
1262 data.dptr = &rec->data[key.dsize];
1263 data.dsize = rec->datalen;
1265 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1266 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1267 "in indata\n"));
1268 return -1;
1272 * If we can not store the record we must add it to the reply
1273 * so the lmaster knows it may not purge this record.
1275 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1276 size_t old_size;
1277 struct ctdb_ltdb_header *hdr;
1279 hdr = (struct ctdb_ltdb_header *)data.dptr;
1280 data.dptr += sizeof(*hdr);
1281 data.dsize -= sizeof(*hdr);
1283 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1284 "record with hash 0x%08x in vacuum "
1285 "via RECEIVE_RECORDS\n",
1286 ctdb_hash(&key)));
1288 old_size = talloc_get_size(records);
1289 records = talloc_realloc_size(outdata, records,
1290 old_size + rec->length);
1291 if (records == NULL) {
1292 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1293 "expand\n"));
1294 return -1;
1296 records->count++;
1297 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1300 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1303 *outdata = ctdb_marshall_finish(records);
1305 return 0;
1310 report capabilities
1312 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1314 uint32_t *capabilities = NULL;
1316 capabilities = talloc(outdata, uint32_t);
1317 CTDB_NO_MEMORY(ctdb, capabilities);
1318 *capabilities = ctdb->capabilities;
1320 outdata->dsize = sizeof(uint32_t);
1321 outdata->dptr = (uint8_t *)capabilities;
1323 return 0;
1326 /* The recovery daemon will ping us at regular intervals.
1327 If we havent been pinged for a while we assume the recovery
1328 daemon is inoperable and we restart.
1330 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1332 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1333 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1335 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1337 if (*count < ctdb->tunable.recd_ping_failcount) {
1338 (*count)++;
1339 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1340 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1341 ctdb_recd_ping_timeout, ctdb);
1342 return;
1345 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1347 ctdb_stop_recoverd(ctdb);
1348 ctdb_start_recoverd(ctdb);
1351 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1353 talloc_free(ctdb->recd_ping_count);
1355 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1356 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1358 if (ctdb->tunable.recd_ping_timeout != 0) {
1359 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1360 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1361 ctdb_recd_ping_timeout, ctdb);
1364 return 0;
1369 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1371 uint32_t new_recmaster;
1373 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1374 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1376 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1377 DEBUG(DEBUG_NOTICE,
1378 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1381 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1382 DEBUG(DEBUG_NOTICE,
1383 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1386 ctdb->recovery_master = new_recmaster;
1387 return 0;
1391 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1393 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1394 ctdb_disable_monitoring(ctdb);
1395 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1397 return 0;
1400 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1402 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1403 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1405 return 0;