ctdbd_conn: Use sys_poll_intr
[Samba.git] / ctdb / server / ctdb_recover.c
blob102854564bc9e8fe97c48aa6b07bd5f6689f6125
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 int
44 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
46 struct ctdb_vnn_map_wire *map;
47 size_t len;
49 CHECK_CONTROL_DATA_SIZE(0);
51 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
52 map = talloc_size(outdata, len);
53 CTDB_NO_MEMORY(ctdb, map);
55 map->generation = ctdb->vnn_map->generation;
56 map->size = ctdb->vnn_map->size;
57 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
59 outdata->dsize = len;
60 outdata->dptr = (uint8_t *)map;
62 return 0;
65 int
66 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
68 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
70 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
71 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
72 return -1;
75 talloc_free(ctdb->vnn_map);
77 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
78 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
80 ctdb->vnn_map->generation = map->generation;
81 ctdb->vnn_map->size = map->size;
82 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
83 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
85 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
87 return 0;
90 int
91 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
93 uint32_t i, len;
94 struct ctdb_db_context *ctdb_db;
95 struct ctdb_dbid_map_old *dbid_map;
97 CHECK_CONTROL_DATA_SIZE(0);
99 len = 0;
100 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
101 len++;
105 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
106 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
107 if (!outdata->dptr) {
108 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
109 exit(1);
112 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
113 dbid_map->num = len;
114 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
115 dbid_map->dbs[i].db_id = ctdb_db->db_id;
116 if (ctdb_db->persistent != 0) {
117 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
119 if (ctdb_db->readonly != 0) {
120 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
122 if (ctdb_db->sticky != 0) {
123 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
127 return 0;
131 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
133 CHECK_CONTROL_DATA_SIZE(0);
135 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
136 ctdb->num_nodes,
137 outdata);
138 if (outdata->dptr == NULL) {
139 return -1;
142 outdata->dsize = talloc_get_size(outdata->dptr);
144 return 0;
148 reload the nodes file
151 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
153 int i, num_nodes;
154 TALLOC_CTX *tmp_ctx;
155 struct ctdb_node **nodes;
157 tmp_ctx = talloc_new(ctdb);
159 /* steal the old nodes file for a while */
160 talloc_steal(tmp_ctx, ctdb->nodes);
161 nodes = ctdb->nodes;
162 ctdb->nodes = NULL;
163 num_nodes = ctdb->num_nodes;
164 ctdb->num_nodes = 0;
166 /* load the new nodes file */
167 ctdb_load_nodes_file(ctdb);
169 for (i=0; i<ctdb->num_nodes; i++) {
170 /* keep any identical pre-existing nodes and connections */
171 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
172 talloc_free(ctdb->nodes[i]);
173 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
174 continue;
177 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
178 continue;
181 /* any new or different nodes must be added */
182 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
183 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
184 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
186 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
187 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
188 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
192 /* tell the recovery daemon to reaload the nodes file too */
193 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
195 talloc_free(tmp_ctx);
197 return 0;
201 a traverse function for pulling all relevent records from pulldb
203 struct pulldb_data {
204 struct ctdb_context *ctdb;
205 struct ctdb_db_context *ctdb_db;
206 struct ctdb_marshall_buffer *pulldata;
207 uint32_t len;
208 uint32_t allocated_len;
209 bool failed;
212 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
214 struct pulldb_data *params = (struct pulldb_data *)p;
215 struct ctdb_rec_data_old *rec;
216 struct ctdb_context *ctdb = params->ctdb;
217 struct ctdb_db_context *ctdb_db = params->ctdb_db;
219 /* add the record to the blob */
220 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
221 if (rec == NULL) {
222 params->failed = true;
223 return -1;
225 if (params->len + rec->length >= params->allocated_len) {
226 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
227 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
229 if (params->pulldata == NULL) {
230 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
231 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
233 params->pulldata->count++;
234 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
235 params->len += rec->length;
237 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
238 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
241 talloc_free(rec);
243 return 0;
247 pull a bunch of records from a ltdb, filtering by lmaster
249 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
251 struct ctdb_pulldb *pull;
252 struct ctdb_db_context *ctdb_db;
253 struct pulldb_data params;
254 struct ctdb_marshall_buffer *reply;
256 pull = (struct ctdb_pulldb *)indata.dptr;
258 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
259 if (!ctdb_db) {
260 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
261 return -1;
264 if (!ctdb_db_frozen(ctdb_db)) {
265 DEBUG(DEBUG_ERR,
266 ("rejecting ctdb_control_pull_db when not frozen\n"));
267 return -1;
270 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
271 CTDB_NO_MEMORY(ctdb, reply);
273 reply->db_id = pull->db_id;
275 params.ctdb = ctdb;
276 params.ctdb_db = ctdb_db;
277 params.pulldata = reply;
278 params.len = offsetof(struct ctdb_marshall_buffer, data);
279 params.allocated_len = params.len;
280 params.failed = false;
282 if (ctdb_db->unhealthy_reason) {
283 /* this is just a warning, as the tdb should be empty anyway */
284 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
285 ctdb_db->db_name, ctdb_db->unhealthy_reason));
288 if (ctdb_lockdb_mark(ctdb_db) != 0) {
289 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
290 return -1;
293 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
294 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
295 ctdb_lockdb_unmark(ctdb_db);
296 talloc_free(params.pulldata);
297 return -1;
300 ctdb_lockdb_unmark(ctdb_db);
302 outdata->dptr = (uint8_t *)params.pulldata;
303 outdata->dsize = params.len;
305 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
306 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
308 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
309 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
313 return 0;
316 struct db_pull_state {
317 struct ctdb_context *ctdb;
318 struct ctdb_db_context *ctdb_db;
319 struct ctdb_marshall_buffer *recs;
320 uint32_t pnn;
321 uint64_t srvid;
322 uint32_t num_records;
325 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
326 TDB_DATA data, void *private_data)
328 struct db_pull_state *state = (struct db_pull_state *)private_data;
329 struct ctdb_marshall_buffer *recs;
331 recs = ctdb_marshall_add(state->ctdb, state->recs,
332 state->ctdb_db->db_id, 0, key, NULL, data);
333 if (recs == NULL) {
334 TALLOC_FREE(state->recs);
335 return -1;
337 state->recs = recs;
339 if (talloc_get_size(state->recs) >=
340 state->ctdb->tunable.rec_buffer_size_limit) {
341 TDB_DATA buffer;
342 int ret;
344 buffer = ctdb_marshall_finish(state->recs);
345 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
346 state->srvid, buffer);
347 if (ret != 0) {
348 TALLOC_FREE(state->recs);
349 return -1;
352 state->num_records += state->recs->count;
353 TALLOC_FREE(state->recs);
356 return 0;
359 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
360 struct ctdb_req_control_old *c,
361 TDB_DATA indata, TDB_DATA *outdata)
363 struct ctdb_pulldb_ext *pulldb_ext;
364 struct ctdb_db_context *ctdb_db;
365 struct db_pull_state state;
366 int ret;
368 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
370 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
371 if (ctdb_db == NULL) {
372 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
373 pulldb_ext->db_id));
374 return -1;
377 if (!ctdb_db_frozen(ctdb_db)) {
378 DEBUG(DEBUG_ERR,
379 ("rejecting ctdb_control_pull_db when not frozen\n"));
380 return -1;
383 if (ctdb_db->unhealthy_reason) {
384 /* this is just a warning, as the tdb should be empty anyway */
385 DEBUG(DEBUG_WARNING,
386 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
387 ctdb_db->db_name, ctdb_db->unhealthy_reason));
390 state.ctdb = ctdb;
391 state.ctdb_db = ctdb_db;
392 state.recs = NULL;
393 state.pnn = c->hdr.srcnode;
394 state.srvid = pulldb_ext->srvid;
395 state.num_records = 0;
397 if (ctdb_lockdb_mark(ctdb_db) != 0) {
398 DEBUG(DEBUG_ERR,
399 (__location__ " Failed to get lock on entire db - failing\n"));
400 return -1;
403 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
404 if (ret == -1) {
405 DEBUG(DEBUG_ERR,
406 (__location__ " Failed to get traverse db '%s'\n",
407 ctdb_db->db_name));
408 ctdb_lockdb_unmark(ctdb_db);
409 return -1;
412 /* Last few records */
413 if (state.recs != NULL) {
414 TDB_DATA buffer;
416 buffer = ctdb_marshall_finish(state.recs);
417 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
418 state.srvid, buffer);
419 if (ret != 0) {
420 TALLOC_FREE(state.recs);
421 ctdb_lockdb_unmark(ctdb_db);
422 return -1;
425 state.num_records += state.recs->count;
426 TALLOC_FREE(state.recs);
429 ctdb_lockdb_unmark(ctdb_db);
431 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
432 if (outdata->dptr == NULL) {
433 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
434 return -1;
437 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
438 outdata->dsize = sizeof(uint32_t);
440 return 0;
444 push a bunch of records into a ltdb, filtering by rsn
446 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
448 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
449 struct ctdb_db_context *ctdb_db;
450 int i, ret;
451 struct ctdb_rec_data_old *rec;
453 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
454 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
455 return -1;
458 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
459 if (!ctdb_db) {
460 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
461 return -1;
464 if (!ctdb_db_frozen(ctdb_db)) {
465 DEBUG(DEBUG_ERR,
466 ("rejecting ctdb_control_push_db when not frozen\n"));
467 return -1;
470 if (ctdb_lockdb_mark(ctdb_db) != 0) {
471 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
472 return -1;
475 rec = (struct ctdb_rec_data_old *)&reply->data[0];
477 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
478 reply->count, reply->db_id));
480 for (i=0;i<reply->count;i++) {
481 TDB_DATA key, data;
482 struct ctdb_ltdb_header *hdr;
484 key.dptr = &rec->data[0];
485 key.dsize = rec->keylen;
486 data.dptr = &rec->data[key.dsize];
487 data.dsize = rec->datalen;
489 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
490 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
491 goto failed;
493 hdr = (struct ctdb_ltdb_header *)data.dptr;
494 /* strip off any read only record flags. All readonly records
495 are revoked implicitely by a recovery
497 hdr->flags &= ~CTDB_REC_RO_FLAGS;
499 data.dptr += sizeof(*hdr);
500 data.dsize -= sizeof(*hdr);
502 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
503 if (ret != 0) {
504 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
505 goto failed;
508 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
511 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
512 reply->count, reply->db_id));
514 if (ctdb_db->readonly) {
515 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
516 ctdb_db->db_id));
517 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
518 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
519 ctdb_db->readonly = false;
520 tdb_close(ctdb_db->rottdb);
521 ctdb_db->rottdb = NULL;
522 ctdb_db->readonly = false;
524 while (ctdb_db->revokechild_active != NULL) {
525 talloc_free(ctdb_db->revokechild_active);
529 ctdb_lockdb_unmark(ctdb_db);
530 return 0;
532 failed:
533 ctdb_lockdb_unmark(ctdb_db);
534 return -1;
537 struct db_push_state {
538 struct ctdb_context *ctdb;
539 struct ctdb_db_context *ctdb_db;
540 uint64_t srvid;
541 uint32_t num_records;
542 bool failed;
545 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
546 void *private_data)
548 struct db_push_state *state = talloc_get_type(
549 private_data, struct db_push_state);
550 struct ctdb_marshall_buffer *recs;
551 struct ctdb_rec_data_old *rec;
552 int i, ret;
554 if (state->failed) {
555 return;
558 recs = (struct ctdb_marshall_buffer *)indata.dptr;
559 rec = (struct ctdb_rec_data_old *)&recs->data[0];
561 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
562 recs->count, recs->db_id));
564 for (i=0; i<recs->count; i++) {
565 TDB_DATA key, data;
566 struct ctdb_ltdb_header *hdr;
568 key.dptr = &rec->data[0];
569 key.dsize = rec->keylen;
570 data.dptr = &rec->data[key.dsize];
571 data.dsize = rec->datalen;
573 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
574 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
575 goto failed;
578 hdr = (struct ctdb_ltdb_header *)data.dptr;
579 /* Strip off any read only record flags.
580 * All readonly records are revoked implicitely by a recovery.
582 hdr->flags &= ~CTDB_REC_RO_FLAGS;
584 data.dptr += sizeof(*hdr);
585 data.dsize -= sizeof(*hdr);
587 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
588 if (ret != 0) {
589 DEBUG(DEBUG_ERR,
590 (__location__ " Unable to store record\n"));
591 goto failed;
594 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
597 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
598 recs->count, recs->db_id));
600 state->num_records += recs->count;
601 return;
603 failed:
604 state->failed = true;
607 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
609 struct ctdb_pulldb_ext *pulldb_ext;
610 struct ctdb_db_context *ctdb_db;
611 struct db_push_state *state;
612 int ret;
614 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
616 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
617 if (ctdb_db == NULL) {
618 DEBUG(DEBUG_ERR,
619 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
620 return -1;
623 if (!ctdb_db_frozen(ctdb_db)) {
624 DEBUG(DEBUG_ERR,
625 ("rejecting ctdb_control_db_push_start when not frozen\n"));
626 return -1;
629 if (ctdb_db->push_started) {
630 DEBUG(DEBUG_WARNING,
631 (__location__ " DB push already started for %s\n",
632 ctdb_db->db_name));
634 /* De-register old state */
635 state = (struct db_push_state *)ctdb_db->push_state;
636 if (state != NULL) {
637 srvid_deregister(ctdb->srv, state->srvid, state);
638 talloc_free(state);
639 ctdb_db->push_state = NULL;
643 state = talloc_zero(ctdb_db, struct db_push_state);
644 if (state == NULL) {
645 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
646 return -1;
649 state->ctdb = ctdb;
650 state->ctdb_db = ctdb_db;
651 state->srvid = pulldb_ext->srvid;
652 state->failed = false;
654 ret = srvid_register(ctdb->srv, state, state->srvid,
655 db_push_msg_handler, state);
656 if (ret != 0) {
657 DEBUG(DEBUG_ERR,
658 (__location__ " Failed to register srvid for db push\n"));
659 talloc_free(state);
660 return -1;
663 if (ctdb_lockdb_mark(ctdb_db) != 0) {
664 DEBUG(DEBUG_ERR,
665 (__location__ " Failed to get lock on entire db - failing\n"));
666 srvid_deregister(ctdb->srv, state->srvid, state);
667 talloc_free(state);
668 return -1;
671 ctdb_db->push_started = true;
672 ctdb_db->push_state = state;
674 return 0;
677 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
678 TDB_DATA indata, TDB_DATA *outdata)
680 uint32_t db_id;
681 struct ctdb_db_context *ctdb_db;
682 struct db_push_state *state;
684 db_id = *(uint32_t *)indata.dptr;
686 ctdb_db = find_ctdb_db(ctdb, db_id);
687 if (ctdb_db == NULL) {
688 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
689 return -1;
692 if (!ctdb_db_frozen(ctdb_db)) {
693 DEBUG(DEBUG_ERR,
694 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
695 return -1;
698 if (!ctdb_db->push_started) {
699 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
700 return -1;
703 if (ctdb_db->readonly) {
704 DEBUG(DEBUG_ERR,
705 ("Clearing the tracking database for dbid 0x%x\n",
706 ctdb_db->db_id));
707 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
708 DEBUG(DEBUG_ERR,
709 ("Failed to wipe tracking database for 0x%x."
710 " Dropping read-only delegation support\n",
711 ctdb_db->db_id));
712 ctdb_db->readonly = false;
713 tdb_close(ctdb_db->rottdb);
714 ctdb_db->rottdb = NULL;
715 ctdb_db->readonly = false;
718 while (ctdb_db->revokechild_active != NULL) {
719 talloc_free(ctdb_db->revokechild_active);
723 ctdb_lockdb_unmark(ctdb_db);
725 state = (struct db_push_state *)ctdb_db->push_state;
726 if (state == NULL) {
727 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
728 return -1;
731 srvid_deregister(ctdb->srv, state->srvid, state);
733 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
734 if (outdata->dptr == NULL) {
735 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
736 talloc_free(state);
737 ctdb_db->push_state = NULL;
738 return -1;
741 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
742 outdata->dsize = sizeof(uint32_t);
744 talloc_free(state);
745 ctdb_db->push_state = NULL;
747 return 0;
750 struct ctdb_set_recmode_state {
751 struct ctdb_context *ctdb;
752 struct ctdb_req_control_old *c;
753 int fd[2];
754 struct tevent_timer *te;
755 struct tevent_fd *fde;
756 pid_t child;
757 struct timeval start_time;
761 called if our set_recmode child times out. this would happen if
762 ctdb_recovery_lock() would block.
764 static void ctdb_set_recmode_timeout(struct tevent_context *ev,
765 struct tevent_timer *te,
766 struct timeval t, void *private_data)
768 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
769 struct ctdb_set_recmode_state);
771 /* we consider this a success, not a failure, as we failed to
772 set the recovery lock which is what we wanted. This can be
773 caused by the cluster filesystem being very slow to
774 arbitrate locks immediately after a node failure.
776 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
777 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
778 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
779 talloc_free(state);
783 /* when we free the recmode state we must kill any child process.
785 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
787 if (state->fd[0] != -1) {
788 state->fd[0] = -1;
790 ctdb_kill(state->ctdb, state->child, SIGKILL);
791 return 0;
794 /* this is called when the client process has completed ctdb_recovery_lock()
795 and has written data back to us through the pipe.
797 static void set_recmode_handler(struct tevent_context *ev,
798 struct tevent_fd *fde,
799 uint16_t flags, void *private_data)
801 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
802 struct ctdb_set_recmode_state);
803 char c = 0;
804 int ret;
805 int status = 0;
806 const char *err = NULL;
808 /* we got a response from our child process so we can abort the
809 timeout.
811 talloc_free(state->te);
812 state->te = NULL;
814 ret = sys_read(state->fd[0], &c, 1);
815 if (ret == 1) {
816 /* Child wrote status. EACCES indicates that it was unable
817 * to take the lock, which is the expected outcome.
818 * 0 indicates that it was able to take the
819 * lock, which is an error because the recovery daemon
820 * should be holding the lock. */
821 double l = timeval_elapsed(&state->start_time);
823 if (c == EACCES) {
824 status = 0;
825 err = NULL;
827 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
829 /* release any deferred attach calls from clients */
830 ctdb_process_deferred_attach(state->ctdb);
832 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
833 } else {
834 status = -1;
835 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
837 } else {
838 /* Child did not write status. Unexpected error.
839 * Child may have received a signal. */
840 status = -1;
841 err = "Unexpected error when testing recovery lock";
844 ctdb_request_control_reply(state->ctdb, state->c, NULL, status, err);
845 talloc_free(state);
848 static void
849 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
850 struct timeval t, void *private_data)
852 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
854 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
855 talloc_free(ctdb->release_ips_ctx);
856 ctdb->release_ips_ctx = NULL;
858 ctdb_release_all_ips(ctdb);
862 * Set up an event to drop all public ips if we remain in recovery for too
863 * long
865 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
867 if (ctdb->release_ips_ctx != NULL) {
868 talloc_free(ctdb->release_ips_ctx);
870 ctdb->release_ips_ctx = talloc_new(ctdb);
871 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
873 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
874 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
875 ctdb_drop_all_ips_event, ctdb);
876 return 0;
880 set the recovery mode
882 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
883 struct ctdb_req_control_old *c,
884 TDB_DATA indata, bool *async_reply,
885 const char **errormsg)
887 uint32_t recmode = *(uint32_t *)indata.dptr;
888 int i, ret;
889 struct ctdb_set_recmode_state *state;
890 pid_t parent = getpid();
891 struct ctdb_db_context *ctdb_db;
893 /* if we enter recovery but stay in recovery for too long
894 we will eventually drop all our ip addresses
896 if (recmode == CTDB_RECOVERY_NORMAL) {
897 talloc_free(ctdb->release_ips_ctx);
898 ctdb->release_ips_ctx = NULL;
899 } else {
900 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
901 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
905 if (recmode != ctdb->recovery_mode) {
906 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
907 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
910 if (recmode != CTDB_RECOVERY_NORMAL ||
911 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
912 ctdb->recovery_mode = recmode;
913 return 0;
916 /* From this point: recmode == CTDB_RECOVERY_NORMAL
918 * Therefore, what follows is special handling when setting
919 * recovery mode back to normal */
921 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
922 if (ctdb_db->generation != ctdb->vnn_map->generation) {
923 DEBUG(DEBUG_ERR,
924 ("Inconsistent DB generation %u for %s\n",
925 ctdb_db->generation, ctdb_db->db_name));
926 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
927 return -1;
931 /* force the databases to thaw */
932 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
933 if (ctdb_db_prio_frozen(ctdb, i)) {
934 ctdb_control_thaw(ctdb, i, false);
938 /* release any deferred attach calls from clients */
939 if (recmode == CTDB_RECOVERY_NORMAL) {
940 ctdb_process_deferred_attach(ctdb);
943 if (ctdb->recovery_lock_file == NULL) {
944 /* Not using recovery lock file */
945 ctdb->recovery_mode = recmode;
946 return 0;
949 state = talloc(ctdb, struct ctdb_set_recmode_state);
950 CTDB_NO_MEMORY(ctdb, state);
952 state->start_time = timeval_current();
953 state->fd[0] = -1;
954 state->fd[1] = -1;
956 /* For the rest of what needs to be done, we need to do this in
957 a child process since
958 1, the call to ctdb_recovery_lock() can block if the cluster
959 filesystem is in the process of recovery.
961 ret = pipe(state->fd);
962 if (ret != 0) {
963 talloc_free(state);
964 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
965 return -1;
968 state->child = ctdb_fork(ctdb);
969 if (state->child == (pid_t)-1) {
970 close(state->fd[0]);
971 close(state->fd[1]);
972 talloc_free(state);
973 return -1;
976 if (state->child == 0) {
977 char cc = EACCES;
978 close(state->fd[0]);
980 prctl_set_comment("ctdb_recmode");
981 debug_extra = talloc_asprintf(NULL, "set_recmode:");
982 /* Daemon should not be able to get the recover lock,
983 * as it should be held by the recovery master */
984 if (ctdb_recovery_lock(ctdb)) {
985 DEBUG(DEBUG_ERR,
986 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
987 ctdb->recovery_lock_file));
988 ctdb_recovery_unlock(ctdb);
989 cc = 0;
992 sys_write(state->fd[1], &cc, 1);
993 ctdb_wait_for_process_to_exit(parent);
994 _exit(0);
996 close(state->fd[1]);
997 set_close_on_exec(state->fd[0]);
999 state->fd[1] = -1;
1001 talloc_set_destructor(state, set_recmode_destructor);
1003 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
1005 state->te = tevent_add_timer(ctdb->ev, state, timeval_current_ofs(5, 0),
1006 ctdb_set_recmode_timeout, state);
1008 state->fde = tevent_add_fd(ctdb->ev, state, state->fd[0], TEVENT_FD_READ,
1009 set_recmode_handler, (void *)state);
1011 if (state->fde == NULL) {
1012 talloc_free(state);
1013 return -1;
1015 tevent_fd_set_auto_close(state->fde);
1017 state->ctdb = ctdb;
1018 state->c = talloc_steal(state, c);
1020 *async_reply = true;
1022 return 0;
1026 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
1028 return ctdb->recovery_lock_fd != -1;
1032 try and get the recovery lock in shared storage - should only work
1033 on the recovery master recovery daemon. Anywhere else is a bug
1035 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
1037 struct flock lock;
1039 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
1040 O_RDWR|O_CREAT, 0600);
1041 if (ctdb->recovery_lock_fd == -1) {
1042 DEBUG(DEBUG_ERR,
1043 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
1044 ctdb->recovery_lock_file, strerror(errno)));
1045 return false;
1048 set_close_on_exec(ctdb->recovery_lock_fd);
1050 lock.l_type = F_WRLCK;
1051 lock.l_whence = SEEK_SET;
1052 lock.l_start = 0;
1053 lock.l_len = 1;
1054 lock.l_pid = 0;
1056 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
1057 int saved_errno = errno;
1058 close(ctdb->recovery_lock_fd);
1059 ctdb->recovery_lock_fd = -1;
1060 /* Fail silently on these errors, since they indicate
1061 * lock contention, but log an error for any other
1062 * failure. */
1063 if (saved_errno != EACCES &&
1064 saved_errno != EAGAIN) {
1065 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
1066 "recovery lock on '%s' - (%s)\n",
1067 ctdb->recovery_lock_file,
1068 strerror(saved_errno)));
1070 return false;
1073 return true;
1076 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
1078 if (ctdb->recovery_lock_fd != -1) {
1079 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
1080 close(ctdb->recovery_lock_fd);
1081 ctdb->recovery_lock_fd = -1;
1086 delete a record as part of the vacuum process
1087 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
1088 use non-blocking locks
1090 return 0 if the record was successfully deleted (i.e. it does not exist
1091 when the function returns)
1092 or !0 is the record still exists in the tdb after returning.
1094 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
1096 TDB_DATA key, data, data2;
1097 struct ctdb_ltdb_header *hdr, *hdr2;
1099 /* these are really internal tdb functions - but we need them here for
1100 non-blocking lock of the freelist */
1101 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
1102 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
1105 key.dsize = rec->keylen;
1106 key.dptr = &rec->data[0];
1107 data.dsize = rec->datalen;
1108 data.dptr = &rec->data[rec->keylen];
1110 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1111 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
1112 return -1;
1115 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1116 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
1117 return -1;
1120 hdr = (struct ctdb_ltdb_header *)data.dptr;
1122 /* use a non-blocking lock */
1123 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1124 return -1;
1127 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1128 if (data2.dptr == NULL) {
1129 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1130 return 0;
1133 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1134 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
1135 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1136 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
1138 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1139 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
1141 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1142 free(data2.dptr);
1143 return 0;
1146 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1148 if (hdr2->rsn > hdr->rsn) {
1149 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1150 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
1151 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
1152 free(data2.dptr);
1153 return -1;
1156 /* do not allow deleting record that have readonly flags set. */
1157 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1158 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1159 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1160 free(data2.dptr);
1161 return -1;
1163 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1164 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1165 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1166 free(data2.dptr);
1167 return -1;
1170 if (hdr2->dmaster == ctdb->pnn) {
1171 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1172 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
1173 free(data2.dptr);
1174 return -1;
1177 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1178 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1179 free(data2.dptr);
1180 return -1;
1183 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1184 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1185 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1186 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
1187 free(data2.dptr);
1188 return -1;
1191 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1192 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1193 free(data2.dptr);
1194 return 0;
1199 struct recovery_callback_state {
1200 struct ctdb_req_control_old *c;
1205 called when the 'recovered' event script has finished
1207 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1209 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1211 ctdb_enable_monitoring(ctdb);
1212 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1214 if (status != 0) {
1215 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1216 if (status == -ETIME) {
1217 ctdb_ban_self(ctdb);
1221 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1222 talloc_free(state);
1224 gettimeofday(&ctdb->last_recovery_finished, NULL);
1226 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1227 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1232 recovery has finished
1234 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1235 struct ctdb_req_control_old *c,
1236 bool *async_reply)
1238 int ret;
1239 struct recovery_callback_state *state;
1241 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1243 ctdb_persistent_finish_trans3_commits(ctdb);
1245 state = talloc(ctdb, struct recovery_callback_state);
1246 CTDB_NO_MEMORY(ctdb, state);
1248 state->c = c;
1250 ctdb_disable_monitoring(ctdb);
1252 ret = ctdb_event_script_callback(ctdb, state,
1253 ctdb_end_recovery_callback,
1254 state,
1255 CTDB_EVENT_RECOVERED, "%s", "");
1257 if (ret != 0) {
1258 ctdb_enable_monitoring(ctdb);
1260 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1261 talloc_free(state);
1262 return -1;
1265 /* tell the control that we will be reply asynchronously */
1266 state->c = talloc_steal(state, c);
1267 *async_reply = true;
1268 return 0;
1272 called when the 'startrecovery' event script has finished
1274 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1276 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1278 if (status != 0) {
1279 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1282 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1283 talloc_free(state);
1287 run the startrecovery eventscript
1289 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1290 struct ctdb_req_control_old *c,
1291 bool *async_reply)
1293 int ret;
1294 struct recovery_callback_state *state;
1296 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1297 gettimeofday(&ctdb->last_recovery_started, NULL);
1299 state = talloc(ctdb, struct recovery_callback_state);
1300 CTDB_NO_MEMORY(ctdb, state);
1302 state->c = talloc_steal(state, c);
1304 ctdb_disable_monitoring(ctdb);
1306 ret = ctdb_event_script_callback(ctdb, state,
1307 ctdb_start_recovery_callback,
1308 state,
1309 CTDB_EVENT_START_RECOVERY,
1310 "%s", "");
1312 if (ret != 0) {
1313 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1314 talloc_free(state);
1315 return -1;
1318 /* tell the control that we will be reply asynchronously */
1319 *async_reply = true;
1320 return 0;
1324 try to delete all these records as part of the vacuuming process
1325 and return the records we failed to delete
1327 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1329 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1330 struct ctdb_db_context *ctdb_db;
1331 int i;
1332 struct ctdb_rec_data_old *rec;
1333 struct ctdb_marshall_buffer *records;
1335 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1336 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1337 return -1;
1340 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1341 if (!ctdb_db) {
1342 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1343 return -1;
1347 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1348 reply->count, reply->db_id));
1351 /* create a blob to send back the records we couldnt delete */
1352 records = (struct ctdb_marshall_buffer *)
1353 talloc_zero_size(outdata,
1354 offsetof(struct ctdb_marshall_buffer, data));
1355 if (records == NULL) {
1356 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1357 return -1;
1359 records->db_id = ctdb_db->db_id;
1362 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1363 for (i=0;i<reply->count;i++) {
1364 TDB_DATA key, data;
1366 key.dptr = &rec->data[0];
1367 key.dsize = rec->keylen;
1368 data.dptr = &rec->data[key.dsize];
1369 data.dsize = rec->datalen;
1371 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1372 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1373 return -1;
1376 /* If we cant delete the record we must add it to the reply
1377 so the lmaster knows it may not purge this record
1379 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1380 size_t old_size;
1381 struct ctdb_ltdb_header *hdr;
1383 hdr = (struct ctdb_ltdb_header *)data.dptr;
1384 data.dptr += sizeof(*hdr);
1385 data.dsize -= sizeof(*hdr);
1387 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1389 old_size = talloc_get_size(records);
1390 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1391 if (records == NULL) {
1392 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1393 return -1;
1395 records->count++;
1396 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1399 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1403 *outdata = ctdb_marshall_finish(records);
1405 return 0;
1409 * Store a record as part of the vacuum process:
1410 * This is called from the RECEIVE_RECORD control which
1411 * the lmaster uses to send the current empty copy
1412 * to all nodes for storing, before it lets the other
1413 * nodes delete the records in the second phase with
1414 * the TRY_DELETE_RECORDS control.
1416 * Only store if we are not lmaster or dmaster, and our
1417 * rsn is <= the provided rsn. Use non-blocking locks.
1419 * return 0 if the record was successfully stored.
1420 * return !0 if the record still exists in the tdb after returning.
1422 static int store_tdb_record(struct ctdb_context *ctdb,
1423 struct ctdb_db_context *ctdb_db,
1424 struct ctdb_rec_data_old *rec)
1426 TDB_DATA key, data, data2;
1427 struct ctdb_ltdb_header *hdr, *hdr2;
1428 int ret;
1430 key.dsize = rec->keylen;
1431 key.dptr = &rec->data[0];
1432 data.dsize = rec->datalen;
1433 data.dptr = &rec->data[rec->keylen];
1435 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1436 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1437 "where we are lmaster\n"));
1438 return -1;
1441 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1442 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1443 return -1;
1446 hdr = (struct ctdb_ltdb_header *)data.dptr;
1448 /* use a non-blocking lock */
1449 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1450 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1451 return -1;
1454 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1455 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1456 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1457 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1458 ret = -1;
1459 goto done;
1461 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1462 ret = 0;
1463 goto done;
1466 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1468 if (hdr2->rsn > hdr->rsn) {
1469 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1470 "rsn=%llu - called with rsn=%llu\n",
1471 (unsigned long long)hdr2->rsn,
1472 (unsigned long long)hdr->rsn));
1473 ret = -1;
1474 goto done;
1477 /* do not allow vacuuming of records that have readonly flags set. */
1478 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1479 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1480 "flags set\n"));
1481 ret = -1;
1482 goto done;
1484 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1485 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1486 "flags set\n"));
1487 ret = -1;
1488 goto done;
1491 if (hdr2->dmaster == ctdb->pnn) {
1492 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1493 "where we are the dmaster\n"));
1494 ret = -1;
1495 goto done;
1498 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1499 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1500 ret = -1;
1501 goto done;
1504 ret = 0;
1506 done:
1507 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1508 free(data2.dptr);
1509 return ret;
1515 * Try to store all these records as part of the vacuuming process
1516 * and return the records we failed to store.
1518 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1519 TDB_DATA indata, TDB_DATA *outdata)
1521 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1522 struct ctdb_db_context *ctdb_db;
1523 int i;
1524 struct ctdb_rec_data_old *rec;
1525 struct ctdb_marshall_buffer *records;
1527 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1528 DEBUG(DEBUG_ERR,
1529 (__location__ " invalid data in receive_records\n"));
1530 return -1;
1533 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1534 if (!ctdb_db) {
1535 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1536 reply->db_id));
1537 return -1;
1540 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1541 "dbid 0x%x\n", reply->count, reply->db_id));
1543 /* create a blob to send back the records we could not store */
1544 records = (struct ctdb_marshall_buffer *)
1545 talloc_zero_size(outdata,
1546 offsetof(struct ctdb_marshall_buffer, data));
1547 if (records == NULL) {
1548 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1549 return -1;
1551 records->db_id = ctdb_db->db_id;
1553 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1554 for (i=0; i<reply->count; i++) {
1555 TDB_DATA key, data;
1557 key.dptr = &rec->data[0];
1558 key.dsize = rec->keylen;
1559 data.dptr = &rec->data[key.dsize];
1560 data.dsize = rec->datalen;
1562 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1563 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1564 "in indata\n"));
1565 return -1;
1569 * If we can not store the record we must add it to the reply
1570 * so the lmaster knows it may not purge this record.
1572 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1573 size_t old_size;
1574 struct ctdb_ltdb_header *hdr;
1576 hdr = (struct ctdb_ltdb_header *)data.dptr;
1577 data.dptr += sizeof(*hdr);
1578 data.dsize -= sizeof(*hdr);
1580 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1581 "record with hash 0x%08x in vacuum "
1582 "via RECEIVE_RECORDS\n",
1583 ctdb_hash(&key)));
1585 old_size = talloc_get_size(records);
1586 records = talloc_realloc_size(outdata, records,
1587 old_size + rec->length);
1588 if (records == NULL) {
1589 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1590 "expand\n"));
1591 return -1;
1593 records->count++;
1594 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1597 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1600 *outdata = ctdb_marshall_finish(records);
1602 return 0;
1607 report capabilities
1609 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1611 uint32_t *capabilities = NULL;
1613 capabilities = talloc(outdata, uint32_t);
1614 CTDB_NO_MEMORY(ctdb, capabilities);
1615 *capabilities = ctdb->capabilities;
1617 outdata->dsize = sizeof(uint32_t);
1618 outdata->dptr = (uint8_t *)capabilities;
1620 return 0;
1623 /* The recovery daemon will ping us at regular intervals.
1624 If we havent been pinged for a while we assume the recovery
1625 daemon is inoperable and we restart.
1627 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1628 struct tevent_timer *te,
1629 struct timeval t, void *p)
1631 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1632 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1634 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1636 if (*count < ctdb->tunable.recd_ping_failcount) {
1637 (*count)++;
1638 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1639 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1640 ctdb_recd_ping_timeout, ctdb);
1641 return;
1644 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1646 ctdb_stop_recoverd(ctdb);
1647 ctdb_start_recoverd(ctdb);
1650 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1652 talloc_free(ctdb->recd_ping_count);
1654 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1655 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1657 if (ctdb->tunable.recd_ping_timeout != 0) {
1658 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1659 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1660 ctdb_recd_ping_timeout, ctdb);
1663 return 0;
1668 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1670 uint32_t new_recmaster;
1672 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1673 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1675 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1676 DEBUG(DEBUG_NOTICE,
1677 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1680 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1681 DEBUG(DEBUG_NOTICE,
1682 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1685 ctdb->recovery_master = new_recmaster;
1686 return 0;
1690 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1692 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1693 ctdb_disable_monitoring(ctdb);
1694 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1696 return 0;
1699 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1701 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1702 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1704 return 0;