ctdb-recovery: Replace use of ctdb_dbid_map with local db_list
[Samba.git] / ctdb / server / ctdb_recovery_helper.c
blobdf96240d8dafafd72963b220b35e15a7ba3c74db
1 /*
2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/sys_rw.h"
32 #include "lib/util/time.h"
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/util.h"
36 #include "protocol/protocol.h"
37 #include "protocol/protocol_api.h"
38 #include "client/client.h"
40 #include "common/logging.h"
42 static int recover_timeout = 30;
44 #define NUM_RETRIES 3
46 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
49 * Utility functions
52 static bool generic_recv(struct tevent_req *req, int *perr)
54 int err;
56 if (tevent_req_is_unix_error(req, &err)) {
57 if (perr != NULL) {
58 *perr = err;
60 return false;
63 return true;
66 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
68 static uint64_t srvid_next(void)
70 rec_srvid += 1;
71 return rec_srvid;
75 * Node related functions
78 struct node_list {
79 uint32_t *pnn_list;
80 uint32_t *caps;
81 uint32_t *ban_credits;
82 unsigned int size;
83 unsigned int count;
86 static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size)
88 struct node_list *nlist;
89 unsigned int i;
91 nlist = talloc_zero(mem_ctx, struct node_list);
92 if (nlist == NULL) {
93 return NULL;
96 nlist->pnn_list = talloc_array(nlist, uint32_t, size);
97 nlist->caps = talloc_zero_array(nlist, uint32_t, size);
98 nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size);
100 if (nlist->pnn_list == NULL ||
101 nlist->caps == NULL ||
102 nlist->ban_credits == NULL) {
103 talloc_free(nlist);
104 return NULL;
106 nlist->size = size;
108 for (i=0; i<nlist->size; i++) {
109 nlist->pnn_list[i] = CTDB_UNKNOWN_PNN;
112 return nlist;
115 static bool node_list_add(struct node_list *nlist, uint32_t pnn)
117 unsigned int i;
119 if (nlist->count == nlist->size) {
120 return false;
123 for (i=0; i<nlist->count; i++) {
124 if (nlist->pnn_list[i] == pnn) {
125 return false;
129 nlist->pnn_list[nlist->count] = pnn;
130 nlist->count += 1;
132 return true;
135 static uint32_t *node_list_lmaster(struct node_list *nlist,
136 TALLOC_CTX *mem_ctx,
137 unsigned int *pnn_count)
139 uint32_t *pnn_list;
140 unsigned int count, i;
142 pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count);
143 if (pnn_list == NULL) {
144 return NULL;
147 count = 0;
148 for (i=0; i<nlist->count; i++) {
149 if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) {
150 continue;
153 pnn_list[count] = nlist->pnn_list[i];
154 count += 1;
157 *pnn_count = count;
158 return pnn_list;
161 static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn)
163 unsigned int i;
165 for (i=0; i<nlist->count; i++) {
166 if (nlist->pnn_list[i] == pnn) {
167 nlist->ban_credits[i] += 1;
168 break;
174 * Database list functions
176 * Simple, naive implementation that could be updated to a db_hash or similar
179 struct db {
180 struct db *prev, *next;
182 uint32_t db_id;
183 uint32_t db_flags;
184 uint32_t *pnn_list;
185 unsigned int num_nodes;
188 struct db_list {
189 unsigned int num_dbs;
190 struct db *db;
191 unsigned int num_nodes;
194 static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes)
196 struct db_list *l;
198 l = talloc_zero(mem_ctx, struct db_list);
199 l->num_nodes = num_nodes;
201 return l;
204 static struct db *db_list_find(struct db_list *dblist, uint32_t db_id)
206 struct db *db;
208 if (dblist == NULL) {
209 return NULL;
212 db = dblist->db;
213 while (db != NULL && db->db_id != db_id) {
214 db = db->next;
217 return db;
220 static int db_list_add(struct db_list *dblist,
221 uint32_t db_id,
222 uint32_t db_flags,
223 uint32_t node)
225 struct db *db = NULL;
227 if (dblist == NULL) {
228 return EINVAL;
231 db = talloc_zero(dblist, struct db);
232 if (db == NULL) {
233 return ENOMEM;
236 db->db_id = db_id;
237 db->db_flags = db_flags;
238 db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes);
239 if (db->pnn_list == NULL) {
240 talloc_free(db);
241 return ENOMEM;
243 db->pnn_list[0] = node;
244 db->num_nodes = 1;
246 DLIST_ADD_END(dblist->db, db);
247 dblist->num_dbs++;
249 return 0;
252 static int db_list_check_and_add(struct db_list *dblist,
253 uint32_t db_id,
254 uint32_t db_flags,
255 uint32_t node)
257 struct db *db = NULL;
258 int ret;
261 * These flags are masked out because they are only set on a
262 * node when a client attaches to that node, so they might not
263 * be set yet. They can't be passed as part of the attch, so
264 * they're no use here.
266 db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY);
268 if (dblist == NULL) {
269 return EINVAL;
272 db = db_list_find(dblist, db_id);
273 if (db == NULL) {
274 ret = db_list_add(dblist, db_id, db_flags, node);
275 return ret;
278 if (db->db_flags != db_flags) {
279 D_ERR("Incompatible database flags for 0x%"PRIx32" "
280 "(0x%"PRIx32" != 0x%"PRIx32")\n",
281 db_id,
282 db_flags,
283 db->db_flags);
284 return EINVAL;
287 if (db->num_nodes >= dblist->num_nodes) {
288 return EINVAL;
291 db->pnn_list[db->num_nodes] = node;
292 db->num_nodes++;
294 return 0;
298 * Recovery database functions
301 struct recdb_context {
302 uint32_t db_id;
303 const char *db_name;
304 const char *db_path;
305 struct tdb_wrap *db;
306 bool persistent;
309 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
310 const char *db_name,
311 const char *db_path,
312 uint32_t hash_size, bool persistent)
314 static char *db_dir_state = NULL;
315 struct recdb_context *recdb;
316 unsigned int tdb_flags;
318 recdb = talloc(mem_ctx, struct recdb_context);
319 if (recdb == NULL) {
320 return NULL;
323 if (db_dir_state == NULL) {
324 db_dir_state = getenv("CTDB_DBDIR_STATE");
327 recdb->db_name = db_name;
328 recdb->db_id = db_id;
329 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
330 db_dir_state != NULL ?
331 db_dir_state :
332 dirname(discard_const(db_path)),
333 db_name);
334 if (recdb->db_path == NULL) {
335 talloc_free(recdb);
336 return NULL;
338 unlink(recdb->db_path);
340 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
341 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
342 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
343 if (recdb->db == NULL) {
344 talloc_free(recdb);
345 D_ERR("failed to create recovery db %s\n", recdb->db_path);
346 return NULL;
349 recdb->persistent = persistent;
351 return recdb;
354 static uint32_t recdb_id(struct recdb_context *recdb)
356 return recdb->db_id;
359 static const char *recdb_name(struct recdb_context *recdb)
361 return recdb->db_name;
364 static const char *recdb_path(struct recdb_context *recdb)
366 return recdb->db_path;
369 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
371 return recdb->db->tdb;
374 static bool recdb_persistent(struct recdb_context *recdb)
376 return recdb->persistent;
379 struct recdb_add_traverse_state {
380 struct recdb_context *recdb;
381 uint32_t mypnn;
384 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
385 TDB_DATA key, TDB_DATA data,
386 void *private_data)
388 struct recdb_add_traverse_state *state =
389 (struct recdb_add_traverse_state *)private_data;
390 struct ctdb_ltdb_header *hdr;
391 TDB_DATA prev_data;
392 int ret;
394 /* header is not marshalled separately in the pulldb control */
395 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
396 return -1;
399 hdr = (struct ctdb_ltdb_header *)data.dptr;
401 /* fetch the existing record, if any */
402 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
404 if (prev_data.dptr != NULL) {
405 struct ctdb_ltdb_header prev_hdr;
407 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
408 free(prev_data.dptr);
409 if (hdr->rsn < prev_hdr.rsn ||
410 (hdr->rsn == prev_hdr.rsn &&
411 prev_hdr.dmaster != state->mypnn)) {
412 return 0;
416 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
417 if (ret != 0) {
418 return -1;
420 return 0;
423 static bool recdb_add(struct recdb_context *recdb, int mypnn,
424 struct ctdb_rec_buffer *recbuf)
426 struct recdb_add_traverse_state state;
427 int ret;
429 state.recdb = recdb;
430 state.mypnn = mypnn;
432 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
433 if (ret != 0) {
434 return false;
437 return true;
440 /* This function decides which records from recdb are retained */
441 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
442 uint32_t reqid, uint32_t dmaster,
443 TDB_DATA key, TDB_DATA data)
445 struct ctdb_ltdb_header *header;
446 int ret;
448 /* Skip empty records */
449 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
450 return 0;
453 /* update the dmaster field to point to us */
454 header = (struct ctdb_ltdb_header *)data.dptr;
455 if (!persistent) {
456 header->dmaster = dmaster;
457 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
460 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
461 if (ret != 0) {
462 return ret;
465 return 0;
468 struct recdb_records_traverse_state {
469 struct ctdb_rec_buffer *recbuf;
470 uint32_t dmaster;
471 uint32_t reqid;
472 bool persistent;
473 bool failed;
476 static int recdb_records_traverse(struct tdb_context *tdb,
477 TDB_DATA key, TDB_DATA data,
478 void *private_data)
480 struct recdb_records_traverse_state *state =
481 (struct recdb_records_traverse_state *)private_data;
482 int ret;
484 ret = recbuf_filter_add(state->recbuf, state->persistent,
485 state->reqid, state->dmaster, key, data);
486 if (ret != 0) {
487 state->failed = true;
488 return ret;
491 return 0;
494 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
495 TALLOC_CTX *mem_ctx,
496 uint32_t dmaster)
498 struct recdb_records_traverse_state state;
499 int ret;
501 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
502 if (state.recbuf == NULL) {
503 return NULL;
505 state.dmaster = dmaster;
506 state.reqid = 0;
507 state.persistent = recdb_persistent(recdb);
508 state.failed = false;
510 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
511 &state);
512 if (ret == -1 || state.failed) {
513 D_ERR("Failed to marshall recovery records for %s\n",
514 recdb_name(recdb));
515 TALLOC_FREE(state.recbuf);
516 return NULL;
519 return state.recbuf;
522 struct recdb_file_traverse_state {
523 struct ctdb_rec_buffer *recbuf;
524 struct recdb_context *recdb;
525 TALLOC_CTX *mem_ctx;
526 uint32_t dmaster;
527 uint32_t reqid;
528 bool persistent;
529 bool failed;
530 int fd;
531 size_t max_size;
532 unsigned int num_buffers;
535 static int recdb_file_traverse(struct tdb_context *tdb,
536 TDB_DATA key, TDB_DATA data,
537 void *private_data)
539 struct recdb_file_traverse_state *state =
540 (struct recdb_file_traverse_state *)private_data;
541 int ret;
543 ret = recbuf_filter_add(state->recbuf, state->persistent,
544 state->reqid, state->dmaster, key, data);
545 if (ret != 0) {
546 state->failed = true;
547 return ret;
550 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
551 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
552 if (ret != 0) {
553 D_ERR("Failed to collect recovery records for %s\n",
554 recdb_name(state->recdb));
555 state->failed = true;
556 return ret;
559 state->num_buffers += 1;
561 TALLOC_FREE(state->recbuf);
562 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
563 recdb_id(state->recdb));
564 if (state->recbuf == NULL) {
565 state->failed = true;
566 return ENOMEM;
570 return 0;
573 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
574 uint32_t dmaster, int fd, int max_size)
576 struct recdb_file_traverse_state state;
577 int ret;
579 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
580 if (state.recbuf == NULL) {
581 return -1;
583 state.recdb = recdb;
584 state.mem_ctx = mem_ctx;
585 state.dmaster = dmaster;
586 state.reqid = 0;
587 state.persistent = recdb_persistent(recdb);
588 state.failed = false;
589 state.fd = fd;
590 state.max_size = max_size;
591 state.num_buffers = 0;
593 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
594 if (ret == -1 || state.failed) {
595 TALLOC_FREE(state.recbuf);
596 return -1;
599 ret = ctdb_rec_buffer_write(state.recbuf, fd);
600 if (ret != 0) {
601 D_ERR("Failed to collect recovery records for %s\n",
602 recdb_name(recdb));
603 TALLOC_FREE(state.recbuf);
604 return -1;
606 state.num_buffers += 1;
608 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
609 state.num_buffers, recdb_name(recdb));
611 return state.num_buffers;
615 * Pull database from a single node
618 struct pull_database_state {
619 struct tevent_context *ev;
620 struct ctdb_client_context *client;
621 struct recdb_context *recdb;
622 uint32_t pnn;
623 uint64_t srvid;
624 unsigned int num_records;
625 int result;
628 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
629 void *private_data);
630 static void pull_database_register_done(struct tevent_req *subreq);
631 static void pull_database_old_done(struct tevent_req *subreq);
632 static void pull_database_unregister_done(struct tevent_req *subreq);
633 static void pull_database_new_done(struct tevent_req *subreq);
635 static struct tevent_req *pull_database_send(
636 TALLOC_CTX *mem_ctx,
637 struct tevent_context *ev,
638 struct ctdb_client_context *client,
639 uint32_t pnn, uint32_t caps,
640 struct recdb_context *recdb)
642 struct tevent_req *req, *subreq;
643 struct pull_database_state *state;
644 struct ctdb_req_control request;
646 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
647 if (req == NULL) {
648 return NULL;
651 state->ev = ev;
652 state->client = client;
653 state->recdb = recdb;
654 state->pnn = pnn;
655 state->srvid = srvid_next();
657 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
658 subreq = ctdb_client_set_message_handler_send(
659 state, state->ev, state->client,
660 state->srvid, pull_database_handler,
661 req);
662 if (tevent_req_nomem(subreq, req)) {
663 return tevent_req_post(req, ev);
666 tevent_req_set_callback(subreq, pull_database_register_done,
667 req);
669 } else {
670 struct ctdb_pulldb pulldb;
672 pulldb.db_id = recdb_id(recdb);
673 pulldb.lmaster = CTDB_LMASTER_ANY;
675 ctdb_req_control_pull_db(&request, &pulldb);
676 subreq = ctdb_client_control_send(state, state->ev,
677 state->client,
678 pnn, TIMEOUT(),
679 &request);
680 if (tevent_req_nomem(subreq, req)) {
681 return tevent_req_post(req, ev);
683 tevent_req_set_callback(subreq, pull_database_old_done, req);
686 return req;
689 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
690 void *private_data)
692 struct tevent_req *req = talloc_get_type_abort(
693 private_data, struct tevent_req);
694 struct pull_database_state *state = tevent_req_data(
695 req, struct pull_database_state);
696 struct ctdb_rec_buffer *recbuf;
697 size_t np;
698 int ret;
699 bool status;
701 if (srvid != state->srvid) {
702 return;
705 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
706 if (ret != 0) {
707 D_ERR("Invalid data received for DB_PULL messages\n");
708 return;
711 if (recbuf->db_id != recdb_id(state->recdb)) {
712 talloc_free(recbuf);
713 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
714 recbuf->db_id, recdb_name(state->recdb));
715 return;
718 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
719 recbuf);
720 if (! status) {
721 talloc_free(recbuf);
722 D_ERR("Failed to add records to recdb for %s\n",
723 recdb_name(state->recdb));
724 return;
727 state->num_records += recbuf->count;
728 talloc_free(recbuf);
731 static void pull_database_register_done(struct tevent_req *subreq)
733 struct tevent_req *req = tevent_req_callback_data(
734 subreq, struct tevent_req);
735 struct pull_database_state *state = tevent_req_data(
736 req, struct pull_database_state);
737 struct ctdb_req_control request;
738 struct ctdb_pulldb_ext pulldb_ext;
739 int ret;
740 bool status;
742 status = ctdb_client_set_message_handler_recv(subreq, &ret);
743 TALLOC_FREE(subreq);
744 if (! status) {
745 D_ERR("Failed to set message handler for DB_PULL for %s\n",
746 recdb_name(state->recdb));
747 tevent_req_error(req, ret);
748 return;
751 pulldb_ext.db_id = recdb_id(state->recdb);
752 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
753 pulldb_ext.srvid = state->srvid;
755 ctdb_req_control_db_pull(&request, &pulldb_ext);
756 subreq = ctdb_client_control_send(state, state->ev, state->client,
757 state->pnn, TIMEOUT(), &request);
758 if (tevent_req_nomem(subreq, req)) {
759 return;
761 tevent_req_set_callback(subreq, pull_database_new_done, req);
764 static void pull_database_old_done(struct tevent_req *subreq)
766 struct tevent_req *req = tevent_req_callback_data(
767 subreq, struct tevent_req);
768 struct pull_database_state *state = tevent_req_data(
769 req, struct pull_database_state);
770 struct ctdb_reply_control *reply;
771 struct ctdb_rec_buffer *recbuf;
772 int ret;
773 bool status;
775 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
776 TALLOC_FREE(subreq);
777 if (! status) {
778 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
779 recdb_name(state->recdb), state->pnn, ret);
780 tevent_req_error(req, ret);
781 return;
784 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
785 talloc_free(reply);
786 if (ret != 0) {
787 tevent_req_error(req, ret);
788 return;
791 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
792 recbuf);
793 if (! status) {
794 talloc_free(recbuf);
795 tevent_req_error(req, EIO);
796 return;
799 state->num_records = recbuf->count;
800 talloc_free(recbuf);
802 D_INFO("Pulled %d records for db %s from node %d\n",
803 state->num_records, recdb_name(state->recdb), state->pnn);
805 tevent_req_done(req);
808 static void pull_database_new_done(struct tevent_req *subreq)
810 struct tevent_req *req = tevent_req_callback_data(
811 subreq, struct tevent_req);
812 struct pull_database_state *state = tevent_req_data(
813 req, struct pull_database_state);
814 struct ctdb_reply_control *reply;
815 uint32_t num_records;
816 int ret;
817 bool status;
819 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
820 TALLOC_FREE(subreq);
821 if (! status) {
822 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
823 recdb_name(state->recdb), state->pnn, ret);
824 state->result = ret;
825 goto unregister;
828 ret = ctdb_reply_control_db_pull(reply, &num_records);
829 talloc_free(reply);
830 if (num_records != state->num_records) {
831 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
832 num_records, state->num_records,
833 recdb_name(state->recdb));
834 state->result = EIO;
835 goto unregister;
838 D_INFO("Pulled %d records for db %s from node %d\n",
839 state->num_records, recdb_name(state->recdb), state->pnn);
841 unregister:
843 subreq = ctdb_client_remove_message_handler_send(
844 state, state->ev, state->client,
845 state->srvid, req);
846 if (tevent_req_nomem(subreq, req)) {
847 return;
849 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
852 static void pull_database_unregister_done(struct tevent_req *subreq)
854 struct tevent_req *req = tevent_req_callback_data(
855 subreq, struct tevent_req);
856 struct pull_database_state *state = tevent_req_data(
857 req, struct pull_database_state);
858 int ret;
859 bool status;
861 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
862 TALLOC_FREE(subreq);
863 if (! status) {
864 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
865 recdb_name(state->recdb));
866 tevent_req_error(req, ret);
867 return;
870 if (state->result != 0) {
871 tevent_req_error(req, state->result);
872 return;
875 tevent_req_done(req);
878 static bool pull_database_recv(struct tevent_req *req, int *perr)
880 return generic_recv(req, perr);
884 * Push database to specified nodes (old style)
887 struct push_database_old_state {
888 struct tevent_context *ev;
889 struct ctdb_client_context *client;
890 struct recdb_context *recdb;
891 uint32_t *pnn_list;
892 unsigned int count;
893 struct ctdb_rec_buffer *recbuf;
894 unsigned int index;
897 static void push_database_old_push_done(struct tevent_req *subreq);
899 static struct tevent_req *push_database_old_send(
900 TALLOC_CTX *mem_ctx,
901 struct tevent_context *ev,
902 struct ctdb_client_context *client,
903 uint32_t *pnn_list,
904 unsigned int count,
905 struct recdb_context *recdb)
907 struct tevent_req *req, *subreq;
908 struct push_database_old_state *state;
909 struct ctdb_req_control request;
910 uint32_t pnn;
912 req = tevent_req_create(mem_ctx, &state,
913 struct push_database_old_state);
914 if (req == NULL) {
915 return NULL;
918 state->ev = ev;
919 state->client = client;
920 state->recdb = recdb;
921 state->pnn_list = pnn_list;
922 state->count = count;
923 state->index = 0;
925 state->recbuf = recdb_records(recdb, state,
926 ctdb_client_pnn(client));
927 if (tevent_req_nomem(state->recbuf, req)) {
928 return tevent_req_post(req, ev);
931 pnn = state->pnn_list[state->index];
933 ctdb_req_control_push_db(&request, state->recbuf);
934 subreq = ctdb_client_control_send(state, ev, client, pnn,
935 TIMEOUT(), &request);
936 if (tevent_req_nomem(subreq, req)) {
937 return tevent_req_post(req, ev);
939 tevent_req_set_callback(subreq, push_database_old_push_done, req);
941 return req;
944 static void push_database_old_push_done(struct tevent_req *subreq)
946 struct tevent_req *req = tevent_req_callback_data(
947 subreq, struct tevent_req);
948 struct push_database_old_state *state = tevent_req_data(
949 req, struct push_database_old_state);
950 struct ctdb_req_control request;
951 uint32_t pnn;
952 int ret;
953 bool status;
955 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
956 TALLOC_FREE(subreq);
957 if (! status) {
958 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
959 recdb_name(state->recdb), state->pnn_list[state->index],
960 ret);
961 tevent_req_error(req, ret);
962 return;
965 state->index += 1;
966 if (state->index == state->count) {
967 TALLOC_FREE(state->recbuf);
968 tevent_req_done(req);
969 return;
972 pnn = state->pnn_list[state->index];
974 ctdb_req_control_push_db(&request, state->recbuf);
975 subreq = ctdb_client_control_send(state, state->ev, state->client,
976 pnn, TIMEOUT(), &request);
977 if (tevent_req_nomem(subreq, req)) {
978 return;
980 tevent_req_set_callback(subreq, push_database_old_push_done, req);
983 static bool push_database_old_recv(struct tevent_req *req, int *perr)
985 return generic_recv(req, perr);
989 * Push database to specified nodes (new style)
992 struct push_database_new_state {
993 struct tevent_context *ev;
994 struct ctdb_client_context *client;
995 struct recdb_context *recdb;
996 uint32_t *pnn_list;
997 unsigned int count;
998 uint64_t srvid;
999 uint32_t dmaster;
1000 int fd;
1001 int num_buffers;
1002 int num_buffers_sent;
1003 unsigned int num_records;
1006 static void push_database_new_started(struct tevent_req *subreq);
1007 static void push_database_new_send_msg(struct tevent_req *req);
1008 static void push_database_new_send_done(struct tevent_req *subreq);
1009 static void push_database_new_confirmed(struct tevent_req *subreq);
1011 static struct tevent_req *push_database_new_send(
1012 TALLOC_CTX *mem_ctx,
1013 struct tevent_context *ev,
1014 struct ctdb_client_context *client,
1015 uint32_t *pnn_list,
1016 unsigned int count,
1017 struct recdb_context *recdb,
1018 int max_size)
1020 struct tevent_req *req, *subreq;
1021 struct push_database_new_state *state;
1022 struct ctdb_req_control request;
1023 struct ctdb_pulldb_ext pulldb_ext;
1024 char *filename;
1025 off_t offset;
1027 req = tevent_req_create(mem_ctx, &state,
1028 struct push_database_new_state);
1029 if (req == NULL) {
1030 return NULL;
1033 state->ev = ev;
1034 state->client = client;
1035 state->recdb = recdb;
1036 state->pnn_list = pnn_list;
1037 state->count = count;
1039 state->srvid = srvid_next();
1040 state->dmaster = ctdb_client_pnn(client);
1041 state->num_buffers_sent = 0;
1042 state->num_records = 0;
1044 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
1045 if (tevent_req_nomem(filename, req)) {
1046 return tevent_req_post(req, ev);
1049 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
1050 if (state->fd == -1) {
1051 tevent_req_error(req, errno);
1052 return tevent_req_post(req, ev);
1054 unlink(filename);
1055 talloc_free(filename);
1057 state->num_buffers = recdb_file(recdb, state, state->dmaster,
1058 state->fd, max_size);
1059 if (state->num_buffers == -1) {
1060 tevent_req_error(req, ENOMEM);
1061 return tevent_req_post(req, ev);
1064 offset = lseek(state->fd, 0, SEEK_SET);
1065 if (offset != 0) {
1066 tevent_req_error(req, EIO);
1067 return tevent_req_post(req, ev);
1070 pulldb_ext.db_id = recdb_id(recdb);
1071 pulldb_ext.srvid = state->srvid;
1073 ctdb_req_control_db_push_start(&request, &pulldb_ext);
1074 subreq = ctdb_client_control_multi_send(state, ev, client,
1075 pnn_list, count,
1076 TIMEOUT(), &request);
1077 if (tevent_req_nomem(subreq, req)) {
1078 return tevent_req_post(req, ev);
1080 tevent_req_set_callback(subreq, push_database_new_started, req);
1082 return req;
1085 static void push_database_new_started(struct tevent_req *subreq)
1087 struct tevent_req *req = tevent_req_callback_data(
1088 subreq, struct tevent_req);
1089 struct push_database_new_state *state = tevent_req_data(
1090 req, struct push_database_new_state);
1091 int *err_list;
1092 int ret;
1093 bool status;
1095 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1096 &err_list, NULL);
1097 TALLOC_FREE(subreq);
1098 if (! status) {
1099 int ret2;
1100 uint32_t pnn;
1102 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1103 state->count,
1104 err_list, &pnn);
1105 if (ret2 != 0) {
1106 D_ERR("control DB_PUSH_START failed for db %s"
1107 " on node %u, ret=%d\n",
1108 recdb_name(state->recdb), pnn, ret2);
1109 } else {
1110 D_ERR("control DB_PUSH_START failed for db %s,"
1111 " ret=%d\n",
1112 recdb_name(state->recdb), ret);
1114 talloc_free(err_list);
1116 tevent_req_error(req, ret);
1117 return;
1120 push_database_new_send_msg(req);
1123 static void push_database_new_send_msg(struct tevent_req *req)
1125 struct push_database_new_state *state = tevent_req_data(
1126 req, struct push_database_new_state);
1127 struct tevent_req *subreq;
1128 struct ctdb_rec_buffer *recbuf;
1129 struct ctdb_req_message message;
1130 TDB_DATA data;
1131 size_t np;
1132 int ret;
1134 if (state->num_buffers_sent == state->num_buffers) {
1135 struct ctdb_req_control request;
1137 ctdb_req_control_db_push_confirm(&request,
1138 recdb_id(state->recdb));
1139 subreq = ctdb_client_control_multi_send(state, state->ev,
1140 state->client,
1141 state->pnn_list,
1142 state->count,
1143 TIMEOUT(), &request);
1144 if (tevent_req_nomem(subreq, req)) {
1145 return;
1147 tevent_req_set_callback(subreq, push_database_new_confirmed,
1148 req);
1149 return;
1152 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
1153 if (ret != 0) {
1154 tevent_req_error(req, ret);
1155 return;
1158 data.dsize = ctdb_rec_buffer_len(recbuf);
1159 data.dptr = talloc_size(state, data.dsize);
1160 if (tevent_req_nomem(data.dptr, req)) {
1161 return;
1164 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
1166 message.srvid = state->srvid;
1167 message.data.data = data;
1169 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
1170 state->num_buffers_sent, recbuf->count,
1171 recdb_name(state->recdb));
1173 subreq = ctdb_client_message_multi_send(state, state->ev,
1174 state->client,
1175 state->pnn_list, state->count,
1176 &message);
1177 if (tevent_req_nomem(subreq, req)) {
1178 return;
1180 tevent_req_set_callback(subreq, push_database_new_send_done, req);
1182 state->num_records += recbuf->count;
1184 talloc_free(data.dptr);
1185 talloc_free(recbuf);
1188 static void push_database_new_send_done(struct tevent_req *subreq)
1190 struct tevent_req *req = tevent_req_callback_data(
1191 subreq, struct tevent_req);
1192 struct push_database_new_state *state = tevent_req_data(
1193 req, struct push_database_new_state);
1194 bool status;
1195 int ret;
1197 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1198 TALLOC_FREE(subreq);
1199 if (! status) {
1200 D_ERR("Sending recovery records failed for %s\n",
1201 recdb_name(state->recdb));
1202 tevent_req_error(req, ret);
1203 return;
1206 state->num_buffers_sent += 1;
1208 push_database_new_send_msg(req);
1211 static void push_database_new_confirmed(struct tevent_req *subreq)
1213 struct tevent_req *req = tevent_req_callback_data(
1214 subreq, struct tevent_req);
1215 struct push_database_new_state *state = tevent_req_data(
1216 req, struct push_database_new_state);
1217 struct ctdb_reply_control **reply;
1218 int *err_list;
1219 bool status;
1220 unsigned int i;
1221 int ret;
1222 uint32_t num_records;
1224 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1225 &err_list, &reply);
1226 TALLOC_FREE(subreq);
1227 if (! status) {
1228 int ret2;
1229 uint32_t pnn;
1231 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1232 state->count, err_list,
1233 &pnn);
1234 if (ret2 != 0) {
1235 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1236 " on node %u, ret=%d\n",
1237 recdb_name(state->recdb), pnn, ret2);
1238 } else {
1239 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1240 " ret=%d\n",
1241 recdb_name(state->recdb), ret);
1243 tevent_req_error(req, ret);
1244 return;
1247 for (i=0; i<state->count; i++) {
1248 ret = ctdb_reply_control_db_push_confirm(reply[i],
1249 &num_records);
1250 if (ret != 0) {
1251 tevent_req_error(req, EPROTO);
1252 return;
1255 if (num_records != state->num_records) {
1256 D_ERR("Node %u received %d of %d records for %s\n",
1257 state->pnn_list[i], num_records,
1258 state->num_records, recdb_name(state->recdb));
1259 tevent_req_error(req, EPROTO);
1260 return;
1264 talloc_free(reply);
1266 D_INFO("Pushed %d records for db %s\n",
1267 state->num_records, recdb_name(state->recdb));
1269 tevent_req_done(req);
1272 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1274 return generic_recv(req, perr);
1278 * wrapper for push_database_old and push_database_new
1281 struct push_database_state {
1282 bool old_done, new_done;
1285 static void push_database_old_done(struct tevent_req *subreq);
1286 static void push_database_new_done(struct tevent_req *subreq);
1288 static struct tevent_req *push_database_send(
1289 TALLOC_CTX *mem_ctx,
1290 struct tevent_context *ev,
1291 struct ctdb_client_context *client,
1292 struct node_list *nlist,
1293 struct ctdb_tunable_list *tun_list,
1294 struct recdb_context *recdb)
1296 struct tevent_req *req, *subreq;
1297 struct push_database_state *state;
1298 uint32_t *old_list, *new_list;
1299 unsigned int old_count, new_count;
1300 unsigned int i;
1302 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1303 if (req == NULL) {
1304 return NULL;
1307 state->old_done = false;
1308 state->new_done = false;
1310 old_count = 0;
1311 new_count = 0;
1312 old_list = talloc_array(state, uint32_t, nlist->count);
1313 new_list = talloc_array(state, uint32_t, nlist->count);
1314 if (tevent_req_nomem(old_list, req) ||
1315 tevent_req_nomem(new_list,req)) {
1316 return tevent_req_post(req, ev);
1319 for (i=0; i<nlist->count; i++) {
1320 if (nlist->caps[i] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1321 new_list[new_count] = nlist->pnn_list[i];
1322 new_count += 1;
1323 } else {
1324 old_list[old_count] = nlist->pnn_list[i];
1325 old_count += 1;
1329 if (old_count > 0) {
1330 subreq = push_database_old_send(state, ev, client,
1331 old_list, old_count, recdb);
1332 if (tevent_req_nomem(subreq, req)) {
1333 return tevent_req_post(req, ev);
1335 tevent_req_set_callback(subreq, push_database_old_done, req);
1336 } else {
1337 state->old_done = true;
1340 if (new_count > 0) {
1341 subreq = push_database_new_send(state, ev, client,
1342 new_list, new_count, recdb,
1343 tun_list->rec_buffer_size_limit);
1344 if (tevent_req_nomem(subreq, req)) {
1345 return tevent_req_post(req, ev);
1347 tevent_req_set_callback(subreq, push_database_new_done, req);
1348 } else {
1349 state->new_done = true;
1352 return req;
1355 static void push_database_old_done(struct tevent_req *subreq)
1357 struct tevent_req *req = tevent_req_callback_data(
1358 subreq, struct tevent_req);
1359 struct push_database_state *state = tevent_req_data(
1360 req, struct push_database_state);
1361 bool status;
1362 int ret;
1364 status = push_database_old_recv(subreq, &ret);
1365 if (! status) {
1366 tevent_req_error(req, ret);
1367 return;
1370 state->old_done = true;
1372 if (state->old_done && state->new_done) {
1373 tevent_req_done(req);
1377 static void push_database_new_done(struct tevent_req *subreq)
1379 struct tevent_req *req = tevent_req_callback_data(
1380 subreq, struct tevent_req);
1381 struct push_database_state *state = tevent_req_data(
1382 req, struct push_database_state);
1383 bool status;
1384 int ret;
1386 status = push_database_new_recv(subreq, &ret);
1387 if (! status) {
1388 tevent_req_error(req, ret);
1389 return;
1392 state->new_done = true;
1394 if (state->old_done && state->new_done) {
1395 tevent_req_done(req);
1399 static bool push_database_recv(struct tevent_req *req, int *perr)
1401 return generic_recv(req, perr);
1405 * Collect databases using highest sequence number
1408 struct collect_highseqnum_db_state {
1409 struct tevent_context *ev;
1410 struct ctdb_client_context *client;
1411 struct node_list *nlist;
1412 uint32_t db_id;
1413 struct recdb_context *recdb;
1415 uint32_t max_pnn;
1418 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1419 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1421 static struct tevent_req *collect_highseqnum_db_send(
1422 TALLOC_CTX *mem_ctx,
1423 struct tevent_context *ev,
1424 struct ctdb_client_context *client,
1425 struct node_list *nlist,
1426 uint32_t db_id,
1427 struct recdb_context *recdb)
1429 struct tevent_req *req, *subreq;
1430 struct collect_highseqnum_db_state *state;
1431 struct ctdb_req_control request;
1433 req = tevent_req_create(mem_ctx, &state,
1434 struct collect_highseqnum_db_state);
1435 if (req == NULL) {
1436 return NULL;
1439 state->ev = ev;
1440 state->client = client;
1441 state->nlist = nlist;
1442 state->db_id = db_id;
1443 state->recdb = recdb;
1445 ctdb_req_control_get_db_seqnum(&request, db_id);
1446 subreq = ctdb_client_control_multi_send(mem_ctx,
1448 client,
1449 nlist->pnn_list,
1450 nlist->count,
1451 TIMEOUT(),
1452 &request);
1453 if (tevent_req_nomem(subreq, req)) {
1454 return tevent_req_post(req, ev);
1456 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1457 req);
1459 return req;
1462 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1464 struct tevent_req *req = tevent_req_callback_data(
1465 subreq, struct tevent_req);
1466 struct collect_highseqnum_db_state *state = tevent_req_data(
1467 req, struct collect_highseqnum_db_state);
1468 struct ctdb_reply_control **reply;
1469 int *err_list;
1470 bool status;
1471 unsigned int i;
1472 int ret;
1473 uint64_t seqnum, max_seqnum;
1474 uint32_t max_caps;
1476 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1477 &err_list, &reply);
1478 TALLOC_FREE(subreq);
1479 if (! status) {
1480 int ret2;
1481 uint32_t pnn;
1483 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1484 state->nlist->count,
1485 err_list,
1486 &pnn);
1487 if (ret2 != 0) {
1488 D_ERR("control GET_DB_SEQNUM failed for db %s"
1489 " on node %u, ret=%d\n",
1490 recdb_name(state->recdb), pnn, ret2);
1491 } else {
1492 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1493 " ret=%d\n",
1494 recdb_name(state->recdb), ret);
1496 tevent_req_error(req, ret);
1497 return;
1500 max_seqnum = 0;
1501 state->max_pnn = state->nlist->pnn_list[0];
1502 max_caps = state->nlist->caps[0];
1503 for (i=0; i<state->nlist->count; i++) {
1504 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1505 if (ret != 0) {
1506 tevent_req_error(req, EPROTO);
1507 return;
1510 if (max_seqnum < seqnum) {
1511 max_seqnum = seqnum;
1512 state->max_pnn = state->nlist->pnn_list[i];
1513 max_caps = state->nlist->caps[i];
1517 talloc_free(reply);
1519 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1520 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1522 subreq = pull_database_send(state,
1523 state->ev,
1524 state->client,
1525 state->max_pnn,
1526 max_caps,
1527 state->recdb);
1528 if (tevent_req_nomem(subreq, req)) {
1529 return;
1531 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1532 req);
1535 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1537 struct tevent_req *req = tevent_req_callback_data(
1538 subreq, struct tevent_req);
1539 struct collect_highseqnum_db_state *state = tevent_req_data(
1540 req, struct collect_highseqnum_db_state);
1541 int ret;
1542 bool status;
1544 status = pull_database_recv(subreq, &ret);
1545 TALLOC_FREE(subreq);
1546 if (! status) {
1547 node_list_ban_credits(state->nlist, state->max_pnn);
1548 tevent_req_error(req, ret);
1549 return;
1552 tevent_req_done(req);
1555 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1557 return generic_recv(req, perr);
1561 * Collect all databases
1564 struct collect_all_db_state {
1565 struct tevent_context *ev;
1566 struct ctdb_client_context *client;
1567 struct node_list *nlist;
1568 uint32_t db_id;
1569 struct recdb_context *recdb;
1571 struct ctdb_pulldb pulldb;
1572 unsigned int index;
1575 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1577 static struct tevent_req *collect_all_db_send(
1578 TALLOC_CTX *mem_ctx,
1579 struct tevent_context *ev,
1580 struct ctdb_client_context *client,
1581 struct node_list *nlist,
1582 uint32_t db_id,
1583 struct recdb_context *recdb)
1585 struct tevent_req *req, *subreq;
1586 struct collect_all_db_state *state;
1588 req = tevent_req_create(mem_ctx, &state,
1589 struct collect_all_db_state);
1590 if (req == NULL) {
1591 return NULL;
1594 state->ev = ev;
1595 state->client = client;
1596 state->nlist = nlist;
1597 state->db_id = db_id;
1598 state->recdb = recdb;
1599 state->index = 0;
1601 subreq = pull_database_send(state,
1603 client,
1604 nlist->pnn_list[state->index],
1605 nlist->caps[state->index],
1606 recdb);
1607 if (tevent_req_nomem(subreq, req)) {
1608 return tevent_req_post(req, ev);
1610 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1612 return req;
1615 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1617 struct tevent_req *req = tevent_req_callback_data(
1618 subreq, struct tevent_req);
1619 struct collect_all_db_state *state = tevent_req_data(
1620 req, struct collect_all_db_state);
1621 int ret;
1622 bool status;
1624 status = pull_database_recv(subreq, &ret);
1625 TALLOC_FREE(subreq);
1626 if (! status) {
1627 node_list_ban_credits(state->nlist,
1628 state->nlist->pnn_list[state->index]);
1629 tevent_req_error(req, ret);
1630 return;
1633 state->index += 1;
1634 if (state->index == state->nlist->count) {
1635 tevent_req_done(req);
1636 return;
1639 subreq = pull_database_send(state,
1640 state->ev,
1641 state->client,
1642 state->nlist->pnn_list[state->index],
1643 state->nlist->caps[state->index],
1644 state->recdb);
1645 if (tevent_req_nomem(subreq, req)) {
1646 return;
1648 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1651 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1653 return generic_recv(req, perr);
1658 * For each database do the following:
1659 * - Get DB name
1660 * - Get DB path
1661 * - Freeze database on all nodes
1662 * - Start transaction on all nodes
1663 * - Collect database from all nodes
1664 * - Wipe database on all nodes
1665 * - Push database to all nodes
1666 * - Commit transaction on all nodes
1667 * - Thaw database on all nodes
1670 struct recover_db_state {
1671 struct tevent_context *ev;
1672 struct ctdb_client_context *client;
1673 struct ctdb_tunable_list *tun_list;
1674 struct node_list *nlist;
1675 uint32_t db_id;
1676 uint8_t db_flags;
1678 uint32_t destnode;
1679 struct ctdb_transdb transdb;
1681 const char *db_name, *db_path;
1682 struct recdb_context *recdb;
1685 static void recover_db_name_done(struct tevent_req *subreq);
1686 static void recover_db_path_done(struct tevent_req *subreq);
1687 static void recover_db_freeze_done(struct tevent_req *subreq);
1688 static void recover_db_transaction_started(struct tevent_req *subreq);
1689 static void recover_db_collect_done(struct tevent_req *subreq);
1690 static void recover_db_wipedb_done(struct tevent_req *subreq);
1691 static void recover_db_pushdb_done(struct tevent_req *subreq);
1692 static void recover_db_transaction_committed(struct tevent_req *subreq);
1693 static void recover_db_thaw_done(struct tevent_req *subreq);
1695 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1696 struct tevent_context *ev,
1697 struct ctdb_client_context *client,
1698 struct ctdb_tunable_list *tun_list,
1699 struct node_list *nlist,
1700 uint32_t generation,
1701 uint32_t db_id,
1702 uint8_t db_flags)
1704 struct tevent_req *req, *subreq;
1705 struct recover_db_state *state;
1706 struct ctdb_req_control request;
1708 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1709 if (req == NULL) {
1710 return NULL;
1713 state->ev = ev;
1714 state->client = client;
1715 state->tun_list = tun_list;
1716 state->nlist = nlist;
1717 state->db_id = db_id;
1718 state->db_flags = db_flags;
1720 state->destnode = ctdb_client_pnn(client);
1721 state->transdb.db_id = db_id;
1722 state->transdb.tid = generation;
1724 ctdb_req_control_get_dbname(&request, db_id);
1725 subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1726 TIMEOUT(), &request);
1727 if (tevent_req_nomem(subreq, req)) {
1728 return tevent_req_post(req, ev);
1730 tevent_req_set_callback(subreq, recover_db_name_done, req);
1732 return req;
1735 static void recover_db_name_done(struct tevent_req *subreq)
1737 struct tevent_req *req = tevent_req_callback_data(
1738 subreq, struct tevent_req);
1739 struct recover_db_state *state = tevent_req_data(
1740 req, struct recover_db_state);
1741 struct ctdb_reply_control *reply;
1742 struct ctdb_req_control request;
1743 int ret;
1744 bool status;
1746 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1747 TALLOC_FREE(subreq);
1748 if (! status) {
1749 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1750 state->db_id, ret);
1751 tevent_req_error(req, ret);
1752 return;
1755 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1756 if (ret != 0) {
1757 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1758 state->db_id, ret);
1759 tevent_req_error(req, EPROTO);
1760 return;
1763 talloc_free(reply);
1765 ctdb_req_control_getdbpath(&request, state->db_id);
1766 subreq = ctdb_client_control_send(state, state->ev, state->client,
1767 state->destnode, TIMEOUT(),
1768 &request);
1769 if (tevent_req_nomem(subreq, req)) {
1770 return;
1772 tevent_req_set_callback(subreq, recover_db_path_done, req);
1775 static void recover_db_path_done(struct tevent_req *subreq)
1777 struct tevent_req *req = tevent_req_callback_data(
1778 subreq, struct tevent_req);
1779 struct recover_db_state *state = tevent_req_data(
1780 req, struct recover_db_state);
1781 struct ctdb_reply_control *reply;
1782 struct ctdb_req_control request;
1783 int ret;
1784 bool status;
1786 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1787 TALLOC_FREE(subreq);
1788 if (! status) {
1789 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1790 state->db_name, ret);
1791 tevent_req_error(req, ret);
1792 return;
1795 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1796 if (ret != 0) {
1797 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1798 state->db_name, ret);
1799 tevent_req_error(req, EPROTO);
1800 return;
1803 talloc_free(reply);
1805 ctdb_req_control_db_freeze(&request, state->db_id);
1806 subreq = ctdb_client_control_multi_send(state,
1807 state->ev,
1808 state->client,
1809 state->nlist->pnn_list,
1810 state->nlist->count,
1811 TIMEOUT(),
1812 &request);
1813 if (tevent_req_nomem(subreq, req)) {
1814 return;
1816 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1819 static void recover_db_freeze_done(struct tevent_req *subreq)
1821 struct tevent_req *req = tevent_req_callback_data(
1822 subreq, struct tevent_req);
1823 struct recover_db_state *state = tevent_req_data(
1824 req, struct recover_db_state);
1825 struct ctdb_req_control request;
1826 int *err_list;
1827 int ret;
1828 bool status;
1830 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1831 NULL);
1832 TALLOC_FREE(subreq);
1833 if (! status) {
1834 int ret2;
1835 uint32_t pnn;
1837 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1838 state->nlist->count,
1839 err_list,
1840 &pnn);
1841 if (ret2 != 0) {
1842 D_ERR("control FREEZE_DB failed for db %s"
1843 " on node %u, ret=%d\n",
1844 state->db_name, pnn, ret2);
1846 node_list_ban_credits(state->nlist, pnn);
1847 } else {
1848 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1849 state->db_name, ret);
1851 tevent_req_error(req, ret);
1852 return;
1855 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1856 subreq = ctdb_client_control_multi_send(state,
1857 state->ev,
1858 state->client,
1859 state->nlist->pnn_list,
1860 state->nlist->count,
1861 TIMEOUT(),
1862 &request);
1863 if (tevent_req_nomem(subreq, req)) {
1864 return;
1866 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1869 static void recover_db_transaction_started(struct tevent_req *subreq)
1871 struct tevent_req *req = tevent_req_callback_data(
1872 subreq, struct tevent_req);
1873 struct recover_db_state *state = tevent_req_data(
1874 req, struct recover_db_state);
1875 int *err_list;
1876 int ret;
1877 bool status;
1879 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1880 NULL);
1881 TALLOC_FREE(subreq);
1882 if (! status) {
1883 int ret2;
1884 uint32_t pnn;
1886 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1887 state->nlist->count,
1888 err_list,
1889 &pnn);
1890 if (ret2 != 0) {
1891 D_ERR("control TRANSACTION_DB failed for db=%s"
1892 " on node %u, ret=%d\n",
1893 state->db_name, pnn, ret2);
1894 } else {
1895 D_ERR("control TRANSACTION_DB failed for db=%s,"
1896 " ret=%d\n", state->db_name, ret);
1898 tevent_req_error(req, ret);
1899 return;
1902 state->recdb = recdb_create(state, state->db_id, state->db_name,
1903 state->db_path,
1904 state->tun_list->database_hash_size,
1905 state->db_flags & CTDB_DB_FLAGS_PERSISTENT);
1906 if (tevent_req_nomem(state->recdb, req)) {
1907 return;
1910 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1911 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1912 subreq = collect_highseqnum_db_send(state,
1913 state->ev,
1914 state->client,
1915 state->nlist,
1916 state->db_id,
1917 state->recdb);
1918 } else {
1919 subreq = collect_all_db_send(state,
1920 state->ev,
1921 state->client,
1922 state->nlist,
1923 state->db_id,
1924 state->recdb);
1926 if (tevent_req_nomem(subreq, req)) {
1927 return;
1929 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1932 static void recover_db_collect_done(struct tevent_req *subreq)
1934 struct tevent_req *req = tevent_req_callback_data(
1935 subreq, struct tevent_req);
1936 struct recover_db_state *state = tevent_req_data(
1937 req, struct recover_db_state);
1938 struct ctdb_req_control request;
1939 int ret;
1940 bool status;
1942 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1943 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1944 status = collect_highseqnum_db_recv(subreq, &ret);
1945 } else {
1946 status = collect_all_db_recv(subreq, &ret);
1948 TALLOC_FREE(subreq);
1949 if (! status) {
1950 tevent_req_error(req, ret);
1951 return;
1954 ctdb_req_control_wipe_database(&request, &state->transdb);
1955 subreq = ctdb_client_control_multi_send(state,
1956 state->ev,
1957 state->client,
1958 state->nlist->pnn_list,
1959 state->nlist->count,
1960 TIMEOUT(),
1961 &request);
1962 if (tevent_req_nomem(subreq, req)) {
1963 return;
1965 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1968 static void recover_db_wipedb_done(struct tevent_req *subreq)
1970 struct tevent_req *req = tevent_req_callback_data(
1971 subreq, struct tevent_req);
1972 struct recover_db_state *state = tevent_req_data(
1973 req, struct recover_db_state);
1974 int *err_list;
1975 int ret;
1976 bool status;
1978 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1979 NULL);
1980 TALLOC_FREE(subreq);
1981 if (! status) {
1982 int ret2;
1983 uint32_t pnn;
1985 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1986 state->nlist->count,
1987 err_list,
1988 &pnn);
1989 if (ret2 != 0) {
1990 D_ERR("control WIPEDB failed for db %s on node %u,"
1991 " ret=%d\n", state->db_name, pnn, ret2);
1992 } else {
1993 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1994 state->db_name, ret);
1996 tevent_req_error(req, ret);
1997 return;
2000 subreq = push_database_send(state,
2001 state->ev,
2002 state->client,
2003 state->nlist,
2004 state->tun_list,
2005 state->recdb);
2006 if (tevent_req_nomem(subreq, req)) {
2007 return;
2009 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
2012 static void recover_db_pushdb_done(struct tevent_req *subreq)
2014 struct tevent_req *req = tevent_req_callback_data(
2015 subreq, struct tevent_req);
2016 struct recover_db_state *state = tevent_req_data(
2017 req, struct recover_db_state);
2018 struct ctdb_req_control request;
2019 int ret;
2020 bool status;
2022 status = push_database_recv(subreq, &ret);
2023 TALLOC_FREE(subreq);
2024 if (! status) {
2025 tevent_req_error(req, ret);
2026 return;
2029 TALLOC_FREE(state->recdb);
2031 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
2032 subreq = ctdb_client_control_multi_send(state,
2033 state->ev,
2034 state->client,
2035 state->nlist->pnn_list,
2036 state->nlist->count,
2037 TIMEOUT(),
2038 &request);
2039 if (tevent_req_nomem(subreq, req)) {
2040 return;
2042 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
2045 static void recover_db_transaction_committed(struct tevent_req *subreq)
2047 struct tevent_req *req = tevent_req_callback_data(
2048 subreq, struct tevent_req);
2049 struct recover_db_state *state = tevent_req_data(
2050 req, struct recover_db_state);
2051 struct ctdb_req_control request;
2052 int *err_list;
2053 int ret;
2054 bool status;
2056 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2057 NULL);
2058 TALLOC_FREE(subreq);
2059 if (! status) {
2060 int ret2;
2061 uint32_t pnn;
2063 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2064 state->nlist->count,
2065 err_list,
2066 &pnn);
2067 if (ret2 != 0) {
2068 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
2069 " on node %u, ret=%d\n",
2070 state->db_name, pnn, ret2);
2071 } else {
2072 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
2073 " ret=%d\n", state->db_name, ret);
2075 tevent_req_error(req, ret);
2076 return;
2079 ctdb_req_control_db_thaw(&request, state->db_id);
2080 subreq = ctdb_client_control_multi_send(state,
2081 state->ev,
2082 state->client,
2083 state->nlist->pnn_list,
2084 state->nlist->count,
2085 TIMEOUT(),
2086 &request);
2087 if (tevent_req_nomem(subreq, req)) {
2088 return;
2090 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
2093 static void recover_db_thaw_done(struct tevent_req *subreq)
2095 struct tevent_req *req = tevent_req_callback_data(
2096 subreq, struct tevent_req);
2097 struct recover_db_state *state = tevent_req_data(
2098 req, struct recover_db_state);
2099 int *err_list;
2100 int ret;
2101 bool status;
2103 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2104 NULL);
2105 TALLOC_FREE(subreq);
2106 if (! status) {
2107 int ret2;
2108 uint32_t pnn;
2110 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2111 state->nlist->count,
2112 err_list,
2113 &pnn);
2114 if (ret2 != 0) {
2115 D_ERR("control DB_THAW failed for db %s on node %u,"
2116 " ret=%d\n", state->db_name, pnn, ret2);
2117 } else {
2118 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
2119 state->db_name, ret);
2121 tevent_req_error(req, ret);
2122 return;
2125 tevent_req_done(req);
2128 static bool recover_db_recv(struct tevent_req *req)
2130 return generic_recv(req, NULL);
2135 * Start database recovery for each database
2137 * Try to recover each database 5 times before failing recovery.
2140 struct db_recovery_state {
2141 struct tevent_context *ev;
2142 struct db_list *dblist;
2143 unsigned int num_replies;
2144 unsigned int num_failed;
2147 struct db_recovery_one_state {
2148 struct tevent_req *req;
2149 struct ctdb_client_context *client;
2150 struct db_list *dblist;
2151 struct ctdb_tunable_list *tun_list;
2152 struct node_list *nlist;
2153 uint32_t generation;
2154 uint32_t db_id;
2155 uint8_t db_flags;
2156 int num_fails;
2159 static void db_recovery_one_done(struct tevent_req *subreq);
2161 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
2162 struct tevent_context *ev,
2163 struct ctdb_client_context *client,
2164 struct db_list *dblist,
2165 struct ctdb_tunable_list *tun_list,
2166 struct node_list *nlist,
2167 uint32_t generation)
2169 struct tevent_req *req, *subreq;
2170 struct db_recovery_state *state;
2171 struct db *db;
2173 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
2174 if (req == NULL) {
2175 return NULL;
2178 state->ev = ev;
2179 state->dblist = dblist;
2180 state->num_replies = 0;
2181 state->num_failed = 0;
2183 if (dblist->num_dbs == 0) {
2184 tevent_req_done(req);
2185 return tevent_req_post(req, ev);
2188 for (db = dblist->db; db != NULL; db = db->next) {
2189 struct db_recovery_one_state *substate;
2191 substate = talloc_zero(state, struct db_recovery_one_state);
2192 if (tevent_req_nomem(substate, req)) {
2193 return tevent_req_post(req, ev);
2196 substate->req = req;
2197 substate->client = client;
2198 substate->dblist = dblist;
2199 substate->tun_list = tun_list;
2200 substate->nlist = nlist;
2201 substate->generation = generation;
2202 substate->db_id = db->db_id;
2203 substate->db_flags = db->db_flags;
2205 subreq = recover_db_send(state,
2207 client,
2208 tun_list,
2209 nlist,
2210 generation,
2211 substate->db_id,
2212 substate->db_flags);
2213 if (tevent_req_nomem(subreq, req)) {
2214 return tevent_req_post(req, ev);
2216 tevent_req_set_callback(subreq, db_recovery_one_done,
2217 substate);
2218 D_NOTICE("recover database 0x%08x\n", substate->db_id);
2221 return req;
2224 static void db_recovery_one_done(struct tevent_req *subreq)
2226 struct db_recovery_one_state *substate = tevent_req_callback_data(
2227 subreq, struct db_recovery_one_state);
2228 struct tevent_req *req = substate->req;
2229 struct db_recovery_state *state = tevent_req_data(
2230 req, struct db_recovery_state);
2231 bool status;
2233 status = recover_db_recv(subreq);
2234 TALLOC_FREE(subreq);
2236 if (status) {
2237 talloc_free(substate);
2238 goto done;
2241 substate->num_fails += 1;
2242 if (substate->num_fails < NUM_RETRIES) {
2243 subreq = recover_db_send(state,
2244 state->ev,
2245 substate->client,
2246 substate->tun_list,
2247 substate->nlist,
2248 substate->generation,
2249 substate->db_id,
2250 substate->db_flags);
2251 if (tevent_req_nomem(subreq, req)) {
2252 goto failed;
2254 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2255 D_NOTICE("recover database 0x%08x, attempt %d\n",
2256 substate->db_id, substate->num_fails+1);
2257 return;
2260 failed:
2261 state->num_failed += 1;
2263 done:
2264 state->num_replies += 1;
2266 if (state->num_replies == state->dblist->num_dbs) {
2267 tevent_req_done(req);
2271 static bool db_recovery_recv(struct tevent_req *req, unsigned int *count)
2273 struct db_recovery_state *state = tevent_req_data(
2274 req, struct db_recovery_state);
2275 int err;
2277 if (tevent_req_is_unix_error(req, &err)) {
2278 *count = 0;
2279 return false;
2282 *count = state->num_replies - state->num_failed;
2284 if (state->num_failed > 0) {
2285 return false;
2288 return true;
2291 struct ban_node_state {
2292 struct tevent_context *ev;
2293 struct ctdb_client_context *client;
2294 struct ctdb_tunable_list *tun_list;
2295 struct node_list *nlist;
2296 uint32_t destnode;
2298 uint32_t max_pnn;
2301 static bool ban_node_check(struct tevent_req *req);
2302 static void ban_node_check_done(struct tevent_req *subreq);
2303 static void ban_node_done(struct tevent_req *subreq);
2305 static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx,
2306 struct tevent_context *ev,
2307 struct ctdb_client_context *client,
2308 struct ctdb_tunable_list *tun_list,
2309 struct node_list *nlist)
2311 struct tevent_req *req;
2312 struct ban_node_state *state;
2313 bool ok;
2315 req = tevent_req_create(mem_ctx, &state, struct ban_node_state);
2316 if (req == NULL) {
2317 return NULL;
2320 state->ev = ev;
2321 state->client = client;
2322 state->tun_list = tun_list;
2323 state->nlist = nlist;
2324 state->destnode = ctdb_client_pnn(client);
2326 /* Bans are not enabled */
2327 if (state->tun_list->enable_bans == 0) {
2328 D_ERR("Bans are not enabled\n");
2329 tevent_req_done(req);
2330 return tevent_req_post(req, ev);
2333 ok = ban_node_check(req);
2334 if (!ok) {
2335 return tevent_req_post(req, ev);
2338 return req;
2341 static bool ban_node_check(struct tevent_req *req)
2343 struct tevent_req *subreq;
2344 struct ban_node_state *state = tevent_req_data(
2345 req, struct ban_node_state);
2346 struct ctdb_req_control request;
2347 unsigned max_credits = 0, i;
2349 for (i=0; i<state->nlist->count; i++) {
2350 if (state->nlist->ban_credits[i] > max_credits) {
2351 state->max_pnn = state->nlist->pnn_list[i];
2352 max_credits = state->nlist->ban_credits[i];
2356 if (max_credits < NUM_RETRIES) {
2357 tevent_req_done(req);
2358 return false;
2361 ctdb_req_control_get_nodemap(&request);
2362 subreq = ctdb_client_control_send(state,
2363 state->ev,
2364 state->client,
2365 state->max_pnn,
2366 TIMEOUT(),
2367 &request);
2368 if (tevent_req_nomem(subreq, req)) {
2369 return false;
2371 tevent_req_set_callback(subreq, ban_node_check_done, req);
2373 return true;
2376 static void ban_node_check_done(struct tevent_req *subreq)
2378 struct tevent_req *req = tevent_req_callback_data(
2379 subreq, struct tevent_req);
2380 struct ban_node_state *state = tevent_req_data(
2381 req, struct ban_node_state);
2382 struct ctdb_reply_control *reply;
2383 struct ctdb_node_map *nodemap;
2384 struct ctdb_req_control request;
2385 struct ctdb_ban_state ban;
2386 unsigned int i;
2387 int ret;
2388 bool ok;
2390 ok = ctdb_client_control_recv(subreq, &ret, state, &reply);
2391 TALLOC_FREE(subreq);
2392 if (!ok) {
2393 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2394 state->max_pnn, ret);
2395 tevent_req_error(req, ret);
2396 return;
2399 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2400 if (ret != 0) {
2401 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2402 tevent_req_error(req, ret);
2403 return;
2406 for (i=0; i<nodemap->num; i++) {
2407 if (nodemap->node[i].pnn != state->max_pnn) {
2408 continue;
2411 /* If the node became inactive, reset ban_credits */
2412 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2413 unsigned int j;
2415 for (j=0; j<state->nlist->count; j++) {
2416 if (state->nlist->pnn_list[j] ==
2417 state->max_pnn) {
2418 state->nlist->ban_credits[j] = 0;
2419 break;
2422 state->max_pnn = CTDB_UNKNOWN_PNN;
2426 talloc_free(nodemap);
2427 talloc_free(reply);
2429 /* If node becames inactive during recovery, pick next */
2430 if (state->max_pnn == CTDB_UNKNOWN_PNN) {
2431 (void) ban_node_check(req);
2432 return;
2435 ban = (struct ctdb_ban_state) {
2436 .pnn = state->max_pnn,
2437 .time = state->tun_list->recovery_ban_period,
2440 D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time);
2442 ctdb_req_control_set_ban_state(&request, &ban);
2443 subreq = ctdb_client_control_send(state,
2444 state->ev,
2445 state->client,
2446 ban.pnn,
2447 TIMEOUT(),
2448 &request);
2449 if (tevent_req_nomem(subreq, req)) {
2450 return;
2452 tevent_req_set_callback(subreq, ban_node_done, req);
2455 static void ban_node_done(struct tevent_req *subreq)
2457 struct tevent_req *req = tevent_req_callback_data(
2458 subreq, struct tevent_req);
2459 struct node_ban_state *state = tevent_req_data(
2460 req, struct node_ban_state);
2461 struct ctdb_reply_control *reply;
2462 int ret;
2463 bool status;
2465 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2466 TALLOC_FREE(subreq);
2467 if (! status) {
2468 tevent_req_error(req, ret);
2469 return;
2472 ret = ctdb_reply_control_set_ban_state(reply);
2473 if (ret != 0) {
2474 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2475 tevent_req_error(req, ret);
2476 return;
2479 talloc_free(reply);
2480 tevent_req_done(req);
2483 static bool ban_node_recv(struct tevent_req *req, int *perr)
2485 if (tevent_req_is_unix_error(req, perr)) {
2486 return false;
2489 return true;
2493 * Run the parallel database recovery
2495 * - Get tunables
2496 * - Get nodemap from all nodes
2497 * - Get capabilities from all nodes
2498 * - Get dbmap
2499 * - Set RECOVERY_ACTIVE
2500 * - Send START_RECOVERY
2501 * - Update vnnmap on all nodes
2502 * - Run database recovery
2503 * - Set RECOVERY_NORMAL
2504 * - Send END_RECOVERY
2507 struct recovery_state {
2508 struct tevent_context *ev;
2509 struct ctdb_client_context *client;
2510 uint32_t generation;
2511 uint32_t destnode;
2512 struct node_list *nlist;
2513 struct ctdb_tunable_list *tun_list;
2514 struct ctdb_vnn_map *vnnmap;
2515 struct db_list *dblist;
2518 static void recovery_tunables_done(struct tevent_req *subreq);
2519 static void recovery_nodemap_done(struct tevent_req *subreq);
2520 static void recovery_nodemap_verify(struct tevent_req *subreq);
2521 static void recovery_capabilities_done(struct tevent_req *subreq);
2522 static void recovery_dbmap_done(struct tevent_req *subreq);
2523 static void recovery_active_done(struct tevent_req *subreq);
2524 static void recovery_start_recovery_done(struct tevent_req *subreq);
2525 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2526 static void recovery_db_recovery_done(struct tevent_req *subreq);
2527 static void recovery_failed_done(struct tevent_req *subreq);
2528 static void recovery_normal_done(struct tevent_req *subreq);
2529 static void recovery_end_recovery_done(struct tevent_req *subreq);
2531 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2532 struct tevent_context *ev,
2533 struct ctdb_client_context *client,
2534 uint32_t generation)
2536 struct tevent_req *req, *subreq;
2537 struct recovery_state *state;
2538 struct ctdb_req_control request;
2540 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2541 if (req == NULL) {
2542 return NULL;
2545 state->ev = ev;
2546 state->client = client;
2547 state->generation = generation;
2548 state->destnode = ctdb_client_pnn(client);
2550 ctdb_req_control_get_all_tunables(&request);
2551 subreq = ctdb_client_control_send(state, state->ev, state->client,
2552 state->destnode, TIMEOUT(),
2553 &request);
2554 if (tevent_req_nomem(subreq, req)) {
2555 return tevent_req_post(req, ev);
2557 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2559 return req;
2562 static void recovery_tunables_done(struct tevent_req *subreq)
2564 struct tevent_req *req = tevent_req_callback_data(
2565 subreq, struct tevent_req);
2566 struct recovery_state *state = tevent_req_data(
2567 req, struct recovery_state);
2568 struct ctdb_reply_control *reply;
2569 struct ctdb_req_control request;
2570 int ret;
2571 bool status;
2573 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2574 TALLOC_FREE(subreq);
2575 if (! status) {
2576 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2577 tevent_req_error(req, ret);
2578 return;
2581 ret = ctdb_reply_control_get_all_tunables(reply, state,
2582 &state->tun_list);
2583 if (ret != 0) {
2584 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2585 tevent_req_error(req, EPROTO);
2586 return;
2589 talloc_free(reply);
2591 recover_timeout = state->tun_list->recover_timeout;
2593 ctdb_req_control_get_nodemap(&request);
2594 subreq = ctdb_client_control_send(state, state->ev, state->client,
2595 state->destnode, TIMEOUT(),
2596 &request);
2597 if (tevent_req_nomem(subreq, req)) {
2598 return;
2600 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2603 static void recovery_nodemap_done(struct tevent_req *subreq)
2605 struct tevent_req *req = tevent_req_callback_data(
2606 subreq, struct tevent_req);
2607 struct recovery_state *state = tevent_req_data(
2608 req, struct recovery_state);
2609 struct ctdb_reply_control *reply;
2610 struct ctdb_req_control request;
2611 struct ctdb_node_map *nodemap;
2612 unsigned int i;
2613 bool status;
2614 int ret;
2616 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2617 TALLOC_FREE(subreq);
2618 if (! status) {
2619 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2620 state->destnode, ret);
2621 tevent_req_error(req, ret);
2622 return;
2625 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2626 if (ret != 0) {
2627 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2628 tevent_req_error(req, ret);
2629 return;
2632 state->nlist = node_list_init(state, nodemap->num);
2633 if (tevent_req_nomem(state->nlist, req)) {
2634 return;
2637 for (i=0; i<nodemap->num; i++) {
2638 bool ok;
2640 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2641 continue;
2644 ok = node_list_add(state->nlist, nodemap->node[i].pnn);
2645 if (!ok) {
2646 tevent_req_error(req, EINVAL);
2647 return;
2651 talloc_free(nodemap);
2652 talloc_free(reply);
2654 /* Verify flags by getting local node information from each node */
2655 ctdb_req_control_get_nodemap(&request);
2656 subreq = ctdb_client_control_multi_send(state,
2657 state->ev,
2658 state->client,
2659 state->nlist->pnn_list,
2660 state->nlist->count,
2661 TIMEOUT(),
2662 &request);
2663 if (tevent_req_nomem(subreq, req)) {
2664 return;
2666 tevent_req_set_callback(subreq, recovery_nodemap_verify, req);
2669 static void recovery_nodemap_verify(struct tevent_req *subreq)
2671 struct tevent_req *req = tevent_req_callback_data(
2672 subreq, struct tevent_req);
2673 struct recovery_state *state = tevent_req_data(
2674 req, struct recovery_state);
2675 struct ctdb_req_control request;
2676 struct ctdb_reply_control **reply;
2677 struct node_list *nlist;
2678 unsigned int i;
2679 int *err_list;
2680 int ret;
2681 bool status;
2683 status = ctdb_client_control_multi_recv(subreq,
2684 &ret,
2685 state,
2686 &err_list,
2687 &reply);
2688 TALLOC_FREE(subreq);
2689 if (! status) {
2690 int ret2;
2691 uint32_t pnn;
2693 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2694 state->nlist->count,
2695 err_list,
2696 &pnn);
2697 if (ret2 != 0) {
2698 D_ERR("control GET_NODEMAP failed on node %u,"
2699 " ret=%d\n", pnn, ret2);
2700 } else {
2701 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2703 tevent_req_error(req, ret);
2704 return;
2707 nlist = node_list_init(state, state->nlist->size);
2708 if (tevent_req_nomem(nlist, req)) {
2709 return;
2712 for (i=0; i<state->nlist->count; i++) {
2713 struct ctdb_node_map *nodemap = NULL;
2714 uint32_t pnn, flags;
2715 unsigned int j;
2716 bool ok;
2718 pnn = state->nlist->pnn_list[i];
2719 ret = ctdb_reply_control_get_nodemap(reply[i],
2720 state,
2721 &nodemap);
2722 if (ret != 0) {
2723 D_ERR("control GET_NODEMAP failed on node %u\n", pnn);
2724 tevent_req_error(req, EPROTO);
2725 return;
2728 flags = NODE_FLAGS_DISCONNECTED;
2729 for (j=0; j<nodemap->num; j++) {
2730 if (nodemap->node[j].pnn == pnn) {
2731 flags = nodemap->node[j].flags;
2732 break;
2736 TALLOC_FREE(nodemap);
2738 if (flags & NODE_FLAGS_INACTIVE) {
2739 continue;
2742 ok = node_list_add(nlist, pnn);
2743 if (!ok) {
2744 tevent_req_error(req, EINVAL);
2745 return;
2749 talloc_free(reply);
2751 talloc_free(state->nlist);
2752 state->nlist = nlist;
2754 ctdb_req_control_get_capabilities(&request);
2755 subreq = ctdb_client_control_multi_send(state,
2756 state->ev,
2757 state->client,
2758 state->nlist->pnn_list,
2759 state->nlist->count,
2760 TIMEOUT(),
2761 &request);
2762 if (tevent_req_nomem(subreq, req)) {
2763 return;
2765 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2768 static void recovery_capabilities_done(struct tevent_req *subreq)
2770 struct tevent_req *req = tevent_req_callback_data(
2771 subreq, struct tevent_req);
2772 struct recovery_state *state = tevent_req_data(
2773 req, struct recovery_state);
2774 struct ctdb_reply_control **reply;
2775 struct ctdb_req_control request;
2776 int *err_list;
2777 unsigned int i;
2778 int ret;
2779 bool status;
2781 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2782 &reply);
2783 TALLOC_FREE(subreq);
2784 if (! status) {
2785 int ret2;
2786 uint32_t pnn;
2788 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2789 state->nlist->count,
2790 err_list,
2791 &pnn);
2792 if (ret2 != 0) {
2793 D_ERR("control GET_CAPABILITIES failed on node %u,"
2794 " ret=%d\n", pnn, ret2);
2795 } else {
2796 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2797 ret);
2799 tevent_req_error(req, ret);
2800 return;
2803 for (i=0; i<state->nlist->count; i++) {
2804 uint32_t caps;
2806 ret = ctdb_reply_control_get_capabilities(reply[i], &caps);
2807 if (ret != 0) {
2808 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2809 state->nlist->pnn_list[i]);
2810 tevent_req_error(req, EPROTO);
2811 return;
2814 state->nlist->caps[i] = caps;
2817 talloc_free(reply);
2819 ctdb_req_control_get_dbmap(&request);
2820 subreq = ctdb_client_control_send(state, state->ev, state->client,
2821 state->destnode, TIMEOUT(),
2822 &request);
2823 if (tevent_req_nomem(subreq, req)) {
2824 return;
2826 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2829 static void recovery_dbmap_done(struct tevent_req *subreq)
2831 struct tevent_req *req = tevent_req_callback_data(
2832 subreq, struct tevent_req);
2833 struct recovery_state *state = tevent_req_data(
2834 req, struct recovery_state);
2835 struct ctdb_reply_control *reply;
2836 struct ctdb_req_control request;
2837 struct ctdb_dbid_map *dbmap = NULL;
2838 unsigned int j;
2839 int ret;
2840 bool status;
2842 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2843 TALLOC_FREE(subreq);
2844 if (! status) {
2845 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2846 state->destnode, ret);
2847 tevent_req_error(req, ret);
2848 return;
2851 state->dblist = db_list_init(state, state->nlist->count);
2852 if (tevent_req_nomem(state->dblist, req)) {
2853 D_ERR("memory allocation error\n");
2854 return;
2857 ret = ctdb_reply_control_get_dbmap(reply, state, &dbmap);
2858 if (ret != 0) {
2859 D_ERR("control GET_DBMAP failed, ret=%d\n", ret);
2860 tevent_req_error(req, ret);
2861 return;
2864 for (j = 0; j < dbmap->num; j++) {
2865 ret = db_list_check_and_add(state->dblist,
2866 dbmap->dbs[j].db_id,
2867 dbmap->dbs[j].flags,
2868 state->destnode);
2869 if (ret != 0) {
2870 D_ERR("failed to add database list entry, ret=%d\n",
2871 ret);
2872 tevent_req_error(req, ret);
2873 return;
2877 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2878 subreq = ctdb_client_control_multi_send(state,
2879 state->ev,
2880 state->client,
2881 state->nlist->pnn_list,
2882 state->nlist->count,
2883 TIMEOUT(),
2884 &request);
2885 if (tevent_req_nomem(subreq, req)) {
2886 return;
2888 tevent_req_set_callback(subreq, recovery_active_done, req);
2891 static void recovery_active_done(struct tevent_req *subreq)
2893 struct tevent_req *req = tevent_req_callback_data(
2894 subreq, struct tevent_req);
2895 struct recovery_state *state = tevent_req_data(
2896 req, struct recovery_state);
2897 struct ctdb_req_control request;
2898 struct ctdb_vnn_map *vnnmap;
2899 int *err_list;
2900 int ret;
2901 bool status;
2903 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2904 NULL);
2905 TALLOC_FREE(subreq);
2906 if (! status) {
2907 int ret2;
2908 uint32_t pnn;
2910 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2911 state->nlist->count,
2912 err_list,
2913 &pnn);
2914 if (ret2 != 0) {
2915 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2916 " ret=%d\n", pnn, ret2);
2917 } else {
2918 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2919 ret);
2921 tevent_req_error(req, ret);
2922 return;
2925 D_ERR("Set recovery mode to ACTIVE\n");
2927 /* Calculate new VNNMAP */
2928 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2929 if (tevent_req_nomem(vnnmap, req)) {
2930 return;
2933 vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size);
2934 if (tevent_req_nomem(vnnmap->map, req)) {
2935 return;
2938 if (vnnmap->size == 0) {
2939 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2940 vnnmap->map[0] = state->destnode;
2941 vnnmap->size = 1;
2944 vnnmap->generation = state->generation;
2946 state->vnnmap = vnnmap;
2948 ctdb_req_control_start_recovery(&request);
2949 subreq = ctdb_client_control_multi_send(state,
2950 state->ev,
2951 state->client,
2952 state->nlist->pnn_list,
2953 state->nlist->count,
2954 TIMEOUT(),
2955 &request);
2956 if (tevent_req_nomem(subreq, req)) {
2957 return;
2959 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2962 static void recovery_start_recovery_done(struct tevent_req *subreq)
2964 struct tevent_req *req = tevent_req_callback_data(
2965 subreq, struct tevent_req);
2966 struct recovery_state *state = tevent_req_data(
2967 req, struct recovery_state);
2968 struct ctdb_req_control request;
2969 int *err_list;
2970 int ret;
2971 bool status;
2973 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2974 NULL);
2975 TALLOC_FREE(subreq);
2976 if (! status) {
2977 int ret2;
2978 uint32_t pnn;
2980 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2981 state->nlist->count,
2982 err_list,
2983 &pnn);
2984 if (ret2 != 0) {
2985 D_ERR("failed to run start_recovery event on node %u,"
2986 " ret=%d\n", pnn, ret2);
2987 } else {
2988 D_ERR("failed to run start_recovery event, ret=%d\n",
2989 ret);
2991 tevent_req_error(req, ret);
2992 return;
2995 D_ERR("start_recovery event finished\n");
2997 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2998 subreq = ctdb_client_control_multi_send(state,
2999 state->ev,
3000 state->client,
3001 state->nlist->pnn_list,
3002 state->nlist->count,
3003 TIMEOUT(),
3004 &request);
3005 if (tevent_req_nomem(subreq, req)) {
3006 return;
3008 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
3011 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
3013 struct tevent_req *req = tevent_req_callback_data(
3014 subreq, struct tevent_req);
3015 struct recovery_state *state = tevent_req_data(
3016 req, struct recovery_state);
3017 int *err_list;
3018 int ret;
3019 bool status;
3021 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
3022 NULL);
3023 TALLOC_FREE(subreq);
3024 if (! status) {
3025 int ret2;
3026 uint32_t pnn;
3028 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3029 state->nlist->count,
3030 err_list,
3031 &pnn);
3032 if (ret2 != 0) {
3033 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
3034 pnn, ret2);
3035 } else {
3036 D_ERR("failed to update VNNMAP, ret=%d\n", ret);
3038 tevent_req_error(req, ret);
3039 return;
3042 D_NOTICE("updated VNNMAP\n");
3044 subreq = db_recovery_send(state,
3045 state->ev,
3046 state->client,
3047 state->dblist,
3048 state->tun_list,
3049 state->nlist,
3050 state->vnnmap->generation);
3051 if (tevent_req_nomem(subreq, req)) {
3052 return;
3054 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
3057 static void recovery_db_recovery_done(struct tevent_req *subreq)
3059 struct tevent_req *req = tevent_req_callback_data(
3060 subreq, struct tevent_req);
3061 struct recovery_state *state = tevent_req_data(
3062 req, struct recovery_state);
3063 struct ctdb_req_control request;
3064 bool status;
3065 unsigned int count;
3067 status = db_recovery_recv(subreq, &count);
3068 TALLOC_FREE(subreq);
3070 D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs);
3072 if (! status) {
3073 subreq = ban_node_send(state,
3074 state->ev,
3075 state->client,
3076 state->tun_list,
3077 state->nlist);
3078 if (tevent_req_nomem(subreq, req)) {
3079 return;
3081 tevent_req_set_callback(subreq, recovery_failed_done, req);
3082 return;
3085 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
3086 subreq = ctdb_client_control_multi_send(state,
3087 state->ev,
3088 state->client,
3089 state->nlist->pnn_list,
3090 state->nlist->count,
3091 TIMEOUT(),
3092 &request);
3093 if (tevent_req_nomem(subreq, req)) {
3094 return;
3096 tevent_req_set_callback(subreq, recovery_normal_done, req);
3099 static void recovery_failed_done(struct tevent_req *subreq)
3101 struct tevent_req *req = tevent_req_callback_data(
3102 subreq, struct tevent_req);
3103 int ret;
3104 bool status;
3106 status = ban_node_recv(subreq, &ret);
3107 TALLOC_FREE(subreq);
3108 if (! status) {
3109 D_ERR("failed to ban node, ret=%d\n", ret);
3112 tevent_req_error(req, EIO);
3115 static void recovery_normal_done(struct tevent_req *subreq)
3117 struct tevent_req *req = tevent_req_callback_data(
3118 subreq, struct tevent_req);
3119 struct recovery_state *state = tevent_req_data(
3120 req, struct recovery_state);
3121 struct ctdb_req_control request;
3122 int *err_list;
3123 int ret;
3124 bool status;
3126 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3127 NULL);
3128 TALLOC_FREE(subreq);
3129 if (! status) {
3130 int ret2;
3131 uint32_t pnn;
3133 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3134 state->nlist->count,
3135 err_list,
3136 &pnn);
3137 if (ret2 != 0) {
3138 D_ERR("failed to set recovery mode NORMAL on node %u,"
3139 " ret=%d\n", pnn, ret2);
3140 } else {
3141 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
3142 ret);
3144 tevent_req_error(req, ret);
3145 return;
3148 D_ERR("Set recovery mode to NORMAL\n");
3150 ctdb_req_control_end_recovery(&request);
3151 subreq = ctdb_client_control_multi_send(state,
3152 state->ev,
3153 state->client,
3154 state->nlist->pnn_list,
3155 state->nlist->count,
3156 TIMEOUT(),
3157 &request);
3158 if (tevent_req_nomem(subreq, req)) {
3159 return;
3161 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
3164 static void recovery_end_recovery_done(struct tevent_req *subreq)
3166 struct tevent_req *req = tevent_req_callback_data(
3167 subreq, struct tevent_req);
3168 struct recovery_state *state = tevent_req_data(
3169 req, struct recovery_state);
3170 int *err_list;
3171 int ret;
3172 bool status;
3174 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3175 NULL);
3176 TALLOC_FREE(subreq);
3177 if (! status) {
3178 int ret2;
3179 uint32_t pnn;
3181 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3182 state->nlist->count,
3183 err_list,
3184 &pnn);
3185 if (ret2 != 0) {
3186 D_ERR("failed to run recovered event on node %u,"
3187 " ret=%d\n", pnn, ret2);
3188 } else {
3189 D_ERR("failed to run recovered event, ret=%d\n", ret);
3191 tevent_req_error(req, ret);
3192 return;
3195 D_ERR("recovered event finished\n");
3197 tevent_req_done(req);
3200 static void recovery_recv(struct tevent_req *req, int *perr)
3202 generic_recv(req, perr);
3205 static void usage(const char *progname)
3207 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
3208 progname);
3213 * Arguments - log fd, write fd, socket path, generation
3215 int main(int argc, char *argv[])
3217 int write_fd;
3218 const char *sockpath;
3219 TALLOC_CTX *mem_ctx = NULL;
3220 struct tevent_context *ev;
3221 struct ctdb_client_context *client;
3222 int ret = 0;
3223 struct tevent_req *req;
3224 uint32_t generation;
3226 if (argc != 4) {
3227 usage(argv[0]);
3228 exit(1);
3231 write_fd = atoi(argv[1]);
3232 sockpath = argv[2];
3233 generation = (uint32_t)smb_strtoul(argv[3],
3234 NULL,
3236 &ret,
3237 SMB_STR_STANDARD);
3238 if (ret != 0) {
3239 fprintf(stderr, "recovery: unable to initialize generation\n");
3240 goto failed;
3243 mem_ctx = talloc_new(NULL);
3244 if (mem_ctx == NULL) {
3245 fprintf(stderr, "recovery: talloc_new() failed\n");
3246 goto failed;
3249 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
3250 if (ret != 0) {
3251 fprintf(stderr, "recovery: Unable to initialize logging\n");
3252 goto failed;
3255 ev = tevent_context_init(mem_ctx);
3256 if (ev == NULL) {
3257 D_ERR("tevent_context_init() failed\n");
3258 goto failed;
3261 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
3262 if (ret != 0) {
3263 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
3264 goto failed;
3267 req = recovery_send(mem_ctx, ev, client, generation);
3268 if (req == NULL) {
3269 D_ERR("database_recover_send() failed\n");
3270 goto failed;
3273 if (! tevent_req_poll(req, ev)) {
3274 D_ERR("tevent_req_poll() failed\n");
3275 goto failed;
3278 recovery_recv(req, &ret);
3279 TALLOC_FREE(req);
3280 if (ret != 0) {
3281 D_ERR("database recovery failed, ret=%d\n", ret);
3282 goto failed;
3285 sys_write(write_fd, &ret, sizeof(ret));
3286 return 0;
3288 failed:
3289 TALLOC_FREE(mem_ctx);
3290 return 1;