s3:smb2_server: remove unused get_min_receive_file_size() wrapper function
[Samba.git] / ctdb / server / ctdb_recover.c
blob7249f50ff4cdaf42eb6847e4f8554ee9765a9220
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "db_wrap.h"
31 int
32 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
34 CHECK_CONTROL_DATA_SIZE(0);
35 struct ctdb_vnn_map_wire *map;
36 size_t len;
38 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
39 map = talloc_size(outdata, len);
40 CTDB_NO_MEMORY(ctdb, map);
42 map->generation = ctdb->vnn_map->generation;
43 map->size = ctdb->vnn_map->size;
44 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
46 outdata->dsize = len;
47 outdata->dptr = (uint8_t *)map;
49 return 0;
52 int
53 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
55 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
56 int i;
58 for(i=1; i<=NUM_DB_PRIORITIES; i++) {
59 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
60 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
61 return -1;
65 talloc_free(ctdb->vnn_map);
67 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
68 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
70 ctdb->vnn_map->generation = map->generation;
71 ctdb->vnn_map->size = map->size;
72 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
73 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
75 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
77 return 0;
80 int
81 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
83 uint32_t i, len;
84 struct ctdb_db_context *ctdb_db;
85 struct ctdb_dbid_map *dbid_map;
87 CHECK_CONTROL_DATA_SIZE(0);
89 len = 0;
90 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
91 len++;
95 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
96 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
97 if (!outdata->dptr) {
98 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
99 exit(1);
102 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
103 dbid_map->num = len;
104 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
105 dbid_map->dbs[i].dbid = ctdb_db->db_id;
106 if (ctdb_db->persistent != 0) {
107 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
109 if (ctdb_db->readonly != 0) {
110 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
112 if (ctdb_db->sticky != 0) {
113 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
117 return 0;
120 int
121 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
123 uint32_t i, num_nodes;
124 struct ctdb_node_map *node_map;
126 CHECK_CONTROL_DATA_SIZE(0);
128 num_nodes = ctdb->num_nodes;
130 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
131 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
132 if (!outdata->dptr) {
133 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
134 exit(1);
137 node_map = (struct ctdb_node_map *)outdata->dptr;
138 node_map->num = num_nodes;
139 for (i=0; i<num_nodes; i++) {
140 if (parse_ip(ctdb->nodes[i]->address.address,
141 NULL, /* TODO: pass in the correct interface here*/
143 &node_map->nodes[i].addr) == 0)
145 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
148 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
149 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
152 return 0;
156 get an old style ipv4-only nodemap
158 int
159 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
161 uint32_t i, num_nodes;
162 struct ctdb_node_mapv4 *node_map;
164 CHECK_CONTROL_DATA_SIZE(0);
166 num_nodes = ctdb->num_nodes;
168 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
169 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
170 if (!outdata->dptr) {
171 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
172 exit(1);
175 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
176 node_map->num = num_nodes;
177 for (i=0; i<num_nodes; i++) {
178 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
179 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
180 return -1;
183 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
184 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
187 return 0;
190 static void
191 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
192 struct timeval t, void *private_data)
194 int i, num_nodes;
195 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
196 TALLOC_CTX *tmp_ctx;
197 struct ctdb_node **nodes;
199 tmp_ctx = talloc_new(ctdb);
201 /* steal the old nodes file for a while */
202 talloc_steal(tmp_ctx, ctdb->nodes);
203 nodes = ctdb->nodes;
204 ctdb->nodes = NULL;
205 num_nodes = ctdb->num_nodes;
206 ctdb->num_nodes = 0;
208 /* load the new nodes file */
209 ctdb_load_nodes_file(ctdb);
211 for (i=0; i<ctdb->num_nodes; i++) {
212 /* keep any identical pre-existing nodes and connections */
213 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
214 talloc_free(ctdb->nodes[i]);
215 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
216 continue;
219 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
220 continue;
223 /* any new or different nodes must be added */
224 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
225 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
226 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
228 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
229 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
230 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
234 /* tell the recovery daemon to reaload the nodes file too */
235 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
237 talloc_free(tmp_ctx);
238 return;
242 reload the nodes file after a short delay (so that we can send the response
243 back first
245 int
246 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
248 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
250 return 0;
254 a traverse function for pulling all relevent records from pulldb
256 struct pulldb_data {
257 struct ctdb_context *ctdb;
258 struct ctdb_db_context *ctdb_db;
259 struct ctdb_marshall_buffer *pulldata;
260 uint32_t len;
261 uint32_t allocated_len;
262 bool failed;
265 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
267 struct pulldb_data *params = (struct pulldb_data *)p;
268 struct ctdb_rec_data *rec;
269 struct ctdb_context *ctdb = params->ctdb;
270 struct ctdb_db_context *ctdb_db = params->ctdb_db;
272 /* add the record to the blob */
273 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
274 if (rec == NULL) {
275 params->failed = true;
276 return -1;
278 if (params->len + rec->length >= params->allocated_len) {
279 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
280 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
282 if (params->pulldata == NULL) {
283 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
284 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
286 params->pulldata->count++;
287 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
288 params->len += rec->length;
290 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
291 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
294 talloc_free(rec);
296 return 0;
300 pull a bunch of records from a ltdb, filtering by lmaster
302 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
304 struct ctdb_control_pulldb *pull;
305 struct ctdb_db_context *ctdb_db;
306 struct pulldb_data params;
307 struct ctdb_marshall_buffer *reply;
309 pull = (struct ctdb_control_pulldb *)indata.dptr;
311 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
312 if (!ctdb_db) {
313 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
314 return -1;
317 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
318 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
319 return -1;
322 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
323 CTDB_NO_MEMORY(ctdb, reply);
325 reply->db_id = pull->db_id;
327 params.ctdb = ctdb;
328 params.ctdb_db = ctdb_db;
329 params.pulldata = reply;
330 params.len = offsetof(struct ctdb_marshall_buffer, data);
331 params.allocated_len = params.len;
332 params.failed = false;
334 if (ctdb_db->unhealthy_reason) {
335 /* this is just a warning, as the tdb should be empty anyway */
336 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
337 ctdb_db->db_name, ctdb_db->unhealthy_reason));
340 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
341 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
342 return -1;
345 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
346 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
347 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
348 talloc_free(params.pulldata);
349 return -1;
352 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
354 outdata->dptr = (uint8_t *)params.pulldata;
355 outdata->dsize = params.len;
357 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
358 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
360 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
361 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
365 return 0;
369 push a bunch of records into a ltdb, filtering by rsn
371 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
373 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
374 struct ctdb_db_context *ctdb_db;
375 int i, ret;
376 struct ctdb_rec_data *rec;
378 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
379 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
380 return -1;
383 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
384 if (!ctdb_db) {
385 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
386 return -1;
389 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
390 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
391 return -1;
394 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
395 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
396 return -1;
399 rec = (struct ctdb_rec_data *)&reply->data[0];
401 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
402 reply->count, reply->db_id));
404 for (i=0;i<reply->count;i++) {
405 TDB_DATA key, data;
406 struct ctdb_ltdb_header *hdr;
408 key.dptr = &rec->data[0];
409 key.dsize = rec->keylen;
410 data.dptr = &rec->data[key.dsize];
411 data.dsize = rec->datalen;
413 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
414 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
415 goto failed;
417 hdr = (struct ctdb_ltdb_header *)data.dptr;
418 /* strip off any read only record flags. All readonly records
419 are revoked implicitely by a recovery
421 hdr->flags &= ~CTDB_REC_RO_FLAGS;
423 data.dptr += sizeof(*hdr);
424 data.dsize -= sizeof(*hdr);
426 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
427 if (ret != 0) {
428 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
429 goto failed;
432 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
435 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
436 reply->count, reply->db_id));
438 if (ctdb_db->readonly) {
439 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
440 ctdb_db->db_id));
441 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
442 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
443 ctdb_db->readonly = false;
444 tdb_close(ctdb_db->rottdb);
445 ctdb_db->rottdb = NULL;
446 ctdb_db->readonly = false;
448 while (ctdb_db->revokechild_active != NULL) {
449 talloc_free(ctdb_db->revokechild_active);
453 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
454 return 0;
456 failed:
457 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
458 return -1;
461 struct ctdb_set_recmode_state {
462 struct ctdb_context *ctdb;
463 struct ctdb_req_control *c;
464 uint32_t recmode;
465 int fd[2];
466 struct timed_event *te;
467 struct fd_event *fde;
468 pid_t child;
469 struct timeval start_time;
473 called if our set_recmode child times out. this would happen if
474 ctdb_recovery_lock() would block.
476 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
477 struct timeval t, void *private_data)
479 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
480 struct ctdb_set_recmode_state);
482 /* we consider this a success, not a failure, as we failed to
483 set the recovery lock which is what we wanted. This can be
484 caused by the cluster filesystem being very slow to
485 arbitrate locks immediately after a node failure.
487 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
488 state->ctdb->recovery_mode = state->recmode;
489 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
490 talloc_free(state);
494 /* when we free the recmode state we must kill any child process.
496 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
498 double l = timeval_elapsed(&state->start_time);
500 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
502 if (state->fd[0] != -1) {
503 state->fd[0] = -1;
505 if (state->fd[1] != -1) {
506 state->fd[1] = -1;
508 ctdb_kill(state->ctdb, state->child, SIGKILL);
509 return 0;
512 /* this is called when the client process has completed ctdb_recovery_lock()
513 and has written data back to us through the pipe.
515 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
516 uint16_t flags, void *private_data)
518 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
519 struct ctdb_set_recmode_state);
520 char c = 0;
521 int ret;
523 /* we got a response from our child process so we can abort the
524 timeout.
526 talloc_free(state->te);
527 state->te = NULL;
530 /* read the childs status when trying to lock the reclock file.
531 child wrote 0 if everything is fine and 1 if it did manage
532 to lock the file, which would be a problem since that means
533 we got a request to exit from recovery but we could still lock
534 the file which at this time SHOULD be locked by the recovery
535 daemon on the recmaster
537 ret = read(state->fd[0], &c, 1);
538 if (ret != 1 || c != 0) {
539 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
540 talloc_free(state);
541 return;
544 state->ctdb->recovery_mode = state->recmode;
546 /* release any deferred attach calls from clients */
547 if (state->recmode == CTDB_RECOVERY_NORMAL) {
548 ctdb_process_deferred_attach(state->ctdb);
551 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
552 talloc_free(state);
553 return;
556 static void
557 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
558 struct timeval t, void *private_data)
560 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
562 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
563 talloc_free(ctdb->release_ips_ctx);
564 ctdb->release_ips_ctx = NULL;
566 ctdb_release_all_ips(ctdb);
570 * Set up an event to drop all public ips if we remain in recovery for too
571 * long
573 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
575 if (ctdb->release_ips_ctx != NULL) {
576 talloc_free(ctdb->release_ips_ctx);
578 ctdb->release_ips_ctx = talloc_new(ctdb);
579 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
581 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
582 return 0;
586 set the recovery mode
588 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
589 struct ctdb_req_control *c,
590 TDB_DATA indata, bool *async_reply,
591 const char **errormsg)
593 uint32_t recmode = *(uint32_t *)indata.dptr;
594 int i, ret;
595 struct ctdb_set_recmode_state *state;
596 pid_t parent = getpid();
598 /* if we enter recovery but stay in recovery for too long
599 we will eventually drop all our ip addresses
601 if (recmode == CTDB_RECOVERY_NORMAL) {
602 talloc_free(ctdb->release_ips_ctx);
603 ctdb->release_ips_ctx = NULL;
604 } else {
605 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
606 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
610 if (recmode != ctdb->recovery_mode) {
611 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
612 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
615 if (recmode != CTDB_RECOVERY_NORMAL ||
616 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
617 ctdb->recovery_mode = recmode;
618 return 0;
621 /* some special handling when ending recovery mode */
623 /* force the databases to thaw */
624 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
625 if (ctdb->freeze_handles[i] != NULL) {
626 ctdb_control_thaw(ctdb, i);
630 state = talloc(ctdb, struct ctdb_set_recmode_state);
631 CTDB_NO_MEMORY(ctdb, state);
633 state->start_time = timeval_current();
634 state->fd[0] = -1;
635 state->fd[1] = -1;
637 /* release any deferred attach calls from clients */
638 if (recmode == CTDB_RECOVERY_NORMAL) {
639 ctdb_process_deferred_attach(ctdb);
642 if (ctdb->tunable.verify_recovery_lock == 0) {
643 /* dont need to verify the reclock file */
644 ctdb->recovery_mode = recmode;
645 return 0;
648 /* For the rest of what needs to be done, we need to do this in
649 a child process since
650 1, the call to ctdb_recovery_lock() can block if the cluster
651 filesystem is in the process of recovery.
653 ret = pipe(state->fd);
654 if (ret != 0) {
655 talloc_free(state);
656 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
657 return -1;
660 state->child = ctdb_fork(ctdb);
661 if (state->child == (pid_t)-1) {
662 close(state->fd[0]);
663 close(state->fd[1]);
664 talloc_free(state);
665 return -1;
668 if (state->child == 0) {
669 char cc = 0;
670 close(state->fd[0]);
672 ctdb_set_process_name("ctdb_recmode");
673 debug_extra = talloc_asprintf(NULL, "set_recmode:");
674 /* we should not be able to get the lock on the reclock file,
675 as it should be held by the recovery master
677 if (ctdb_recovery_lock(ctdb, false)) {
678 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
679 cc = 1;
682 write(state->fd[1], &cc, 1);
683 /* make sure we die when our parent dies */
684 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
685 sleep(5);
686 write(state->fd[1], &cc, 1);
688 _exit(0);
690 close(state->fd[1]);
691 set_close_on_exec(state->fd[0]);
693 state->fd[1] = -1;
695 talloc_set_destructor(state, set_recmode_destructor);
697 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
699 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
700 ctdb_set_recmode_timeout, state);
702 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
703 EVENT_FD_READ,
704 set_recmode_handler,
705 (void *)state);
707 if (state->fde == NULL) {
708 talloc_free(state);
709 return -1;
711 tevent_fd_set_auto_close(state->fde);
713 state->ctdb = ctdb;
714 state->recmode = recmode;
715 state->c = talloc_steal(state, c);
717 *async_reply = true;
719 return 0;
724 try and get the recovery lock in shared storage - should only work
725 on the recovery master recovery daemon. Anywhere else is a bug
727 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
729 struct flock lock;
731 if (keep) {
732 DEBUG(DEBUG_ERR, ("Take the recovery lock\n"));
734 if (ctdb->recovery_lock_fd != -1) {
735 close(ctdb->recovery_lock_fd);
736 ctdb->recovery_lock_fd = -1;
739 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
740 if (ctdb->recovery_lock_fd == -1) {
741 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
742 ctdb->recovery_lock_file, strerror(errno)));
743 return false;
746 set_close_on_exec(ctdb->recovery_lock_fd);
748 lock.l_type = F_WRLCK;
749 lock.l_whence = SEEK_SET;
750 lock.l_start = 0;
751 lock.l_len = 1;
752 lock.l_pid = 0;
754 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
755 close(ctdb->recovery_lock_fd);
756 ctdb->recovery_lock_fd = -1;
757 if (keep) {
758 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
760 return false;
763 if (!keep) {
764 close(ctdb->recovery_lock_fd);
765 ctdb->recovery_lock_fd = -1;
768 if (keep) {
769 DEBUG(DEBUG_NOTICE, ("Recovery lock taken successfully\n"));
772 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
774 return true;
778 delete a record as part of the vacuum process
779 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
780 use non-blocking locks
782 return 0 if the record was successfully deleted (i.e. it does not exist
783 when the function returns)
784 or !0 is the record still exists in the tdb after returning.
786 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
788 TDB_DATA key, data, data2;
789 struct ctdb_ltdb_header *hdr, *hdr2;
791 /* these are really internal tdb functions - but we need them here for
792 non-blocking lock of the freelist */
793 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
794 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
797 key.dsize = rec->keylen;
798 key.dptr = &rec->data[0];
799 data.dsize = rec->datalen;
800 data.dptr = &rec->data[rec->keylen];
802 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
803 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
804 return -1;
807 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
808 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
809 return -1;
812 hdr = (struct ctdb_ltdb_header *)data.dptr;
814 /* use a non-blocking lock */
815 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
816 return -1;
819 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
820 if (data2.dptr == NULL) {
821 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
822 return 0;
825 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
826 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
827 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
828 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
830 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
831 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
833 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
834 free(data2.dptr);
835 return 0;
838 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
840 if (hdr2->rsn > hdr->rsn) {
841 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
842 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
843 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
844 free(data2.dptr);
845 return -1;
848 /* do not allow deleting record that have readonly flags set. */
849 if (hdr->flags & CTDB_REC_RO_FLAGS) {
850 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
851 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
852 free(data2.dptr);
853 return -1;
855 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
856 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
857 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
858 free(data2.dptr);
859 return -1;
862 if (hdr2->dmaster == ctdb->pnn) {
863 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
864 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
865 free(data2.dptr);
866 return -1;
869 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
870 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
871 free(data2.dptr);
872 return -1;
875 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
876 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
877 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
878 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
879 free(data2.dptr);
880 return -1;
883 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
884 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
885 free(data2.dptr);
886 return 0;
891 struct recovery_callback_state {
892 struct ctdb_req_control *c;
897 called when the 'recovered' event script has finished
899 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
901 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
903 ctdb_enable_monitoring(ctdb);
904 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
906 if (status != 0) {
907 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
908 if (status == -ETIME) {
909 ctdb_ban_self(ctdb);
913 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
914 talloc_free(state);
916 gettimeofday(&ctdb->last_recovery_finished, NULL);
918 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
919 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
924 recovery has finished
926 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
927 struct ctdb_req_control *c,
928 bool *async_reply)
930 int ret;
931 struct recovery_callback_state *state;
933 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
935 ctdb_persistent_finish_trans3_commits(ctdb);
937 state = talloc(ctdb, struct recovery_callback_state);
938 CTDB_NO_MEMORY(ctdb, state);
940 state->c = c;
942 ctdb_disable_monitoring(ctdb);
944 ret = ctdb_event_script_callback(ctdb, state,
945 ctdb_end_recovery_callback,
946 state,
947 CTDB_EVENT_RECOVERED, "%s", "");
949 if (ret != 0) {
950 ctdb_enable_monitoring(ctdb);
952 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
953 talloc_free(state);
954 return -1;
957 /* tell the control that we will be reply asynchronously */
958 state->c = talloc_steal(state, c);
959 *async_reply = true;
960 return 0;
964 called when the 'startrecovery' event script has finished
966 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
968 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
970 if (status != 0) {
971 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
974 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
975 talloc_free(state);
979 run the startrecovery eventscript
981 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
982 struct ctdb_req_control *c,
983 bool *async_reply)
985 int ret;
986 struct recovery_callback_state *state;
988 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
989 gettimeofday(&ctdb->last_recovery_started, NULL);
991 state = talloc(ctdb, struct recovery_callback_state);
992 CTDB_NO_MEMORY(ctdb, state);
994 state->c = talloc_steal(state, c);
996 ctdb_disable_monitoring(ctdb);
998 ret = ctdb_event_script_callback(ctdb, state,
999 ctdb_start_recovery_callback,
1000 state,
1001 CTDB_EVENT_START_RECOVERY,
1002 "%s", "");
1004 if (ret != 0) {
1005 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1006 talloc_free(state);
1007 return -1;
1010 /* tell the control that we will be reply asynchronously */
1011 *async_reply = true;
1012 return 0;
1016 try to delete all these records as part of the vacuuming process
1017 and return the records we failed to delete
1019 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1021 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1022 struct ctdb_db_context *ctdb_db;
1023 int i;
1024 struct ctdb_rec_data *rec;
1025 struct ctdb_marshall_buffer *records;
1027 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1028 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1029 return -1;
1032 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1033 if (!ctdb_db) {
1034 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1035 return -1;
1039 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1040 reply->count, reply->db_id));
1043 /* create a blob to send back the records we couldnt delete */
1044 records = (struct ctdb_marshall_buffer *)
1045 talloc_zero_size(outdata,
1046 offsetof(struct ctdb_marshall_buffer, data));
1047 if (records == NULL) {
1048 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1049 return -1;
1051 records->db_id = ctdb_db->db_id;
1054 rec = (struct ctdb_rec_data *)&reply->data[0];
1055 for (i=0;i<reply->count;i++) {
1056 TDB_DATA key, data;
1058 key.dptr = &rec->data[0];
1059 key.dsize = rec->keylen;
1060 data.dptr = &rec->data[key.dsize];
1061 data.dsize = rec->datalen;
1063 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1064 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1065 return -1;
1068 /* If we cant delete the record we must add it to the reply
1069 so the lmaster knows it may not purge this record
1071 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1072 size_t old_size;
1073 struct ctdb_ltdb_header *hdr;
1075 hdr = (struct ctdb_ltdb_header *)data.dptr;
1076 data.dptr += sizeof(*hdr);
1077 data.dsize -= sizeof(*hdr);
1079 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1081 old_size = talloc_get_size(records);
1082 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1083 if (records == NULL) {
1084 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1085 return -1;
1087 records->count++;
1088 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1091 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1095 outdata->dptr = (uint8_t *)records;
1096 outdata->dsize = talloc_get_size(records);
1098 return 0;
1102 * Store a record as part of the vacuum process:
1103 * This is called from the RECEIVE_RECORD control which
1104 * the lmaster uses to send the current empty copy
1105 * to all nodes for storing, before it lets the other
1106 * nodes delete the records in the second phase with
1107 * the TRY_DELETE_RECORDS control.
1109 * Only store if we are not lmaster or dmaster, and our
1110 * rsn is <= the provided rsn. Use non-blocking locks.
1112 * return 0 if the record was successfully stored.
1113 * return !0 if the record still exists in the tdb after returning.
1115 static int store_tdb_record(struct ctdb_context *ctdb,
1116 struct ctdb_db_context *ctdb_db,
1117 struct ctdb_rec_data *rec)
1119 TDB_DATA key, data, data2;
1120 struct ctdb_ltdb_header *hdr, *hdr2;
1121 int ret;
1123 key.dsize = rec->keylen;
1124 key.dptr = &rec->data[0];
1125 data.dsize = rec->datalen;
1126 data.dptr = &rec->data[rec->keylen];
1128 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1129 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1130 "where we are lmaster\n"));
1131 return -1;
1134 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1135 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1136 return -1;
1139 hdr = (struct ctdb_ltdb_header *)data.dptr;
1141 /* use a non-blocking lock */
1142 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1143 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1144 return -1;
1147 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1148 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1149 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1150 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1151 ret = -1;
1152 goto done;
1154 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1155 ret = 0;
1156 goto done;
1159 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1161 if (hdr2->rsn > hdr->rsn) {
1162 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1163 "rsn=%llu - called with rsn=%llu\n",
1164 (unsigned long long)hdr2->rsn,
1165 (unsigned long long)hdr->rsn));
1166 ret = -1;
1167 goto done;
1170 /* do not allow vacuuming of records that have readonly flags set. */
1171 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1172 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1173 "flags set\n"));
1174 ret = -1;
1175 goto done;
1177 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1178 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1179 "flags set\n"));
1180 ret = -1;
1181 goto done;
1184 if (hdr2->dmaster == ctdb->pnn) {
1185 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1186 "where we are the dmaster\n"));
1187 ret = -1;
1188 goto done;
1191 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1192 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1193 ret = -1;
1194 goto done;
1197 ret = 0;
1199 done:
1200 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1201 free(data2.dptr);
1202 return ret;
1208 * Try to store all these records as part of the vacuuming process
1209 * and return the records we failed to store.
1211 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1212 TDB_DATA indata, TDB_DATA *outdata)
1214 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1215 struct ctdb_db_context *ctdb_db;
1216 int i;
1217 struct ctdb_rec_data *rec;
1218 struct ctdb_marshall_buffer *records;
1220 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1221 DEBUG(DEBUG_ERR,
1222 (__location__ " invalid data in receive_records\n"));
1223 return -1;
1226 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1227 if (!ctdb_db) {
1228 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1229 reply->db_id));
1230 return -1;
1233 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1234 "dbid 0x%x\n", reply->count, reply->db_id));
1236 /* create a blob to send back the records we could not store */
1237 records = (struct ctdb_marshall_buffer *)
1238 talloc_zero_size(outdata,
1239 offsetof(struct ctdb_marshall_buffer, data));
1240 if (records == NULL) {
1241 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1242 return -1;
1244 records->db_id = ctdb_db->db_id;
1246 rec = (struct ctdb_rec_data *)&reply->data[0];
1247 for (i=0; i<reply->count; i++) {
1248 TDB_DATA key, data;
1250 key.dptr = &rec->data[0];
1251 key.dsize = rec->keylen;
1252 data.dptr = &rec->data[key.dsize];
1253 data.dsize = rec->datalen;
1255 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1256 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1257 "in indata\n"));
1258 return -1;
1262 * If we can not store the record we must add it to the reply
1263 * so the lmaster knows it may not purge this record.
1265 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1266 size_t old_size;
1267 struct ctdb_ltdb_header *hdr;
1269 hdr = (struct ctdb_ltdb_header *)data.dptr;
1270 data.dptr += sizeof(*hdr);
1271 data.dsize -= sizeof(*hdr);
1273 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1274 "record with hash 0x%08x in vacuum "
1275 "via RECEIVE_RECORDS\n",
1276 ctdb_hash(&key)));
1278 old_size = talloc_get_size(records);
1279 records = talloc_realloc_size(outdata, records,
1280 old_size + rec->length);
1281 if (records == NULL) {
1282 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1283 "expand\n"));
1284 return -1;
1286 records->count++;
1287 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1290 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1294 outdata->dptr = (uint8_t *)records;
1295 outdata->dsize = talloc_get_size(records);
1297 return 0;
1302 report capabilities
1304 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1306 uint32_t *capabilities = NULL;
1308 capabilities = talloc(outdata, uint32_t);
1309 CTDB_NO_MEMORY(ctdb, capabilities);
1310 *capabilities = ctdb->capabilities;
1312 outdata->dsize = sizeof(uint32_t);
1313 outdata->dptr = (uint8_t *)capabilities;
1315 return 0;
1318 /* The recovery daemon will ping us at regular intervals.
1319 If we havent been pinged for a while we assume the recovery
1320 daemon is inoperable and we restart.
1322 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1324 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1325 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1327 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1329 if (*count < ctdb->tunable.recd_ping_failcount) {
1330 (*count)++;
1331 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1332 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1333 ctdb_recd_ping_timeout, ctdb);
1334 return;
1337 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1339 ctdb_stop_recoverd(ctdb);
1340 ctdb_start_recoverd(ctdb);
1343 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1345 talloc_free(ctdb->recd_ping_count);
1347 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1348 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1350 if (ctdb->tunable.recd_ping_timeout != 0) {
1351 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1352 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1353 ctdb_recd_ping_timeout, ctdb);
1356 return 0;
1361 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1363 uint32_t new_recmaster;
1365 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1366 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1368 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1369 DEBUG(DEBUG_NOTICE,
1370 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1373 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1374 DEBUG(DEBUG_NOTICE,
1375 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1378 ctdb->recovery_master = new_recmaster;
1379 return 0;
1383 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1385 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1386 ctdb_disable_monitoring(ctdb);
1387 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1389 return 0;
1392 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1394 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1395 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1397 return 0;