s3:script/tests: add a test for VSS write behaviour
[Samba.git] / ctdb / server / ctdb_recover.c
blobf05052e8466866acbab2b25a79d62d5d18b23971
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 #include "ctdb_cluster_mutex.h"
45 int
46 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
48 struct ctdb_vnn_map_wire *map;
49 size_t len;
51 CHECK_CONTROL_DATA_SIZE(0);
53 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
54 map = talloc_size(outdata, len);
55 CTDB_NO_MEMORY(ctdb, map);
57 map->generation = ctdb->vnn_map->generation;
58 map->size = ctdb->vnn_map->size;
59 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
61 outdata->dsize = len;
62 outdata->dptr = (uint8_t *)map;
64 return 0;
67 int
68 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
70 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
72 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
73 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
74 return -1;
77 talloc_free(ctdb->vnn_map);
79 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
80 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
82 ctdb->vnn_map->generation = map->generation;
83 ctdb->vnn_map->size = map->size;
84 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
85 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
87 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
89 return 0;
92 int
93 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
95 uint32_t i, len;
96 struct ctdb_db_context *ctdb_db;
97 struct ctdb_dbid_map_old *dbid_map;
99 CHECK_CONTROL_DATA_SIZE(0);
101 len = 0;
102 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
103 len++;
107 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
108 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
109 if (!outdata->dptr) {
110 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
111 exit(1);
114 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
115 dbid_map->num = len;
116 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
117 dbid_map->dbs[i].db_id = ctdb_db->db_id;
118 dbid_map->dbs[i].flags = ctdb_db->db_flags;
121 return 0;
125 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
127 CHECK_CONTROL_DATA_SIZE(0);
129 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
130 ctdb->num_nodes,
131 outdata);
132 if (outdata->dptr == NULL) {
133 return -1;
136 outdata->dsize = talloc_get_size(outdata->dptr);
138 return 0;
142 reload the nodes file
145 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
147 int i, num_nodes;
148 TALLOC_CTX *tmp_ctx;
149 struct ctdb_node **nodes;
151 tmp_ctx = talloc_new(ctdb);
153 /* steal the old nodes file for a while */
154 talloc_steal(tmp_ctx, ctdb->nodes);
155 nodes = ctdb->nodes;
156 ctdb->nodes = NULL;
157 num_nodes = ctdb->num_nodes;
158 ctdb->num_nodes = 0;
160 /* load the new nodes file */
161 ctdb_load_nodes_file(ctdb);
163 for (i=0; i<ctdb->num_nodes; i++) {
164 /* keep any identical pre-existing nodes and connections */
165 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
166 talloc_free(ctdb->nodes[i]);
167 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
168 continue;
171 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
172 continue;
175 /* any new or different nodes must be added */
176 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
177 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
178 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
180 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
181 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
182 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
186 /* tell the recovery daemon to reaload the nodes file too */
187 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
189 talloc_free(tmp_ctx);
191 return 0;
195 a traverse function for pulling all relevent records from pulldb
197 struct pulldb_data {
198 struct ctdb_context *ctdb;
199 struct ctdb_db_context *ctdb_db;
200 struct ctdb_marshall_buffer *pulldata;
201 uint32_t len;
202 uint32_t allocated_len;
203 bool failed;
206 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
208 struct pulldb_data *params = (struct pulldb_data *)p;
209 struct ctdb_rec_data_old *rec;
210 struct ctdb_context *ctdb = params->ctdb;
211 struct ctdb_db_context *ctdb_db = params->ctdb_db;
213 /* add the record to the blob */
214 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
215 if (rec == NULL) {
216 params->failed = true;
217 return -1;
219 if (params->len + rec->length >= params->allocated_len) {
220 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
221 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
223 if (params->pulldata == NULL) {
224 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
225 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
227 params->pulldata->count++;
228 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
229 params->len += rec->length;
231 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
232 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
235 talloc_free(rec);
237 return 0;
241 pull a bunch of records from a ltdb, filtering by lmaster
243 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
245 struct ctdb_pulldb *pull;
246 struct ctdb_db_context *ctdb_db;
247 struct pulldb_data params;
248 struct ctdb_marshall_buffer *reply;
250 pull = (struct ctdb_pulldb *)indata.dptr;
252 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
253 if (!ctdb_db) {
254 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
255 return -1;
258 if (!ctdb_db_frozen(ctdb_db)) {
259 DEBUG(DEBUG_ERR,
260 ("rejecting ctdb_control_pull_db when not frozen\n"));
261 return -1;
264 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
265 CTDB_NO_MEMORY(ctdb, reply);
267 reply->db_id = pull->db_id;
269 params.ctdb = ctdb;
270 params.ctdb_db = ctdb_db;
271 params.pulldata = reply;
272 params.len = offsetof(struct ctdb_marshall_buffer, data);
273 params.allocated_len = params.len;
274 params.failed = false;
276 if (ctdb_db->unhealthy_reason) {
277 /* this is just a warning, as the tdb should be empty anyway */
278 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
279 ctdb_db->db_name, ctdb_db->unhealthy_reason));
282 /* If the records are invalid, we are done */
283 if (ctdb_db->invalid_records) {
284 goto done;
287 if (ctdb_lockdb_mark(ctdb_db) != 0) {
288 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
289 return -1;
292 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
293 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
294 ctdb_lockdb_unmark(ctdb_db);
295 talloc_free(params.pulldata);
296 return -1;
299 ctdb_lockdb_unmark(ctdb_db);
301 done:
302 outdata->dptr = (uint8_t *)params.pulldata;
303 outdata->dsize = params.len;
305 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
306 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
308 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
309 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
313 return 0;
316 struct db_pull_state {
317 struct ctdb_context *ctdb;
318 struct ctdb_db_context *ctdb_db;
319 struct ctdb_marshall_buffer *recs;
320 uint32_t pnn;
321 uint64_t srvid;
322 uint32_t num_records;
325 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
326 TDB_DATA data, void *private_data)
328 struct db_pull_state *state = (struct db_pull_state *)private_data;
329 struct ctdb_marshall_buffer *recs;
331 recs = ctdb_marshall_add(state->ctdb, state->recs,
332 state->ctdb_db->db_id, 0, key, NULL, data);
333 if (recs == NULL) {
334 TALLOC_FREE(state->recs);
335 return -1;
337 state->recs = recs;
339 if (talloc_get_size(state->recs) >=
340 state->ctdb->tunable.rec_buffer_size_limit) {
341 TDB_DATA buffer;
342 int ret;
344 buffer = ctdb_marshall_finish(state->recs);
345 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
346 state->srvid, buffer);
347 if (ret != 0) {
348 TALLOC_FREE(state->recs);
349 return -1;
352 state->num_records += state->recs->count;
353 TALLOC_FREE(state->recs);
356 return 0;
359 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
360 struct ctdb_req_control_old *c,
361 TDB_DATA indata, TDB_DATA *outdata)
363 struct ctdb_pulldb_ext *pulldb_ext;
364 struct ctdb_db_context *ctdb_db;
365 struct db_pull_state state;
366 int ret;
368 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
370 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
371 if (ctdb_db == NULL) {
372 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
373 pulldb_ext->db_id));
374 return -1;
377 if (!ctdb_db_frozen(ctdb_db)) {
378 DEBUG(DEBUG_ERR,
379 ("rejecting ctdb_control_pull_db when not frozen\n"));
380 return -1;
383 if (ctdb_db->unhealthy_reason) {
384 /* this is just a warning, as the tdb should be empty anyway */
385 DEBUG(DEBUG_WARNING,
386 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
387 ctdb_db->db_name, ctdb_db->unhealthy_reason));
390 state.ctdb = ctdb;
391 state.ctdb_db = ctdb_db;
392 state.recs = NULL;
393 state.pnn = c->hdr.srcnode;
394 state.srvid = pulldb_ext->srvid;
395 state.num_records = 0;
397 /* If the records are invalid, we are done */
398 if (ctdb_db->invalid_records) {
399 goto done;
402 if (ctdb_lockdb_mark(ctdb_db) != 0) {
403 DEBUG(DEBUG_ERR,
404 (__location__ " Failed to get lock on entire db - failing\n"));
405 return -1;
408 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
409 if (ret == -1) {
410 DEBUG(DEBUG_ERR,
411 (__location__ " Failed to get traverse db '%s'\n",
412 ctdb_db->db_name));
413 ctdb_lockdb_unmark(ctdb_db);
414 return -1;
417 /* Last few records */
418 if (state.recs != NULL) {
419 TDB_DATA buffer;
421 buffer = ctdb_marshall_finish(state.recs);
422 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
423 state.srvid, buffer);
424 if (ret != 0) {
425 TALLOC_FREE(state.recs);
426 ctdb_lockdb_unmark(ctdb_db);
427 return -1;
430 state.num_records += state.recs->count;
431 TALLOC_FREE(state.recs);
434 ctdb_lockdb_unmark(ctdb_db);
436 done:
437 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
438 if (outdata->dptr == NULL) {
439 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
440 return -1;
443 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
444 outdata->dsize = sizeof(uint32_t);
446 return 0;
450 push a bunch of records into a ltdb, filtering by rsn
452 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
454 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
455 struct ctdb_db_context *ctdb_db;
456 int i, ret;
457 struct ctdb_rec_data_old *rec;
459 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
460 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
461 return -1;
464 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
465 if (!ctdb_db) {
466 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
467 return -1;
470 if (!ctdb_db_frozen(ctdb_db)) {
471 DEBUG(DEBUG_ERR,
472 ("rejecting ctdb_control_push_db when not frozen\n"));
473 return -1;
476 if (ctdb_lockdb_mark(ctdb_db) != 0) {
477 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
478 return -1;
481 rec = (struct ctdb_rec_data_old *)&reply->data[0];
483 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
484 reply->count, reply->db_id));
486 for (i=0;i<reply->count;i++) {
487 TDB_DATA key, data;
488 struct ctdb_ltdb_header *hdr;
490 key.dptr = &rec->data[0];
491 key.dsize = rec->keylen;
492 data.dptr = &rec->data[key.dsize];
493 data.dsize = rec->datalen;
495 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
496 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
497 goto failed;
499 hdr = (struct ctdb_ltdb_header *)data.dptr;
500 /* strip off any read only record flags. All readonly records
501 are revoked implicitely by a recovery
503 hdr->flags &= ~CTDB_REC_RO_FLAGS;
505 data.dptr += sizeof(*hdr);
506 data.dsize -= sizeof(*hdr);
508 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
509 if (ret != 0) {
510 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
511 goto failed;
514 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
517 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
518 reply->count, reply->db_id));
520 if (ctdb_db_readonly(ctdb_db)) {
521 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
522 ctdb_db->db_id));
523 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
524 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
525 tdb_close(ctdb_db->rottdb);
526 ctdb_db->rottdb = NULL;
527 ctdb_db_reset_readonly(ctdb_db);
529 while (ctdb_db->revokechild_active != NULL) {
530 talloc_free(ctdb_db->revokechild_active);
534 ctdb_lockdb_unmark(ctdb_db);
535 return 0;
537 failed:
538 ctdb_lockdb_unmark(ctdb_db);
539 return -1;
542 struct db_push_state {
543 struct ctdb_context *ctdb;
544 struct ctdb_db_context *ctdb_db;
545 uint64_t srvid;
546 uint32_t num_records;
547 bool failed;
550 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
551 void *private_data)
553 struct db_push_state *state = talloc_get_type(
554 private_data, struct db_push_state);
555 struct ctdb_marshall_buffer *recs;
556 struct ctdb_rec_data_old *rec;
557 int i, ret;
559 if (state->failed) {
560 return;
563 recs = (struct ctdb_marshall_buffer *)indata.dptr;
564 rec = (struct ctdb_rec_data_old *)&recs->data[0];
566 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
567 recs->count, recs->db_id));
569 for (i=0; i<recs->count; i++) {
570 TDB_DATA key, data;
571 struct ctdb_ltdb_header *hdr;
573 key.dptr = &rec->data[0];
574 key.dsize = rec->keylen;
575 data.dptr = &rec->data[key.dsize];
576 data.dsize = rec->datalen;
578 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
579 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
580 goto failed;
583 hdr = (struct ctdb_ltdb_header *)data.dptr;
584 /* Strip off any read only record flags.
585 * All readonly records are revoked implicitely by a recovery.
587 hdr->flags &= ~CTDB_REC_RO_FLAGS;
589 data.dptr += sizeof(*hdr);
590 data.dsize -= sizeof(*hdr);
592 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
593 if (ret != 0) {
594 DEBUG(DEBUG_ERR,
595 (__location__ " Unable to store record\n"));
596 goto failed;
599 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
602 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
603 recs->count, recs->db_id));
605 state->num_records += recs->count;
606 return;
608 failed:
609 state->failed = true;
612 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
614 struct ctdb_pulldb_ext *pulldb_ext;
615 struct ctdb_db_context *ctdb_db;
616 struct db_push_state *state;
617 int ret;
619 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
621 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
622 if (ctdb_db == NULL) {
623 DEBUG(DEBUG_ERR,
624 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
625 return -1;
628 if (!ctdb_db_frozen(ctdb_db)) {
629 DEBUG(DEBUG_ERR,
630 ("rejecting ctdb_control_db_push_start when not frozen\n"));
631 return -1;
634 if (ctdb_db->push_started) {
635 DEBUG(DEBUG_WARNING,
636 (__location__ " DB push already started for %s\n",
637 ctdb_db->db_name));
639 /* De-register old state */
640 state = (struct db_push_state *)ctdb_db->push_state;
641 if (state != NULL) {
642 srvid_deregister(ctdb->srv, state->srvid, state);
643 talloc_free(state);
644 ctdb_db->push_state = NULL;
648 state = talloc_zero(ctdb_db, struct db_push_state);
649 if (state == NULL) {
650 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
651 return -1;
654 state->ctdb = ctdb;
655 state->ctdb_db = ctdb_db;
656 state->srvid = pulldb_ext->srvid;
657 state->failed = false;
659 ret = srvid_register(ctdb->srv, state, state->srvid,
660 db_push_msg_handler, state);
661 if (ret != 0) {
662 DEBUG(DEBUG_ERR,
663 (__location__ " Failed to register srvid for db push\n"));
664 talloc_free(state);
665 return -1;
668 if (ctdb_lockdb_mark(ctdb_db) != 0) {
669 DEBUG(DEBUG_ERR,
670 (__location__ " Failed to get lock on entire db - failing\n"));
671 srvid_deregister(ctdb->srv, state->srvid, state);
672 talloc_free(state);
673 return -1;
676 ctdb_db->push_started = true;
677 ctdb_db->push_state = state;
679 return 0;
682 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
683 TDB_DATA indata, TDB_DATA *outdata)
685 uint32_t db_id;
686 struct ctdb_db_context *ctdb_db;
687 struct db_push_state *state;
689 db_id = *(uint32_t *)indata.dptr;
691 ctdb_db = find_ctdb_db(ctdb, db_id);
692 if (ctdb_db == NULL) {
693 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
694 return -1;
697 if (!ctdb_db_frozen(ctdb_db)) {
698 DEBUG(DEBUG_ERR,
699 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
700 return -1;
703 if (!ctdb_db->push_started) {
704 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
705 return -1;
708 if (ctdb_db_readonly(ctdb_db)) {
709 DEBUG(DEBUG_ERR,
710 ("Clearing the tracking database for dbid 0x%x\n",
711 ctdb_db->db_id));
712 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
713 DEBUG(DEBUG_ERR,
714 ("Failed to wipe tracking database for 0x%x."
715 " Dropping read-only delegation support\n",
716 ctdb_db->db_id));
717 tdb_close(ctdb_db->rottdb);
718 ctdb_db->rottdb = NULL;
719 ctdb_db_reset_readonly(ctdb_db);
722 while (ctdb_db->revokechild_active != NULL) {
723 talloc_free(ctdb_db->revokechild_active);
727 ctdb_lockdb_unmark(ctdb_db);
729 state = (struct db_push_state *)ctdb_db->push_state;
730 if (state == NULL) {
731 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
732 return -1;
735 srvid_deregister(ctdb->srv, state->srvid, state);
737 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
738 if (outdata->dptr == NULL) {
739 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
740 talloc_free(state);
741 ctdb_db->push_state = NULL;
742 return -1;
745 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
746 outdata->dsize = sizeof(uint32_t);
748 talloc_free(state);
749 ctdb_db->push_started = false;
750 ctdb_db->push_state = NULL;
752 return 0;
755 struct set_recmode_state {
756 struct ctdb_context *ctdb;
757 struct ctdb_req_control_old *c;
760 static void set_recmode_handler(char status,
761 double latency,
762 void *private_data)
764 struct set_recmode_state *state = talloc_get_type_abort(
765 private_data, struct set_recmode_state);
766 int s = 0;
767 const char *err = NULL;
769 switch (status) {
770 case '0':
771 /* Mutex taken */
772 DEBUG(DEBUG_ERR,
773 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
774 state->ctdb->recovery_lock));
775 s = -1;
776 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
777 break;
779 case '1':
780 /* Contention */
781 DEBUG(DEBUG_DEBUG, (__location__ " Recovery lock check OK\n"));
782 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
783 ctdb_process_deferred_attach(state->ctdb);
785 s = 0;
787 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock",
788 reclock.ctdbd, latency);
789 break;
791 case '2':
792 /* Timeout. Consider this a success, not a failure,
793 * as we failed to set the recovery lock which is what
794 * we wanted. This can be caused by the cluster
795 * filesystem being very slow to arbitrate locks
796 * immediately after a node failure. */
797 DEBUG(DEBUG_WARNING,
798 (__location__
799 "Time out getting recovery lock, allowing recmode set anyway\n"));
800 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
801 ctdb_process_deferred_attach(state->ctdb);
803 s = 0;
804 break;
806 default:
807 DEBUG(DEBUG_ERR,
808 ("Unexpected error when testing recovery lock\n"));
809 s = -1;
810 err = "Unexpected error when testing recovery lock";
813 ctdb_request_control_reply(state->ctdb, state->c, NULL, s, err);
814 talloc_free(state);
817 static void
818 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
819 struct timeval t, void *private_data)
821 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
823 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
824 talloc_free(ctdb->release_ips_ctx);
825 ctdb->release_ips_ctx = NULL;
827 ctdb_release_all_ips(ctdb);
831 * Set up an event to drop all public ips if we remain in recovery for too
832 * long
834 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
836 if (ctdb->release_ips_ctx != NULL) {
837 talloc_free(ctdb->release_ips_ctx);
839 ctdb->release_ips_ctx = talloc_new(ctdb);
840 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
842 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
843 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
844 ctdb_drop_all_ips_event, ctdb);
845 return 0;
849 set the recovery mode
851 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
852 struct ctdb_req_control_old *c,
853 TDB_DATA indata, bool *async_reply,
854 const char **errormsg)
856 uint32_t recmode = *(uint32_t *)indata.dptr;
857 struct ctdb_db_context *ctdb_db;
858 struct set_recmode_state *state;
859 struct ctdb_cluster_mutex_handle *h;
861 if (recmode == ctdb->recovery_mode) {
862 D_INFO("Recovery mode already set to %s\n",
863 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
864 return 0;
867 D_NOTICE("Recovery mode set to %s\n",
868 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
870 /* if we enter recovery but stay in recovery for too long
871 we will eventually drop all our ip addresses
873 if (recmode == CTDB_RECOVERY_ACTIVE) {
874 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
875 D_ERR("Failed to set up deferred drop all ips\n");
878 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
879 return 0;
882 /* From this point: recmode == CTDB_RECOVERY_NORMAL
884 * Therefore, what follows is special handling when setting
885 * recovery mode back to normal */
887 TALLOC_FREE(ctdb->release_ips_ctx);
889 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
890 if (ctdb_db->generation != ctdb->vnn_map->generation) {
891 DEBUG(DEBUG_ERR,
892 ("Inconsistent DB generation %u for %s\n",
893 ctdb_db->generation, ctdb_db->db_name));
894 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
895 return -1;
899 /* force the databases to thaw */
900 if (ctdb_db_all_frozen(ctdb)) {
901 ctdb_control_thaw(ctdb, false);
904 if (ctdb->recovery_lock == NULL) {
905 /* Not using recovery lock file */
906 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
907 ctdb_process_deferred_attach(ctdb);
908 return 0;
911 state = talloc_zero(ctdb, struct set_recmode_state);
912 if (state == NULL) {
913 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
914 return -1;
916 state->ctdb = ctdb;
917 state->c = NULL;
919 h = ctdb_cluster_mutex(state, ctdb, ctdb->recovery_lock, 5,
920 set_recmode_handler, state, NULL, NULL);
921 if (h == NULL) {
922 talloc_free(state);
923 return -1;
926 state->c = talloc_steal(state, c);
927 *async_reply = true;
929 return 0;
934 delete a record as part of the vacuum process
935 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
936 use non-blocking locks
938 return 0 if the record was successfully deleted (i.e. it does not exist
939 when the function returns)
940 or !0 is the record still exists in the tdb after returning.
942 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
944 TDB_DATA key, data, data2;
945 struct ctdb_ltdb_header *hdr, *hdr2;
947 /* these are really internal tdb functions - but we need them here for
948 non-blocking lock of the freelist */
949 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
950 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
953 key.dsize = rec->keylen;
954 key.dptr = &rec->data[0];
955 data.dsize = rec->datalen;
956 data.dptr = &rec->data[rec->keylen];
958 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
959 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
960 return -1;
963 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
964 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
965 return -1;
968 hdr = (struct ctdb_ltdb_header *)data.dptr;
970 /* use a non-blocking lock */
971 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
972 return -1;
975 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
976 if (data2.dptr == NULL) {
977 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
978 return 0;
981 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
982 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
983 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
984 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
986 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
987 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
989 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
990 free(data2.dptr);
991 return 0;
994 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
996 if (hdr2->rsn > hdr->rsn) {
997 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
998 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
999 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
1000 free(data2.dptr);
1001 return -1;
1004 /* do not allow deleting record that have readonly flags set. */
1005 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1006 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1007 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1008 free(data2.dptr);
1009 return -1;
1011 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1012 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1013 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1014 free(data2.dptr);
1015 return -1;
1018 if (hdr2->dmaster == ctdb->pnn) {
1019 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1020 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
1021 free(data2.dptr);
1022 return -1;
1025 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1026 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1027 free(data2.dptr);
1028 return -1;
1031 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1032 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1033 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1034 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
1035 free(data2.dptr);
1036 return -1;
1039 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1040 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1041 free(data2.dptr);
1042 return 0;
1047 struct recovery_callback_state {
1048 struct ctdb_req_control_old *c;
1053 called when the 'recovered' event script has finished
1055 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1057 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1059 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1061 if (status != 0) {
1062 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1063 if (status == -ETIMEDOUT) {
1064 ctdb_ban_self(ctdb);
1068 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1069 talloc_free(state);
1071 gettimeofday(&ctdb->last_recovery_finished, NULL);
1073 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1074 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1079 recovery has finished
1081 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1082 struct ctdb_req_control_old *c,
1083 bool *async_reply)
1085 int ret;
1086 struct recovery_callback_state *state;
1088 DEBUG(DEBUG_ERR,("Recovery has finished\n"));
1090 ctdb_persistent_finish_trans3_commits(ctdb);
1092 state = talloc(ctdb, struct recovery_callback_state);
1093 CTDB_NO_MEMORY(ctdb, state);
1095 state->c = c;
1097 ret = ctdb_event_script_callback(ctdb, state,
1098 ctdb_end_recovery_callback,
1099 state,
1100 CTDB_EVENT_RECOVERED, "%s", "");
1102 if (ret != 0) {
1103 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1104 talloc_free(state);
1105 return -1;
1108 /* tell the control that we will be reply asynchronously */
1109 state->c = talloc_steal(state, c);
1110 *async_reply = true;
1111 return 0;
1115 called when the 'startrecovery' event script has finished
1117 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1119 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1121 if (status != 0) {
1122 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1125 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1126 talloc_free(state);
1129 static void run_start_recovery_event(struct ctdb_context *ctdb,
1130 struct recovery_callback_state *state)
1132 int ret;
1134 ret = ctdb_event_script_callback(ctdb, state,
1135 ctdb_start_recovery_callback,
1136 state,
1137 CTDB_EVENT_START_RECOVERY,
1138 "%s", "");
1140 if (ret != 0) {
1141 DEBUG(DEBUG_ERR,("Unable to run startrecovery event\n"));
1142 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1143 talloc_free(state);
1144 return;
1147 return;
1150 static bool reclock_strings_equal(const char *a, const char *b)
1152 return (a == NULL && b == NULL) ||
1153 (a != NULL && b != NULL && strcmp(a, b) == 0);
1156 static void start_recovery_reclock_callback(struct ctdb_context *ctdb,
1157 int32_t status,
1158 TDB_DATA data,
1159 const char *errormsg,
1160 void *private_data)
1162 struct recovery_callback_state *state = talloc_get_type_abort(
1163 private_data, struct recovery_callback_state);
1164 const char *local = ctdb->recovery_lock;
1165 const char *remote = NULL;
1167 if (status != 0) {
1168 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1169 ctdb_request_control_reply(ctdb, state->c, NULL,
1170 status, errormsg);
1171 talloc_free(state);
1172 return;
1175 /* Check reclock consistency */
1176 if (data.dsize > 0) {
1177 /* Ensure NUL-termination */
1178 data.dptr[data.dsize-1] = '\0';
1179 remote = (const char *)data.dptr;
1181 if (! reclock_strings_equal(local, remote)) {
1182 /* Inconsistent */
1183 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1184 DEBUG(DEBUG_ERR,
1185 ("Recovery lock configuration inconsistent: "
1186 "recmaster has %s, this node has %s, shutting down\n",
1187 remote == NULL ? "NULL" : remote,
1188 local == NULL ? "NULL" : local));
1189 talloc_free(state);
1190 ctdb_shutdown_sequence(ctdb, 1);
1192 DEBUG(DEBUG_INFO,
1193 ("Recovery lock consistency check successful\n"));
1195 run_start_recovery_event(ctdb, state);
1198 /* Check recovery lock consistency and run eventscripts for the
1199 * "startrecovery" event */
1200 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1201 struct ctdb_req_control_old *c,
1202 bool *async_reply)
1204 int ret;
1205 struct recovery_callback_state *state;
1206 uint32_t recmaster = c->hdr.srcnode;
1208 DEBUG(DEBUG_ERR, ("Recovery has started\n"));
1209 gettimeofday(&ctdb->last_recovery_started, NULL);
1211 state = talloc(ctdb, struct recovery_callback_state);
1212 CTDB_NO_MEMORY(ctdb, state);
1214 state->c = c;
1216 /* Although the recovery master sent this node a start
1217 * recovery control, this node might still think the recovery
1218 * master is disconnected. In this case defer the recovery
1219 * lock consistency check. */
1220 if (ctdb->nodes[recmaster]->flags & NODE_FLAGS_DISCONNECTED) {
1221 run_start_recovery_event(ctdb, state);
1222 } else {
1223 /* Ask the recovery master about its reclock setting */
1224 ret = ctdb_daemon_send_control(ctdb,
1225 recmaster,
1227 CTDB_CONTROL_GET_RECLOCK_FILE,
1228 0, 0,
1229 tdb_null,
1230 start_recovery_reclock_callback,
1231 state);
1233 if (ret != 0) {
1234 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1235 talloc_free(state);
1236 return -1;
1240 /* tell the control that we will be reply asynchronously */
1241 state->c = talloc_steal(state, c);
1242 *async_reply = true;
1244 return 0;
1248 try to delete all these records as part of the vacuuming process
1249 and return the records we failed to delete
1251 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1253 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1254 struct ctdb_db_context *ctdb_db;
1255 int i;
1256 struct ctdb_rec_data_old *rec;
1257 struct ctdb_marshall_buffer *records;
1259 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1260 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1261 return -1;
1264 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1265 if (!ctdb_db) {
1266 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1267 return -1;
1271 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1272 reply->count, reply->db_id));
1275 /* create a blob to send back the records we couldnt delete */
1276 records = (struct ctdb_marshall_buffer *)
1277 talloc_zero_size(outdata,
1278 offsetof(struct ctdb_marshall_buffer, data));
1279 if (records == NULL) {
1280 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1281 return -1;
1283 records->db_id = ctdb_db->db_id;
1286 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1287 for (i=0;i<reply->count;i++) {
1288 TDB_DATA key, data;
1290 key.dptr = &rec->data[0];
1291 key.dsize = rec->keylen;
1292 data.dptr = &rec->data[key.dsize];
1293 data.dsize = rec->datalen;
1295 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1296 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1297 talloc_free(records);
1298 return -1;
1301 /* If we cant delete the record we must add it to the reply
1302 so the lmaster knows it may not purge this record
1304 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1305 size_t old_size;
1306 struct ctdb_ltdb_header *hdr;
1308 hdr = (struct ctdb_ltdb_header *)data.dptr;
1309 data.dptr += sizeof(*hdr);
1310 data.dsize -= sizeof(*hdr);
1312 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1314 old_size = talloc_get_size(records);
1315 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1316 if (records == NULL) {
1317 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1318 return -1;
1320 records->count++;
1321 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1324 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1328 *outdata = ctdb_marshall_finish(records);
1330 return 0;
1334 report capabilities
1336 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1338 uint32_t *capabilities = NULL;
1340 capabilities = talloc(outdata, uint32_t);
1341 CTDB_NO_MEMORY(ctdb, capabilities);
1342 *capabilities = ctdb->capabilities;
1344 outdata->dsize = sizeof(uint32_t);
1345 outdata->dptr = (uint8_t *)capabilities;
1347 return 0;
1350 /* The recovery daemon will ping us at regular intervals.
1351 If we havent been pinged for a while we assume the recovery
1352 daemon is inoperable and we restart.
1354 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1355 struct tevent_timer *te,
1356 struct timeval t, void *p)
1358 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1359 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1361 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1363 if (*count < ctdb->tunable.recd_ping_failcount) {
1364 (*count)++;
1365 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1366 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1367 ctdb_recd_ping_timeout, ctdb);
1368 return;
1371 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1373 ctdb_stop_recoverd(ctdb);
1374 ctdb_start_recoverd(ctdb);
1377 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1379 talloc_free(ctdb->recd_ping_count);
1381 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1382 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1384 if (ctdb->tunable.recd_ping_timeout != 0) {
1385 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1386 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1387 ctdb_recd_ping_timeout, ctdb);
1390 return 0;
1395 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1397 uint32_t new_recmaster;
1399 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1400 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1402 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1403 DEBUG(DEBUG_ERR,
1404 ("Remote node (%u) is now the recovery master\n",
1405 new_recmaster));
1408 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1409 DEBUG(DEBUG_ERR,
1410 ("This node (%u) is now the recovery master\n",
1411 ctdb->pnn));
1414 ctdb->recovery_master = new_recmaster;
1415 return 0;
1419 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1421 DEBUG(DEBUG_ERR, ("Stopping node\n"));
1422 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1424 return 0;
1427 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1429 DEBUG(DEBUG_ERR, ("Continue node\n"));
1430 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1432 return 0;