python: Port ntvfs posix bindings to Python 3 compatible form
[Samba.git] / ctdb / server / ctdb_recover.c
blobf4cd5f64eee5821e1884a1c6743b93f3c2868a8f
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 #include "ctdb_cluster_mutex.h"
45 int
46 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
48 struct ctdb_vnn_map_wire *map;
49 size_t len;
51 CHECK_CONTROL_DATA_SIZE(0);
53 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
54 map = talloc_size(outdata, len);
55 CTDB_NO_MEMORY(ctdb, map);
57 map->generation = ctdb->vnn_map->generation;
58 map->size = ctdb->vnn_map->size;
59 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
61 outdata->dsize = len;
62 outdata->dptr = (uint8_t *)map;
64 return 0;
67 int
68 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
70 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
72 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
73 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
74 return -1;
77 talloc_free(ctdb->vnn_map);
79 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
80 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
82 ctdb->vnn_map->generation = map->generation;
83 ctdb->vnn_map->size = map->size;
84 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
85 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
87 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
89 return 0;
92 int
93 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
95 uint32_t i, len;
96 struct ctdb_db_context *ctdb_db;
97 struct ctdb_dbid_map_old *dbid_map;
99 CHECK_CONTROL_DATA_SIZE(0);
101 len = 0;
102 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
103 len++;
107 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
108 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
109 if (!outdata->dptr) {
110 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
111 exit(1);
114 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
115 dbid_map->num = len;
116 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
117 dbid_map->dbs[i].db_id = ctdb_db->db_id;
118 dbid_map->dbs[i].flags = ctdb_db->db_flags;
121 return 0;
125 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
127 CHECK_CONTROL_DATA_SIZE(0);
129 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
130 ctdb->num_nodes,
131 outdata);
132 if (outdata->dptr == NULL) {
133 return -1;
136 outdata->dsize = talloc_get_size(outdata->dptr);
138 return 0;
142 reload the nodes file
145 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
147 int i, num_nodes;
148 TALLOC_CTX *tmp_ctx;
149 struct ctdb_node **nodes;
151 tmp_ctx = talloc_new(ctdb);
153 /* steal the old nodes file for a while */
154 talloc_steal(tmp_ctx, ctdb->nodes);
155 nodes = ctdb->nodes;
156 ctdb->nodes = NULL;
157 num_nodes = ctdb->num_nodes;
158 ctdb->num_nodes = 0;
160 /* load the new nodes file */
161 ctdb_load_nodes_file(ctdb);
163 for (i=0; i<ctdb->num_nodes; i++) {
164 /* keep any identical pre-existing nodes and connections */
165 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
166 talloc_free(ctdb->nodes[i]);
167 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
168 continue;
171 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
172 continue;
175 /* any new or different nodes must be added */
176 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
177 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
178 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
180 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
181 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
182 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
186 /* tell the recovery daemon to reaload the nodes file too */
187 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
189 talloc_free(tmp_ctx);
191 return 0;
195 a traverse function for pulling all relevent records from pulldb
197 struct pulldb_data {
198 struct ctdb_context *ctdb;
199 struct ctdb_db_context *ctdb_db;
200 struct ctdb_marshall_buffer *pulldata;
201 uint32_t len;
202 uint32_t allocated_len;
203 bool failed;
206 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
208 struct pulldb_data *params = (struct pulldb_data *)p;
209 struct ctdb_rec_data_old *rec;
210 struct ctdb_context *ctdb = params->ctdb;
211 struct ctdb_db_context *ctdb_db = params->ctdb_db;
213 /* add the record to the blob */
214 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
215 if (rec == NULL) {
216 params->failed = true;
217 return -1;
219 if (params->len + rec->length >= params->allocated_len) {
220 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
221 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
223 if (params->pulldata == NULL) {
224 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
225 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
227 params->pulldata->count++;
228 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
229 params->len += rec->length;
231 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
232 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
235 talloc_free(rec);
237 return 0;
241 pull a bunch of records from a ltdb, filtering by lmaster
243 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
245 struct ctdb_pulldb *pull;
246 struct ctdb_db_context *ctdb_db;
247 struct pulldb_data params;
248 struct ctdb_marshall_buffer *reply;
250 pull = (struct ctdb_pulldb *)indata.dptr;
252 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
253 if (!ctdb_db) {
254 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
255 return -1;
258 if (!ctdb_db_frozen(ctdb_db)) {
259 DEBUG(DEBUG_ERR,
260 ("rejecting ctdb_control_pull_db when not frozen\n"));
261 return -1;
264 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
265 CTDB_NO_MEMORY(ctdb, reply);
267 reply->db_id = pull->db_id;
269 params.ctdb = ctdb;
270 params.ctdb_db = ctdb_db;
271 params.pulldata = reply;
272 params.len = offsetof(struct ctdb_marshall_buffer, data);
273 params.allocated_len = params.len;
274 params.failed = false;
276 if (ctdb_db->unhealthy_reason) {
277 /* this is just a warning, as the tdb should be empty anyway */
278 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
279 ctdb_db->db_name, ctdb_db->unhealthy_reason));
282 if (ctdb_lockdb_mark(ctdb_db) != 0) {
283 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
284 return -1;
287 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
288 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
289 ctdb_lockdb_unmark(ctdb_db);
290 talloc_free(params.pulldata);
291 return -1;
294 ctdb_lockdb_unmark(ctdb_db);
296 outdata->dptr = (uint8_t *)params.pulldata;
297 outdata->dsize = params.len;
299 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
300 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
302 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
303 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
307 return 0;
310 struct db_pull_state {
311 struct ctdb_context *ctdb;
312 struct ctdb_db_context *ctdb_db;
313 struct ctdb_marshall_buffer *recs;
314 uint32_t pnn;
315 uint64_t srvid;
316 uint32_t num_records;
319 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
320 TDB_DATA data, void *private_data)
322 struct db_pull_state *state = (struct db_pull_state *)private_data;
323 struct ctdb_marshall_buffer *recs;
325 recs = ctdb_marshall_add(state->ctdb, state->recs,
326 state->ctdb_db->db_id, 0, key, NULL, data);
327 if (recs == NULL) {
328 TALLOC_FREE(state->recs);
329 return -1;
331 state->recs = recs;
333 if (talloc_get_size(state->recs) >=
334 state->ctdb->tunable.rec_buffer_size_limit) {
335 TDB_DATA buffer;
336 int ret;
338 buffer = ctdb_marshall_finish(state->recs);
339 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
340 state->srvid, buffer);
341 if (ret != 0) {
342 TALLOC_FREE(state->recs);
343 return -1;
346 state->num_records += state->recs->count;
347 TALLOC_FREE(state->recs);
350 return 0;
353 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
354 struct ctdb_req_control_old *c,
355 TDB_DATA indata, TDB_DATA *outdata)
357 struct ctdb_pulldb_ext *pulldb_ext;
358 struct ctdb_db_context *ctdb_db;
359 struct db_pull_state state;
360 int ret;
362 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
364 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
365 if (ctdb_db == NULL) {
366 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
367 pulldb_ext->db_id));
368 return -1;
371 if (!ctdb_db_frozen(ctdb_db)) {
372 DEBUG(DEBUG_ERR,
373 ("rejecting ctdb_control_pull_db when not frozen\n"));
374 return -1;
377 if (ctdb_db->unhealthy_reason) {
378 /* this is just a warning, as the tdb should be empty anyway */
379 DEBUG(DEBUG_WARNING,
380 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
381 ctdb_db->db_name, ctdb_db->unhealthy_reason));
384 state.ctdb = ctdb;
385 state.ctdb_db = ctdb_db;
386 state.recs = NULL;
387 state.pnn = c->hdr.srcnode;
388 state.srvid = pulldb_ext->srvid;
389 state.num_records = 0;
391 if (ctdb_lockdb_mark(ctdb_db) != 0) {
392 DEBUG(DEBUG_ERR,
393 (__location__ " Failed to get lock on entire db - failing\n"));
394 return -1;
397 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
398 if (ret == -1) {
399 DEBUG(DEBUG_ERR,
400 (__location__ " Failed to get traverse db '%s'\n",
401 ctdb_db->db_name));
402 ctdb_lockdb_unmark(ctdb_db);
403 return -1;
406 /* Last few records */
407 if (state.recs != NULL) {
408 TDB_DATA buffer;
410 buffer = ctdb_marshall_finish(state.recs);
411 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
412 state.srvid, buffer);
413 if (ret != 0) {
414 TALLOC_FREE(state.recs);
415 ctdb_lockdb_unmark(ctdb_db);
416 return -1;
419 state.num_records += state.recs->count;
420 TALLOC_FREE(state.recs);
423 ctdb_lockdb_unmark(ctdb_db);
425 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
426 if (outdata->dptr == NULL) {
427 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
428 return -1;
431 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
432 outdata->dsize = sizeof(uint32_t);
434 return 0;
438 push a bunch of records into a ltdb, filtering by rsn
440 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
442 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
443 struct ctdb_db_context *ctdb_db;
444 int i, ret;
445 struct ctdb_rec_data_old *rec;
447 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
448 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
449 return -1;
452 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
453 if (!ctdb_db) {
454 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
455 return -1;
458 if (!ctdb_db_frozen(ctdb_db)) {
459 DEBUG(DEBUG_ERR,
460 ("rejecting ctdb_control_push_db when not frozen\n"));
461 return -1;
464 if (ctdb_lockdb_mark(ctdb_db) != 0) {
465 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
466 return -1;
469 rec = (struct ctdb_rec_data_old *)&reply->data[0];
471 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
472 reply->count, reply->db_id));
474 for (i=0;i<reply->count;i++) {
475 TDB_DATA key, data;
476 struct ctdb_ltdb_header *hdr;
478 key.dptr = &rec->data[0];
479 key.dsize = rec->keylen;
480 data.dptr = &rec->data[key.dsize];
481 data.dsize = rec->datalen;
483 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
484 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
485 goto failed;
487 hdr = (struct ctdb_ltdb_header *)data.dptr;
488 /* strip off any read only record flags. All readonly records
489 are revoked implicitely by a recovery
491 hdr->flags &= ~CTDB_REC_RO_FLAGS;
493 data.dptr += sizeof(*hdr);
494 data.dsize -= sizeof(*hdr);
496 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
497 if (ret != 0) {
498 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
499 goto failed;
502 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
505 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
506 reply->count, reply->db_id));
508 if (ctdb_db_readonly(ctdb_db)) {
509 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
510 ctdb_db->db_id));
511 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
512 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
513 tdb_close(ctdb_db->rottdb);
514 ctdb_db->rottdb = NULL;
515 ctdb_db_reset_readonly(ctdb_db);
517 while (ctdb_db->revokechild_active != NULL) {
518 talloc_free(ctdb_db->revokechild_active);
522 ctdb_lockdb_unmark(ctdb_db);
523 return 0;
525 failed:
526 ctdb_lockdb_unmark(ctdb_db);
527 return -1;
530 struct db_push_state {
531 struct ctdb_context *ctdb;
532 struct ctdb_db_context *ctdb_db;
533 uint64_t srvid;
534 uint32_t num_records;
535 bool failed;
538 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
539 void *private_data)
541 struct db_push_state *state = talloc_get_type(
542 private_data, struct db_push_state);
543 struct ctdb_marshall_buffer *recs;
544 struct ctdb_rec_data_old *rec;
545 int i, ret;
547 if (state->failed) {
548 return;
551 recs = (struct ctdb_marshall_buffer *)indata.dptr;
552 rec = (struct ctdb_rec_data_old *)&recs->data[0];
554 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
555 recs->count, recs->db_id));
557 for (i=0; i<recs->count; i++) {
558 TDB_DATA key, data;
559 struct ctdb_ltdb_header *hdr;
561 key.dptr = &rec->data[0];
562 key.dsize = rec->keylen;
563 data.dptr = &rec->data[key.dsize];
564 data.dsize = rec->datalen;
566 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
567 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
568 goto failed;
571 hdr = (struct ctdb_ltdb_header *)data.dptr;
572 /* Strip off any read only record flags.
573 * All readonly records are revoked implicitely by a recovery.
575 hdr->flags &= ~CTDB_REC_RO_FLAGS;
577 data.dptr += sizeof(*hdr);
578 data.dsize -= sizeof(*hdr);
580 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
581 if (ret != 0) {
582 DEBUG(DEBUG_ERR,
583 (__location__ " Unable to store record\n"));
584 goto failed;
587 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
590 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
591 recs->count, recs->db_id));
593 state->num_records += recs->count;
594 return;
596 failed:
597 state->failed = true;
600 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
602 struct ctdb_pulldb_ext *pulldb_ext;
603 struct ctdb_db_context *ctdb_db;
604 struct db_push_state *state;
605 int ret;
607 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
609 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
610 if (ctdb_db == NULL) {
611 DEBUG(DEBUG_ERR,
612 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
613 return -1;
616 if (!ctdb_db_frozen(ctdb_db)) {
617 DEBUG(DEBUG_ERR,
618 ("rejecting ctdb_control_db_push_start when not frozen\n"));
619 return -1;
622 if (ctdb_db->push_started) {
623 DEBUG(DEBUG_WARNING,
624 (__location__ " DB push already started for %s\n",
625 ctdb_db->db_name));
627 /* De-register old state */
628 state = (struct db_push_state *)ctdb_db->push_state;
629 if (state != NULL) {
630 srvid_deregister(ctdb->srv, state->srvid, state);
631 talloc_free(state);
632 ctdb_db->push_state = NULL;
636 state = talloc_zero(ctdb_db, struct db_push_state);
637 if (state == NULL) {
638 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
639 return -1;
642 state->ctdb = ctdb;
643 state->ctdb_db = ctdb_db;
644 state->srvid = pulldb_ext->srvid;
645 state->failed = false;
647 ret = srvid_register(ctdb->srv, state, state->srvid,
648 db_push_msg_handler, state);
649 if (ret != 0) {
650 DEBUG(DEBUG_ERR,
651 (__location__ " Failed to register srvid for db push\n"));
652 talloc_free(state);
653 return -1;
656 if (ctdb_lockdb_mark(ctdb_db) != 0) {
657 DEBUG(DEBUG_ERR,
658 (__location__ " Failed to get lock on entire db - failing\n"));
659 srvid_deregister(ctdb->srv, state->srvid, state);
660 talloc_free(state);
661 return -1;
664 ctdb_db->push_started = true;
665 ctdb_db->push_state = state;
667 return 0;
670 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
671 TDB_DATA indata, TDB_DATA *outdata)
673 uint32_t db_id;
674 struct ctdb_db_context *ctdb_db;
675 struct db_push_state *state;
677 db_id = *(uint32_t *)indata.dptr;
679 ctdb_db = find_ctdb_db(ctdb, db_id);
680 if (ctdb_db == NULL) {
681 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
682 return -1;
685 if (!ctdb_db_frozen(ctdb_db)) {
686 DEBUG(DEBUG_ERR,
687 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
688 return -1;
691 if (!ctdb_db->push_started) {
692 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
693 return -1;
696 if (ctdb_db_readonly(ctdb_db)) {
697 DEBUG(DEBUG_ERR,
698 ("Clearing the tracking database for dbid 0x%x\n",
699 ctdb_db->db_id));
700 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
701 DEBUG(DEBUG_ERR,
702 ("Failed to wipe tracking database for 0x%x."
703 " Dropping read-only delegation support\n",
704 ctdb_db->db_id));
705 tdb_close(ctdb_db->rottdb);
706 ctdb_db->rottdb = NULL;
707 ctdb_db_reset_readonly(ctdb_db);
710 while (ctdb_db->revokechild_active != NULL) {
711 talloc_free(ctdb_db->revokechild_active);
715 ctdb_lockdb_unmark(ctdb_db);
717 state = (struct db_push_state *)ctdb_db->push_state;
718 if (state == NULL) {
719 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
720 return -1;
723 srvid_deregister(ctdb->srv, state->srvid, state);
725 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
726 if (outdata->dptr == NULL) {
727 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
728 talloc_free(state);
729 ctdb_db->push_state = NULL;
730 return -1;
733 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
734 outdata->dsize = sizeof(uint32_t);
736 talloc_free(state);
737 ctdb_db->push_started = false;
738 ctdb_db->push_state = NULL;
740 return 0;
743 struct set_recmode_state {
744 struct ctdb_context *ctdb;
745 struct ctdb_req_control_old *c;
748 static void set_recmode_handler(char status,
749 double latency,
750 void *private_data)
752 struct set_recmode_state *state = talloc_get_type_abort(
753 private_data, struct set_recmode_state);
754 int s = 0;
755 const char *err = NULL;
757 switch (status) {
758 case '0':
759 /* Mutex taken */
760 DEBUG(DEBUG_ERR,
761 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
762 state->ctdb->recovery_lock));
763 s = -1;
764 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
765 break;
767 case '1':
768 /* Contention */
769 DEBUG(DEBUG_DEBUG, (__location__ " Recovery lock check OK\n"));
770 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
771 ctdb_process_deferred_attach(state->ctdb);
773 s = 0;
775 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock",
776 reclock.ctdbd, latency);
777 break;
779 case '2':
780 /* Timeout. Consider this a success, not a failure,
781 * as we failed to set the recovery lock which is what
782 * we wanted. This can be caused by the cluster
783 * filesystem being very slow to arbitrate locks
784 * immediately after a node failure. */
785 DEBUG(DEBUG_WARNING,
786 (__location__
787 "Time out getting recovery lock, allowing recmode set anyway\n"));
788 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
789 ctdb_process_deferred_attach(state->ctdb);
791 s = 0;
792 break;
794 default:
795 DEBUG(DEBUG_ERR,
796 ("Unexpected error when testing recovery lock\n"));
797 s = -1;
798 err = "Unexpected error when testing recovery lock";
801 ctdb_request_control_reply(state->ctdb, state->c, NULL, s, err);
802 talloc_free(state);
805 static void
806 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
807 struct timeval t, void *private_data)
809 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
811 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
812 talloc_free(ctdb->release_ips_ctx);
813 ctdb->release_ips_ctx = NULL;
815 ctdb_release_all_ips(ctdb);
819 * Set up an event to drop all public ips if we remain in recovery for too
820 * long
822 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
824 if (ctdb->release_ips_ctx != NULL) {
825 talloc_free(ctdb->release_ips_ctx);
827 ctdb->release_ips_ctx = talloc_new(ctdb);
828 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
830 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
831 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
832 ctdb_drop_all_ips_event, ctdb);
833 return 0;
837 set the recovery mode
839 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
840 struct ctdb_req_control_old *c,
841 TDB_DATA indata, bool *async_reply,
842 const char **errormsg)
844 uint32_t recmode = *(uint32_t *)indata.dptr;
845 struct ctdb_db_context *ctdb_db;
846 struct set_recmode_state *state;
847 struct ctdb_cluster_mutex_handle *h;
849 if (recmode == ctdb->recovery_mode) {
850 D_INFO("Recovery mode already set to %s\n",
851 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
852 return 0;
855 D_NOTICE("Recovery mode set to %s\n",
856 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
858 /* if we enter recovery but stay in recovery for too long
859 we will eventually drop all our ip addresses
861 if (recmode == CTDB_RECOVERY_ACTIVE) {
862 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
863 D_ERR("Failed to set up deferred drop all ips\n");
866 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
867 return 0;
870 /* From this point: recmode == CTDB_RECOVERY_NORMAL
872 * Therefore, what follows is special handling when setting
873 * recovery mode back to normal */
875 TALLOC_FREE(ctdb->release_ips_ctx);
877 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
878 if (ctdb_db->generation != ctdb->vnn_map->generation) {
879 DEBUG(DEBUG_ERR,
880 ("Inconsistent DB generation %u for %s\n",
881 ctdb_db->generation, ctdb_db->db_name));
882 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
883 return -1;
887 /* force the databases to thaw */
888 if (ctdb_db_all_frozen(ctdb)) {
889 ctdb_control_thaw(ctdb, false);
892 if (ctdb->recovery_lock == NULL) {
893 /* Not using recovery lock file */
894 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
895 ctdb_process_deferred_attach(ctdb);
896 return 0;
899 state = talloc_zero(ctdb, struct set_recmode_state);
900 if (state == NULL) {
901 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
902 return -1;
904 state->ctdb = ctdb;
905 state->c = NULL;
907 h = ctdb_cluster_mutex(state, ctdb, ctdb->recovery_lock, 5,
908 set_recmode_handler, state, NULL, NULL);
909 if (h == NULL) {
910 talloc_free(state);
911 return -1;
914 state->c = talloc_steal(state, c);
915 *async_reply = true;
917 return 0;
922 delete a record as part of the vacuum process
923 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
924 use non-blocking locks
926 return 0 if the record was successfully deleted (i.e. it does not exist
927 when the function returns)
928 or !0 is the record still exists in the tdb after returning.
930 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
932 TDB_DATA key, data, data2;
933 struct ctdb_ltdb_header *hdr, *hdr2;
935 /* these are really internal tdb functions - but we need them here for
936 non-blocking lock of the freelist */
937 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
938 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
941 key.dsize = rec->keylen;
942 key.dptr = &rec->data[0];
943 data.dsize = rec->datalen;
944 data.dptr = &rec->data[rec->keylen];
946 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
947 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
948 return -1;
951 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
952 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
953 return -1;
956 hdr = (struct ctdb_ltdb_header *)data.dptr;
958 /* use a non-blocking lock */
959 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
960 return -1;
963 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
964 if (data2.dptr == NULL) {
965 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
966 return 0;
969 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
970 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
971 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
972 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
974 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
975 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
977 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
978 free(data2.dptr);
979 return 0;
982 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
984 if (hdr2->rsn > hdr->rsn) {
985 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
986 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
987 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
988 free(data2.dptr);
989 return -1;
992 /* do not allow deleting record that have readonly flags set. */
993 if (hdr->flags & CTDB_REC_RO_FLAGS) {
994 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
995 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
996 free(data2.dptr);
997 return -1;
999 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1000 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1001 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1002 free(data2.dptr);
1003 return -1;
1006 if (hdr2->dmaster == ctdb->pnn) {
1007 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1008 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
1009 free(data2.dptr);
1010 return -1;
1013 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1014 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1015 free(data2.dptr);
1016 return -1;
1019 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1020 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1021 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1022 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
1023 free(data2.dptr);
1024 return -1;
1027 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1028 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1029 free(data2.dptr);
1030 return 0;
1035 struct recovery_callback_state {
1036 struct ctdb_req_control_old *c;
1041 called when the 'recovered' event script has finished
1043 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1045 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1047 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1049 if (status != 0) {
1050 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1051 if (status == -ETIME) {
1052 ctdb_ban_self(ctdb);
1056 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1057 talloc_free(state);
1059 gettimeofday(&ctdb->last_recovery_finished, NULL);
1061 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1062 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1067 recovery has finished
1069 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1070 struct ctdb_req_control_old *c,
1071 bool *async_reply)
1073 int ret;
1074 struct recovery_callback_state *state;
1076 DEBUG(DEBUG_ERR,("Recovery has finished\n"));
1078 ctdb_persistent_finish_trans3_commits(ctdb);
1080 state = talloc(ctdb, struct recovery_callback_state);
1081 CTDB_NO_MEMORY(ctdb, state);
1083 state->c = c;
1085 ret = ctdb_event_script_callback(ctdb, state,
1086 ctdb_end_recovery_callback,
1087 state,
1088 CTDB_EVENT_RECOVERED, "%s", "");
1090 if (ret != 0) {
1091 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1092 talloc_free(state);
1093 return -1;
1096 /* tell the control that we will be reply asynchronously */
1097 state->c = talloc_steal(state, c);
1098 *async_reply = true;
1099 return 0;
1103 called when the 'startrecovery' event script has finished
1105 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1107 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1109 if (status != 0) {
1110 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1113 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1114 talloc_free(state);
1117 static void run_start_recovery_event(struct ctdb_context *ctdb,
1118 struct recovery_callback_state *state)
1120 int ret;
1122 ret = ctdb_event_script_callback(ctdb, state,
1123 ctdb_start_recovery_callback,
1124 state,
1125 CTDB_EVENT_START_RECOVERY,
1126 "%s", "");
1128 if (ret != 0) {
1129 DEBUG(DEBUG_ERR,("Unable to run startrecovery event\n"));
1130 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1131 talloc_free(state);
1132 return;
1135 return;
1138 static bool reclock_strings_equal(const char *a, const char *b)
1140 return (a == NULL && b == NULL) ||
1141 (a != NULL && b != NULL && strcmp(a, b) == 0);
1144 static void start_recovery_reclock_callback(struct ctdb_context *ctdb,
1145 int32_t status,
1146 TDB_DATA data,
1147 const char *errormsg,
1148 void *private_data)
1150 struct recovery_callback_state *state = talloc_get_type_abort(
1151 private_data, struct recovery_callback_state);
1152 const char *local = ctdb->recovery_lock;
1153 const char *remote = NULL;
1155 if (status != 0) {
1156 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1157 ctdb_request_control_reply(ctdb, state->c, NULL,
1158 status, errormsg);
1159 talloc_free(state);
1160 return;
1163 /* Check reclock consistency */
1164 if (data.dsize > 0) {
1165 /* Ensure NUL-termination */
1166 data.dptr[data.dsize-1] = '\0';
1167 remote = (const char *)data.dptr;
1169 if (! reclock_strings_equal(local, remote)) {
1170 /* Inconsistent */
1171 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1172 DEBUG(DEBUG_ERR,
1173 ("Recovery lock configuration inconsistent: "
1174 "recmaster has %s, this node has %s, shutting down\n",
1175 remote == NULL ? "NULL" : remote,
1176 local == NULL ? "NULL" : local));
1177 talloc_free(state);
1178 ctdb_shutdown_sequence(ctdb, 1);
1180 DEBUG(DEBUG_INFO,
1181 ("Recovery lock consistency check successful\n"));
1183 run_start_recovery_event(ctdb, state);
1186 /* Check recovery lock consistency and run eventscripts for the
1187 * "startrecovery" event */
1188 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1189 struct ctdb_req_control_old *c,
1190 bool *async_reply)
1192 int ret;
1193 struct recovery_callback_state *state;
1194 uint32_t recmaster = c->hdr.srcnode;
1196 DEBUG(DEBUG_ERR, ("Recovery has started\n"));
1197 gettimeofday(&ctdb->last_recovery_started, NULL);
1199 state = talloc(ctdb, struct recovery_callback_state);
1200 CTDB_NO_MEMORY(ctdb, state);
1202 state->c = c;
1204 /* Although the recovery master sent this node a start
1205 * recovery control, this node might still think the recovery
1206 * master is disconnected. In this case defer the recovery
1207 * lock consistency check. */
1208 if (ctdb->nodes[recmaster]->flags & NODE_FLAGS_DISCONNECTED) {
1209 run_start_recovery_event(ctdb, state);
1210 } else {
1211 /* Ask the recovery master about its reclock setting */
1212 ret = ctdb_daemon_send_control(ctdb,
1213 recmaster,
1215 CTDB_CONTROL_GET_RECLOCK_FILE,
1216 0, 0,
1217 tdb_null,
1218 start_recovery_reclock_callback,
1219 state);
1221 if (ret != 0) {
1222 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1223 talloc_free(state);
1224 return -1;
1228 /* tell the control that we will be reply asynchronously */
1229 state->c = talloc_steal(state, c);
1230 *async_reply = true;
1232 return 0;
1236 try to delete all these records as part of the vacuuming process
1237 and return the records we failed to delete
1239 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1241 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1242 struct ctdb_db_context *ctdb_db;
1243 int i;
1244 struct ctdb_rec_data_old *rec;
1245 struct ctdb_marshall_buffer *records;
1247 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1248 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1249 return -1;
1252 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1253 if (!ctdb_db) {
1254 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1255 return -1;
1259 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1260 reply->count, reply->db_id));
1263 /* create a blob to send back the records we couldnt delete */
1264 records = (struct ctdb_marshall_buffer *)
1265 talloc_zero_size(outdata,
1266 offsetof(struct ctdb_marshall_buffer, data));
1267 if (records == NULL) {
1268 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1269 return -1;
1271 records->db_id = ctdb_db->db_id;
1274 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1275 for (i=0;i<reply->count;i++) {
1276 TDB_DATA key, data;
1278 key.dptr = &rec->data[0];
1279 key.dsize = rec->keylen;
1280 data.dptr = &rec->data[key.dsize];
1281 data.dsize = rec->datalen;
1283 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1284 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1285 talloc_free(records);
1286 return -1;
1289 /* If we cant delete the record we must add it to the reply
1290 so the lmaster knows it may not purge this record
1292 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1293 size_t old_size;
1294 struct ctdb_ltdb_header *hdr;
1296 hdr = (struct ctdb_ltdb_header *)data.dptr;
1297 data.dptr += sizeof(*hdr);
1298 data.dsize -= sizeof(*hdr);
1300 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1302 old_size = talloc_get_size(records);
1303 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1304 if (records == NULL) {
1305 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1306 return -1;
1308 records->count++;
1309 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1312 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1316 *outdata = ctdb_marshall_finish(records);
1318 return 0;
1322 * Store a record as part of the vacuum process:
1323 * This is called from the RECEIVE_RECORD control which
1324 * the lmaster uses to send the current empty copy
1325 * to all nodes for storing, before it lets the other
1326 * nodes delete the records in the second phase with
1327 * the TRY_DELETE_RECORDS control.
1329 * Only store if we are not lmaster or dmaster, and our
1330 * rsn is <= the provided rsn. Use non-blocking locks.
1332 * return 0 if the record was successfully stored.
1333 * return !0 if the record still exists in the tdb after returning.
1335 static int store_tdb_record(struct ctdb_context *ctdb,
1336 struct ctdb_db_context *ctdb_db,
1337 struct ctdb_rec_data_old *rec)
1339 TDB_DATA key, data, data2;
1340 struct ctdb_ltdb_header *hdr, *hdr2;
1341 int ret;
1343 key.dsize = rec->keylen;
1344 key.dptr = &rec->data[0];
1345 data.dsize = rec->datalen;
1346 data.dptr = &rec->data[rec->keylen];
1348 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1349 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1350 "where we are lmaster\n"));
1351 return -1;
1354 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1355 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1356 return -1;
1359 hdr = (struct ctdb_ltdb_header *)data.dptr;
1361 /* use a non-blocking lock */
1362 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1363 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1364 return -1;
1367 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1368 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1369 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1370 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1371 ret = -1;
1372 goto done;
1374 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1375 ret = 0;
1376 goto done;
1379 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1381 if (hdr2->rsn > hdr->rsn) {
1382 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1383 "rsn=%llu - called with rsn=%llu\n",
1384 (unsigned long long)hdr2->rsn,
1385 (unsigned long long)hdr->rsn));
1386 ret = -1;
1387 goto done;
1390 /* do not allow vacuuming of records that have readonly flags set. */
1391 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1392 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1393 "flags set\n"));
1394 ret = -1;
1395 goto done;
1397 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1398 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1399 "flags set\n"));
1400 ret = -1;
1401 goto done;
1404 if (hdr2->dmaster == ctdb->pnn) {
1405 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1406 "where we are the dmaster\n"));
1407 ret = -1;
1408 goto done;
1411 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1412 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1413 ret = -1;
1414 goto done;
1417 ret = 0;
1419 done:
1420 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1421 free(data2.dptr);
1422 return ret;
1428 * Try to store all these records as part of the vacuuming process
1429 * and return the records we failed to store.
1431 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1432 TDB_DATA indata, TDB_DATA *outdata)
1434 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1435 struct ctdb_db_context *ctdb_db;
1436 int i;
1437 struct ctdb_rec_data_old *rec;
1438 struct ctdb_marshall_buffer *records;
1440 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1441 DEBUG(DEBUG_ERR,
1442 (__location__ " invalid data in receive_records\n"));
1443 return -1;
1446 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1447 if (!ctdb_db) {
1448 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1449 reply->db_id));
1450 return -1;
1453 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1454 "dbid 0x%x\n", reply->count, reply->db_id));
1456 /* create a blob to send back the records we could not store */
1457 records = (struct ctdb_marshall_buffer *)
1458 talloc_zero_size(outdata,
1459 offsetof(struct ctdb_marshall_buffer, data));
1460 if (records == NULL) {
1461 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1462 return -1;
1464 records->db_id = ctdb_db->db_id;
1466 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1467 for (i=0; i<reply->count; i++) {
1468 TDB_DATA key, data;
1470 key.dptr = &rec->data[0];
1471 key.dsize = rec->keylen;
1472 data.dptr = &rec->data[key.dsize];
1473 data.dsize = rec->datalen;
1475 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1476 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1477 "in indata\n"));
1478 talloc_free(records);
1479 return -1;
1483 * If we can not store the record we must add it to the reply
1484 * so the lmaster knows it may not purge this record.
1486 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1487 size_t old_size;
1488 struct ctdb_ltdb_header *hdr;
1490 hdr = (struct ctdb_ltdb_header *)data.dptr;
1491 data.dptr += sizeof(*hdr);
1492 data.dsize -= sizeof(*hdr);
1494 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1495 "record with hash 0x%08x in vacuum "
1496 "via RECEIVE_RECORDS\n",
1497 ctdb_hash(&key)));
1499 old_size = talloc_get_size(records);
1500 records = talloc_realloc_size(outdata, records,
1501 old_size + rec->length);
1502 if (records == NULL) {
1503 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1504 "expand\n"));
1505 return -1;
1507 records->count++;
1508 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1511 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1514 *outdata = ctdb_marshall_finish(records);
1516 return 0;
1521 report capabilities
1523 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1525 uint32_t *capabilities = NULL;
1527 capabilities = talloc(outdata, uint32_t);
1528 CTDB_NO_MEMORY(ctdb, capabilities);
1529 *capabilities = ctdb->capabilities;
1531 outdata->dsize = sizeof(uint32_t);
1532 outdata->dptr = (uint8_t *)capabilities;
1534 return 0;
1537 /* The recovery daemon will ping us at regular intervals.
1538 If we havent been pinged for a while we assume the recovery
1539 daemon is inoperable and we restart.
1541 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1542 struct tevent_timer *te,
1543 struct timeval t, void *p)
1545 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1546 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1548 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1550 if (*count < ctdb->tunable.recd_ping_failcount) {
1551 (*count)++;
1552 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1553 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1554 ctdb_recd_ping_timeout, ctdb);
1555 return;
1558 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1560 ctdb_stop_recoverd(ctdb);
1561 ctdb_start_recoverd(ctdb);
1564 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1566 talloc_free(ctdb->recd_ping_count);
1568 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1569 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1571 if (ctdb->tunable.recd_ping_timeout != 0) {
1572 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1573 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1574 ctdb_recd_ping_timeout, ctdb);
1577 return 0;
1582 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1584 uint32_t new_recmaster;
1586 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1587 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1589 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1590 DEBUG(DEBUG_ERR,
1591 ("Remote node (%u) is now the recovery master\n",
1592 new_recmaster));
1595 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1596 DEBUG(DEBUG_ERR,
1597 ("This node (%u) is now the recovery master\n",
1598 ctdb->pnn));
1601 ctdb->recovery_master = new_recmaster;
1602 return 0;
1606 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1608 DEBUG(DEBUG_ERR, ("Stopping node\n"));
1609 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1611 return 0;
1614 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1616 DEBUG(DEBUG_ERR, ("Continue node\n"));
1617 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1619 return 0;