s4:torture:raw:notify: treat torture_open_connection calls with torture_assert
[Samba.git] / ctdb / server / ctdb_recover.c
blobeb3f46da52ce2658b4c637838efa16da84d15352
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
31 int
32 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
34 struct ctdb_vnn_map_wire *map;
35 size_t len;
37 CHECK_CONTROL_DATA_SIZE(0);
39 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
40 map = talloc_size(outdata, len);
41 CTDB_NO_MEMORY(ctdb, map);
43 map->generation = ctdb->vnn_map->generation;
44 map->size = ctdb->vnn_map->size;
45 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
47 outdata->dsize = len;
48 outdata->dptr = (uint8_t *)map;
50 return 0;
53 int
54 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
56 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
57 int i;
59 for(i=1; i<=NUM_DB_PRIORITIES; i++) {
60 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
61 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
62 return -1;
66 talloc_free(ctdb->vnn_map);
68 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
69 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
71 ctdb->vnn_map->generation = map->generation;
72 ctdb->vnn_map->size = map->size;
73 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
74 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
76 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
78 return 0;
81 int
82 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
84 uint32_t i, len;
85 struct ctdb_db_context *ctdb_db;
86 struct ctdb_dbid_map *dbid_map;
88 CHECK_CONTROL_DATA_SIZE(0);
90 len = 0;
91 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
92 len++;
96 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
97 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
98 if (!outdata->dptr) {
99 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
100 exit(1);
103 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
104 dbid_map->num = len;
105 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
106 dbid_map->dbs[i].dbid = ctdb_db->db_id;
107 if (ctdb_db->persistent != 0) {
108 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
110 if (ctdb_db->readonly != 0) {
111 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
113 if (ctdb_db->sticky != 0) {
114 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
118 return 0;
121 int
122 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
124 uint32_t i, num_nodes;
125 struct ctdb_node_map *node_map;
127 CHECK_CONTROL_DATA_SIZE(0);
129 num_nodes = ctdb->num_nodes;
131 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
132 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
133 if (!outdata->dptr) {
134 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
135 exit(1);
138 node_map = (struct ctdb_node_map *)outdata->dptr;
139 node_map->num = num_nodes;
140 for (i=0; i<num_nodes; i++) {
141 node_map->nodes[i].addr = ctdb->nodes[i]->address;
142 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
143 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
146 return 0;
150 get an old style ipv4-only nodemap
152 int
153 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
155 uint32_t i, num_nodes;
156 struct ctdb_node_mapv4 *node_map;
158 CHECK_CONTROL_DATA_SIZE(0);
160 num_nodes = ctdb->num_nodes;
162 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
163 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
164 if (!outdata->dptr) {
165 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
166 exit(1);
169 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
170 node_map->num = num_nodes;
171 for (i=0; i<num_nodes; i++) {
172 node_map->nodes[i].sin = ctdb->nodes[i]->address.ip;
173 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
174 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
177 return 0;
180 static void
181 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
182 struct timeval t, void *private_data)
184 int i, num_nodes;
185 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
186 TALLOC_CTX *tmp_ctx;
187 struct ctdb_node **nodes;
189 tmp_ctx = talloc_new(ctdb);
191 /* steal the old nodes file for a while */
192 talloc_steal(tmp_ctx, ctdb->nodes);
193 nodes = ctdb->nodes;
194 ctdb->nodes = NULL;
195 num_nodes = ctdb->num_nodes;
196 ctdb->num_nodes = 0;
198 /* load the new nodes file */
199 ctdb_load_nodes_file(ctdb);
201 for (i=0; i<ctdb->num_nodes; i++) {
202 /* keep any identical pre-existing nodes and connections */
203 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
204 talloc_free(ctdb->nodes[i]);
205 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
206 continue;
209 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
210 continue;
213 /* any new or different nodes must be added */
214 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
215 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
216 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
218 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
219 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
220 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
224 /* tell the recovery daemon to reaload the nodes file too */
225 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
227 talloc_free(tmp_ctx);
228 return;
232 reload the nodes file after a short delay (so that we can send the response
233 back first
235 int
236 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
238 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
240 return 0;
244 a traverse function for pulling all relevent records from pulldb
246 struct pulldb_data {
247 struct ctdb_context *ctdb;
248 struct ctdb_db_context *ctdb_db;
249 struct ctdb_marshall_buffer *pulldata;
250 uint32_t len;
251 uint32_t allocated_len;
252 bool failed;
255 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
257 struct pulldb_data *params = (struct pulldb_data *)p;
258 struct ctdb_rec_data *rec;
259 struct ctdb_context *ctdb = params->ctdb;
260 struct ctdb_db_context *ctdb_db = params->ctdb_db;
262 /* add the record to the blob */
263 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
264 if (rec == NULL) {
265 params->failed = true;
266 return -1;
268 if (params->len + rec->length >= params->allocated_len) {
269 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
270 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
272 if (params->pulldata == NULL) {
273 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
274 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
276 params->pulldata->count++;
277 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
278 params->len += rec->length;
280 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
281 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
284 talloc_free(rec);
286 return 0;
290 pull a bunch of records from a ltdb, filtering by lmaster
292 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
294 struct ctdb_control_pulldb *pull;
295 struct ctdb_db_context *ctdb_db;
296 struct pulldb_data params;
297 struct ctdb_marshall_buffer *reply;
299 pull = (struct ctdb_control_pulldb *)indata.dptr;
301 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
302 if (!ctdb_db) {
303 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
304 return -1;
307 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
308 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
309 return -1;
312 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
313 CTDB_NO_MEMORY(ctdb, reply);
315 reply->db_id = pull->db_id;
317 params.ctdb = ctdb;
318 params.ctdb_db = ctdb_db;
319 params.pulldata = reply;
320 params.len = offsetof(struct ctdb_marshall_buffer, data);
321 params.allocated_len = params.len;
322 params.failed = false;
324 if (ctdb_db->unhealthy_reason) {
325 /* this is just a warning, as the tdb should be empty anyway */
326 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
327 ctdb_db->db_name, ctdb_db->unhealthy_reason));
330 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
331 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
332 return -1;
335 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
336 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
337 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
338 talloc_free(params.pulldata);
339 return -1;
342 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
344 outdata->dptr = (uint8_t *)params.pulldata;
345 outdata->dsize = params.len;
347 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
348 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
350 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
351 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
355 return 0;
359 push a bunch of records into a ltdb, filtering by rsn
361 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
363 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
364 struct ctdb_db_context *ctdb_db;
365 int i, ret;
366 struct ctdb_rec_data *rec;
368 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
369 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
370 return -1;
373 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
374 if (!ctdb_db) {
375 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
376 return -1;
379 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
380 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
381 return -1;
384 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
385 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
386 return -1;
389 rec = (struct ctdb_rec_data *)&reply->data[0];
391 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
392 reply->count, reply->db_id));
394 for (i=0;i<reply->count;i++) {
395 TDB_DATA key, data;
396 struct ctdb_ltdb_header *hdr;
398 key.dptr = &rec->data[0];
399 key.dsize = rec->keylen;
400 data.dptr = &rec->data[key.dsize];
401 data.dsize = rec->datalen;
403 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
404 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
405 goto failed;
407 hdr = (struct ctdb_ltdb_header *)data.dptr;
408 /* strip off any read only record flags. All readonly records
409 are revoked implicitely by a recovery
411 hdr->flags &= ~CTDB_REC_RO_FLAGS;
413 data.dptr += sizeof(*hdr);
414 data.dsize -= sizeof(*hdr);
416 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
417 if (ret != 0) {
418 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
419 goto failed;
422 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
425 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
426 reply->count, reply->db_id));
428 if (ctdb_db->readonly) {
429 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
430 ctdb_db->db_id));
431 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
432 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
433 ctdb_db->readonly = false;
434 tdb_close(ctdb_db->rottdb);
435 ctdb_db->rottdb = NULL;
436 ctdb_db->readonly = false;
438 while (ctdb_db->revokechild_active != NULL) {
439 talloc_free(ctdb_db->revokechild_active);
443 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
444 return 0;
446 failed:
447 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
448 return -1;
451 struct ctdb_set_recmode_state {
452 struct ctdb_context *ctdb;
453 struct ctdb_req_control *c;
454 uint32_t recmode;
455 int fd[2];
456 struct timed_event *te;
457 struct fd_event *fde;
458 pid_t child;
459 struct timeval start_time;
463 called if our set_recmode child times out. this would happen if
464 ctdb_recovery_lock() would block.
466 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
467 struct timeval t, void *private_data)
469 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
470 struct ctdb_set_recmode_state);
472 /* we consider this a success, not a failure, as we failed to
473 set the recovery lock which is what we wanted. This can be
474 caused by the cluster filesystem being very slow to
475 arbitrate locks immediately after a node failure.
477 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
478 state->ctdb->recovery_mode = state->recmode;
479 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
480 talloc_free(state);
484 /* when we free the recmode state we must kill any child process.
486 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
488 double l = timeval_elapsed(&state->start_time);
490 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
492 if (state->fd[0] != -1) {
493 state->fd[0] = -1;
495 if (state->fd[1] != -1) {
496 state->fd[1] = -1;
498 ctdb_kill(state->ctdb, state->child, SIGKILL);
499 return 0;
502 /* this is called when the client process has completed ctdb_recovery_lock()
503 and has written data back to us through the pipe.
505 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
506 uint16_t flags, void *private_data)
508 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
509 struct ctdb_set_recmode_state);
510 char c = 0;
511 int ret;
513 /* we got a response from our child process so we can abort the
514 timeout.
516 talloc_free(state->te);
517 state->te = NULL;
520 /* If, as expected, the child was unable to take the recovery
521 * lock then it will have written 0 into the pipe, so
522 * continue. However, any other value (e.g. 1) indicates that
523 * it was able to take the recovery lock when it should have
524 * been held by the recovery daemon on the recovery master.
526 ret = sys_read(state->fd[0], &c, 1);
527 if (ret != 1 || c != 0) {
528 const char *msg = \
529 "Took recovery lock from daemon - probably a cluster filesystem lock coherence problem";
530 ctdb_request_control_reply(
531 state->ctdb, state->c, NULL, -1,
532 msg);
533 talloc_free(state);
534 ctdb_die(state->ctdb, msg);
537 state->ctdb->recovery_mode = state->recmode;
539 /* release any deferred attach calls from clients */
540 if (state->recmode == CTDB_RECOVERY_NORMAL) {
541 ctdb_process_deferred_attach(state->ctdb);
544 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
545 talloc_free(state);
546 return;
549 static void
550 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
551 struct timeval t, void *private_data)
553 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
555 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
556 talloc_free(ctdb->release_ips_ctx);
557 ctdb->release_ips_ctx = NULL;
559 ctdb_release_all_ips(ctdb);
563 * Set up an event to drop all public ips if we remain in recovery for too
564 * long
566 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
568 if (ctdb->release_ips_ctx != NULL) {
569 talloc_free(ctdb->release_ips_ctx);
571 ctdb->release_ips_ctx = talloc_new(ctdb);
572 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
574 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
575 return 0;
579 set the recovery mode
581 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
582 struct ctdb_req_control *c,
583 TDB_DATA indata, bool *async_reply,
584 const char **errormsg)
586 uint32_t recmode = *(uint32_t *)indata.dptr;
587 int i, ret;
588 struct ctdb_set_recmode_state *state;
589 pid_t parent = getpid();
591 /* if we enter recovery but stay in recovery for too long
592 we will eventually drop all our ip addresses
594 if (recmode == CTDB_RECOVERY_NORMAL) {
595 talloc_free(ctdb->release_ips_ctx);
596 ctdb->release_ips_ctx = NULL;
597 } else {
598 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
599 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
603 if (recmode != ctdb->recovery_mode) {
604 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
605 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
608 if (recmode != CTDB_RECOVERY_NORMAL ||
609 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
610 ctdb->recovery_mode = recmode;
611 return 0;
614 /* some special handling when ending recovery mode */
616 /* force the databases to thaw */
617 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
618 if (ctdb->freeze_handles[i] != NULL) {
619 ctdb_control_thaw(ctdb, i, false);
623 state = talloc(ctdb, struct ctdb_set_recmode_state);
624 CTDB_NO_MEMORY(ctdb, state);
626 state->start_time = timeval_current();
627 state->fd[0] = -1;
628 state->fd[1] = -1;
630 /* release any deferred attach calls from clients */
631 if (recmode == CTDB_RECOVERY_NORMAL) {
632 ctdb_process_deferred_attach(ctdb);
635 if (ctdb->recovery_lock_file == NULL) {
636 /* Not using recovery lock file */
637 ctdb->recovery_mode = recmode;
638 return 0;
641 /* For the rest of what needs to be done, we need to do this in
642 a child process since
643 1, the call to ctdb_recovery_lock() can block if the cluster
644 filesystem is in the process of recovery.
646 ret = pipe(state->fd);
647 if (ret != 0) {
648 talloc_free(state);
649 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
650 return -1;
653 state->child = ctdb_fork(ctdb);
654 if (state->child == (pid_t)-1) {
655 close(state->fd[0]);
656 close(state->fd[1]);
657 talloc_free(state);
658 return -1;
661 if (state->child == 0) {
662 char cc = 0;
663 close(state->fd[0]);
665 ctdb_set_process_name("ctdb_recmode");
666 debug_extra = talloc_asprintf(NULL, "set_recmode:");
667 /* Daemon should not be able to get the recover lock,
668 * as it should be held by the recovery master */
669 if (ctdb_recovery_lock(ctdb)) {
670 DEBUG(DEBUG_ERR,
671 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
672 ctdb->recovery_lock_file));
673 ctdb_recovery_unlock(ctdb);
674 cc = 1;
677 sys_write(state->fd[1], &cc, 1);
678 /* make sure we die when our parent dies */
679 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
680 sleep(5);
681 sys_write(state->fd[1], &cc, 1);
683 _exit(0);
685 close(state->fd[1]);
686 set_close_on_exec(state->fd[0]);
688 state->fd[1] = -1;
690 talloc_set_destructor(state, set_recmode_destructor);
692 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
694 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
695 ctdb_set_recmode_timeout, state);
697 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
698 EVENT_FD_READ,
699 set_recmode_handler,
700 (void *)state);
702 if (state->fde == NULL) {
703 talloc_free(state);
704 return -1;
706 tevent_fd_set_auto_close(state->fde);
708 state->ctdb = ctdb;
709 state->recmode = recmode;
710 state->c = talloc_steal(state, c);
712 *async_reply = true;
714 return 0;
718 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
720 return ctdb->recovery_lock_fd != -1;
724 try and get the recovery lock in shared storage - should only work
725 on the recovery master recovery daemon. Anywhere else is a bug
727 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
729 struct flock lock;
731 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
732 O_RDWR|O_CREAT, 0600);
733 if (ctdb->recovery_lock_fd == -1) {
734 DEBUG(DEBUG_ERR,
735 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
736 ctdb->recovery_lock_file, strerror(errno)));
737 return false;
740 set_close_on_exec(ctdb->recovery_lock_fd);
742 lock.l_type = F_WRLCK;
743 lock.l_whence = SEEK_SET;
744 lock.l_start = 0;
745 lock.l_len = 1;
746 lock.l_pid = 0;
748 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
749 int saved_errno = errno;
750 close(ctdb->recovery_lock_fd);
751 ctdb->recovery_lock_fd = -1;
752 /* Fail silently on these errors, since they indicate
753 * lock contention, but log an error for any other
754 * failure. */
755 if (saved_errno != EACCES &&
756 saved_errno != EAGAIN) {
757 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
758 "recovery lock on '%s' - (%s)\n",
759 ctdb->recovery_lock_file,
760 strerror(saved_errno)));
762 return false;
765 return true;
768 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
770 if (ctdb->recovery_lock_fd != -1) {
771 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
772 close(ctdb->recovery_lock_fd);
773 ctdb->recovery_lock_fd = -1;
778 delete a record as part of the vacuum process
779 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
780 use non-blocking locks
782 return 0 if the record was successfully deleted (i.e. it does not exist
783 when the function returns)
784 or !0 is the record still exists in the tdb after returning.
786 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
788 TDB_DATA key, data, data2;
789 struct ctdb_ltdb_header *hdr, *hdr2;
791 /* these are really internal tdb functions - but we need them here for
792 non-blocking lock of the freelist */
793 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
794 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
797 key.dsize = rec->keylen;
798 key.dptr = &rec->data[0];
799 data.dsize = rec->datalen;
800 data.dptr = &rec->data[rec->keylen];
802 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
803 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
804 return -1;
807 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
808 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
809 return -1;
812 hdr = (struct ctdb_ltdb_header *)data.dptr;
814 /* use a non-blocking lock */
815 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
816 return -1;
819 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
820 if (data2.dptr == NULL) {
821 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
822 return 0;
825 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
826 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
827 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
828 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
830 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
831 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
833 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
834 free(data2.dptr);
835 return 0;
838 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
840 if (hdr2->rsn > hdr->rsn) {
841 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
842 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
843 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
844 free(data2.dptr);
845 return -1;
848 /* do not allow deleting record that have readonly flags set. */
849 if (hdr->flags & CTDB_REC_RO_FLAGS) {
850 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
851 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
852 free(data2.dptr);
853 return -1;
855 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
856 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
857 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
858 free(data2.dptr);
859 return -1;
862 if (hdr2->dmaster == ctdb->pnn) {
863 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
864 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
865 free(data2.dptr);
866 return -1;
869 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
870 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
871 free(data2.dptr);
872 return -1;
875 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
876 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
877 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
878 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
879 free(data2.dptr);
880 return -1;
883 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
884 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
885 free(data2.dptr);
886 return 0;
891 struct recovery_callback_state {
892 struct ctdb_req_control *c;
897 called when the 'recovered' event script has finished
899 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
901 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
903 ctdb_enable_monitoring(ctdb);
904 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
906 if (status != 0) {
907 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
908 if (status == -ETIME) {
909 ctdb_ban_self(ctdb);
913 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
914 talloc_free(state);
916 gettimeofday(&ctdb->last_recovery_finished, NULL);
918 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
919 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
924 recovery has finished
926 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
927 struct ctdb_req_control *c,
928 bool *async_reply)
930 int ret;
931 struct recovery_callback_state *state;
933 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
935 ctdb_persistent_finish_trans3_commits(ctdb);
937 state = talloc(ctdb, struct recovery_callback_state);
938 CTDB_NO_MEMORY(ctdb, state);
940 state->c = c;
942 ctdb_disable_monitoring(ctdb);
944 ret = ctdb_event_script_callback(ctdb, state,
945 ctdb_end_recovery_callback,
946 state,
947 CTDB_EVENT_RECOVERED, "%s", "");
949 if (ret != 0) {
950 ctdb_enable_monitoring(ctdb);
952 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
953 talloc_free(state);
954 return -1;
957 /* tell the control that we will be reply asynchronously */
958 state->c = talloc_steal(state, c);
959 *async_reply = true;
960 return 0;
964 called when the 'startrecovery' event script has finished
966 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
968 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
970 if (status != 0) {
971 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
974 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
975 talloc_free(state);
979 run the startrecovery eventscript
981 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
982 struct ctdb_req_control *c,
983 bool *async_reply)
985 int ret;
986 struct recovery_callback_state *state;
988 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
989 gettimeofday(&ctdb->last_recovery_started, NULL);
991 state = talloc(ctdb, struct recovery_callback_state);
992 CTDB_NO_MEMORY(ctdb, state);
994 state->c = talloc_steal(state, c);
996 ctdb_disable_monitoring(ctdb);
998 ret = ctdb_event_script_callback(ctdb, state,
999 ctdb_start_recovery_callback,
1000 state,
1001 CTDB_EVENT_START_RECOVERY,
1002 "%s", "");
1004 if (ret != 0) {
1005 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1006 talloc_free(state);
1007 return -1;
1010 /* tell the control that we will be reply asynchronously */
1011 *async_reply = true;
1012 return 0;
1016 try to delete all these records as part of the vacuuming process
1017 and return the records we failed to delete
1019 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1021 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1022 struct ctdb_db_context *ctdb_db;
1023 int i;
1024 struct ctdb_rec_data *rec;
1025 struct ctdb_marshall_buffer *records;
1027 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1028 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1029 return -1;
1032 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1033 if (!ctdb_db) {
1034 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1035 return -1;
1039 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1040 reply->count, reply->db_id));
1043 /* create a blob to send back the records we couldnt delete */
1044 records = (struct ctdb_marshall_buffer *)
1045 talloc_zero_size(outdata,
1046 offsetof(struct ctdb_marshall_buffer, data));
1047 if (records == NULL) {
1048 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1049 return -1;
1051 records->db_id = ctdb_db->db_id;
1054 rec = (struct ctdb_rec_data *)&reply->data[0];
1055 for (i=0;i<reply->count;i++) {
1056 TDB_DATA key, data;
1058 key.dptr = &rec->data[0];
1059 key.dsize = rec->keylen;
1060 data.dptr = &rec->data[key.dsize];
1061 data.dsize = rec->datalen;
1063 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1064 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1065 return -1;
1068 /* If we cant delete the record we must add it to the reply
1069 so the lmaster knows it may not purge this record
1071 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1072 size_t old_size;
1073 struct ctdb_ltdb_header *hdr;
1075 hdr = (struct ctdb_ltdb_header *)data.dptr;
1076 data.dptr += sizeof(*hdr);
1077 data.dsize -= sizeof(*hdr);
1079 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1081 old_size = talloc_get_size(records);
1082 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1083 if (records == NULL) {
1084 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1085 return -1;
1087 records->count++;
1088 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1091 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1095 *outdata = ctdb_marshall_finish(records);
1097 return 0;
1101 * Store a record as part of the vacuum process:
1102 * This is called from the RECEIVE_RECORD control which
1103 * the lmaster uses to send the current empty copy
1104 * to all nodes for storing, before it lets the other
1105 * nodes delete the records in the second phase with
1106 * the TRY_DELETE_RECORDS control.
1108 * Only store if we are not lmaster or dmaster, and our
1109 * rsn is <= the provided rsn. Use non-blocking locks.
1111 * return 0 if the record was successfully stored.
1112 * return !0 if the record still exists in the tdb after returning.
1114 static int store_tdb_record(struct ctdb_context *ctdb,
1115 struct ctdb_db_context *ctdb_db,
1116 struct ctdb_rec_data *rec)
1118 TDB_DATA key, data, data2;
1119 struct ctdb_ltdb_header *hdr, *hdr2;
1120 int ret;
1122 key.dsize = rec->keylen;
1123 key.dptr = &rec->data[0];
1124 data.dsize = rec->datalen;
1125 data.dptr = &rec->data[rec->keylen];
1127 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1128 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1129 "where we are lmaster\n"));
1130 return -1;
1133 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1134 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1135 return -1;
1138 hdr = (struct ctdb_ltdb_header *)data.dptr;
1140 /* use a non-blocking lock */
1141 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1142 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1143 return -1;
1146 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1147 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1148 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1149 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1150 ret = -1;
1151 goto done;
1153 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1154 ret = 0;
1155 goto done;
1158 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1160 if (hdr2->rsn > hdr->rsn) {
1161 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1162 "rsn=%llu - called with rsn=%llu\n",
1163 (unsigned long long)hdr2->rsn,
1164 (unsigned long long)hdr->rsn));
1165 ret = -1;
1166 goto done;
1169 /* do not allow vacuuming of records that have readonly flags set. */
1170 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1171 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1172 "flags set\n"));
1173 ret = -1;
1174 goto done;
1176 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1177 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1178 "flags set\n"));
1179 ret = -1;
1180 goto done;
1183 if (hdr2->dmaster == ctdb->pnn) {
1184 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1185 "where we are the dmaster\n"));
1186 ret = -1;
1187 goto done;
1190 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1191 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1192 ret = -1;
1193 goto done;
1196 ret = 0;
1198 done:
1199 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1200 free(data2.dptr);
1201 return ret;
1207 * Try to store all these records as part of the vacuuming process
1208 * and return the records we failed to store.
1210 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1211 TDB_DATA indata, TDB_DATA *outdata)
1213 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1214 struct ctdb_db_context *ctdb_db;
1215 int i;
1216 struct ctdb_rec_data *rec;
1217 struct ctdb_marshall_buffer *records;
1219 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1220 DEBUG(DEBUG_ERR,
1221 (__location__ " invalid data in receive_records\n"));
1222 return -1;
1225 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1226 if (!ctdb_db) {
1227 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1228 reply->db_id));
1229 return -1;
1232 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1233 "dbid 0x%x\n", reply->count, reply->db_id));
1235 /* create a blob to send back the records we could not store */
1236 records = (struct ctdb_marshall_buffer *)
1237 talloc_zero_size(outdata,
1238 offsetof(struct ctdb_marshall_buffer, data));
1239 if (records == NULL) {
1240 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1241 return -1;
1243 records->db_id = ctdb_db->db_id;
1245 rec = (struct ctdb_rec_data *)&reply->data[0];
1246 for (i=0; i<reply->count; i++) {
1247 TDB_DATA key, data;
1249 key.dptr = &rec->data[0];
1250 key.dsize = rec->keylen;
1251 data.dptr = &rec->data[key.dsize];
1252 data.dsize = rec->datalen;
1254 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1255 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1256 "in indata\n"));
1257 return -1;
1261 * If we can not store the record we must add it to the reply
1262 * so the lmaster knows it may not purge this record.
1264 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1265 size_t old_size;
1266 struct ctdb_ltdb_header *hdr;
1268 hdr = (struct ctdb_ltdb_header *)data.dptr;
1269 data.dptr += sizeof(*hdr);
1270 data.dsize -= sizeof(*hdr);
1272 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1273 "record with hash 0x%08x in vacuum "
1274 "via RECEIVE_RECORDS\n",
1275 ctdb_hash(&key)));
1277 old_size = talloc_get_size(records);
1278 records = talloc_realloc_size(outdata, records,
1279 old_size + rec->length);
1280 if (records == NULL) {
1281 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1282 "expand\n"));
1283 return -1;
1285 records->count++;
1286 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1289 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1292 *outdata = ctdb_marshall_finish(records);
1294 return 0;
1299 report capabilities
1301 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1303 uint32_t *capabilities = NULL;
1305 capabilities = talloc(outdata, uint32_t);
1306 CTDB_NO_MEMORY(ctdb, capabilities);
1307 *capabilities = ctdb->capabilities;
1309 outdata->dsize = sizeof(uint32_t);
1310 outdata->dptr = (uint8_t *)capabilities;
1312 return 0;
1315 /* The recovery daemon will ping us at regular intervals.
1316 If we havent been pinged for a while we assume the recovery
1317 daemon is inoperable and we restart.
1319 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1321 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1322 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1324 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1326 if (*count < ctdb->tunable.recd_ping_failcount) {
1327 (*count)++;
1328 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1329 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1330 ctdb_recd_ping_timeout, ctdb);
1331 return;
1334 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1336 ctdb_stop_recoverd(ctdb);
1337 ctdb_start_recoverd(ctdb);
1340 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1342 talloc_free(ctdb->recd_ping_count);
1344 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1345 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1347 if (ctdb->tunable.recd_ping_timeout != 0) {
1348 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1349 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1350 ctdb_recd_ping_timeout, ctdb);
1353 return 0;
1358 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1360 uint32_t new_recmaster;
1362 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1363 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1365 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1366 DEBUG(DEBUG_NOTICE,
1367 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1370 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1371 DEBUG(DEBUG_NOTICE,
1372 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1375 ctdb->recovery_master = new_recmaster;
1376 return 0;
1380 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1382 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1383 ctdb_disable_monitoring(ctdb);
1384 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1386 return 0;
1389 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1391 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1392 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1394 return 0;