ctdb-recoverd: Improve error messages on recovery lock coherence fail
[Samba.git] / ctdb / server / ctdb_recover.c
blob4b9407f6c8e787588ef2c6b22495417edf2cd9c1
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
31 int
32 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
34 struct ctdb_vnn_map_wire *map;
35 size_t len;
37 CHECK_CONTROL_DATA_SIZE(0);
39 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
40 map = talloc_size(outdata, len);
41 CTDB_NO_MEMORY(ctdb, map);
43 map->generation = ctdb->vnn_map->generation;
44 map->size = ctdb->vnn_map->size;
45 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
47 outdata->dsize = len;
48 outdata->dptr = (uint8_t *)map;
50 return 0;
53 int
54 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
56 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
57 int i;
59 for(i=1; i<=NUM_DB_PRIORITIES; i++) {
60 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
61 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
62 return -1;
66 talloc_free(ctdb->vnn_map);
68 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
69 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
71 ctdb->vnn_map->generation = map->generation;
72 ctdb->vnn_map->size = map->size;
73 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
74 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
76 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
78 return 0;
81 int
82 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
84 uint32_t i, len;
85 struct ctdb_db_context *ctdb_db;
86 struct ctdb_dbid_map *dbid_map;
88 CHECK_CONTROL_DATA_SIZE(0);
90 len = 0;
91 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
92 len++;
96 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
97 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
98 if (!outdata->dptr) {
99 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
100 exit(1);
103 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
104 dbid_map->num = len;
105 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
106 dbid_map->dbs[i].dbid = ctdb_db->db_id;
107 if (ctdb_db->persistent != 0) {
108 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
110 if (ctdb_db->readonly != 0) {
111 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
113 if (ctdb_db->sticky != 0) {
114 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
118 return 0;
121 int
122 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
124 uint32_t i, num_nodes;
125 struct ctdb_node_map *node_map;
127 CHECK_CONTROL_DATA_SIZE(0);
129 num_nodes = ctdb->num_nodes;
131 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
132 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
133 if (!outdata->dptr) {
134 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
135 exit(1);
138 node_map = (struct ctdb_node_map *)outdata->dptr;
139 node_map->num = num_nodes;
140 for (i=0; i<num_nodes; i++) {
141 if (parse_ip(ctdb->nodes[i]->address.address,
142 NULL, /* TODO: pass in the correct interface here*/
144 &node_map->nodes[i].addr) == 0)
146 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
149 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
150 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
153 return 0;
157 get an old style ipv4-only nodemap
159 int
160 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
162 uint32_t i, num_nodes;
163 struct ctdb_node_mapv4 *node_map;
165 CHECK_CONTROL_DATA_SIZE(0);
167 num_nodes = ctdb->num_nodes;
169 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
170 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
171 if (!outdata->dptr) {
172 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
173 exit(1);
176 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
177 node_map->num = num_nodes;
178 for (i=0; i<num_nodes; i++) {
179 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
180 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
181 return -1;
184 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
185 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
188 return 0;
191 static void
192 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
193 struct timeval t, void *private_data)
195 int i, num_nodes;
196 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
197 TALLOC_CTX *tmp_ctx;
198 struct ctdb_node **nodes;
200 tmp_ctx = talloc_new(ctdb);
202 /* steal the old nodes file for a while */
203 talloc_steal(tmp_ctx, ctdb->nodes);
204 nodes = ctdb->nodes;
205 ctdb->nodes = NULL;
206 num_nodes = ctdb->num_nodes;
207 ctdb->num_nodes = 0;
209 /* load the new nodes file */
210 ctdb_load_nodes_file(ctdb);
212 for (i=0; i<ctdb->num_nodes; i++) {
213 /* keep any identical pre-existing nodes and connections */
214 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
215 talloc_free(ctdb->nodes[i]);
216 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
217 continue;
220 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
221 continue;
224 /* any new or different nodes must be added */
225 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
226 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
227 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
229 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
230 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
231 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
235 /* tell the recovery daemon to reaload the nodes file too */
236 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
238 talloc_free(tmp_ctx);
239 return;
243 reload the nodes file after a short delay (so that we can send the response
244 back first
246 int
247 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
249 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
251 return 0;
255 a traverse function for pulling all relevent records from pulldb
257 struct pulldb_data {
258 struct ctdb_context *ctdb;
259 struct ctdb_db_context *ctdb_db;
260 struct ctdb_marshall_buffer *pulldata;
261 uint32_t len;
262 uint32_t allocated_len;
263 bool failed;
266 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
268 struct pulldb_data *params = (struct pulldb_data *)p;
269 struct ctdb_rec_data *rec;
270 struct ctdb_context *ctdb = params->ctdb;
271 struct ctdb_db_context *ctdb_db = params->ctdb_db;
273 /* add the record to the blob */
274 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
275 if (rec == NULL) {
276 params->failed = true;
277 return -1;
279 if (params->len + rec->length >= params->allocated_len) {
280 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
281 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
283 if (params->pulldata == NULL) {
284 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
285 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
287 params->pulldata->count++;
288 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
289 params->len += rec->length;
291 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
292 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
295 talloc_free(rec);
297 return 0;
301 pull a bunch of records from a ltdb, filtering by lmaster
303 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
305 struct ctdb_control_pulldb *pull;
306 struct ctdb_db_context *ctdb_db;
307 struct pulldb_data params;
308 struct ctdb_marshall_buffer *reply;
310 pull = (struct ctdb_control_pulldb *)indata.dptr;
312 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
313 if (!ctdb_db) {
314 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
315 return -1;
318 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
319 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
320 return -1;
323 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
324 CTDB_NO_MEMORY(ctdb, reply);
326 reply->db_id = pull->db_id;
328 params.ctdb = ctdb;
329 params.ctdb_db = ctdb_db;
330 params.pulldata = reply;
331 params.len = offsetof(struct ctdb_marshall_buffer, data);
332 params.allocated_len = params.len;
333 params.failed = false;
335 if (ctdb_db->unhealthy_reason) {
336 /* this is just a warning, as the tdb should be empty anyway */
337 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
338 ctdb_db->db_name, ctdb_db->unhealthy_reason));
341 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
342 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
343 return -1;
346 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
347 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
348 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
349 talloc_free(params.pulldata);
350 return -1;
353 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
355 outdata->dptr = (uint8_t *)params.pulldata;
356 outdata->dsize = params.len;
358 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
359 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
361 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
362 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
366 return 0;
370 push a bunch of records into a ltdb, filtering by rsn
372 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
374 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
375 struct ctdb_db_context *ctdb_db;
376 int i, ret;
377 struct ctdb_rec_data *rec;
379 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
380 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
381 return -1;
384 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
385 if (!ctdb_db) {
386 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
387 return -1;
390 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
391 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
392 return -1;
395 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
396 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
397 return -1;
400 rec = (struct ctdb_rec_data *)&reply->data[0];
402 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
403 reply->count, reply->db_id));
405 for (i=0;i<reply->count;i++) {
406 TDB_DATA key, data;
407 struct ctdb_ltdb_header *hdr;
409 key.dptr = &rec->data[0];
410 key.dsize = rec->keylen;
411 data.dptr = &rec->data[key.dsize];
412 data.dsize = rec->datalen;
414 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
415 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
416 goto failed;
418 hdr = (struct ctdb_ltdb_header *)data.dptr;
419 /* strip off any read only record flags. All readonly records
420 are revoked implicitely by a recovery
422 hdr->flags &= ~CTDB_REC_RO_FLAGS;
424 data.dptr += sizeof(*hdr);
425 data.dsize -= sizeof(*hdr);
427 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
428 if (ret != 0) {
429 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
430 goto failed;
433 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
436 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
437 reply->count, reply->db_id));
439 if (ctdb_db->readonly) {
440 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
441 ctdb_db->db_id));
442 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
443 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
444 ctdb_db->readonly = false;
445 tdb_close(ctdb_db->rottdb);
446 ctdb_db->rottdb = NULL;
447 ctdb_db->readonly = false;
449 while (ctdb_db->revokechild_active != NULL) {
450 talloc_free(ctdb_db->revokechild_active);
454 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
455 return 0;
457 failed:
458 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
459 return -1;
462 struct ctdb_set_recmode_state {
463 struct ctdb_context *ctdb;
464 struct ctdb_req_control *c;
465 uint32_t recmode;
466 int fd[2];
467 struct timed_event *te;
468 struct fd_event *fde;
469 pid_t child;
470 struct timeval start_time;
474 called if our set_recmode child times out. this would happen if
475 ctdb_recovery_lock() would block.
477 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
478 struct timeval t, void *private_data)
480 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
481 struct ctdb_set_recmode_state);
483 /* we consider this a success, not a failure, as we failed to
484 set the recovery lock which is what we wanted. This can be
485 caused by the cluster filesystem being very slow to
486 arbitrate locks immediately after a node failure.
488 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
489 state->ctdb->recovery_mode = state->recmode;
490 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
491 talloc_free(state);
495 /* when we free the recmode state we must kill any child process.
497 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
499 double l = timeval_elapsed(&state->start_time);
501 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
503 if (state->fd[0] != -1) {
504 state->fd[0] = -1;
506 if (state->fd[1] != -1) {
507 state->fd[1] = -1;
509 ctdb_kill(state->ctdb, state->child, SIGKILL);
510 return 0;
513 /* this is called when the client process has completed ctdb_recovery_lock()
514 and has written data back to us through the pipe.
516 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
517 uint16_t flags, void *private_data)
519 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
520 struct ctdb_set_recmode_state);
521 char c = 0;
522 int ret;
524 /* we got a response from our child process so we can abort the
525 timeout.
527 talloc_free(state->te);
528 state->te = NULL;
531 /* If, as expected, the child was unable to take the recovery
532 * lock then it will have written 0 into the pipe, so
533 * continue. However, any other value (e.g. 1) indicates that
534 * it was able to take the recovery lock when it should have
535 * been held by the recovery daemon on the recovery master.
537 ret = sys_read(state->fd[0], &c, 1);
538 if (ret != 1 || c != 0) {
539 ctdb_request_control_reply(
540 state->ctdb, state->c, NULL, -1,
541 "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem");
542 talloc_free(state);
543 return;
546 state->ctdb->recovery_mode = state->recmode;
548 /* release any deferred attach calls from clients */
549 if (state->recmode == CTDB_RECOVERY_NORMAL) {
550 ctdb_process_deferred_attach(state->ctdb);
553 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
554 talloc_free(state);
555 return;
558 static void
559 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
560 struct timeval t, void *private_data)
562 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
564 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
565 talloc_free(ctdb->release_ips_ctx);
566 ctdb->release_ips_ctx = NULL;
568 ctdb_release_all_ips(ctdb);
572 * Set up an event to drop all public ips if we remain in recovery for too
573 * long
575 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
577 if (ctdb->release_ips_ctx != NULL) {
578 talloc_free(ctdb->release_ips_ctx);
580 ctdb->release_ips_ctx = talloc_new(ctdb);
581 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
583 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
584 return 0;
588 set the recovery mode
590 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
591 struct ctdb_req_control *c,
592 TDB_DATA indata, bool *async_reply,
593 const char **errormsg)
595 uint32_t recmode = *(uint32_t *)indata.dptr;
596 int i, ret;
597 struct ctdb_set_recmode_state *state;
598 pid_t parent = getpid();
600 /* if we enter recovery but stay in recovery for too long
601 we will eventually drop all our ip addresses
603 if (recmode == CTDB_RECOVERY_NORMAL) {
604 talloc_free(ctdb->release_ips_ctx);
605 ctdb->release_ips_ctx = NULL;
606 } else {
607 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
608 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
612 if (recmode != ctdb->recovery_mode) {
613 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
614 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
617 if (recmode != CTDB_RECOVERY_NORMAL ||
618 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
619 ctdb->recovery_mode = recmode;
620 return 0;
623 /* some special handling when ending recovery mode */
625 /* force the databases to thaw */
626 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
627 if (ctdb->freeze_handles[i] != NULL) {
628 ctdb_control_thaw(ctdb, i, false);
632 state = talloc(ctdb, struct ctdb_set_recmode_state);
633 CTDB_NO_MEMORY(ctdb, state);
635 state->start_time = timeval_current();
636 state->fd[0] = -1;
637 state->fd[1] = -1;
639 /* release any deferred attach calls from clients */
640 if (recmode == CTDB_RECOVERY_NORMAL) {
641 ctdb_process_deferred_attach(ctdb);
644 if (ctdb->recovery_lock_file == NULL) {
645 /* Not using recovery lock file */
646 ctdb->recovery_mode = recmode;
647 return 0;
650 /* For the rest of what needs to be done, we need to do this in
651 a child process since
652 1, the call to ctdb_recovery_lock() can block if the cluster
653 filesystem is in the process of recovery.
655 ret = pipe(state->fd);
656 if (ret != 0) {
657 talloc_free(state);
658 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
659 return -1;
662 state->child = ctdb_fork(ctdb);
663 if (state->child == (pid_t)-1) {
664 close(state->fd[0]);
665 close(state->fd[1]);
666 talloc_free(state);
667 return -1;
670 if (state->child == 0) {
671 char cc = 0;
672 close(state->fd[0]);
674 ctdb_set_process_name("ctdb_recmode");
675 debug_extra = talloc_asprintf(NULL, "set_recmode:");
676 /* Daemon should not be able to get the recover lock,
677 * as it should be held by the recovery master */
678 if (ctdb_recovery_lock(ctdb)) {
679 DEBUG(DEBUG_ERR,
680 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
681 ctdb->recovery_lock_file));
682 ctdb_recovery_unlock(ctdb);
683 cc = 1;
686 sys_write(state->fd[1], &cc, 1);
687 /* make sure we die when our parent dies */
688 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
689 sleep(5);
690 sys_write(state->fd[1], &cc, 1);
692 _exit(0);
694 close(state->fd[1]);
695 set_close_on_exec(state->fd[0]);
697 state->fd[1] = -1;
699 talloc_set_destructor(state, set_recmode_destructor);
701 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
703 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
704 ctdb_set_recmode_timeout, state);
706 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
707 EVENT_FD_READ,
708 set_recmode_handler,
709 (void *)state);
711 if (state->fde == NULL) {
712 talloc_free(state);
713 return -1;
715 tevent_fd_set_auto_close(state->fde);
717 state->ctdb = ctdb;
718 state->recmode = recmode;
719 state->c = talloc_steal(state, c);
721 *async_reply = true;
723 return 0;
727 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
729 return ctdb->recovery_lock_fd != -1;
733 try and get the recovery lock in shared storage - should only work
734 on the recovery master recovery daemon. Anywhere else is a bug
736 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
738 struct flock lock;
740 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
741 O_RDWR|O_CREAT, 0600);
742 if (ctdb->recovery_lock_fd == -1) {
743 DEBUG(DEBUG_ERR,
744 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
745 ctdb->recovery_lock_file, strerror(errno)));
746 return false;
749 set_close_on_exec(ctdb->recovery_lock_fd);
751 lock.l_type = F_WRLCK;
752 lock.l_whence = SEEK_SET;
753 lock.l_start = 0;
754 lock.l_len = 1;
755 lock.l_pid = 0;
757 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
758 int saved_errno = errno;
759 close(ctdb->recovery_lock_fd);
760 ctdb->recovery_lock_fd = -1;
761 /* Fail silently on these errors, since they indicate
762 * lock contention, but log an error for any other
763 * failure. */
764 if (saved_errno != EACCES &&
765 saved_errno != EAGAIN) {
766 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
767 "recovery lock on '%s' - (%s)\n",
768 ctdb->recovery_lock_file,
769 strerror(saved_errno)));
771 return false;
774 return true;
777 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
779 if (ctdb->recovery_lock_fd != -1) {
780 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
781 close(ctdb->recovery_lock_fd);
782 ctdb->recovery_lock_fd = -1;
787 delete a record as part of the vacuum process
788 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
789 use non-blocking locks
791 return 0 if the record was successfully deleted (i.e. it does not exist
792 when the function returns)
793 or !0 is the record still exists in the tdb after returning.
795 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
797 TDB_DATA key, data, data2;
798 struct ctdb_ltdb_header *hdr, *hdr2;
800 /* these are really internal tdb functions - but we need them here for
801 non-blocking lock of the freelist */
802 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
803 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
806 key.dsize = rec->keylen;
807 key.dptr = &rec->data[0];
808 data.dsize = rec->datalen;
809 data.dptr = &rec->data[rec->keylen];
811 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
812 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
813 return -1;
816 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
817 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
818 return -1;
821 hdr = (struct ctdb_ltdb_header *)data.dptr;
823 /* use a non-blocking lock */
824 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
825 return -1;
828 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
829 if (data2.dptr == NULL) {
830 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
831 return 0;
834 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
835 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
836 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
837 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
839 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
840 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
842 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
843 free(data2.dptr);
844 return 0;
847 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
849 if (hdr2->rsn > hdr->rsn) {
850 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
851 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
852 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
853 free(data2.dptr);
854 return -1;
857 /* do not allow deleting record that have readonly flags set. */
858 if (hdr->flags & CTDB_REC_RO_FLAGS) {
859 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
860 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
861 free(data2.dptr);
862 return -1;
864 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
865 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
866 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
867 free(data2.dptr);
868 return -1;
871 if (hdr2->dmaster == ctdb->pnn) {
872 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
873 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
874 free(data2.dptr);
875 return -1;
878 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
879 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
880 free(data2.dptr);
881 return -1;
884 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
885 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
886 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
887 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
888 free(data2.dptr);
889 return -1;
892 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
893 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
894 free(data2.dptr);
895 return 0;
900 struct recovery_callback_state {
901 struct ctdb_req_control *c;
906 called when the 'recovered' event script has finished
908 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
910 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
912 ctdb_enable_monitoring(ctdb);
913 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
915 if (status != 0) {
916 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
917 if (status == -ETIME) {
918 ctdb_ban_self(ctdb);
922 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
923 talloc_free(state);
925 gettimeofday(&ctdb->last_recovery_finished, NULL);
927 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
928 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
933 recovery has finished
935 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
936 struct ctdb_req_control *c,
937 bool *async_reply)
939 int ret;
940 struct recovery_callback_state *state;
942 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
944 ctdb_persistent_finish_trans3_commits(ctdb);
946 state = talloc(ctdb, struct recovery_callback_state);
947 CTDB_NO_MEMORY(ctdb, state);
949 state->c = c;
951 ctdb_disable_monitoring(ctdb);
953 ret = ctdb_event_script_callback(ctdb, state,
954 ctdb_end_recovery_callback,
955 state,
956 CTDB_EVENT_RECOVERED, "%s", "");
958 if (ret != 0) {
959 ctdb_enable_monitoring(ctdb);
961 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
962 talloc_free(state);
963 return -1;
966 /* tell the control that we will be reply asynchronously */
967 state->c = talloc_steal(state, c);
968 *async_reply = true;
969 return 0;
973 called when the 'startrecovery' event script has finished
975 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
977 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
979 if (status != 0) {
980 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
983 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
984 talloc_free(state);
988 run the startrecovery eventscript
990 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
991 struct ctdb_req_control *c,
992 bool *async_reply)
994 int ret;
995 struct recovery_callback_state *state;
997 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
998 gettimeofday(&ctdb->last_recovery_started, NULL);
1000 state = talloc(ctdb, struct recovery_callback_state);
1001 CTDB_NO_MEMORY(ctdb, state);
1003 state->c = talloc_steal(state, c);
1005 ctdb_disable_monitoring(ctdb);
1007 ret = ctdb_event_script_callback(ctdb, state,
1008 ctdb_start_recovery_callback,
1009 state,
1010 CTDB_EVENT_START_RECOVERY,
1011 "%s", "");
1013 if (ret != 0) {
1014 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1015 talloc_free(state);
1016 return -1;
1019 /* tell the control that we will be reply asynchronously */
1020 *async_reply = true;
1021 return 0;
1025 try to delete all these records as part of the vacuuming process
1026 and return the records we failed to delete
1028 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1030 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1031 struct ctdb_db_context *ctdb_db;
1032 int i;
1033 struct ctdb_rec_data *rec;
1034 struct ctdb_marshall_buffer *records;
1036 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1037 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1038 return -1;
1041 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1042 if (!ctdb_db) {
1043 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1044 return -1;
1048 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1049 reply->count, reply->db_id));
1052 /* create a blob to send back the records we couldnt delete */
1053 records = (struct ctdb_marshall_buffer *)
1054 talloc_zero_size(outdata,
1055 offsetof(struct ctdb_marshall_buffer, data));
1056 if (records == NULL) {
1057 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1058 return -1;
1060 records->db_id = ctdb_db->db_id;
1063 rec = (struct ctdb_rec_data *)&reply->data[0];
1064 for (i=0;i<reply->count;i++) {
1065 TDB_DATA key, data;
1067 key.dptr = &rec->data[0];
1068 key.dsize = rec->keylen;
1069 data.dptr = &rec->data[key.dsize];
1070 data.dsize = rec->datalen;
1072 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1073 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1074 return -1;
1077 /* If we cant delete the record we must add it to the reply
1078 so the lmaster knows it may not purge this record
1080 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1081 size_t old_size;
1082 struct ctdb_ltdb_header *hdr;
1084 hdr = (struct ctdb_ltdb_header *)data.dptr;
1085 data.dptr += sizeof(*hdr);
1086 data.dsize -= sizeof(*hdr);
1088 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1090 old_size = talloc_get_size(records);
1091 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1092 if (records == NULL) {
1093 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1094 return -1;
1096 records->count++;
1097 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1100 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1104 *outdata = ctdb_marshall_finish(records);
1106 return 0;
1110 * Store a record as part of the vacuum process:
1111 * This is called from the RECEIVE_RECORD control which
1112 * the lmaster uses to send the current empty copy
1113 * to all nodes for storing, before it lets the other
1114 * nodes delete the records in the second phase with
1115 * the TRY_DELETE_RECORDS control.
1117 * Only store if we are not lmaster or dmaster, and our
1118 * rsn is <= the provided rsn. Use non-blocking locks.
1120 * return 0 if the record was successfully stored.
1121 * return !0 if the record still exists in the tdb after returning.
1123 static int store_tdb_record(struct ctdb_context *ctdb,
1124 struct ctdb_db_context *ctdb_db,
1125 struct ctdb_rec_data *rec)
1127 TDB_DATA key, data, data2;
1128 struct ctdb_ltdb_header *hdr, *hdr2;
1129 int ret;
1131 key.dsize = rec->keylen;
1132 key.dptr = &rec->data[0];
1133 data.dsize = rec->datalen;
1134 data.dptr = &rec->data[rec->keylen];
1136 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1137 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1138 "where we are lmaster\n"));
1139 return -1;
1142 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1143 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1144 return -1;
1147 hdr = (struct ctdb_ltdb_header *)data.dptr;
1149 /* use a non-blocking lock */
1150 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1151 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1152 return -1;
1155 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1156 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1157 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1158 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1159 ret = -1;
1160 goto done;
1162 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1163 ret = 0;
1164 goto done;
1167 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1169 if (hdr2->rsn > hdr->rsn) {
1170 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1171 "rsn=%llu - called with rsn=%llu\n",
1172 (unsigned long long)hdr2->rsn,
1173 (unsigned long long)hdr->rsn));
1174 ret = -1;
1175 goto done;
1178 /* do not allow vacuuming of records that have readonly flags set. */
1179 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1180 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1181 "flags set\n"));
1182 ret = -1;
1183 goto done;
1185 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1186 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1187 "flags set\n"));
1188 ret = -1;
1189 goto done;
1192 if (hdr2->dmaster == ctdb->pnn) {
1193 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1194 "where we are the dmaster\n"));
1195 ret = -1;
1196 goto done;
1199 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1200 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1201 ret = -1;
1202 goto done;
1205 ret = 0;
1207 done:
1208 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1209 free(data2.dptr);
1210 return ret;
1216 * Try to store all these records as part of the vacuuming process
1217 * and return the records we failed to store.
1219 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1220 TDB_DATA indata, TDB_DATA *outdata)
1222 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1223 struct ctdb_db_context *ctdb_db;
1224 int i;
1225 struct ctdb_rec_data *rec;
1226 struct ctdb_marshall_buffer *records;
1228 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1229 DEBUG(DEBUG_ERR,
1230 (__location__ " invalid data in receive_records\n"));
1231 return -1;
1234 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1235 if (!ctdb_db) {
1236 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1237 reply->db_id));
1238 return -1;
1241 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1242 "dbid 0x%x\n", reply->count, reply->db_id));
1244 /* create a blob to send back the records we could not store */
1245 records = (struct ctdb_marshall_buffer *)
1246 talloc_zero_size(outdata,
1247 offsetof(struct ctdb_marshall_buffer, data));
1248 if (records == NULL) {
1249 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1250 return -1;
1252 records->db_id = ctdb_db->db_id;
1254 rec = (struct ctdb_rec_data *)&reply->data[0];
1255 for (i=0; i<reply->count; i++) {
1256 TDB_DATA key, data;
1258 key.dptr = &rec->data[0];
1259 key.dsize = rec->keylen;
1260 data.dptr = &rec->data[key.dsize];
1261 data.dsize = rec->datalen;
1263 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1264 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1265 "in indata\n"));
1266 return -1;
1270 * If we can not store the record we must add it to the reply
1271 * so the lmaster knows it may not purge this record.
1273 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1274 size_t old_size;
1275 struct ctdb_ltdb_header *hdr;
1277 hdr = (struct ctdb_ltdb_header *)data.dptr;
1278 data.dptr += sizeof(*hdr);
1279 data.dsize -= sizeof(*hdr);
1281 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1282 "record with hash 0x%08x in vacuum "
1283 "via RECEIVE_RECORDS\n",
1284 ctdb_hash(&key)));
1286 old_size = talloc_get_size(records);
1287 records = talloc_realloc_size(outdata, records,
1288 old_size + rec->length);
1289 if (records == NULL) {
1290 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1291 "expand\n"));
1292 return -1;
1294 records->count++;
1295 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1298 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1301 *outdata = ctdb_marshall_finish(records);
1303 return 0;
1308 report capabilities
1310 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1312 uint32_t *capabilities = NULL;
1314 capabilities = talloc(outdata, uint32_t);
1315 CTDB_NO_MEMORY(ctdb, capabilities);
1316 *capabilities = ctdb->capabilities;
1318 outdata->dsize = sizeof(uint32_t);
1319 outdata->dptr = (uint8_t *)capabilities;
1321 return 0;
1324 /* The recovery daemon will ping us at regular intervals.
1325 If we havent been pinged for a while we assume the recovery
1326 daemon is inoperable and we restart.
1328 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1330 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1331 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1333 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1335 if (*count < ctdb->tunable.recd_ping_failcount) {
1336 (*count)++;
1337 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1338 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1339 ctdb_recd_ping_timeout, ctdb);
1340 return;
1343 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1345 ctdb_stop_recoverd(ctdb);
1346 ctdb_start_recoverd(ctdb);
1349 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1351 talloc_free(ctdb->recd_ping_count);
1353 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1354 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1356 if (ctdb->tunable.recd_ping_timeout != 0) {
1357 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1358 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1359 ctdb_recd_ping_timeout, ctdb);
1362 return 0;
1367 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1369 uint32_t new_recmaster;
1371 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1372 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1374 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1375 DEBUG(DEBUG_NOTICE,
1376 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1379 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1380 DEBUG(DEBUG_NOTICE,
1381 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1384 ctdb->recovery_master = new_recmaster;
1385 return 0;
1389 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1391 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1392 ctdb_disable_monitoring(ctdb);
1393 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1395 return 0;
1398 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1400 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1401 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1403 return 0;