ctdb-daemon: Fix CID 1363233 Resource leak (RESOURCE_LEAK)
[Samba.git] / ctdb / server / ctdb_recover.c
blobb42f00173109f7d87f04bdb2723c5ab81b4c305e
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 #include "ctdb_cluster_mutex.h"
45 int
46 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
48 struct ctdb_vnn_map_wire *map;
49 size_t len;
51 CHECK_CONTROL_DATA_SIZE(0);
53 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
54 map = talloc_size(outdata, len);
55 CTDB_NO_MEMORY(ctdb, map);
57 map->generation = ctdb->vnn_map->generation;
58 map->size = ctdb->vnn_map->size;
59 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
61 outdata->dsize = len;
62 outdata->dptr = (uint8_t *)map;
64 return 0;
67 int
68 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
70 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
72 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
73 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
74 return -1;
77 talloc_free(ctdb->vnn_map);
79 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
80 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
82 ctdb->vnn_map->generation = map->generation;
83 ctdb->vnn_map->size = map->size;
84 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
85 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
87 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
89 return 0;
92 int
93 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
95 uint32_t i, len;
96 struct ctdb_db_context *ctdb_db;
97 struct ctdb_dbid_map_old *dbid_map;
99 CHECK_CONTROL_DATA_SIZE(0);
101 len = 0;
102 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
103 len++;
107 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
108 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
109 if (!outdata->dptr) {
110 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
111 exit(1);
114 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
115 dbid_map->num = len;
116 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
117 dbid_map->dbs[i].db_id = ctdb_db->db_id;
118 if (ctdb_db->persistent != 0) {
119 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
121 if (ctdb_db->readonly != 0) {
122 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
124 if (ctdb_db->sticky != 0) {
125 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
129 return 0;
133 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
135 CHECK_CONTROL_DATA_SIZE(0);
137 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
138 ctdb->num_nodes,
139 outdata);
140 if (outdata->dptr == NULL) {
141 return -1;
144 outdata->dsize = talloc_get_size(outdata->dptr);
146 return 0;
150 reload the nodes file
153 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
155 int i, num_nodes;
156 TALLOC_CTX *tmp_ctx;
157 struct ctdb_node **nodes;
159 tmp_ctx = talloc_new(ctdb);
161 /* steal the old nodes file for a while */
162 talloc_steal(tmp_ctx, ctdb->nodes);
163 nodes = ctdb->nodes;
164 ctdb->nodes = NULL;
165 num_nodes = ctdb->num_nodes;
166 ctdb->num_nodes = 0;
168 /* load the new nodes file */
169 ctdb_load_nodes_file(ctdb);
171 for (i=0; i<ctdb->num_nodes; i++) {
172 /* keep any identical pre-existing nodes and connections */
173 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
174 talloc_free(ctdb->nodes[i]);
175 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
176 continue;
179 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
180 continue;
183 /* any new or different nodes must be added */
184 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
185 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
186 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
188 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
189 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
190 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
194 /* tell the recovery daemon to reaload the nodes file too */
195 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
197 talloc_free(tmp_ctx);
199 return 0;
203 a traverse function for pulling all relevent records from pulldb
205 struct pulldb_data {
206 struct ctdb_context *ctdb;
207 struct ctdb_db_context *ctdb_db;
208 struct ctdb_marshall_buffer *pulldata;
209 uint32_t len;
210 uint32_t allocated_len;
211 bool failed;
214 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
216 struct pulldb_data *params = (struct pulldb_data *)p;
217 struct ctdb_rec_data_old *rec;
218 struct ctdb_context *ctdb = params->ctdb;
219 struct ctdb_db_context *ctdb_db = params->ctdb_db;
221 /* add the record to the blob */
222 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
223 if (rec == NULL) {
224 params->failed = true;
225 return -1;
227 if (params->len + rec->length >= params->allocated_len) {
228 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
229 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
231 if (params->pulldata == NULL) {
232 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
233 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
235 params->pulldata->count++;
236 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
237 params->len += rec->length;
239 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
240 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
243 talloc_free(rec);
245 return 0;
249 pull a bunch of records from a ltdb, filtering by lmaster
251 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
253 struct ctdb_pulldb *pull;
254 struct ctdb_db_context *ctdb_db;
255 struct pulldb_data params;
256 struct ctdb_marshall_buffer *reply;
258 pull = (struct ctdb_pulldb *)indata.dptr;
260 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
261 if (!ctdb_db) {
262 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
263 return -1;
266 if (!ctdb_db_frozen(ctdb_db)) {
267 DEBUG(DEBUG_ERR,
268 ("rejecting ctdb_control_pull_db when not frozen\n"));
269 return -1;
272 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
273 CTDB_NO_MEMORY(ctdb, reply);
275 reply->db_id = pull->db_id;
277 params.ctdb = ctdb;
278 params.ctdb_db = ctdb_db;
279 params.pulldata = reply;
280 params.len = offsetof(struct ctdb_marshall_buffer, data);
281 params.allocated_len = params.len;
282 params.failed = false;
284 if (ctdb_db->unhealthy_reason) {
285 /* this is just a warning, as the tdb should be empty anyway */
286 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
287 ctdb_db->db_name, ctdb_db->unhealthy_reason));
290 if (ctdb_lockdb_mark(ctdb_db) != 0) {
291 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
292 return -1;
295 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
296 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
297 ctdb_lockdb_unmark(ctdb_db);
298 talloc_free(params.pulldata);
299 return -1;
302 ctdb_lockdb_unmark(ctdb_db);
304 outdata->dptr = (uint8_t *)params.pulldata;
305 outdata->dsize = params.len;
307 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
308 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
310 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
311 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
315 return 0;
318 struct db_pull_state {
319 struct ctdb_context *ctdb;
320 struct ctdb_db_context *ctdb_db;
321 struct ctdb_marshall_buffer *recs;
322 uint32_t pnn;
323 uint64_t srvid;
324 uint32_t num_records;
327 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
328 TDB_DATA data, void *private_data)
330 struct db_pull_state *state = (struct db_pull_state *)private_data;
331 struct ctdb_marshall_buffer *recs;
333 recs = ctdb_marshall_add(state->ctdb, state->recs,
334 state->ctdb_db->db_id, 0, key, NULL, data);
335 if (recs == NULL) {
336 TALLOC_FREE(state->recs);
337 return -1;
339 state->recs = recs;
341 if (talloc_get_size(state->recs) >=
342 state->ctdb->tunable.rec_buffer_size_limit) {
343 TDB_DATA buffer;
344 int ret;
346 buffer = ctdb_marshall_finish(state->recs);
347 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
348 state->srvid, buffer);
349 if (ret != 0) {
350 TALLOC_FREE(state->recs);
351 return -1;
354 state->num_records += state->recs->count;
355 TALLOC_FREE(state->recs);
358 return 0;
361 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
362 struct ctdb_req_control_old *c,
363 TDB_DATA indata, TDB_DATA *outdata)
365 struct ctdb_pulldb_ext *pulldb_ext;
366 struct ctdb_db_context *ctdb_db;
367 struct db_pull_state state;
368 int ret;
370 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
372 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
373 if (ctdb_db == NULL) {
374 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
375 pulldb_ext->db_id));
376 return -1;
379 if (!ctdb_db_frozen(ctdb_db)) {
380 DEBUG(DEBUG_ERR,
381 ("rejecting ctdb_control_pull_db when not frozen\n"));
382 return -1;
385 if (ctdb_db->unhealthy_reason) {
386 /* this is just a warning, as the tdb should be empty anyway */
387 DEBUG(DEBUG_WARNING,
388 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
389 ctdb_db->db_name, ctdb_db->unhealthy_reason));
392 state.ctdb = ctdb;
393 state.ctdb_db = ctdb_db;
394 state.recs = NULL;
395 state.pnn = c->hdr.srcnode;
396 state.srvid = pulldb_ext->srvid;
397 state.num_records = 0;
399 if (ctdb_lockdb_mark(ctdb_db) != 0) {
400 DEBUG(DEBUG_ERR,
401 (__location__ " Failed to get lock on entire db - failing\n"));
402 return -1;
405 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
406 if (ret == -1) {
407 DEBUG(DEBUG_ERR,
408 (__location__ " Failed to get traverse db '%s'\n",
409 ctdb_db->db_name));
410 ctdb_lockdb_unmark(ctdb_db);
411 return -1;
414 /* Last few records */
415 if (state.recs != NULL) {
416 TDB_DATA buffer;
418 buffer = ctdb_marshall_finish(state.recs);
419 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
420 state.srvid, buffer);
421 if (ret != 0) {
422 TALLOC_FREE(state.recs);
423 ctdb_lockdb_unmark(ctdb_db);
424 return -1;
427 state.num_records += state.recs->count;
428 TALLOC_FREE(state.recs);
431 ctdb_lockdb_unmark(ctdb_db);
433 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
434 if (outdata->dptr == NULL) {
435 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
436 return -1;
439 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
440 outdata->dsize = sizeof(uint32_t);
442 return 0;
446 push a bunch of records into a ltdb, filtering by rsn
448 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
450 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
451 struct ctdb_db_context *ctdb_db;
452 int i, ret;
453 struct ctdb_rec_data_old *rec;
455 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
456 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
457 return -1;
460 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
461 if (!ctdb_db) {
462 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
463 return -1;
466 if (!ctdb_db_frozen(ctdb_db)) {
467 DEBUG(DEBUG_ERR,
468 ("rejecting ctdb_control_push_db when not frozen\n"));
469 return -1;
472 if (ctdb_lockdb_mark(ctdb_db) != 0) {
473 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
474 return -1;
477 rec = (struct ctdb_rec_data_old *)&reply->data[0];
479 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
480 reply->count, reply->db_id));
482 for (i=0;i<reply->count;i++) {
483 TDB_DATA key, data;
484 struct ctdb_ltdb_header *hdr;
486 key.dptr = &rec->data[0];
487 key.dsize = rec->keylen;
488 data.dptr = &rec->data[key.dsize];
489 data.dsize = rec->datalen;
491 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
492 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
493 goto failed;
495 hdr = (struct ctdb_ltdb_header *)data.dptr;
496 /* strip off any read only record flags. All readonly records
497 are revoked implicitely by a recovery
499 hdr->flags &= ~CTDB_REC_RO_FLAGS;
501 data.dptr += sizeof(*hdr);
502 data.dsize -= sizeof(*hdr);
504 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
505 if (ret != 0) {
506 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
507 goto failed;
510 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
513 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
514 reply->count, reply->db_id));
516 if (ctdb_db->readonly) {
517 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
518 ctdb_db->db_id));
519 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
520 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
521 ctdb_db->readonly = false;
522 tdb_close(ctdb_db->rottdb);
523 ctdb_db->rottdb = NULL;
524 ctdb_db->readonly = false;
526 while (ctdb_db->revokechild_active != NULL) {
527 talloc_free(ctdb_db->revokechild_active);
531 ctdb_lockdb_unmark(ctdb_db);
532 return 0;
534 failed:
535 ctdb_lockdb_unmark(ctdb_db);
536 return -1;
539 struct db_push_state {
540 struct ctdb_context *ctdb;
541 struct ctdb_db_context *ctdb_db;
542 uint64_t srvid;
543 uint32_t num_records;
544 bool failed;
547 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
548 void *private_data)
550 struct db_push_state *state = talloc_get_type(
551 private_data, struct db_push_state);
552 struct ctdb_marshall_buffer *recs;
553 struct ctdb_rec_data_old *rec;
554 int i, ret;
556 if (state->failed) {
557 return;
560 recs = (struct ctdb_marshall_buffer *)indata.dptr;
561 rec = (struct ctdb_rec_data_old *)&recs->data[0];
563 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
564 recs->count, recs->db_id));
566 for (i=0; i<recs->count; i++) {
567 TDB_DATA key, data;
568 struct ctdb_ltdb_header *hdr;
570 key.dptr = &rec->data[0];
571 key.dsize = rec->keylen;
572 data.dptr = &rec->data[key.dsize];
573 data.dsize = rec->datalen;
575 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
576 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
577 goto failed;
580 hdr = (struct ctdb_ltdb_header *)data.dptr;
581 /* Strip off any read only record flags.
582 * All readonly records are revoked implicitely by a recovery.
584 hdr->flags &= ~CTDB_REC_RO_FLAGS;
586 data.dptr += sizeof(*hdr);
587 data.dsize -= sizeof(*hdr);
589 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
590 if (ret != 0) {
591 DEBUG(DEBUG_ERR,
592 (__location__ " Unable to store record\n"));
593 goto failed;
596 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
599 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
600 recs->count, recs->db_id));
602 state->num_records += recs->count;
603 return;
605 failed:
606 state->failed = true;
609 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
611 struct ctdb_pulldb_ext *pulldb_ext;
612 struct ctdb_db_context *ctdb_db;
613 struct db_push_state *state;
614 int ret;
616 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
618 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
619 if (ctdb_db == NULL) {
620 DEBUG(DEBUG_ERR,
621 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
622 return -1;
625 if (!ctdb_db_frozen(ctdb_db)) {
626 DEBUG(DEBUG_ERR,
627 ("rejecting ctdb_control_db_push_start when not frozen\n"));
628 return -1;
631 if (ctdb_db->push_started) {
632 DEBUG(DEBUG_WARNING,
633 (__location__ " DB push already started for %s\n",
634 ctdb_db->db_name));
636 /* De-register old state */
637 state = (struct db_push_state *)ctdb_db->push_state;
638 if (state != NULL) {
639 srvid_deregister(ctdb->srv, state->srvid, state);
640 talloc_free(state);
641 ctdb_db->push_state = NULL;
645 state = talloc_zero(ctdb_db, struct db_push_state);
646 if (state == NULL) {
647 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
648 return -1;
651 state->ctdb = ctdb;
652 state->ctdb_db = ctdb_db;
653 state->srvid = pulldb_ext->srvid;
654 state->failed = false;
656 ret = srvid_register(ctdb->srv, state, state->srvid,
657 db_push_msg_handler, state);
658 if (ret != 0) {
659 DEBUG(DEBUG_ERR,
660 (__location__ " Failed to register srvid for db push\n"));
661 talloc_free(state);
662 return -1;
665 if (ctdb_lockdb_mark(ctdb_db) != 0) {
666 DEBUG(DEBUG_ERR,
667 (__location__ " Failed to get lock on entire db - failing\n"));
668 srvid_deregister(ctdb->srv, state->srvid, state);
669 talloc_free(state);
670 return -1;
673 ctdb_db->push_started = true;
674 ctdb_db->push_state = state;
676 return 0;
679 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
680 TDB_DATA indata, TDB_DATA *outdata)
682 uint32_t db_id;
683 struct ctdb_db_context *ctdb_db;
684 struct db_push_state *state;
686 db_id = *(uint32_t *)indata.dptr;
688 ctdb_db = find_ctdb_db(ctdb, db_id);
689 if (ctdb_db == NULL) {
690 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
691 return -1;
694 if (!ctdb_db_frozen(ctdb_db)) {
695 DEBUG(DEBUG_ERR,
696 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
697 return -1;
700 if (!ctdb_db->push_started) {
701 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
702 return -1;
705 if (ctdb_db->readonly) {
706 DEBUG(DEBUG_ERR,
707 ("Clearing the tracking database for dbid 0x%x\n",
708 ctdb_db->db_id));
709 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
710 DEBUG(DEBUG_ERR,
711 ("Failed to wipe tracking database for 0x%x."
712 " Dropping read-only delegation support\n",
713 ctdb_db->db_id));
714 ctdb_db->readonly = false;
715 tdb_close(ctdb_db->rottdb);
716 ctdb_db->rottdb = NULL;
717 ctdb_db->readonly = false;
720 while (ctdb_db->revokechild_active != NULL) {
721 talloc_free(ctdb_db->revokechild_active);
725 ctdb_lockdb_unmark(ctdb_db);
727 state = (struct db_push_state *)ctdb_db->push_state;
728 if (state == NULL) {
729 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
730 return -1;
733 srvid_deregister(ctdb->srv, state->srvid, state);
735 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
736 if (outdata->dptr == NULL) {
737 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
738 talloc_free(state);
739 ctdb_db->push_state = NULL;
740 return -1;
743 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
744 outdata->dsize = sizeof(uint32_t);
746 talloc_free(state);
747 ctdb_db->push_started = false;
748 ctdb_db->push_state = NULL;
750 return 0;
753 struct set_recmode_state {
754 struct ctdb_context *ctdb;
755 struct ctdb_req_control_old *c;
758 static void set_recmode_handler(char status,
759 double latency,
760 void *private_data)
762 struct set_recmode_state *state = talloc_get_type_abort(
763 private_data, struct set_recmode_state);
764 int s = 0;
765 const char *err = NULL;
767 switch (status) {
768 case '0':
769 /* Mutex taken */
770 DEBUG(DEBUG_ERR,
771 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
772 state->ctdb->recovery_lock));
773 s = -1;
774 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
775 break;
777 case '1':
778 /* Contention */
779 DEBUG(DEBUG_DEBUG, (__location__ " Recovery lock check OK\n"));
780 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
781 ctdb_process_deferred_attach(state->ctdb);
783 s = 0;
785 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock",
786 reclock.ctdbd, latency);
787 break;
789 case '2':
790 /* Timeout. Consider this a success, not a failure,
791 * as we failed to set the recovery lock which is what
792 * we wanted. This can be caused by the cluster
793 * filesystem being very slow to arbitrate locks
794 * immediately after a node failure. */
795 DEBUG(DEBUG_WARNING,
796 (__location__
797 "Time out getting recovery lock, allowing recmode set anyway\n"));
798 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
799 ctdb_process_deferred_attach(state->ctdb);
801 s = 0;
802 break;
804 default:
805 DEBUG(DEBUG_ERR,
806 ("Unexpected error when testing recovery lock\n"));
807 s = -1;
808 err = "Unexpected error when testing recovery lock";
811 ctdb_request_control_reply(state->ctdb, state->c, NULL, s, err);
812 talloc_free(state);
815 static void
816 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
817 struct timeval t, void *private_data)
819 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
821 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
822 talloc_free(ctdb->release_ips_ctx);
823 ctdb->release_ips_ctx = NULL;
825 ctdb_release_all_ips(ctdb);
829 * Set up an event to drop all public ips if we remain in recovery for too
830 * long
832 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
834 if (ctdb->release_ips_ctx != NULL) {
835 talloc_free(ctdb->release_ips_ctx);
837 ctdb->release_ips_ctx = talloc_new(ctdb);
838 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
840 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
841 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
842 ctdb_drop_all_ips_event, ctdb);
843 return 0;
847 set the recovery mode
849 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
850 struct ctdb_req_control_old *c,
851 TDB_DATA indata, bool *async_reply,
852 const char **errormsg)
854 uint32_t recmode = *(uint32_t *)indata.dptr;
855 struct ctdb_db_context *ctdb_db;
856 struct set_recmode_state *state;
857 struct ctdb_cluster_mutex_handle *h;
859 /* if we enter recovery but stay in recovery for too long
860 we will eventually drop all our ip addresses
862 if (recmode == CTDB_RECOVERY_NORMAL) {
863 talloc_free(ctdb->release_ips_ctx);
864 ctdb->release_ips_ctx = NULL;
865 } else {
866 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
867 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
871 if (recmode != ctdb->recovery_mode) {
872 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
873 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
876 if (recmode != CTDB_RECOVERY_NORMAL ||
877 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
878 ctdb->recovery_mode = recmode;
879 return 0;
882 /* From this point: recmode == CTDB_RECOVERY_NORMAL
884 * Therefore, what follows is special handling when setting
885 * recovery mode back to normal */
887 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
888 if (ctdb_db->generation != ctdb->vnn_map->generation) {
889 DEBUG(DEBUG_ERR,
890 ("Inconsistent DB generation %u for %s\n",
891 ctdb_db->generation, ctdb_db->db_name));
892 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
893 return -1;
897 /* force the databases to thaw */
898 if (ctdb_db_all_frozen(ctdb)) {
899 ctdb_control_thaw(ctdb, false);
902 if (ctdb->recovery_lock == NULL) {
903 /* Not using recovery lock file */
904 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
905 ctdb_process_deferred_attach(ctdb);
906 return 0;
909 state = talloc_zero(ctdb, struct set_recmode_state);
910 if (state == NULL) {
911 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
912 return -1;
914 state->ctdb = ctdb;
915 state->c = NULL;
917 h = ctdb_cluster_mutex(state, ctdb, ctdb->recovery_lock, 5,
918 set_recmode_handler, state, NULL, NULL);
919 if (h == NULL) {
920 talloc_free(state);
921 return -1;
924 state->c = talloc_steal(state, c);
925 *async_reply = true;
927 return 0;
932 delete a record as part of the vacuum process
933 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
934 use non-blocking locks
936 return 0 if the record was successfully deleted (i.e. it does not exist
937 when the function returns)
938 or !0 is the record still exists in the tdb after returning.
940 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
942 TDB_DATA key, data, data2;
943 struct ctdb_ltdb_header *hdr, *hdr2;
945 /* these are really internal tdb functions - but we need them here for
946 non-blocking lock of the freelist */
947 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
948 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
951 key.dsize = rec->keylen;
952 key.dptr = &rec->data[0];
953 data.dsize = rec->datalen;
954 data.dptr = &rec->data[rec->keylen];
956 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
957 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
958 return -1;
961 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
962 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
963 return -1;
966 hdr = (struct ctdb_ltdb_header *)data.dptr;
968 /* use a non-blocking lock */
969 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
970 return -1;
973 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
974 if (data2.dptr == NULL) {
975 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
976 return 0;
979 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
980 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
981 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
982 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
984 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
985 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
987 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
988 free(data2.dptr);
989 return 0;
992 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
994 if (hdr2->rsn > hdr->rsn) {
995 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
996 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
997 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
998 free(data2.dptr);
999 return -1;
1002 /* do not allow deleting record that have readonly flags set. */
1003 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1004 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1005 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1006 free(data2.dptr);
1007 return -1;
1009 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1010 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1011 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1012 free(data2.dptr);
1013 return -1;
1016 if (hdr2->dmaster == ctdb->pnn) {
1017 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1018 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
1019 free(data2.dptr);
1020 return -1;
1023 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1024 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1025 free(data2.dptr);
1026 return -1;
1029 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1030 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1031 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1032 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
1033 free(data2.dptr);
1034 return -1;
1037 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1038 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1039 free(data2.dptr);
1040 return 0;
1045 struct recovery_callback_state {
1046 struct ctdb_req_control_old *c;
1051 called when the 'recovered' event script has finished
1053 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1055 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1057 ctdb_enable_monitoring(ctdb);
1058 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1060 if (status != 0) {
1061 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1062 if (status == -ETIME) {
1063 ctdb_ban_self(ctdb);
1067 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1068 talloc_free(state);
1070 gettimeofday(&ctdb->last_recovery_finished, NULL);
1072 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1073 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1078 recovery has finished
1080 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1081 struct ctdb_req_control_old *c,
1082 bool *async_reply)
1084 int ret;
1085 struct recovery_callback_state *state;
1087 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1089 ctdb_persistent_finish_trans3_commits(ctdb);
1091 state = talloc(ctdb, struct recovery_callback_state);
1092 CTDB_NO_MEMORY(ctdb, state);
1094 state->c = c;
1096 ctdb_disable_monitoring(ctdb);
1098 ret = ctdb_event_script_callback(ctdb, state,
1099 ctdb_end_recovery_callback,
1100 state,
1101 CTDB_EVENT_RECOVERED, "%s", "");
1103 if (ret != 0) {
1104 ctdb_enable_monitoring(ctdb);
1106 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1107 talloc_free(state);
1108 return -1;
1111 /* tell the control that we will be reply asynchronously */
1112 state->c = talloc_steal(state, c);
1113 *async_reply = true;
1114 return 0;
1118 called when the 'startrecovery' event script has finished
1120 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1122 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1124 if (status != 0) {
1125 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1128 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1129 talloc_free(state);
1132 static void run_start_recovery_event(struct ctdb_context *ctdb,
1133 struct recovery_callback_state *state)
1135 int ret;
1137 ctdb_disable_monitoring(ctdb);
1139 ret = ctdb_event_script_callback(ctdb, state,
1140 ctdb_start_recovery_callback,
1141 state,
1142 CTDB_EVENT_START_RECOVERY,
1143 "%s", "");
1145 if (ret != 0) {
1146 DEBUG(DEBUG_ERR,("Unable to run startrecovery event\n"));
1147 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1148 talloc_free(state);
1149 return;
1152 return;
1155 static bool reclock_strings_equal(const char *a, const char *b)
1157 return (a == NULL && b == NULL) ||
1158 (a != NULL && b != NULL && strcmp(a, b) == 0);
1161 static void start_recovery_reclock_callback(struct ctdb_context *ctdb,
1162 int32_t status,
1163 TDB_DATA data,
1164 const char *errormsg,
1165 void *private_data)
1167 struct recovery_callback_state *state = talloc_get_type_abort(
1168 private_data, struct recovery_callback_state);
1169 const char *local = ctdb->recovery_lock;
1170 const char *remote = NULL;
1172 if (status != 0) {
1173 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1174 ctdb_request_control_reply(ctdb, state->c, NULL,
1175 status, errormsg);
1176 talloc_free(state);
1177 return;
1180 /* Check reclock consistency */
1181 if (data.dsize > 0) {
1182 /* Ensure NUL-termination */
1183 data.dptr[data.dsize-1] = '\0';
1184 remote = (const char *)data.dptr;
1186 if (! reclock_strings_equal(local, remote)) {
1187 /* Inconsistent */
1188 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1189 DEBUG(DEBUG_ERR,
1190 ("Recovery lock configuration inconsistent: "
1191 "recmaster has %s, this node has %s, shutting down\n",
1192 remote == NULL ? "NULL" : remote,
1193 local == NULL ? "NULL" : local));
1194 talloc_free(state);
1195 ctdb_shutdown_sequence(ctdb, 1);
1197 DEBUG(DEBUG_INFO,
1198 ("Recovery lock consistency check successful\n"));
1200 run_start_recovery_event(ctdb, state);
1203 /* Check recovery lock consistency and run eventscripts for the
1204 * "startrecovery" event */
1205 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1206 struct ctdb_req_control_old *c,
1207 bool *async_reply)
1209 int ret;
1210 struct recovery_callback_state *state;
1211 uint32_t recmaster = c->hdr.srcnode;
1213 DEBUG(DEBUG_NOTICE, ("Recovery has started\n"));
1214 gettimeofday(&ctdb->last_recovery_started, NULL);
1216 state = talloc(ctdb, struct recovery_callback_state);
1217 CTDB_NO_MEMORY(ctdb, state);
1219 state->c = c;
1221 /* Although the recovery master sent this node a start
1222 * recovery control, this node might still think the recovery
1223 * master is disconnected. In this case defer the recovery
1224 * lock consistency check. */
1225 if (ctdb->nodes[recmaster]->flags & NODE_FLAGS_DISCONNECTED) {
1226 run_start_recovery_event(ctdb, state);
1227 } else {
1228 /* Ask the recovery master about its reclock setting */
1229 ret = ctdb_daemon_send_control(ctdb,
1230 recmaster,
1232 CTDB_CONTROL_GET_RECLOCK_FILE,
1233 0, 0,
1234 tdb_null,
1235 start_recovery_reclock_callback,
1236 state);
1238 if (ret != 0) {
1239 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1240 talloc_free(state);
1241 return -1;
1245 /* tell the control that we will be reply asynchronously */
1246 state->c = talloc_steal(state, c);
1247 *async_reply = true;
1249 return 0;
1253 try to delete all these records as part of the vacuuming process
1254 and return the records we failed to delete
1256 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1258 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1259 struct ctdb_db_context *ctdb_db;
1260 int i;
1261 struct ctdb_rec_data_old *rec;
1262 struct ctdb_marshall_buffer *records;
1264 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1265 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1266 return -1;
1269 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1270 if (!ctdb_db) {
1271 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1272 return -1;
1276 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1277 reply->count, reply->db_id));
1280 /* create a blob to send back the records we couldnt delete */
1281 records = (struct ctdb_marshall_buffer *)
1282 talloc_zero_size(outdata,
1283 offsetof(struct ctdb_marshall_buffer, data));
1284 if (records == NULL) {
1285 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1286 return -1;
1288 records->db_id = ctdb_db->db_id;
1291 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1292 for (i=0;i<reply->count;i++) {
1293 TDB_DATA key, data;
1295 key.dptr = &rec->data[0];
1296 key.dsize = rec->keylen;
1297 data.dptr = &rec->data[key.dsize];
1298 data.dsize = rec->datalen;
1300 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1301 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1302 talloc_free(records);
1303 return -1;
1306 /* If we cant delete the record we must add it to the reply
1307 so the lmaster knows it may not purge this record
1309 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1310 size_t old_size;
1311 struct ctdb_ltdb_header *hdr;
1313 hdr = (struct ctdb_ltdb_header *)data.dptr;
1314 data.dptr += sizeof(*hdr);
1315 data.dsize -= sizeof(*hdr);
1317 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1319 old_size = talloc_get_size(records);
1320 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1321 if (records == NULL) {
1322 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1323 return -1;
1325 records->count++;
1326 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1329 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1333 *outdata = ctdb_marshall_finish(records);
1335 return 0;
1339 * Store a record as part of the vacuum process:
1340 * This is called from the RECEIVE_RECORD control which
1341 * the lmaster uses to send the current empty copy
1342 * to all nodes for storing, before it lets the other
1343 * nodes delete the records in the second phase with
1344 * the TRY_DELETE_RECORDS control.
1346 * Only store if we are not lmaster or dmaster, and our
1347 * rsn is <= the provided rsn. Use non-blocking locks.
1349 * return 0 if the record was successfully stored.
1350 * return !0 if the record still exists in the tdb after returning.
1352 static int store_tdb_record(struct ctdb_context *ctdb,
1353 struct ctdb_db_context *ctdb_db,
1354 struct ctdb_rec_data_old *rec)
1356 TDB_DATA key, data, data2;
1357 struct ctdb_ltdb_header *hdr, *hdr2;
1358 int ret;
1360 key.dsize = rec->keylen;
1361 key.dptr = &rec->data[0];
1362 data.dsize = rec->datalen;
1363 data.dptr = &rec->data[rec->keylen];
1365 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1366 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1367 "where we are lmaster\n"));
1368 return -1;
1371 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1372 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1373 return -1;
1376 hdr = (struct ctdb_ltdb_header *)data.dptr;
1378 /* use a non-blocking lock */
1379 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1380 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1381 return -1;
1384 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1385 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1386 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1387 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1388 ret = -1;
1389 goto done;
1391 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1392 ret = 0;
1393 goto done;
1396 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1398 if (hdr2->rsn > hdr->rsn) {
1399 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1400 "rsn=%llu - called with rsn=%llu\n",
1401 (unsigned long long)hdr2->rsn,
1402 (unsigned long long)hdr->rsn));
1403 ret = -1;
1404 goto done;
1407 /* do not allow vacuuming of records that have readonly flags set. */
1408 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1409 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1410 "flags set\n"));
1411 ret = -1;
1412 goto done;
1414 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1415 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1416 "flags set\n"));
1417 ret = -1;
1418 goto done;
1421 if (hdr2->dmaster == ctdb->pnn) {
1422 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1423 "where we are the dmaster\n"));
1424 ret = -1;
1425 goto done;
1428 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1429 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1430 ret = -1;
1431 goto done;
1434 ret = 0;
1436 done:
1437 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1438 free(data2.dptr);
1439 return ret;
1445 * Try to store all these records as part of the vacuuming process
1446 * and return the records we failed to store.
1448 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1449 TDB_DATA indata, TDB_DATA *outdata)
1451 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1452 struct ctdb_db_context *ctdb_db;
1453 int i;
1454 struct ctdb_rec_data_old *rec;
1455 struct ctdb_marshall_buffer *records;
1457 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1458 DEBUG(DEBUG_ERR,
1459 (__location__ " invalid data in receive_records\n"));
1460 return -1;
1463 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1464 if (!ctdb_db) {
1465 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1466 reply->db_id));
1467 return -1;
1470 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1471 "dbid 0x%x\n", reply->count, reply->db_id));
1473 /* create a blob to send back the records we could not store */
1474 records = (struct ctdb_marshall_buffer *)
1475 talloc_zero_size(outdata,
1476 offsetof(struct ctdb_marshall_buffer, data));
1477 if (records == NULL) {
1478 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1479 return -1;
1481 records->db_id = ctdb_db->db_id;
1483 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1484 for (i=0; i<reply->count; i++) {
1485 TDB_DATA key, data;
1487 key.dptr = &rec->data[0];
1488 key.dsize = rec->keylen;
1489 data.dptr = &rec->data[key.dsize];
1490 data.dsize = rec->datalen;
1492 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1493 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1494 "in indata\n"));
1495 return -1;
1499 * If we can not store the record we must add it to the reply
1500 * so the lmaster knows it may not purge this record.
1502 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1503 size_t old_size;
1504 struct ctdb_ltdb_header *hdr;
1506 hdr = (struct ctdb_ltdb_header *)data.dptr;
1507 data.dptr += sizeof(*hdr);
1508 data.dsize -= sizeof(*hdr);
1510 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1511 "record with hash 0x%08x in vacuum "
1512 "via RECEIVE_RECORDS\n",
1513 ctdb_hash(&key)));
1515 old_size = talloc_get_size(records);
1516 records = talloc_realloc_size(outdata, records,
1517 old_size + rec->length);
1518 if (records == NULL) {
1519 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1520 "expand\n"));
1521 return -1;
1523 records->count++;
1524 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1527 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1530 *outdata = ctdb_marshall_finish(records);
1532 return 0;
1537 report capabilities
1539 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1541 uint32_t *capabilities = NULL;
1543 capabilities = talloc(outdata, uint32_t);
1544 CTDB_NO_MEMORY(ctdb, capabilities);
1545 *capabilities = ctdb->capabilities;
1547 outdata->dsize = sizeof(uint32_t);
1548 outdata->dptr = (uint8_t *)capabilities;
1550 return 0;
1553 /* The recovery daemon will ping us at regular intervals.
1554 If we havent been pinged for a while we assume the recovery
1555 daemon is inoperable and we restart.
1557 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1558 struct tevent_timer *te,
1559 struct timeval t, void *p)
1561 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1562 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1564 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1566 if (*count < ctdb->tunable.recd_ping_failcount) {
1567 (*count)++;
1568 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1569 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1570 ctdb_recd_ping_timeout, ctdb);
1571 return;
1574 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1576 ctdb_stop_recoverd(ctdb);
1577 ctdb_start_recoverd(ctdb);
1580 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1582 talloc_free(ctdb->recd_ping_count);
1584 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1585 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1587 if (ctdb->tunable.recd_ping_timeout != 0) {
1588 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1589 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1590 ctdb_recd_ping_timeout, ctdb);
1593 return 0;
1598 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1600 uint32_t new_recmaster;
1602 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1603 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1605 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1606 DEBUG(DEBUG_NOTICE,
1607 ("Remote node (%u) is now the recovery master\n",
1608 new_recmaster));
1611 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1612 DEBUG(DEBUG_NOTICE,
1613 ("This node (%u) is now the recovery master\n",
1614 ctdb->pnn));
1617 ctdb->recovery_master = new_recmaster;
1618 return 0;
1622 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1624 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1625 ctdb_disable_monitoring(ctdb);
1626 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1628 return 0;
1631 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1633 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1634 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1636 return 0;