ctdb-daemon: Make node inactive in the NODE_STOP control
[Samba.git] / ctdb / server / ctdb_recover.c
blob1654c6d3978c1c97b976d20edc4e269e7941750b
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 #include "ctdb_cluster_mutex.h"
45 int
46 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
48 struct ctdb_vnn_map_wire *map;
49 size_t len;
51 CHECK_CONTROL_DATA_SIZE(0);
53 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
54 map = talloc_size(outdata, len);
55 CTDB_NO_MEMORY(ctdb, map);
57 map->generation = ctdb->vnn_map->generation;
58 map->size = ctdb->vnn_map->size;
59 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
61 outdata->dsize = len;
62 outdata->dptr = (uint8_t *)map;
64 return 0;
67 int
68 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
70 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
72 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
73 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
74 return -1;
77 talloc_free(ctdb->vnn_map);
79 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
80 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
82 ctdb->vnn_map->generation = map->generation;
83 ctdb->vnn_map->size = map->size;
84 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
85 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
87 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
89 return 0;
92 int
93 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
95 uint32_t i, len;
96 struct ctdb_db_context *ctdb_db;
97 struct ctdb_dbid_map_old *dbid_map;
99 CHECK_CONTROL_DATA_SIZE(0);
101 len = 0;
102 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
103 len++;
107 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
108 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
109 if (!outdata->dptr) {
110 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
111 exit(1);
114 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
115 dbid_map->num = len;
116 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
117 dbid_map->dbs[i].db_id = ctdb_db->db_id;
118 dbid_map->dbs[i].flags = ctdb_db->db_flags;
121 return 0;
125 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
127 CHECK_CONTROL_DATA_SIZE(0);
129 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
130 ctdb->num_nodes,
131 outdata);
132 if (outdata->dptr == NULL) {
133 return -1;
136 outdata->dsize = talloc_get_size(outdata->dptr);
138 return 0;
142 reload the nodes file
145 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
147 unsigned int i, num_nodes;
148 TALLOC_CTX *tmp_ctx;
149 struct ctdb_node **nodes;
151 tmp_ctx = talloc_new(ctdb);
153 /* steal the old nodes file for a while */
154 talloc_steal(tmp_ctx, ctdb->nodes);
155 nodes = ctdb->nodes;
156 ctdb->nodes = NULL;
157 num_nodes = ctdb->num_nodes;
158 ctdb->num_nodes = 0;
160 /* load the new nodes file */
161 ctdb_load_nodes_file(ctdb);
163 for (i=0; i<ctdb->num_nodes; i++) {
164 /* keep any identical pre-existing nodes and connections */
165 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
166 talloc_free(ctdb->nodes[i]);
167 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
168 continue;
171 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
172 continue;
175 /* any new or different nodes must be added */
176 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
177 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
178 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
180 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
181 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
182 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
186 /* tell the recovery daemon to reaload the nodes file too */
187 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
189 talloc_free(tmp_ctx);
191 return 0;
195 a traverse function for pulling all relevent records from pulldb
197 struct pulldb_data {
198 struct ctdb_context *ctdb;
199 struct ctdb_db_context *ctdb_db;
200 struct ctdb_marshall_buffer *pulldata;
201 uint32_t len;
202 uint32_t allocated_len;
203 bool failed;
206 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
208 struct pulldb_data *params = (struct pulldb_data *)p;
209 struct ctdb_rec_data_old *rec;
210 struct ctdb_context *ctdb = params->ctdb;
211 struct ctdb_db_context *ctdb_db = params->ctdb_db;
213 /* add the record to the blob */
214 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
215 if (rec == NULL) {
216 params->failed = true;
217 return -1;
219 if (params->len + rec->length >= params->allocated_len) {
220 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
221 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
223 if (params->pulldata == NULL) {
224 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
225 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
227 params->pulldata->count++;
228 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
229 params->len += rec->length;
231 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
232 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
235 talloc_free(rec);
237 return 0;
241 pull a bunch of records from a ltdb, filtering by lmaster
243 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
245 struct ctdb_pulldb *pull;
246 struct ctdb_db_context *ctdb_db;
247 struct pulldb_data params;
248 struct ctdb_marshall_buffer *reply;
250 pull = (struct ctdb_pulldb *)indata.dptr;
252 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
253 if (!ctdb_db) {
254 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
255 return -1;
258 if (!ctdb_db_frozen(ctdb_db)) {
259 DEBUG(DEBUG_ERR,
260 ("rejecting ctdb_control_pull_db when not frozen\n"));
261 return -1;
264 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
265 CTDB_NO_MEMORY(ctdb, reply);
267 reply->db_id = pull->db_id;
269 params.ctdb = ctdb;
270 params.ctdb_db = ctdb_db;
271 params.pulldata = reply;
272 params.len = offsetof(struct ctdb_marshall_buffer, data);
273 params.allocated_len = params.len;
274 params.failed = false;
276 if (ctdb_db->unhealthy_reason) {
277 /* this is just a warning, as the tdb should be empty anyway */
278 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
279 ctdb_db->db_name, ctdb_db->unhealthy_reason));
282 /* If the records are invalid, we are done */
283 if (ctdb_db->invalid_records) {
284 goto done;
287 if (ctdb_lockdb_mark(ctdb_db) != 0) {
288 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
289 return -1;
292 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
293 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
294 ctdb_lockdb_unmark(ctdb_db);
295 talloc_free(params.pulldata);
296 return -1;
299 ctdb_lockdb_unmark(ctdb_db);
301 done:
302 outdata->dptr = (uint8_t *)params.pulldata;
303 outdata->dsize = params.len;
305 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
306 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
308 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
309 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
313 return 0;
316 struct db_pull_state {
317 struct ctdb_context *ctdb;
318 struct ctdb_db_context *ctdb_db;
319 struct ctdb_marshall_buffer *recs;
320 uint32_t pnn;
321 uint64_t srvid;
322 uint32_t num_records;
325 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
326 TDB_DATA data, void *private_data)
328 struct db_pull_state *state = (struct db_pull_state *)private_data;
329 struct ctdb_marshall_buffer *recs;
331 recs = ctdb_marshall_add(state->ctdb, state->recs,
332 state->ctdb_db->db_id, 0, key, NULL, data);
333 if (recs == NULL) {
334 TALLOC_FREE(state->recs);
335 return -1;
337 state->recs = recs;
339 if (talloc_get_size(state->recs) >=
340 state->ctdb->tunable.rec_buffer_size_limit) {
341 TDB_DATA buffer;
342 int ret;
344 buffer = ctdb_marshall_finish(state->recs);
345 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
346 state->srvid, buffer);
347 if (ret != 0) {
348 TALLOC_FREE(state->recs);
349 return -1;
352 state->num_records += state->recs->count;
353 TALLOC_FREE(state->recs);
356 return 0;
359 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
360 struct ctdb_req_control_old *c,
361 TDB_DATA indata, TDB_DATA *outdata)
363 struct ctdb_pulldb_ext *pulldb_ext;
364 struct ctdb_db_context *ctdb_db;
365 struct db_pull_state state;
366 int ret;
368 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
370 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
371 if (ctdb_db == NULL) {
372 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
373 pulldb_ext->db_id));
374 return -1;
377 if (!ctdb_db_frozen(ctdb_db)) {
378 DEBUG(DEBUG_ERR,
379 ("rejecting ctdb_control_pull_db when not frozen\n"));
380 return -1;
383 if (ctdb_db->unhealthy_reason) {
384 /* this is just a warning, as the tdb should be empty anyway */
385 DEBUG(DEBUG_WARNING,
386 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
387 ctdb_db->db_name, ctdb_db->unhealthy_reason));
390 state.ctdb = ctdb;
391 state.ctdb_db = ctdb_db;
392 state.recs = NULL;
393 state.pnn = c->hdr.srcnode;
394 state.srvid = pulldb_ext->srvid;
395 state.num_records = 0;
397 /* If the records are invalid, we are done */
398 if (ctdb_db->invalid_records) {
399 goto done;
402 if (ctdb_lockdb_mark(ctdb_db) != 0) {
403 DEBUG(DEBUG_ERR,
404 (__location__ " Failed to get lock on entire db - failing\n"));
405 return -1;
408 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
409 if (ret == -1) {
410 DEBUG(DEBUG_ERR,
411 (__location__ " Failed to get traverse db '%s'\n",
412 ctdb_db->db_name));
413 ctdb_lockdb_unmark(ctdb_db);
414 return -1;
417 /* Last few records */
418 if (state.recs != NULL) {
419 TDB_DATA buffer;
421 buffer = ctdb_marshall_finish(state.recs);
422 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
423 state.srvid, buffer);
424 if (ret != 0) {
425 TALLOC_FREE(state.recs);
426 ctdb_lockdb_unmark(ctdb_db);
427 return -1;
430 state.num_records += state.recs->count;
431 TALLOC_FREE(state.recs);
434 ctdb_lockdb_unmark(ctdb_db);
436 done:
437 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
438 if (outdata->dptr == NULL) {
439 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
440 return -1;
443 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
444 outdata->dsize = sizeof(uint32_t);
446 return 0;
450 push a bunch of records into a ltdb, filtering by rsn
452 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
454 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
455 struct ctdb_db_context *ctdb_db;
456 unsigned int i;
457 int ret;
458 struct ctdb_rec_data_old *rec;
460 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
461 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
462 return -1;
465 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
466 if (!ctdb_db) {
467 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
468 return -1;
471 if (!ctdb_db_frozen(ctdb_db)) {
472 DEBUG(DEBUG_ERR,
473 ("rejecting ctdb_control_push_db when not frozen\n"));
474 return -1;
477 if (ctdb_lockdb_mark(ctdb_db) != 0) {
478 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
479 return -1;
482 rec = (struct ctdb_rec_data_old *)&reply->data[0];
484 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
485 reply->count, reply->db_id));
487 for (i=0;i<reply->count;i++) {
488 TDB_DATA key, data;
489 struct ctdb_ltdb_header *hdr;
491 key.dptr = &rec->data[0];
492 key.dsize = rec->keylen;
493 data.dptr = &rec->data[key.dsize];
494 data.dsize = rec->datalen;
496 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
497 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
498 goto failed;
500 hdr = (struct ctdb_ltdb_header *)data.dptr;
501 /* strip off any read only record flags. All readonly records
502 are revoked implicitely by a recovery
504 hdr->flags &= ~CTDB_REC_RO_FLAGS;
506 data.dptr += sizeof(*hdr);
507 data.dsize -= sizeof(*hdr);
509 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
510 if (ret != 0) {
511 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
512 goto failed;
515 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
518 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
519 reply->count, reply->db_id));
521 if (ctdb_db_readonly(ctdb_db)) {
522 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
523 ctdb_db->db_id));
524 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
525 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
526 tdb_close(ctdb_db->rottdb);
527 ctdb_db->rottdb = NULL;
528 ctdb_db_reset_readonly(ctdb_db);
530 while (ctdb_db->revokechild_active != NULL) {
531 talloc_free(ctdb_db->revokechild_active);
535 ctdb_lockdb_unmark(ctdb_db);
536 return 0;
538 failed:
539 ctdb_lockdb_unmark(ctdb_db);
540 return -1;
543 struct db_push_state {
544 struct ctdb_context *ctdb;
545 struct ctdb_db_context *ctdb_db;
546 uint64_t srvid;
547 uint32_t num_records;
548 bool failed;
551 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
552 void *private_data)
554 struct db_push_state *state = talloc_get_type(
555 private_data, struct db_push_state);
556 struct ctdb_marshall_buffer *recs;
557 struct ctdb_rec_data_old *rec;
558 unsigned int i;
559 int ret;
561 if (state->failed) {
562 return;
565 recs = (struct ctdb_marshall_buffer *)indata.dptr;
566 rec = (struct ctdb_rec_data_old *)&recs->data[0];
568 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
569 recs->count, recs->db_id));
571 for (i=0; i<recs->count; i++) {
572 TDB_DATA key, data;
573 struct ctdb_ltdb_header *hdr;
575 key.dptr = &rec->data[0];
576 key.dsize = rec->keylen;
577 data.dptr = &rec->data[key.dsize];
578 data.dsize = rec->datalen;
580 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
581 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
582 goto failed;
585 hdr = (struct ctdb_ltdb_header *)data.dptr;
586 /* Strip off any read only record flags.
587 * All readonly records are revoked implicitely by a recovery.
589 hdr->flags &= ~CTDB_REC_RO_FLAGS;
591 data.dptr += sizeof(*hdr);
592 data.dsize -= sizeof(*hdr);
594 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
595 if (ret != 0) {
596 DEBUG(DEBUG_ERR,
597 (__location__ " Unable to store record\n"));
598 goto failed;
601 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
604 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
605 recs->count, recs->db_id));
607 state->num_records += recs->count;
608 return;
610 failed:
611 state->failed = true;
614 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
616 struct ctdb_pulldb_ext *pulldb_ext;
617 struct ctdb_db_context *ctdb_db;
618 struct db_push_state *state;
619 int ret;
621 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
623 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
624 if (ctdb_db == NULL) {
625 DEBUG(DEBUG_ERR,
626 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
627 return -1;
630 if (!ctdb_db_frozen(ctdb_db)) {
631 DEBUG(DEBUG_ERR,
632 ("rejecting ctdb_control_db_push_start when not frozen\n"));
633 return -1;
636 if (ctdb_db->push_started) {
637 DEBUG(DEBUG_WARNING,
638 (__location__ " DB push already started for %s\n",
639 ctdb_db->db_name));
641 /* De-register old state */
642 state = (struct db_push_state *)ctdb_db->push_state;
643 if (state != NULL) {
644 srvid_deregister(ctdb->srv, state->srvid, state);
645 talloc_free(state);
646 ctdb_db->push_state = NULL;
650 state = talloc_zero(ctdb_db, struct db_push_state);
651 if (state == NULL) {
652 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
653 return -1;
656 state->ctdb = ctdb;
657 state->ctdb_db = ctdb_db;
658 state->srvid = pulldb_ext->srvid;
659 state->failed = false;
661 ret = srvid_register(ctdb->srv, state, state->srvid,
662 db_push_msg_handler, state);
663 if (ret != 0) {
664 DEBUG(DEBUG_ERR,
665 (__location__ " Failed to register srvid for db push\n"));
666 talloc_free(state);
667 return -1;
670 if (ctdb_lockdb_mark(ctdb_db) != 0) {
671 DEBUG(DEBUG_ERR,
672 (__location__ " Failed to get lock on entire db - failing\n"));
673 srvid_deregister(ctdb->srv, state->srvid, state);
674 talloc_free(state);
675 return -1;
678 ctdb_db->push_started = true;
679 ctdb_db->push_state = state;
681 return 0;
684 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
685 TDB_DATA indata, TDB_DATA *outdata)
687 uint32_t db_id;
688 struct ctdb_db_context *ctdb_db;
689 struct db_push_state *state;
691 db_id = *(uint32_t *)indata.dptr;
693 ctdb_db = find_ctdb_db(ctdb, db_id);
694 if (ctdb_db == NULL) {
695 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
696 return -1;
699 if (!ctdb_db_frozen(ctdb_db)) {
700 DEBUG(DEBUG_ERR,
701 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
702 return -1;
705 if (!ctdb_db->push_started) {
706 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
707 return -1;
710 if (ctdb_db_readonly(ctdb_db)) {
711 DEBUG(DEBUG_ERR,
712 ("Clearing the tracking database for dbid 0x%x\n",
713 ctdb_db->db_id));
714 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
715 DEBUG(DEBUG_ERR,
716 ("Failed to wipe tracking database for 0x%x."
717 " Dropping read-only delegation support\n",
718 ctdb_db->db_id));
719 tdb_close(ctdb_db->rottdb);
720 ctdb_db->rottdb = NULL;
721 ctdb_db_reset_readonly(ctdb_db);
724 while (ctdb_db->revokechild_active != NULL) {
725 talloc_free(ctdb_db->revokechild_active);
729 ctdb_lockdb_unmark(ctdb_db);
731 state = (struct db_push_state *)ctdb_db->push_state;
732 if (state == NULL) {
733 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
734 return -1;
737 srvid_deregister(ctdb->srv, state->srvid, state);
739 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
740 if (outdata->dptr == NULL) {
741 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
742 talloc_free(state);
743 ctdb_db->push_state = NULL;
744 return -1;
747 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
748 outdata->dsize = sizeof(uint32_t);
750 talloc_free(state);
751 ctdb_db->push_started = false;
752 ctdb_db->push_state = NULL;
754 return 0;
757 struct set_recmode_state {
758 struct ctdb_context *ctdb;
759 struct ctdb_req_control_old *c;
762 static void set_recmode_handler(char status,
763 double latency,
764 void *private_data)
766 struct set_recmode_state *state = talloc_get_type_abort(
767 private_data, struct set_recmode_state);
768 int s = 0;
769 const char *err = NULL;
771 switch (status) {
772 case '0':
773 /* Mutex taken */
774 DEBUG(DEBUG_ERR,
775 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
776 state->ctdb->recovery_lock));
777 s = -1;
778 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
779 break;
781 case '1':
782 /* Contention */
783 DEBUG(DEBUG_DEBUG, (__location__ " Recovery lock check OK\n"));
784 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
785 ctdb_process_deferred_attach(state->ctdb);
787 s = 0;
789 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock",
790 reclock.ctdbd, latency);
791 break;
793 case '2':
794 /* Timeout. Consider this a success, not a failure,
795 * as we failed to set the recovery lock which is what
796 * we wanted. This can be caused by the cluster
797 * filesystem being very slow to arbitrate locks
798 * immediately after a node failure. */
799 DEBUG(DEBUG_WARNING,
800 (__location__
801 "Time out getting recovery lock, allowing recmode set anyway\n"));
802 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
803 ctdb_process_deferred_attach(state->ctdb);
805 s = 0;
806 break;
808 default:
809 DEBUG(DEBUG_ERR,
810 ("Unexpected error when testing recovery lock\n"));
811 s = -1;
812 err = "Unexpected error when testing recovery lock";
815 ctdb_request_control_reply(state->ctdb, state->c, NULL, s, err);
816 talloc_free(state);
819 static void
820 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
821 struct timeval t, void *private_data)
823 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
825 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
826 talloc_free(ctdb->release_ips_ctx);
827 ctdb->release_ips_ctx = NULL;
829 ctdb_release_all_ips(ctdb);
833 * Set up an event to drop all public ips if we remain in recovery for too
834 * long
836 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
838 if (ctdb->release_ips_ctx != NULL) {
839 talloc_free(ctdb->release_ips_ctx);
841 ctdb->release_ips_ctx = talloc_new(ctdb);
842 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
844 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
845 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
846 ctdb_drop_all_ips_event, ctdb);
847 return 0;
851 set the recovery mode
853 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
854 struct ctdb_req_control_old *c,
855 TDB_DATA indata, bool *async_reply,
856 const char **errormsg)
858 uint32_t recmode = *(uint32_t *)indata.dptr;
859 struct ctdb_db_context *ctdb_db;
860 struct set_recmode_state *state;
861 struct ctdb_cluster_mutex_handle *h;
863 if (recmode == ctdb->recovery_mode) {
864 D_INFO("Recovery mode already set to %s\n",
865 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
866 return 0;
869 D_NOTICE("Recovery mode set to %s\n",
870 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
872 /* if we enter recovery but stay in recovery for too long
873 we will eventually drop all our ip addresses
875 if (recmode == CTDB_RECOVERY_ACTIVE) {
876 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
877 D_ERR("Failed to set up deferred drop all ips\n");
880 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
881 return 0;
884 /* From this point: recmode == CTDB_RECOVERY_NORMAL
886 * Therefore, what follows is special handling when setting
887 * recovery mode back to normal */
889 TALLOC_FREE(ctdb->release_ips_ctx);
891 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
892 if (ctdb_db->generation != ctdb->vnn_map->generation) {
893 DEBUG(DEBUG_ERR,
894 ("Inconsistent DB generation %u for %s\n",
895 ctdb_db->generation, ctdb_db->db_name));
896 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
897 return -1;
901 /* force the databases to thaw */
902 if (ctdb_db_all_frozen(ctdb)) {
903 ctdb_control_thaw(ctdb, false);
906 if (ctdb->recovery_lock == NULL) {
907 /* Not using recovery lock file */
908 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
909 ctdb_process_deferred_attach(ctdb);
910 return 0;
913 state = talloc_zero(ctdb, struct set_recmode_state);
914 if (state == NULL) {
915 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
916 return -1;
918 state->ctdb = ctdb;
919 state->c = NULL;
921 h = ctdb_cluster_mutex(state, ctdb, ctdb->recovery_lock, 5,
922 set_recmode_handler, state, NULL, NULL);
923 if (h == NULL) {
924 talloc_free(state);
925 return -1;
928 state->c = talloc_steal(state, c);
929 *async_reply = true;
931 return 0;
936 delete a record as part of the vacuum process
937 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
938 use non-blocking locks
940 return 0 if the record was successfully deleted (i.e. it does not exist
941 when the function returns)
942 or !0 is the record still exists in the tdb after returning.
944 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
946 TDB_DATA key, data, data2;
947 struct ctdb_ltdb_header *hdr, *hdr2;
949 /* these are really internal tdb functions - but we need them here for
950 non-blocking lock of the freelist */
951 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
952 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
955 key.dsize = rec->keylen;
956 key.dptr = &rec->data[0];
957 data.dsize = rec->datalen;
958 data.dptr = &rec->data[rec->keylen];
960 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
961 DBG_INFO("Called delete on record where we are lmaster\n");
962 return -1;
965 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
966 DBG_ERR("Bad record size\n");
967 return -1;
970 hdr = (struct ctdb_ltdb_header *)data.dptr;
972 /* use a non-blocking lock */
973 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
974 DBG_INFO("Failed to get non-blocking chain lock\n");
975 return -1;
978 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
979 if (data2.dptr == NULL) {
980 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
981 return 0;
984 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
985 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
986 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
987 DBG_ERR("Failed to delete corrupt record\n");
989 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
990 DBG_ERR("Deleted corrupt record\n");
992 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
993 free(data2.dptr);
994 return 0;
997 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
999 if (hdr2->rsn > hdr->rsn) {
1000 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1001 DBG_INFO("Skipping record with rsn=%llu - called with rsn=%llu\n",
1002 (unsigned long long)hdr2->rsn,
1003 (unsigned long long)hdr->rsn);
1004 free(data2.dptr);
1005 return -1;
1008 /* do not allow deleting record that have readonly flags set. */
1009 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1010 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1011 DBG_INFO("Skipping record with readonly flags set\n");
1012 free(data2.dptr);
1013 return -1;
1015 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1016 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1017 DBG_INFO("Skipping record with readonly flags set locally\n");
1018 free(data2.dptr);
1019 return -1;
1022 if (hdr2->dmaster == ctdb->pnn) {
1023 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1024 DBG_INFO("Attempted delete record where we are the dmaster\n");
1025 free(data2.dptr);
1026 return -1;
1029 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1030 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1031 DBG_INFO("Failed to get non-blocking freelist lock\n");
1032 free(data2.dptr);
1033 return -1;
1036 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1037 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1038 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1039 DBG_INFO("Failed to delete record\n");
1040 free(data2.dptr);
1041 return -1;
1044 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1045 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1046 free(data2.dptr);
1047 return 0;
1052 struct recovery_callback_state {
1053 struct ctdb_req_control_old *c;
1058 called when the 'recovered' event script has finished
1060 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1062 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1064 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1066 if (status != 0) {
1067 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1068 if (status == -ETIMEDOUT) {
1069 ctdb_ban_self(ctdb);
1073 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1074 talloc_free(state);
1076 gettimeofday(&ctdb->last_recovery_finished, NULL);
1078 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1079 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1084 recovery has finished
1086 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1087 struct ctdb_req_control_old *c,
1088 bool *async_reply)
1090 int ret;
1091 struct recovery_callback_state *state;
1093 DEBUG(DEBUG_ERR,("Recovery has finished\n"));
1095 ctdb_persistent_finish_trans3_commits(ctdb);
1097 state = talloc(ctdb, struct recovery_callback_state);
1098 CTDB_NO_MEMORY(ctdb, state);
1100 state->c = c;
1102 ret = ctdb_event_script_callback(ctdb, state,
1103 ctdb_end_recovery_callback,
1104 state,
1105 CTDB_EVENT_RECOVERED, "%s", "");
1107 if (ret != 0) {
1108 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1109 talloc_free(state);
1110 return -1;
1113 /* tell the control that we will be reply asynchronously */
1114 state->c = talloc_steal(state, c);
1115 *async_reply = true;
1116 return 0;
1120 called when the 'startrecovery' event script has finished
1122 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1124 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1126 if (status != 0) {
1127 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1130 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1131 talloc_free(state);
1134 static void run_start_recovery_event(struct ctdb_context *ctdb,
1135 struct recovery_callback_state *state)
1137 int ret;
1139 ret = ctdb_event_script_callback(ctdb, state,
1140 ctdb_start_recovery_callback,
1141 state,
1142 CTDB_EVENT_START_RECOVERY,
1143 "%s", "");
1145 if (ret != 0) {
1146 DEBUG(DEBUG_ERR,("Unable to run startrecovery event\n"));
1147 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1148 talloc_free(state);
1149 return;
1152 return;
1155 static bool reclock_strings_equal(const char *a, const char *b)
1157 return (a == NULL && b == NULL) ||
1158 (a != NULL && b != NULL && strcmp(a, b) == 0);
1161 static void start_recovery_reclock_callback(struct ctdb_context *ctdb,
1162 int32_t status,
1163 TDB_DATA data,
1164 const char *errormsg,
1165 void *private_data)
1167 struct recovery_callback_state *state = talloc_get_type_abort(
1168 private_data, struct recovery_callback_state);
1169 const char *local = ctdb->recovery_lock;
1170 const char *remote = NULL;
1172 if (status != 0) {
1173 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1174 ctdb_request_control_reply(ctdb, state->c, NULL,
1175 status, errormsg);
1176 talloc_free(state);
1177 return;
1180 /* Check reclock consistency */
1181 if (data.dsize > 0) {
1182 /* Ensure NUL-termination */
1183 data.dptr[data.dsize-1] = '\0';
1184 remote = (const char *)data.dptr;
1186 if (! reclock_strings_equal(local, remote)) {
1187 /* Inconsistent */
1188 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1189 DEBUG(DEBUG_ERR,
1190 ("Recovery lock configuration inconsistent: "
1191 "recmaster has %s, this node has %s, shutting down\n",
1192 remote == NULL ? "NULL" : remote,
1193 local == NULL ? "NULL" : local));
1194 talloc_free(state);
1195 ctdb_shutdown_sequence(ctdb, 1);
1197 DEBUG(DEBUG_INFO,
1198 ("Recovery lock consistency check successful\n"));
1200 run_start_recovery_event(ctdb, state);
1203 /* Check recovery lock consistency and run eventscripts for the
1204 * "startrecovery" event */
1205 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1206 struct ctdb_req_control_old *c,
1207 bool *async_reply)
1209 int ret;
1210 struct recovery_callback_state *state;
1211 uint32_t recmaster = c->hdr.srcnode;
1213 DEBUG(DEBUG_ERR, ("Recovery has started\n"));
1214 gettimeofday(&ctdb->last_recovery_started, NULL);
1216 state = talloc(ctdb, struct recovery_callback_state);
1217 CTDB_NO_MEMORY(ctdb, state);
1219 state->c = c;
1221 /* Although the recovery master sent this node a start
1222 * recovery control, this node might still think the recovery
1223 * master is disconnected. In this case defer the recovery
1224 * lock consistency check. */
1225 if (ctdb->nodes[recmaster]->flags & NODE_FLAGS_DISCONNECTED) {
1226 run_start_recovery_event(ctdb, state);
1227 } else {
1228 /* Ask the recovery master about its reclock setting */
1229 ret = ctdb_daemon_send_control(ctdb,
1230 recmaster,
1232 CTDB_CONTROL_GET_RECLOCK_FILE,
1233 0, 0,
1234 tdb_null,
1235 start_recovery_reclock_callback,
1236 state);
1238 if (ret != 0) {
1239 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1240 talloc_free(state);
1241 return -1;
1245 /* tell the control that we will be reply asynchronously */
1246 state->c = talloc_steal(state, c);
1247 *async_reply = true;
1249 return 0;
1253 try to delete all these records as part of the vacuuming process
1254 and return the records we failed to delete
1256 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1258 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1259 struct ctdb_db_context *ctdb_db;
1260 unsigned int i;
1261 struct ctdb_rec_data_old *rec;
1262 struct ctdb_marshall_buffer *records;
1264 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1265 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1266 return -1;
1269 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1270 if (!ctdb_db) {
1271 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1272 return -1;
1276 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1277 reply->count, reply->db_id));
1280 /* create a blob to send back the records we couldnt delete */
1281 records = (struct ctdb_marshall_buffer *)
1282 talloc_zero_size(outdata,
1283 offsetof(struct ctdb_marshall_buffer, data));
1284 if (records == NULL) {
1285 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1286 return -1;
1288 records->db_id = ctdb_db->db_id;
1291 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1292 for (i=0;i<reply->count;i++) {
1293 TDB_DATA key, data;
1295 key.dptr = &rec->data[0];
1296 key.dsize = rec->keylen;
1297 data.dptr = &rec->data[key.dsize];
1298 data.dsize = rec->datalen;
1300 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1301 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1302 talloc_free(records);
1303 return -1;
1306 /* If we cant delete the record we must add it to the reply
1307 so the lmaster knows it may not purge this record
1309 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1310 size_t old_size;
1311 struct ctdb_ltdb_header *hdr;
1313 hdr = (struct ctdb_ltdb_header *)data.dptr;
1314 data.dptr += sizeof(*hdr);
1315 data.dsize -= sizeof(*hdr);
1317 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1319 old_size = talloc_get_size(records);
1320 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1321 if (records == NULL) {
1322 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1323 return -1;
1325 records->count++;
1326 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1329 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1333 *outdata = ctdb_marshall_finish(records);
1335 return 0;
1339 report capabilities
1341 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1343 uint32_t *capabilities = NULL;
1345 capabilities = talloc(outdata, uint32_t);
1346 CTDB_NO_MEMORY(ctdb, capabilities);
1347 *capabilities = ctdb->capabilities;
1349 outdata->dsize = sizeof(uint32_t);
1350 outdata->dptr = (uint8_t *)capabilities;
1352 return 0;
1355 /* The recovery daemon will ping us at regular intervals.
1356 If we havent been pinged for a while we assume the recovery
1357 daemon is inoperable and we restart.
1359 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1360 struct tevent_timer *te,
1361 struct timeval t, void *p)
1363 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1364 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1366 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1368 if (*count < ctdb->tunable.recd_ping_failcount) {
1369 (*count)++;
1370 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1371 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1372 ctdb_recd_ping_timeout, ctdb);
1373 return;
1376 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1378 ctdb_stop_recoverd(ctdb);
1379 ctdb_start_recoverd(ctdb);
1382 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1384 talloc_free(ctdb->recd_ping_count);
1386 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1387 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1389 if (ctdb->tunable.recd_ping_timeout != 0) {
1390 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1391 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1392 ctdb_recd_ping_timeout, ctdb);
1395 return 0;
1400 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1402 uint32_t new_recmaster;
1404 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1405 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1407 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1408 DEBUG(DEBUG_ERR,
1409 ("Remote node (%u) is now the recovery master\n",
1410 new_recmaster));
1413 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1414 DEBUG(DEBUG_ERR,
1415 ("This node (%u) is now the recovery master\n",
1416 ctdb->pnn));
1419 ctdb->recovery_master = new_recmaster;
1420 return 0;
1423 void ctdb_node_become_inactive(struct ctdb_context *ctdb)
1425 struct ctdb_db_context *ctdb_db;
1427 D_WARNING("Making node INACTIVE\n");
1430 * Do not service database calls - reset generation to invalid
1431 * so this node ignores any REQ/REPLY CALL/DMASTER
1433 ctdb->vnn_map->generation = INVALID_GENERATION;
1434 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
1435 ctdb_db->generation = INVALID_GENERATION;
1439 * Although this bypasses the control, the only thing missing
1440 * is the deferred drop of all public IPs, which isn't
1441 * necessary because they are dropped below
1443 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
1444 D_NOTICE("Recovery mode set to ACTIVE\n");
1445 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1449 * Initiate database freeze - this will be scheduled for
1450 * immediate execution and will be in progress long before the
1451 * calling control returns
1453 ctdb_daemon_send_control(ctdb,
1454 ctdb->pnn,
1456 CTDB_CONTROL_FREEZE,
1458 CTDB_CTRL_FLAG_NOREPLY,
1459 tdb_null,
1460 NULL,
1461 NULL);
1463 D_NOTICE("Dropping all public IP addresses\n");
1464 ctdb_release_all_ips(ctdb);
1467 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1469 DEBUG(DEBUG_ERR, ("Stopping node\n"));
1470 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1472 ctdb_node_become_inactive(ctdb);
1474 return 0;
1477 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1479 DEBUG(DEBUG_ERR, ("Continue node\n"));
1480 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1482 return 0;