s3:unix_msg: use a buffer pointer instead of array indexes for the iov buffer
[Samba.git] / ctdb / server / ctdb_recover.c
blobc26a048c8bbf43597594ca83f0ba45f6de057dcd
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
31 int
32 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
34 struct ctdb_vnn_map_wire *map;
35 size_t len;
37 CHECK_CONTROL_DATA_SIZE(0);
39 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
40 map = talloc_size(outdata, len);
41 CTDB_NO_MEMORY(ctdb, map);
43 map->generation = ctdb->vnn_map->generation;
44 map->size = ctdb->vnn_map->size;
45 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
47 outdata->dsize = len;
48 outdata->dptr = (uint8_t *)map;
50 return 0;
53 int
54 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
56 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
57 int i;
59 for(i=1; i<=NUM_DB_PRIORITIES; i++) {
60 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
61 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
62 return -1;
66 talloc_free(ctdb->vnn_map);
68 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
69 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
71 ctdb->vnn_map->generation = map->generation;
72 ctdb->vnn_map->size = map->size;
73 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
74 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
76 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
78 return 0;
81 int
82 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
84 uint32_t i, len;
85 struct ctdb_db_context *ctdb_db;
86 struct ctdb_dbid_map *dbid_map;
88 CHECK_CONTROL_DATA_SIZE(0);
90 len = 0;
91 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
92 len++;
96 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
97 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
98 if (!outdata->dptr) {
99 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
100 exit(1);
103 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
104 dbid_map->num = len;
105 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
106 dbid_map->dbs[i].dbid = ctdb_db->db_id;
107 if (ctdb_db->persistent != 0) {
108 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
110 if (ctdb_db->readonly != 0) {
111 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
113 if (ctdb_db->sticky != 0) {
114 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
118 return 0;
121 int
122 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
124 uint32_t i, num_nodes;
125 struct ctdb_node_map *node_map;
127 CHECK_CONTROL_DATA_SIZE(0);
129 num_nodes = ctdb->num_nodes;
131 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
132 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
133 if (!outdata->dptr) {
134 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
135 exit(1);
138 node_map = (struct ctdb_node_map *)outdata->dptr;
139 node_map->num = num_nodes;
140 for (i=0; i<num_nodes; i++) {
141 if (parse_ip(ctdb->nodes[i]->address.address,
142 NULL, /* TODO: pass in the correct interface here*/
144 &node_map->nodes[i].addr) == 0)
146 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
149 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
150 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
153 return 0;
157 get an old style ipv4-only nodemap
159 int
160 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
162 uint32_t i, num_nodes;
163 struct ctdb_node_mapv4 *node_map;
165 CHECK_CONTROL_DATA_SIZE(0);
167 num_nodes = ctdb->num_nodes;
169 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
170 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
171 if (!outdata->dptr) {
172 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
173 exit(1);
176 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
177 node_map->num = num_nodes;
178 for (i=0; i<num_nodes; i++) {
179 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
180 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
181 return -1;
184 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
185 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
188 return 0;
191 static void
192 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
193 struct timeval t, void *private_data)
195 int i, num_nodes;
196 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
197 TALLOC_CTX *tmp_ctx;
198 struct ctdb_node **nodes;
200 tmp_ctx = talloc_new(ctdb);
202 /* steal the old nodes file for a while */
203 talloc_steal(tmp_ctx, ctdb->nodes);
204 nodes = ctdb->nodes;
205 ctdb->nodes = NULL;
206 num_nodes = ctdb->num_nodes;
207 ctdb->num_nodes = 0;
209 /* load the new nodes file */
210 ctdb_load_nodes_file(ctdb);
212 for (i=0; i<ctdb->num_nodes; i++) {
213 /* keep any identical pre-existing nodes and connections */
214 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
215 talloc_free(ctdb->nodes[i]);
216 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
217 continue;
220 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
221 continue;
224 /* any new or different nodes must be added */
225 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
226 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
227 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
229 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
230 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
231 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
235 /* tell the recovery daemon to reaload the nodes file too */
236 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
238 talloc_free(tmp_ctx);
239 return;
243 reload the nodes file after a short delay (so that we can send the response
244 back first
246 int
247 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
249 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
251 return 0;
255 a traverse function for pulling all relevent records from pulldb
257 struct pulldb_data {
258 struct ctdb_context *ctdb;
259 struct ctdb_db_context *ctdb_db;
260 struct ctdb_marshall_buffer *pulldata;
261 uint32_t len;
262 uint32_t allocated_len;
263 bool failed;
266 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
268 struct pulldb_data *params = (struct pulldb_data *)p;
269 struct ctdb_rec_data *rec;
270 struct ctdb_context *ctdb = params->ctdb;
271 struct ctdb_db_context *ctdb_db = params->ctdb_db;
273 /* add the record to the blob */
274 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
275 if (rec == NULL) {
276 params->failed = true;
277 return -1;
279 if (params->len + rec->length >= params->allocated_len) {
280 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
281 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
283 if (params->pulldata == NULL) {
284 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
285 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
287 params->pulldata->count++;
288 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
289 params->len += rec->length;
291 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
292 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
295 talloc_free(rec);
297 return 0;
301 pull a bunch of records from a ltdb, filtering by lmaster
303 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
305 struct ctdb_control_pulldb *pull;
306 struct ctdb_db_context *ctdb_db;
307 struct pulldb_data params;
308 struct ctdb_marshall_buffer *reply;
310 pull = (struct ctdb_control_pulldb *)indata.dptr;
312 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
313 if (!ctdb_db) {
314 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
315 return -1;
318 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
319 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
320 return -1;
323 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
324 CTDB_NO_MEMORY(ctdb, reply);
326 reply->db_id = pull->db_id;
328 params.ctdb = ctdb;
329 params.ctdb_db = ctdb_db;
330 params.pulldata = reply;
331 params.len = offsetof(struct ctdb_marshall_buffer, data);
332 params.allocated_len = params.len;
333 params.failed = false;
335 if (ctdb_db->unhealthy_reason) {
336 /* this is just a warning, as the tdb should be empty anyway */
337 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
338 ctdb_db->db_name, ctdb_db->unhealthy_reason));
341 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
342 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
343 return -1;
346 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
347 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
348 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
349 talloc_free(params.pulldata);
350 return -1;
353 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
355 outdata->dptr = (uint8_t *)params.pulldata;
356 outdata->dsize = params.len;
358 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
359 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
361 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
362 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
366 return 0;
370 push a bunch of records into a ltdb, filtering by rsn
372 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
374 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
375 struct ctdb_db_context *ctdb_db;
376 int i, ret;
377 struct ctdb_rec_data *rec;
379 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
380 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
381 return -1;
384 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
385 if (!ctdb_db) {
386 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
387 return -1;
390 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
391 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
392 return -1;
395 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
396 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
397 return -1;
400 rec = (struct ctdb_rec_data *)&reply->data[0];
402 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
403 reply->count, reply->db_id));
405 for (i=0;i<reply->count;i++) {
406 TDB_DATA key, data;
407 struct ctdb_ltdb_header *hdr;
409 key.dptr = &rec->data[0];
410 key.dsize = rec->keylen;
411 data.dptr = &rec->data[key.dsize];
412 data.dsize = rec->datalen;
414 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
415 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
416 goto failed;
418 hdr = (struct ctdb_ltdb_header *)data.dptr;
419 /* strip off any read only record flags. All readonly records
420 are revoked implicitely by a recovery
422 hdr->flags &= ~CTDB_REC_RO_FLAGS;
424 data.dptr += sizeof(*hdr);
425 data.dsize -= sizeof(*hdr);
427 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
428 if (ret != 0) {
429 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
430 goto failed;
433 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
436 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
437 reply->count, reply->db_id));
439 if (ctdb_db->readonly) {
440 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
441 ctdb_db->db_id));
442 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
443 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
444 ctdb_db->readonly = false;
445 tdb_close(ctdb_db->rottdb);
446 ctdb_db->rottdb = NULL;
447 ctdb_db->readonly = false;
449 while (ctdb_db->revokechild_active != NULL) {
450 talloc_free(ctdb_db->revokechild_active);
454 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
455 return 0;
457 failed:
458 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
459 return -1;
462 struct ctdb_set_recmode_state {
463 struct ctdb_context *ctdb;
464 struct ctdb_req_control *c;
465 uint32_t recmode;
466 int fd[2];
467 struct timed_event *te;
468 struct fd_event *fde;
469 pid_t child;
470 struct timeval start_time;
474 called if our set_recmode child times out. this would happen if
475 ctdb_recovery_lock() would block.
477 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
478 struct timeval t, void *private_data)
480 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
481 struct ctdb_set_recmode_state);
483 /* we consider this a success, not a failure, as we failed to
484 set the recovery lock which is what we wanted. This can be
485 caused by the cluster filesystem being very slow to
486 arbitrate locks immediately after a node failure.
488 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
489 state->ctdb->recovery_mode = state->recmode;
490 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
491 talloc_free(state);
495 /* when we free the recmode state we must kill any child process.
497 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
499 double l = timeval_elapsed(&state->start_time);
501 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
503 if (state->fd[0] != -1) {
504 state->fd[0] = -1;
506 if (state->fd[1] != -1) {
507 state->fd[1] = -1;
509 ctdb_kill(state->ctdb, state->child, SIGKILL);
510 return 0;
513 /* this is called when the client process has completed ctdb_recovery_lock()
514 and has written data back to us through the pipe.
516 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
517 uint16_t flags, void *private_data)
519 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
520 struct ctdb_set_recmode_state);
521 char c = 0;
522 int ret;
524 /* we got a response from our child process so we can abort the
525 timeout.
527 talloc_free(state->te);
528 state->te = NULL;
531 /* read the childs status when trying to lock the reclock file.
532 child wrote 0 if everything is fine and 1 if it did manage
533 to lock the file, which would be a problem since that means
534 we got a request to exit from recovery but we could still lock
535 the file which at this time SHOULD be locked by the recovery
536 daemon on the recmaster
538 ret = sys_read(state->fd[0], &c, 1);
539 if (ret != 1 || c != 0) {
540 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
541 talloc_free(state);
542 return;
545 state->ctdb->recovery_mode = state->recmode;
547 /* release any deferred attach calls from clients */
548 if (state->recmode == CTDB_RECOVERY_NORMAL) {
549 ctdb_process_deferred_attach(state->ctdb);
552 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
553 talloc_free(state);
554 return;
557 static void
558 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
559 struct timeval t, void *private_data)
561 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
563 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
564 talloc_free(ctdb->release_ips_ctx);
565 ctdb->release_ips_ctx = NULL;
567 ctdb_release_all_ips(ctdb);
571 * Set up an event to drop all public ips if we remain in recovery for too
572 * long
574 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
576 if (ctdb->release_ips_ctx != NULL) {
577 talloc_free(ctdb->release_ips_ctx);
579 ctdb->release_ips_ctx = talloc_new(ctdb);
580 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
582 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
583 return 0;
587 set the recovery mode
589 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
590 struct ctdb_req_control *c,
591 TDB_DATA indata, bool *async_reply,
592 const char **errormsg)
594 uint32_t recmode = *(uint32_t *)indata.dptr;
595 int i, ret;
596 struct ctdb_set_recmode_state *state;
597 pid_t parent = getpid();
599 /* if we enter recovery but stay in recovery for too long
600 we will eventually drop all our ip addresses
602 if (recmode == CTDB_RECOVERY_NORMAL) {
603 talloc_free(ctdb->release_ips_ctx);
604 ctdb->release_ips_ctx = NULL;
605 } else {
606 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
607 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
611 if (recmode != ctdb->recovery_mode) {
612 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
613 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
616 if (recmode != CTDB_RECOVERY_NORMAL ||
617 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
618 ctdb->recovery_mode = recmode;
619 return 0;
622 /* some special handling when ending recovery mode */
624 /* force the databases to thaw */
625 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
626 if (ctdb->freeze_handles[i] != NULL) {
627 ctdb_control_thaw(ctdb, i, false);
631 state = talloc(ctdb, struct ctdb_set_recmode_state);
632 CTDB_NO_MEMORY(ctdb, state);
634 state->start_time = timeval_current();
635 state->fd[0] = -1;
636 state->fd[1] = -1;
638 /* release any deferred attach calls from clients */
639 if (recmode == CTDB_RECOVERY_NORMAL) {
640 ctdb_process_deferred_attach(ctdb);
643 if (ctdb->tunable.verify_recovery_lock == 0) {
644 /* dont need to verify the reclock file */
645 ctdb->recovery_mode = recmode;
646 return 0;
649 /* For the rest of what needs to be done, we need to do this in
650 a child process since
651 1, the call to ctdb_recovery_lock() can block if the cluster
652 filesystem is in the process of recovery.
654 ret = pipe(state->fd);
655 if (ret != 0) {
656 talloc_free(state);
657 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
658 return -1;
661 state->child = ctdb_fork(ctdb);
662 if (state->child == (pid_t)-1) {
663 close(state->fd[0]);
664 close(state->fd[1]);
665 talloc_free(state);
666 return -1;
669 if (state->child == 0) {
670 char cc = 0;
671 close(state->fd[0]);
673 ctdb_set_process_name("ctdb_recmode");
674 debug_extra = talloc_asprintf(NULL, "set_recmode:");
675 /* we should not be able to get the lock on the reclock file,
676 as it should be held by the recovery master
678 if (ctdb_recovery_lock(ctdb, false)) {
679 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
680 cc = 1;
683 sys_write(state->fd[1], &cc, 1);
684 /* make sure we die when our parent dies */
685 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
686 sleep(5);
687 sys_write(state->fd[1], &cc, 1);
689 _exit(0);
691 close(state->fd[1]);
692 set_close_on_exec(state->fd[0]);
694 state->fd[1] = -1;
696 talloc_set_destructor(state, set_recmode_destructor);
698 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
700 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
701 ctdb_set_recmode_timeout, state);
703 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
704 EVENT_FD_READ,
705 set_recmode_handler,
706 (void *)state);
708 if (state->fde == NULL) {
709 talloc_free(state);
710 return -1;
712 tevent_fd_set_auto_close(state->fde);
714 state->ctdb = ctdb;
715 state->recmode = recmode;
716 state->c = talloc_steal(state, c);
718 *async_reply = true;
720 return 0;
725 try and get the recovery lock in shared storage - should only work
726 on the recovery master recovery daemon. Anywhere else is a bug
728 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
730 struct flock lock;
732 if (keep) {
733 DEBUG(DEBUG_ERR, ("Take the recovery lock\n"));
735 if (ctdb->recovery_lock_fd != -1) {
736 close(ctdb->recovery_lock_fd);
737 ctdb->recovery_lock_fd = -1;
740 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
741 if (ctdb->recovery_lock_fd == -1) {
742 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
743 ctdb->recovery_lock_file, strerror(errno)));
744 return false;
747 set_close_on_exec(ctdb->recovery_lock_fd);
749 lock.l_type = F_WRLCK;
750 lock.l_whence = SEEK_SET;
751 lock.l_start = 0;
752 lock.l_len = 1;
753 lock.l_pid = 0;
755 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
756 close(ctdb->recovery_lock_fd);
757 ctdb->recovery_lock_fd = -1;
758 if (keep) {
759 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
761 return false;
764 if (!keep) {
765 close(ctdb->recovery_lock_fd);
766 ctdb->recovery_lock_fd = -1;
769 if (keep) {
770 DEBUG(DEBUG_NOTICE, ("Recovery lock taken successfully\n"));
773 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
775 return true;
779 delete a record as part of the vacuum process
780 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
781 use non-blocking locks
783 return 0 if the record was successfully deleted (i.e. it does not exist
784 when the function returns)
785 or !0 is the record still exists in the tdb after returning.
787 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
789 TDB_DATA key, data, data2;
790 struct ctdb_ltdb_header *hdr, *hdr2;
792 /* these are really internal tdb functions - but we need them here for
793 non-blocking lock of the freelist */
794 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
795 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
798 key.dsize = rec->keylen;
799 key.dptr = &rec->data[0];
800 data.dsize = rec->datalen;
801 data.dptr = &rec->data[rec->keylen];
803 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
804 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
805 return -1;
808 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
809 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
810 return -1;
813 hdr = (struct ctdb_ltdb_header *)data.dptr;
815 /* use a non-blocking lock */
816 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
817 return -1;
820 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
821 if (data2.dptr == NULL) {
822 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
823 return 0;
826 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
827 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
828 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
829 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
831 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
832 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
834 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
835 free(data2.dptr);
836 return 0;
839 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
841 if (hdr2->rsn > hdr->rsn) {
842 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
843 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
844 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
845 free(data2.dptr);
846 return -1;
849 /* do not allow deleting record that have readonly flags set. */
850 if (hdr->flags & CTDB_REC_RO_FLAGS) {
851 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
852 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
853 free(data2.dptr);
854 return -1;
856 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
857 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
858 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
859 free(data2.dptr);
860 return -1;
863 if (hdr2->dmaster == ctdb->pnn) {
864 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
865 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
866 free(data2.dptr);
867 return -1;
870 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
871 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
872 free(data2.dptr);
873 return -1;
876 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
877 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
878 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
879 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
880 free(data2.dptr);
881 return -1;
884 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
885 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
886 free(data2.dptr);
887 return 0;
892 struct recovery_callback_state {
893 struct ctdb_req_control *c;
898 called when the 'recovered' event script has finished
900 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
902 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
904 ctdb_enable_monitoring(ctdb);
905 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
907 if (status != 0) {
908 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
909 if (status == -ETIME) {
910 ctdb_ban_self(ctdb);
914 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
915 talloc_free(state);
917 gettimeofday(&ctdb->last_recovery_finished, NULL);
919 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
920 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
925 recovery has finished
927 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
928 struct ctdb_req_control *c,
929 bool *async_reply)
931 int ret;
932 struct recovery_callback_state *state;
934 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
936 ctdb_persistent_finish_trans3_commits(ctdb);
938 state = talloc(ctdb, struct recovery_callback_state);
939 CTDB_NO_MEMORY(ctdb, state);
941 state->c = c;
943 ctdb_disable_monitoring(ctdb);
945 ret = ctdb_event_script_callback(ctdb, state,
946 ctdb_end_recovery_callback,
947 state,
948 CTDB_EVENT_RECOVERED, "%s", "");
950 if (ret != 0) {
951 ctdb_enable_monitoring(ctdb);
953 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
954 talloc_free(state);
955 return -1;
958 /* tell the control that we will be reply asynchronously */
959 state->c = talloc_steal(state, c);
960 *async_reply = true;
961 return 0;
965 called when the 'startrecovery' event script has finished
967 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
969 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
971 if (status != 0) {
972 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
975 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
976 talloc_free(state);
980 run the startrecovery eventscript
982 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
983 struct ctdb_req_control *c,
984 bool *async_reply)
986 int ret;
987 struct recovery_callback_state *state;
989 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
990 gettimeofday(&ctdb->last_recovery_started, NULL);
992 state = talloc(ctdb, struct recovery_callback_state);
993 CTDB_NO_MEMORY(ctdb, state);
995 state->c = talloc_steal(state, c);
997 ctdb_disable_monitoring(ctdb);
999 ret = ctdb_event_script_callback(ctdb, state,
1000 ctdb_start_recovery_callback,
1001 state,
1002 CTDB_EVENT_START_RECOVERY,
1003 "%s", "");
1005 if (ret != 0) {
1006 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1007 talloc_free(state);
1008 return -1;
1011 /* tell the control that we will be reply asynchronously */
1012 *async_reply = true;
1013 return 0;
1017 try to delete all these records as part of the vacuuming process
1018 and return the records we failed to delete
1020 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1022 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1023 struct ctdb_db_context *ctdb_db;
1024 int i;
1025 struct ctdb_rec_data *rec;
1026 struct ctdb_marshall_buffer *records;
1028 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1029 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1030 return -1;
1033 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1034 if (!ctdb_db) {
1035 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1036 return -1;
1040 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1041 reply->count, reply->db_id));
1044 /* create a blob to send back the records we couldnt delete */
1045 records = (struct ctdb_marshall_buffer *)
1046 talloc_zero_size(outdata,
1047 offsetof(struct ctdb_marshall_buffer, data));
1048 if (records == NULL) {
1049 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1050 return -1;
1052 records->db_id = ctdb_db->db_id;
1055 rec = (struct ctdb_rec_data *)&reply->data[0];
1056 for (i=0;i<reply->count;i++) {
1057 TDB_DATA key, data;
1059 key.dptr = &rec->data[0];
1060 key.dsize = rec->keylen;
1061 data.dptr = &rec->data[key.dsize];
1062 data.dsize = rec->datalen;
1064 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1065 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1066 return -1;
1069 /* If we cant delete the record we must add it to the reply
1070 so the lmaster knows it may not purge this record
1072 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1073 size_t old_size;
1074 struct ctdb_ltdb_header *hdr;
1076 hdr = (struct ctdb_ltdb_header *)data.dptr;
1077 data.dptr += sizeof(*hdr);
1078 data.dsize -= sizeof(*hdr);
1080 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1082 old_size = talloc_get_size(records);
1083 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1084 if (records == NULL) {
1085 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1086 return -1;
1088 records->count++;
1089 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1092 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1096 *outdata = ctdb_marshall_finish(records);
1098 return 0;
1102 * Store a record as part of the vacuum process:
1103 * This is called from the RECEIVE_RECORD control which
1104 * the lmaster uses to send the current empty copy
1105 * to all nodes for storing, before it lets the other
1106 * nodes delete the records in the second phase with
1107 * the TRY_DELETE_RECORDS control.
1109 * Only store if we are not lmaster or dmaster, and our
1110 * rsn is <= the provided rsn. Use non-blocking locks.
1112 * return 0 if the record was successfully stored.
1113 * return !0 if the record still exists in the tdb after returning.
1115 static int store_tdb_record(struct ctdb_context *ctdb,
1116 struct ctdb_db_context *ctdb_db,
1117 struct ctdb_rec_data *rec)
1119 TDB_DATA key, data, data2;
1120 struct ctdb_ltdb_header *hdr, *hdr2;
1121 int ret;
1123 key.dsize = rec->keylen;
1124 key.dptr = &rec->data[0];
1125 data.dsize = rec->datalen;
1126 data.dptr = &rec->data[rec->keylen];
1128 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1129 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1130 "where we are lmaster\n"));
1131 return -1;
1134 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1135 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1136 return -1;
1139 hdr = (struct ctdb_ltdb_header *)data.dptr;
1141 /* use a non-blocking lock */
1142 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1143 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1144 return -1;
1147 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1148 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1149 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1150 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1151 ret = -1;
1152 goto done;
1154 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1155 ret = 0;
1156 goto done;
1159 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1161 if (hdr2->rsn > hdr->rsn) {
1162 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1163 "rsn=%llu - called with rsn=%llu\n",
1164 (unsigned long long)hdr2->rsn,
1165 (unsigned long long)hdr->rsn));
1166 ret = -1;
1167 goto done;
1170 /* do not allow vacuuming of records that have readonly flags set. */
1171 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1172 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1173 "flags set\n"));
1174 ret = -1;
1175 goto done;
1177 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1178 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1179 "flags set\n"));
1180 ret = -1;
1181 goto done;
1184 if (hdr2->dmaster == ctdb->pnn) {
1185 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1186 "where we are the dmaster\n"));
1187 ret = -1;
1188 goto done;
1191 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1192 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1193 ret = -1;
1194 goto done;
1197 ret = 0;
1199 done:
1200 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1201 free(data2.dptr);
1202 return ret;
1208 * Try to store all these records as part of the vacuuming process
1209 * and return the records we failed to store.
1211 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1212 TDB_DATA indata, TDB_DATA *outdata)
1214 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1215 struct ctdb_db_context *ctdb_db;
1216 int i;
1217 struct ctdb_rec_data *rec;
1218 struct ctdb_marshall_buffer *records;
1220 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1221 DEBUG(DEBUG_ERR,
1222 (__location__ " invalid data in receive_records\n"));
1223 return -1;
1226 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1227 if (!ctdb_db) {
1228 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1229 reply->db_id));
1230 return -1;
1233 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1234 "dbid 0x%x\n", reply->count, reply->db_id));
1236 /* create a blob to send back the records we could not store */
1237 records = (struct ctdb_marshall_buffer *)
1238 talloc_zero_size(outdata,
1239 offsetof(struct ctdb_marshall_buffer, data));
1240 if (records == NULL) {
1241 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1242 return -1;
1244 records->db_id = ctdb_db->db_id;
1246 rec = (struct ctdb_rec_data *)&reply->data[0];
1247 for (i=0; i<reply->count; i++) {
1248 TDB_DATA key, data;
1250 key.dptr = &rec->data[0];
1251 key.dsize = rec->keylen;
1252 data.dptr = &rec->data[key.dsize];
1253 data.dsize = rec->datalen;
1255 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1256 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1257 "in indata\n"));
1258 return -1;
1262 * If we can not store the record we must add it to the reply
1263 * so the lmaster knows it may not purge this record.
1265 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1266 size_t old_size;
1267 struct ctdb_ltdb_header *hdr;
1269 hdr = (struct ctdb_ltdb_header *)data.dptr;
1270 data.dptr += sizeof(*hdr);
1271 data.dsize -= sizeof(*hdr);
1273 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1274 "record with hash 0x%08x in vacuum "
1275 "via RECEIVE_RECORDS\n",
1276 ctdb_hash(&key)));
1278 old_size = talloc_get_size(records);
1279 records = talloc_realloc_size(outdata, records,
1280 old_size + rec->length);
1281 if (records == NULL) {
1282 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1283 "expand\n"));
1284 return -1;
1286 records->count++;
1287 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1290 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1293 *outdata = ctdb_marshall_finish(records);
1295 return 0;
1300 report capabilities
1302 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1304 uint32_t *capabilities = NULL;
1306 capabilities = talloc(outdata, uint32_t);
1307 CTDB_NO_MEMORY(ctdb, capabilities);
1308 *capabilities = ctdb->capabilities;
1310 outdata->dsize = sizeof(uint32_t);
1311 outdata->dptr = (uint8_t *)capabilities;
1313 return 0;
1316 /* The recovery daemon will ping us at regular intervals.
1317 If we havent been pinged for a while we assume the recovery
1318 daemon is inoperable and we restart.
1320 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1322 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1323 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1325 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1327 if (*count < ctdb->tunable.recd_ping_failcount) {
1328 (*count)++;
1329 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1330 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1331 ctdb_recd_ping_timeout, ctdb);
1332 return;
1335 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1337 ctdb_stop_recoverd(ctdb);
1338 ctdb_start_recoverd(ctdb);
1341 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1343 talloc_free(ctdb->recd_ping_count);
1345 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1346 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1348 if (ctdb->tunable.recd_ping_timeout != 0) {
1349 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1350 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1351 ctdb_recd_ping_timeout, ctdb);
1354 return 0;
1359 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1361 uint32_t new_recmaster;
1363 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1364 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1366 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1367 DEBUG(DEBUG_NOTICE,
1368 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1371 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1372 DEBUG(DEBUG_NOTICE,
1373 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1376 ctdb->recovery_master = new_recmaster;
1377 return 0;
1381 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1383 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1384 ctdb_disable_monitoring(ctdb);
1385 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1387 return 0;
1390 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1392 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1393 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1395 return 0;