idmap_hash: mirror the NT_STATUS_NONE_MAPPED/STATUS_SOME_UNMAPPED logic from idmap_au...
[Samba.git] / ctdb / server / ctdb_recovery_helper.c
blobe0d32199ab90413e68f08b1d6a017f6b23997ba9
1 /*
2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/sys_rw.h"
32 #include "lib/util/time.h"
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/util.h"
35 #include "lib/util/smb_strtox.h"
37 #include "protocol/protocol.h"
38 #include "protocol/protocol_api.h"
39 #include "client/client.h"
41 #include "common/logging.h"
43 static int recover_timeout = 30;
45 #define NUM_RETRIES 3
47 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
50 * Utility functions
53 static bool generic_recv(struct tevent_req *req, int *perr)
55 int err;
57 if (tevent_req_is_unix_error(req, &err)) {
58 if (perr != NULL) {
59 *perr = err;
61 return false;
64 return true;
67 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
69 static uint64_t srvid_next(void)
71 rec_srvid += 1;
72 return rec_srvid;
76 * Node related functions
79 struct node_list {
80 uint32_t *pnn_list;
81 uint32_t *caps;
82 uint32_t *ban_credits;
83 unsigned int size;
84 unsigned int count;
87 static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size)
89 struct node_list *nlist;
90 unsigned int i;
92 nlist = talloc_zero(mem_ctx, struct node_list);
93 if (nlist == NULL) {
94 return NULL;
97 nlist->pnn_list = talloc_array(nlist, uint32_t, size);
98 nlist->caps = talloc_zero_array(nlist, uint32_t, size);
99 nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size);
101 if (nlist->pnn_list == NULL ||
102 nlist->caps == NULL ||
103 nlist->ban_credits == NULL) {
104 talloc_free(nlist);
105 return NULL;
107 nlist->size = size;
109 for (i=0; i<nlist->size; i++) {
110 nlist->pnn_list[i] = CTDB_UNKNOWN_PNN;
113 return nlist;
116 static bool node_list_add(struct node_list *nlist, uint32_t pnn)
118 unsigned int i;
120 if (nlist->count == nlist->size) {
121 return false;
124 for (i=0; i<nlist->count; i++) {
125 if (nlist->pnn_list[i] == pnn) {
126 return false;
130 nlist->pnn_list[nlist->count] = pnn;
131 nlist->count += 1;
133 return true;
136 static uint32_t *node_list_lmaster(struct node_list *nlist,
137 TALLOC_CTX *mem_ctx,
138 unsigned int *pnn_count)
140 uint32_t *pnn_list;
141 unsigned int count, i;
143 pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count);
144 if (pnn_list == NULL) {
145 return NULL;
148 count = 0;
149 for (i=0; i<nlist->count; i++) {
150 if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) {
151 continue;
154 pnn_list[count] = nlist->pnn_list[i];
155 count += 1;
158 *pnn_count = count;
159 return pnn_list;
162 static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn)
164 unsigned int i;
166 for (i=0; i<nlist->count; i++) {
167 if (nlist->pnn_list[i] == pnn) {
168 nlist->ban_credits[i] += 1;
169 break;
175 * Database list functions
177 * Simple, naive implementation that could be updated to a db_hash or similar
180 struct db {
181 struct db *prev, *next;
183 uint32_t db_id;
184 uint32_t db_flags;
185 uint32_t *pnn_list;
186 unsigned int num_nodes;
189 struct db_list {
190 unsigned int num_dbs;
191 struct db *db;
192 unsigned int num_nodes;
195 static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes)
197 struct db_list *l;
199 l = talloc_zero(mem_ctx, struct db_list);
200 l->num_nodes = num_nodes;
202 return l;
205 static struct db *db_list_find(struct db_list *dblist, uint32_t db_id)
207 struct db *db;
209 if (dblist == NULL) {
210 return NULL;
213 db = dblist->db;
214 while (db != NULL && db->db_id != db_id) {
215 db = db->next;
218 return db;
221 static int db_list_add(struct db_list *dblist,
222 uint32_t db_id,
223 uint32_t db_flags,
224 uint32_t node)
226 struct db *db = NULL;
228 if (dblist == NULL) {
229 return EINVAL;
232 db = talloc_zero(dblist, struct db);
233 if (db == NULL) {
234 return ENOMEM;
237 db->db_id = db_id;
238 db->db_flags = db_flags;
239 db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes);
240 if (db->pnn_list == NULL) {
241 talloc_free(db);
242 return ENOMEM;
244 db->pnn_list[0] = node;
245 db->num_nodes = 1;
247 DLIST_ADD_END(dblist->db, db);
248 dblist->num_dbs++;
250 return 0;
253 static int db_list_check_and_add(struct db_list *dblist,
254 uint32_t db_id,
255 uint32_t db_flags,
256 uint32_t node)
258 struct db *db = NULL;
259 int ret;
262 * These flags are masked out because they are only set on a
263 * node when a client attaches to that node, so they might not
264 * be set yet. They can't be passed as part of the attch, so
265 * they're no use here.
267 db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY);
269 if (dblist == NULL) {
270 return EINVAL;
273 db = db_list_find(dblist, db_id);
274 if (db == NULL) {
275 ret = db_list_add(dblist, db_id, db_flags, node);
276 return ret;
279 if (db->db_flags != db_flags) {
280 D_ERR("Incompatible database flags for 0x%"PRIx32" "
281 "(0x%"PRIx32" != 0x%"PRIx32")\n",
282 db_id,
283 db_flags,
284 db->db_flags);
285 return EINVAL;
288 if (db->num_nodes >= dblist->num_nodes) {
289 return EINVAL;
292 db->pnn_list[db->num_nodes] = node;
293 db->num_nodes++;
295 return 0;
299 * Create database on nodes where it is missing
302 struct db_create_missing_state {
303 struct tevent_context *ev;
304 struct ctdb_client_context *client;
306 struct node_list *nlist;
308 const char *db_name;
309 uint32_t *missing_pnn_list;
310 int missing_num_nodes;
313 static void db_create_missing_done(struct tevent_req *subreq);
315 static struct tevent_req *db_create_missing_send(
316 TALLOC_CTX *mem_ctx,
317 struct tevent_context *ev,
318 struct ctdb_client_context *client,
319 struct node_list *nlist,
320 const char *db_name,
321 struct db *db)
323 struct tevent_req *req, *subreq;
324 struct db_create_missing_state *state;
325 struct ctdb_req_control request;
326 unsigned int i, j;
328 req = tevent_req_create(mem_ctx,
329 &state,
330 struct db_create_missing_state);
331 if (req == NULL) {
332 return NULL;
335 state->ev = ev;
336 state->client = client;
337 state->nlist = nlist;
338 state->db_name = db_name;
340 if (nlist->count == db->num_nodes) {
341 tevent_req_done(req);
342 return tevent_req_post(req, ev);
345 state->missing_pnn_list = talloc_array(mem_ctx, uint32_t, nlist->count);
346 if (tevent_req_nomem(state->missing_pnn_list, req)) {
347 return tevent_req_post(req, ev);
350 for (i = 0; i < nlist->count; i++) {
351 uint32_t pnn = nlist->pnn_list[i] ;
353 for (j = 0; j < db->num_nodes; j++) {
354 if (pnn == db->pnn_list[j]) {
355 break;
359 if (j < db->num_nodes) {
360 continue;
363 DBG_INFO("Create database %s on node %u\n",
364 state->db_name,
365 pnn);
366 state->missing_pnn_list[state->missing_num_nodes] = pnn;
367 state->missing_num_nodes++;
370 if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
371 ctdb_req_control_db_attach_persistent(&request, db_name);
372 } else if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
373 ctdb_req_control_db_attach_replicated(&request, db_name);
374 } else {
375 ctdb_req_control_db_attach(&request, db_name);
377 request.flags = CTDB_CTRL_FLAG_ATTACH_RECOVERY;
378 subreq = ctdb_client_control_multi_send(state,
379 state->ev,
380 state->client,
381 state->missing_pnn_list,
382 state->missing_num_nodes,
383 TIMEOUT(),
384 &request);
385 if (tevent_req_nomem(subreq, req)) {
386 return tevent_req_post(req, ev);
388 tevent_req_set_callback(subreq, db_create_missing_done, req);
390 return req;
393 static void db_create_missing_done(struct tevent_req *subreq)
395 struct tevent_req *req = tevent_req_callback_data(
396 subreq, struct tevent_req);
397 struct db_create_missing_state *state = tevent_req_data(
398 req, struct db_create_missing_state);
399 int *err_list;
400 int ret;
401 bool status;
403 status = ctdb_client_control_multi_recv(subreq,
404 &ret,
405 NULL,
406 &err_list,
407 NULL);
408 TALLOC_FREE(subreq);
409 if (! status) {
410 int ret2;
411 uint32_t pnn;
413 ret2 = ctdb_client_control_multi_error(
414 state->missing_pnn_list,
415 state->missing_num_nodes,
416 err_list,
417 &pnn);
418 if (ret2 != 0) {
419 D_ERR("control DB_ATTACH failed for db %s"
420 " on node %u, ret=%d\n",
421 state->db_name,
422 pnn,
423 ret2);
424 node_list_ban_credits(state->nlist, pnn);
425 } else {
426 D_ERR("control DB_ATTACH failed for db %s, ret=%d\n",
427 state->db_name,
428 ret);
430 tevent_req_error(req, ret);
431 return;
434 tevent_req_done(req);
437 static bool db_create_missing_recv(struct tevent_req *req, int *perr)
439 return generic_recv(req, perr);
443 * Recovery database functions
446 struct recdb_context {
447 uint32_t db_id;
448 const char *db_name;
449 const char *db_path;
450 struct tdb_wrap *db;
451 bool persistent;
454 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
455 const char *db_name,
456 const char *db_path,
457 uint32_t hash_size, bool persistent)
459 static char *db_dir_state = NULL;
460 struct recdb_context *recdb;
461 unsigned int tdb_flags;
463 recdb = talloc(mem_ctx, struct recdb_context);
464 if (recdb == NULL) {
465 return NULL;
468 if (db_dir_state == NULL) {
469 db_dir_state = getenv("CTDB_DBDIR_STATE");
472 recdb->db_name = db_name;
473 recdb->db_id = db_id;
474 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
475 db_dir_state != NULL ?
476 db_dir_state :
477 dirname(discard_const(db_path)),
478 db_name);
479 if (recdb->db_path == NULL) {
480 talloc_free(recdb);
481 return NULL;
483 unlink(recdb->db_path);
485 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
486 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
487 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
488 if (recdb->db == NULL) {
489 talloc_free(recdb);
490 D_ERR("failed to create recovery db %s\n", recdb->db_path);
491 return NULL;
494 recdb->persistent = persistent;
496 return recdb;
499 static uint32_t recdb_id(struct recdb_context *recdb)
501 return recdb->db_id;
504 static const char *recdb_name(struct recdb_context *recdb)
506 return recdb->db_name;
509 static const char *recdb_path(struct recdb_context *recdb)
511 return recdb->db_path;
514 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
516 return recdb->db->tdb;
519 static bool recdb_persistent(struct recdb_context *recdb)
521 return recdb->persistent;
524 struct recdb_add_traverse_state {
525 struct recdb_context *recdb;
526 uint32_t mypnn;
529 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
530 TDB_DATA key, TDB_DATA data,
531 void *private_data)
533 struct recdb_add_traverse_state *state =
534 (struct recdb_add_traverse_state *)private_data;
535 struct ctdb_ltdb_header *hdr;
536 TDB_DATA prev_data;
537 int ret;
539 /* header is not marshalled separately in the pulldb control */
540 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
541 return -1;
544 hdr = (struct ctdb_ltdb_header *)data.dptr;
546 /* fetch the existing record, if any */
547 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
549 if (prev_data.dptr != NULL) {
550 struct ctdb_ltdb_header prev_hdr;
552 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
553 free(prev_data.dptr);
554 if (hdr->rsn < prev_hdr.rsn ||
555 (hdr->rsn == prev_hdr.rsn &&
556 prev_hdr.dmaster != state->mypnn)) {
557 return 0;
561 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
562 if (ret != 0) {
563 return -1;
565 return 0;
568 static bool recdb_add(struct recdb_context *recdb, int mypnn,
569 struct ctdb_rec_buffer *recbuf)
571 struct recdb_add_traverse_state state;
572 int ret;
574 state.recdb = recdb;
575 state.mypnn = mypnn;
577 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
578 if (ret != 0) {
579 return false;
582 return true;
585 /* This function decides which records from recdb are retained */
586 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
587 uint32_t reqid, uint32_t dmaster,
588 TDB_DATA key, TDB_DATA data)
590 struct ctdb_ltdb_header *header;
591 int ret;
593 /* Skip empty records */
594 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
595 return 0;
598 /* update the dmaster field to point to us */
599 header = (struct ctdb_ltdb_header *)data.dptr;
600 if (!persistent) {
601 header->dmaster = dmaster;
602 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
605 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
606 if (ret != 0) {
607 return ret;
610 return 0;
613 struct recdb_file_traverse_state {
614 struct ctdb_rec_buffer *recbuf;
615 struct recdb_context *recdb;
616 TALLOC_CTX *mem_ctx;
617 uint32_t dmaster;
618 uint32_t reqid;
619 bool persistent;
620 bool failed;
621 int fd;
622 size_t max_size;
623 unsigned int num_buffers;
626 static int recdb_file_traverse(struct tdb_context *tdb,
627 TDB_DATA key, TDB_DATA data,
628 void *private_data)
630 struct recdb_file_traverse_state *state =
631 (struct recdb_file_traverse_state *)private_data;
632 int ret;
634 ret = recbuf_filter_add(state->recbuf, state->persistent,
635 state->reqid, state->dmaster, key, data);
636 if (ret != 0) {
637 state->failed = true;
638 return ret;
641 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
642 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
643 if (ret != 0) {
644 D_ERR("Failed to collect recovery records for %s\n",
645 recdb_name(state->recdb));
646 state->failed = true;
647 return ret;
650 state->num_buffers += 1;
652 TALLOC_FREE(state->recbuf);
653 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
654 recdb_id(state->recdb));
655 if (state->recbuf == NULL) {
656 state->failed = true;
657 return ENOMEM;
661 return 0;
664 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
665 uint32_t dmaster, int fd, int max_size)
667 struct recdb_file_traverse_state state;
668 int ret;
670 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
671 if (state.recbuf == NULL) {
672 return -1;
674 state.recdb = recdb;
675 state.mem_ctx = mem_ctx;
676 state.dmaster = dmaster;
677 state.reqid = 0;
678 state.persistent = recdb_persistent(recdb);
679 state.failed = false;
680 state.fd = fd;
681 state.max_size = max_size;
682 state.num_buffers = 0;
684 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
685 if (ret == -1 || state.failed) {
686 TALLOC_FREE(state.recbuf);
687 return -1;
690 ret = ctdb_rec_buffer_write(state.recbuf, fd);
691 if (ret != 0) {
692 D_ERR("Failed to collect recovery records for %s\n",
693 recdb_name(recdb));
694 TALLOC_FREE(state.recbuf);
695 return -1;
697 state.num_buffers += 1;
699 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
700 state.num_buffers, recdb_name(recdb));
702 return state.num_buffers;
706 * Pull database from a single node
709 struct pull_database_state {
710 struct tevent_context *ev;
711 struct ctdb_client_context *client;
712 struct recdb_context *recdb;
713 uint32_t pnn;
714 uint64_t srvid;
715 unsigned int num_records;
716 int result;
719 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
720 void *private_data);
721 static void pull_database_register_done(struct tevent_req *subreq);
722 static void pull_database_unregister_done(struct tevent_req *subreq);
723 static void pull_database_done(struct tevent_req *subreq);
725 static struct tevent_req *pull_database_send(
726 TALLOC_CTX *mem_ctx,
727 struct tevent_context *ev,
728 struct ctdb_client_context *client,
729 uint32_t pnn,
730 struct recdb_context *recdb)
732 struct tevent_req *req, *subreq;
733 struct pull_database_state *state;
735 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
736 if (req == NULL) {
737 return NULL;
740 state->ev = ev;
741 state->client = client;
742 state->recdb = recdb;
743 state->pnn = pnn;
744 state->srvid = srvid_next();
746 subreq = ctdb_client_set_message_handler_send(
747 state, state->ev, state->client,
748 state->srvid, pull_database_handler,
749 req);
750 if (tevent_req_nomem(subreq, req)) {
751 return tevent_req_post(req, ev);
754 tevent_req_set_callback(subreq, pull_database_register_done, req);
756 return req;
759 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
760 void *private_data)
762 struct tevent_req *req = talloc_get_type_abort(
763 private_data, struct tevent_req);
764 struct pull_database_state *state = tevent_req_data(
765 req, struct pull_database_state);
766 struct ctdb_rec_buffer *recbuf;
767 size_t np;
768 int ret;
769 bool status;
771 if (srvid != state->srvid) {
772 return;
775 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
776 if (ret != 0) {
777 D_ERR("Invalid data received for DB_PULL messages\n");
778 return;
781 if (recbuf->db_id != recdb_id(state->recdb)) {
782 talloc_free(recbuf);
783 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
784 recbuf->db_id, recdb_name(state->recdb));
785 return;
788 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
789 recbuf);
790 if (! status) {
791 talloc_free(recbuf);
792 D_ERR("Failed to add records to recdb for %s\n",
793 recdb_name(state->recdb));
794 return;
797 state->num_records += recbuf->count;
798 talloc_free(recbuf);
801 static void pull_database_register_done(struct tevent_req *subreq)
803 struct tevent_req *req = tevent_req_callback_data(
804 subreq, struct tevent_req);
805 struct pull_database_state *state = tevent_req_data(
806 req, struct pull_database_state);
807 struct ctdb_req_control request;
808 struct ctdb_pulldb_ext pulldb_ext;
809 int ret;
810 bool status;
812 status = ctdb_client_set_message_handler_recv(subreq, &ret);
813 TALLOC_FREE(subreq);
814 if (! status) {
815 D_ERR("Failed to set message handler for DB_PULL for %s\n",
816 recdb_name(state->recdb));
817 tevent_req_error(req, ret);
818 return;
821 pulldb_ext.db_id = recdb_id(state->recdb);
822 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
823 pulldb_ext.srvid = state->srvid;
825 ctdb_req_control_db_pull(&request, &pulldb_ext);
826 subreq = ctdb_client_control_send(state, state->ev, state->client,
827 state->pnn, TIMEOUT(), &request);
828 if (tevent_req_nomem(subreq, req)) {
829 return;
831 tevent_req_set_callback(subreq, pull_database_done, req);
834 static void pull_database_done(struct tevent_req *subreq)
836 struct tevent_req *req = tevent_req_callback_data(
837 subreq, struct tevent_req);
838 struct pull_database_state *state = tevent_req_data(
839 req, struct pull_database_state);
840 struct ctdb_reply_control *reply;
841 uint32_t num_records;
842 int ret;
843 bool status;
845 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
846 TALLOC_FREE(subreq);
847 if (! status) {
848 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
849 recdb_name(state->recdb), state->pnn, ret);
850 state->result = ret;
851 goto unregister;
854 ret = ctdb_reply_control_db_pull(reply, &num_records);
855 talloc_free(reply);
856 if (num_records != state->num_records) {
857 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
858 num_records, state->num_records,
859 recdb_name(state->recdb));
860 state->result = EIO;
861 goto unregister;
864 D_INFO("Pulled %d records for db %s from node %d\n",
865 state->num_records, recdb_name(state->recdb), state->pnn);
867 unregister:
869 subreq = ctdb_client_remove_message_handler_send(
870 state, state->ev, state->client,
871 state->srvid, req);
872 if (tevent_req_nomem(subreq, req)) {
873 return;
875 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
878 static void pull_database_unregister_done(struct tevent_req *subreq)
880 struct tevent_req *req = tevent_req_callback_data(
881 subreq, struct tevent_req);
882 struct pull_database_state *state = tevent_req_data(
883 req, struct pull_database_state);
884 int ret;
885 bool status;
887 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
888 TALLOC_FREE(subreq);
889 if (! status) {
890 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
891 recdb_name(state->recdb));
892 tevent_req_error(req, ret);
893 return;
896 if (state->result != 0) {
897 tevent_req_error(req, state->result);
898 return;
901 tevent_req_done(req);
904 static bool pull_database_recv(struct tevent_req *req, int *perr)
906 return generic_recv(req, perr);
910 * Push database to specified nodes (new style)
913 struct push_database_state {
914 struct tevent_context *ev;
915 struct ctdb_client_context *client;
916 struct recdb_context *recdb;
917 uint32_t *pnn_list;
918 unsigned int count;
919 uint64_t srvid;
920 uint32_t dmaster;
921 int fd;
922 int num_buffers;
923 int num_buffers_sent;
924 unsigned int num_records;
927 static void push_database_started(struct tevent_req *subreq);
928 static void push_database_send_msg(struct tevent_req *req);
929 static void push_database_send_done(struct tevent_req *subreq);
930 static void push_database_confirmed(struct tevent_req *subreq);
932 static struct tevent_req *push_database_send(
933 TALLOC_CTX *mem_ctx,
934 struct tevent_context *ev,
935 struct ctdb_client_context *client,
936 uint32_t *pnn_list,
937 unsigned int count,
938 struct recdb_context *recdb,
939 int max_size)
941 struct tevent_req *req, *subreq;
942 struct push_database_state *state;
943 struct ctdb_req_control request;
944 struct ctdb_pulldb_ext pulldb_ext;
945 char *filename;
946 off_t offset;
948 req = tevent_req_create(mem_ctx, &state,
949 struct push_database_state);
950 if (req == NULL) {
951 return NULL;
954 state->ev = ev;
955 state->client = client;
956 state->recdb = recdb;
957 state->pnn_list = pnn_list;
958 state->count = count;
960 state->srvid = srvid_next();
961 state->dmaster = ctdb_client_pnn(client);
962 state->num_buffers_sent = 0;
963 state->num_records = 0;
965 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
966 if (tevent_req_nomem(filename, req)) {
967 return tevent_req_post(req, ev);
970 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
971 if (state->fd == -1) {
972 tevent_req_error(req, errno);
973 return tevent_req_post(req, ev);
975 unlink(filename);
976 talloc_free(filename);
978 state->num_buffers = recdb_file(recdb, state, state->dmaster,
979 state->fd, max_size);
980 if (state->num_buffers == -1) {
981 tevent_req_error(req, ENOMEM);
982 return tevent_req_post(req, ev);
985 offset = lseek(state->fd, 0, SEEK_SET);
986 if (offset != 0) {
987 tevent_req_error(req, EIO);
988 return tevent_req_post(req, ev);
991 pulldb_ext.db_id = recdb_id(recdb);
992 pulldb_ext.srvid = state->srvid;
994 ctdb_req_control_db_push_start(&request, &pulldb_ext);
995 subreq = ctdb_client_control_multi_send(state, ev, client,
996 pnn_list, count,
997 TIMEOUT(), &request);
998 if (tevent_req_nomem(subreq, req)) {
999 return tevent_req_post(req, ev);
1001 tevent_req_set_callback(subreq, push_database_started, req);
1003 return req;
1006 static void push_database_started(struct tevent_req *subreq)
1008 struct tevent_req *req = tevent_req_callback_data(
1009 subreq, struct tevent_req);
1010 struct push_database_state *state = tevent_req_data(
1011 req, struct push_database_state);
1012 int *err_list;
1013 int ret;
1014 bool status;
1016 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1017 &err_list, NULL);
1018 TALLOC_FREE(subreq);
1019 if (! status) {
1020 int ret2;
1021 uint32_t pnn;
1023 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1024 state->count,
1025 err_list, &pnn);
1026 if (ret2 != 0) {
1027 D_ERR("control DB_PUSH_START failed for db %s"
1028 " on node %u, ret=%d\n",
1029 recdb_name(state->recdb), pnn, ret2);
1030 } else {
1031 D_ERR("control DB_PUSH_START failed for db %s,"
1032 " ret=%d\n",
1033 recdb_name(state->recdb), ret);
1035 talloc_free(err_list);
1037 tevent_req_error(req, ret);
1038 return;
1041 push_database_send_msg(req);
1044 static void push_database_send_msg(struct tevent_req *req)
1046 struct push_database_state *state = tevent_req_data(
1047 req, struct push_database_state);
1048 struct tevent_req *subreq;
1049 struct ctdb_rec_buffer *recbuf;
1050 struct ctdb_req_message message;
1051 TDB_DATA data;
1052 size_t np;
1053 int ret;
1055 if (state->num_buffers_sent == state->num_buffers) {
1056 struct ctdb_req_control request;
1058 ctdb_req_control_db_push_confirm(&request,
1059 recdb_id(state->recdb));
1060 subreq = ctdb_client_control_multi_send(state, state->ev,
1061 state->client,
1062 state->pnn_list,
1063 state->count,
1064 TIMEOUT(), &request);
1065 if (tevent_req_nomem(subreq, req)) {
1066 return;
1068 tevent_req_set_callback(subreq, push_database_confirmed, req);
1069 return;
1072 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
1073 if (ret != 0) {
1074 tevent_req_error(req, ret);
1075 return;
1078 data.dsize = ctdb_rec_buffer_len(recbuf);
1079 data.dptr = talloc_size(state, data.dsize);
1080 if (tevent_req_nomem(data.dptr, req)) {
1081 return;
1084 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
1086 message.srvid = state->srvid;
1087 message.data.data = data;
1089 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
1090 state->num_buffers_sent, recbuf->count,
1091 recdb_name(state->recdb));
1093 subreq = ctdb_client_message_multi_send(state, state->ev,
1094 state->client,
1095 state->pnn_list, state->count,
1096 &message);
1097 if (tevent_req_nomem(subreq, req)) {
1098 return;
1100 tevent_req_set_callback(subreq, push_database_send_done, req);
1102 state->num_records += recbuf->count;
1104 talloc_free(data.dptr);
1105 talloc_free(recbuf);
1108 static void push_database_send_done(struct tevent_req *subreq)
1110 struct tevent_req *req = tevent_req_callback_data(
1111 subreq, struct tevent_req);
1112 struct push_database_state *state = tevent_req_data(
1113 req, struct push_database_state);
1114 bool status;
1115 int ret;
1117 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1118 TALLOC_FREE(subreq);
1119 if (! status) {
1120 D_ERR("Sending recovery records failed for %s\n",
1121 recdb_name(state->recdb));
1122 tevent_req_error(req, ret);
1123 return;
1126 state->num_buffers_sent += 1;
1128 push_database_send_msg(req);
1131 static void push_database_confirmed(struct tevent_req *subreq)
1133 struct tevent_req *req = tevent_req_callback_data(
1134 subreq, struct tevent_req);
1135 struct push_database_state *state = tevent_req_data(
1136 req, struct push_database_state);
1137 struct ctdb_reply_control **reply;
1138 int *err_list;
1139 bool status;
1140 unsigned int i;
1141 int ret;
1142 uint32_t num_records;
1144 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1145 &err_list, &reply);
1146 TALLOC_FREE(subreq);
1147 if (! status) {
1148 int ret2;
1149 uint32_t pnn;
1151 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1152 state->count, err_list,
1153 &pnn);
1154 if (ret2 != 0) {
1155 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1156 " on node %u, ret=%d\n",
1157 recdb_name(state->recdb), pnn, ret2);
1158 } else {
1159 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1160 " ret=%d\n",
1161 recdb_name(state->recdb), ret);
1163 tevent_req_error(req, ret);
1164 return;
1167 for (i=0; i<state->count; i++) {
1168 ret = ctdb_reply_control_db_push_confirm(reply[i],
1169 &num_records);
1170 if (ret != 0) {
1171 tevent_req_error(req, EPROTO);
1172 return;
1175 if (num_records != state->num_records) {
1176 D_ERR("Node %u received %d of %d records for %s\n",
1177 state->pnn_list[i], num_records,
1178 state->num_records, recdb_name(state->recdb));
1179 tevent_req_error(req, EPROTO);
1180 return;
1184 talloc_free(reply);
1186 D_INFO("Pushed %d records for db %s\n",
1187 state->num_records, recdb_name(state->recdb));
1189 tevent_req_done(req);
1192 static bool push_database_recv(struct tevent_req *req, int *perr)
1194 return generic_recv(req, perr);
1198 * Collect databases using highest sequence number
1201 struct collect_highseqnum_db_state {
1202 struct tevent_context *ev;
1203 struct ctdb_client_context *client;
1204 struct node_list *nlist;
1205 uint32_t db_id;
1206 struct recdb_context *recdb;
1208 uint32_t max_pnn;
1211 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1212 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1214 static struct tevent_req *collect_highseqnum_db_send(
1215 TALLOC_CTX *mem_ctx,
1216 struct tevent_context *ev,
1217 struct ctdb_client_context *client,
1218 struct node_list *nlist,
1219 uint32_t db_id,
1220 struct recdb_context *recdb)
1222 struct tevent_req *req, *subreq;
1223 struct collect_highseqnum_db_state *state;
1224 struct ctdb_req_control request;
1226 req = tevent_req_create(mem_ctx, &state,
1227 struct collect_highseqnum_db_state);
1228 if (req == NULL) {
1229 return NULL;
1232 state->ev = ev;
1233 state->client = client;
1234 state->nlist = nlist;
1235 state->db_id = db_id;
1236 state->recdb = recdb;
1238 ctdb_req_control_get_db_seqnum(&request, db_id);
1239 subreq = ctdb_client_control_multi_send(mem_ctx,
1241 client,
1242 nlist->pnn_list,
1243 nlist->count,
1244 TIMEOUT(),
1245 &request);
1246 if (tevent_req_nomem(subreq, req)) {
1247 return tevent_req_post(req, ev);
1249 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1250 req);
1252 return req;
1255 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1257 struct tevent_req *req = tevent_req_callback_data(
1258 subreq, struct tevent_req);
1259 struct collect_highseqnum_db_state *state = tevent_req_data(
1260 req, struct collect_highseqnum_db_state);
1261 struct ctdb_reply_control **reply;
1262 int *err_list;
1263 bool status;
1264 unsigned int i;
1265 int ret;
1266 uint64_t seqnum, max_seqnum;
1268 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1269 &err_list, &reply);
1270 TALLOC_FREE(subreq);
1271 if (! status) {
1272 int ret2;
1273 uint32_t pnn;
1275 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1276 state->nlist->count,
1277 err_list,
1278 &pnn);
1279 if (ret2 != 0) {
1280 D_ERR("control GET_DB_SEQNUM failed for db %s"
1281 " on node %u, ret=%d\n",
1282 recdb_name(state->recdb), pnn, ret2);
1283 } else {
1284 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1285 " ret=%d\n",
1286 recdb_name(state->recdb), ret);
1288 tevent_req_error(req, ret);
1289 return;
1292 max_seqnum = 0;
1293 state->max_pnn = state->nlist->pnn_list[0];
1294 for (i=0; i<state->nlist->count; i++) {
1295 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1296 if (ret != 0) {
1297 tevent_req_error(req, EPROTO);
1298 return;
1301 if (max_seqnum < seqnum) {
1302 max_seqnum = seqnum;
1303 state->max_pnn = state->nlist->pnn_list[i];
1307 talloc_free(reply);
1309 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1310 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1312 subreq = pull_database_send(state,
1313 state->ev,
1314 state->client,
1315 state->max_pnn,
1316 state->recdb);
1317 if (tevent_req_nomem(subreq, req)) {
1318 return;
1320 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1321 req);
1324 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1326 struct tevent_req *req = tevent_req_callback_data(
1327 subreq, struct tevent_req);
1328 struct collect_highseqnum_db_state *state = tevent_req_data(
1329 req, struct collect_highseqnum_db_state);
1330 int ret;
1331 bool status;
1333 status = pull_database_recv(subreq, &ret);
1334 TALLOC_FREE(subreq);
1335 if (! status) {
1336 node_list_ban_credits(state->nlist, state->max_pnn);
1337 tevent_req_error(req, ret);
1338 return;
1341 tevent_req_done(req);
1344 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1346 return generic_recv(req, perr);
1350 * Collect all databases
1353 struct collect_all_db_state {
1354 struct tevent_context *ev;
1355 struct ctdb_client_context *client;
1356 struct node_list *nlist;
1357 uint32_t db_id;
1358 struct recdb_context *recdb;
1360 struct ctdb_pulldb pulldb;
1361 unsigned int index;
1364 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1366 static struct tevent_req *collect_all_db_send(
1367 TALLOC_CTX *mem_ctx,
1368 struct tevent_context *ev,
1369 struct ctdb_client_context *client,
1370 struct node_list *nlist,
1371 uint32_t db_id,
1372 struct recdb_context *recdb)
1374 struct tevent_req *req, *subreq;
1375 struct collect_all_db_state *state;
1377 req = tevent_req_create(mem_ctx, &state,
1378 struct collect_all_db_state);
1379 if (req == NULL) {
1380 return NULL;
1383 state->ev = ev;
1384 state->client = client;
1385 state->nlist = nlist;
1386 state->db_id = db_id;
1387 state->recdb = recdb;
1388 state->index = 0;
1390 subreq = pull_database_send(state,
1392 client,
1393 nlist->pnn_list[state->index],
1394 recdb);
1395 if (tevent_req_nomem(subreq, req)) {
1396 return tevent_req_post(req, ev);
1398 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1400 return req;
1403 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1405 struct tevent_req *req = tevent_req_callback_data(
1406 subreq, struct tevent_req);
1407 struct collect_all_db_state *state = tevent_req_data(
1408 req, struct collect_all_db_state);
1409 int ret;
1410 bool status;
1412 status = pull_database_recv(subreq, &ret);
1413 TALLOC_FREE(subreq);
1414 if (! status) {
1415 node_list_ban_credits(state->nlist,
1416 state->nlist->pnn_list[state->index]);
1417 tevent_req_error(req, ret);
1418 return;
1421 state->index += 1;
1422 if (state->index == state->nlist->count) {
1423 tevent_req_done(req);
1424 return;
1427 subreq = pull_database_send(state,
1428 state->ev,
1429 state->client,
1430 state->nlist->pnn_list[state->index],
1431 state->recdb);
1432 if (tevent_req_nomem(subreq, req)) {
1433 return;
1435 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1438 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1440 return generic_recv(req, perr);
1445 * For each database do the following:
1446 * - Get DB name from all nodes
1447 * - Attach database on missing nodes
1448 * - Get DB path
1449 * - Freeze database on all nodes
1450 * - Start transaction on all nodes
1451 * - Collect database from all nodes
1452 * - Wipe database on all nodes
1453 * - Push database to all nodes
1454 * - Commit transaction on all nodes
1455 * - Thaw database on all nodes
1458 struct recover_db_state {
1459 struct tevent_context *ev;
1460 struct ctdb_client_context *client;
1461 struct ctdb_tunable_list *tun_list;
1462 struct node_list *nlist;
1463 struct db *db;
1465 uint32_t destnode;
1466 struct ctdb_transdb transdb;
1468 const char *db_name, *db_path;
1469 struct recdb_context *recdb;
1472 static void recover_db_name_done(struct tevent_req *subreq);
1473 static void recover_db_create_missing_done(struct tevent_req *subreq);
1474 static void recover_db_path_done(struct tevent_req *subreq);
1475 static void recover_db_freeze_done(struct tevent_req *subreq);
1476 static void recover_db_transaction_started(struct tevent_req *subreq);
1477 static void recover_db_collect_done(struct tevent_req *subreq);
1478 static void recover_db_wipedb_done(struct tevent_req *subreq);
1479 static void recover_db_pushdb_done(struct tevent_req *subreq);
1480 static void recover_db_transaction_committed(struct tevent_req *subreq);
1481 static void recover_db_thaw_done(struct tevent_req *subreq);
1483 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1484 struct tevent_context *ev,
1485 struct ctdb_client_context *client,
1486 struct ctdb_tunable_list *tun_list,
1487 struct node_list *nlist,
1488 uint32_t generation,
1489 struct db *db)
1491 struct tevent_req *req, *subreq;
1492 struct recover_db_state *state;
1493 struct ctdb_req_control request;
1495 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1496 if (req == NULL) {
1497 return NULL;
1500 state->ev = ev;
1501 state->client = client;
1502 state->tun_list = tun_list;
1503 state->nlist = nlist;
1504 state->db = db;
1506 state->destnode = ctdb_client_pnn(client);
1507 state->transdb.db_id = db->db_id;
1508 state->transdb.tid = generation;
1510 ctdb_req_control_get_dbname(&request, db->db_id);
1511 subreq = ctdb_client_control_multi_send(state,
1513 client,
1514 state->db->pnn_list,
1515 state->db->num_nodes,
1516 TIMEOUT(),
1517 &request);
1518 if (tevent_req_nomem(subreq, req)) {
1519 return tevent_req_post(req, ev);
1521 tevent_req_set_callback(subreq, recover_db_name_done, req);
1523 return req;
1526 static void recover_db_name_done(struct tevent_req *subreq)
1528 struct tevent_req *req = tevent_req_callback_data(
1529 subreq, struct tevent_req);
1530 struct recover_db_state *state = tevent_req_data(
1531 req, struct recover_db_state);
1532 struct ctdb_reply_control **reply;
1533 int *err_list;
1534 unsigned int i;
1535 int ret;
1536 bool status;
1538 status = ctdb_client_control_multi_recv(subreq,
1539 &ret,
1540 state,
1541 &err_list,
1542 &reply);
1543 TALLOC_FREE(subreq);
1544 if (! status) {
1545 int ret2;
1546 uint32_t pnn;
1548 ret2 = ctdb_client_control_multi_error(state->db->pnn_list,
1549 state->db->num_nodes,
1550 err_list,
1551 &pnn);
1552 if (ret2 != 0) {
1553 D_ERR("control GET_DBNAME failed on node %u,"
1554 " ret=%d\n",
1555 pnn,
1556 ret2);
1557 } else {
1558 D_ERR("control GET_DBNAME failed, ret=%d\n",
1559 ret);
1561 tevent_req_error(req, ret);
1562 return;
1565 for (i = 0; i < state->db->num_nodes; i++) {
1566 const char *db_name;
1567 uint32_t pnn;
1569 pnn = state->nlist->pnn_list[i];
1571 ret = ctdb_reply_control_get_dbname(reply[i],
1572 state,
1573 &db_name);
1574 if (ret != 0) {
1575 D_ERR("control GET_DBNAME failed on node %u "
1576 "for db=0x%x, ret=%d\n",
1577 pnn,
1578 state->db->db_id,
1579 ret);
1580 tevent_req_error(req, EPROTO);
1581 return;
1584 if (state->db_name == NULL) {
1585 state->db_name = db_name;
1586 continue;
1589 if (strcmp(state->db_name, db_name) != 0) {
1590 D_ERR("Incompatible database name for 0x%"PRIx32" "
1591 "(%s != %s) on node %"PRIu32"\n",
1592 state->db->db_id,
1593 db_name,
1594 state->db_name,
1595 pnn);
1596 node_list_ban_credits(state->nlist, pnn);
1597 tevent_req_error(req, ret);
1598 return;
1602 talloc_free(reply);
1604 subreq = db_create_missing_send(state,
1605 state->ev,
1606 state->client,
1607 state->nlist,
1608 state->db_name,
1609 state->db);
1611 if (tevent_req_nomem(subreq, req)) {
1612 return;
1614 tevent_req_set_callback(subreq, recover_db_create_missing_done, req);
1617 static void recover_db_create_missing_done(struct tevent_req *subreq)
1619 struct tevent_req *req = tevent_req_callback_data(
1620 subreq, struct tevent_req);
1621 struct recover_db_state *state = tevent_req_data(
1622 req, struct recover_db_state);
1623 struct ctdb_req_control request;
1624 int ret;
1625 bool status;
1627 /* Could sanity check the db_id here */
1628 status = db_create_missing_recv(subreq, &ret);
1629 TALLOC_FREE(subreq);
1630 if (! status) {
1631 tevent_req_error(req, ret);
1632 return;
1635 ctdb_req_control_getdbpath(&request, state->db->db_id);
1636 subreq = ctdb_client_control_send(state, state->ev, state->client,
1637 state->destnode, TIMEOUT(),
1638 &request);
1639 if (tevent_req_nomem(subreq, req)) {
1640 return;
1642 tevent_req_set_callback(subreq, recover_db_path_done, req);
1645 static void recover_db_path_done(struct tevent_req *subreq)
1647 struct tevent_req *req = tevent_req_callback_data(
1648 subreq, struct tevent_req);
1649 struct recover_db_state *state = tevent_req_data(
1650 req, struct recover_db_state);
1651 struct ctdb_reply_control *reply;
1652 struct ctdb_req_control request;
1653 int ret;
1654 bool status;
1656 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1657 TALLOC_FREE(subreq);
1658 if (! status) {
1659 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1660 state->db_name, ret);
1661 tevent_req_error(req, ret);
1662 return;
1665 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1666 if (ret != 0) {
1667 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1668 state->db_name, ret);
1669 tevent_req_error(req, EPROTO);
1670 return;
1673 talloc_free(reply);
1675 ctdb_req_control_db_freeze(&request, state->db->db_id);
1676 subreq = ctdb_client_control_multi_send(state,
1677 state->ev,
1678 state->client,
1679 state->nlist->pnn_list,
1680 state->nlist->count,
1681 TIMEOUT(),
1682 &request);
1683 if (tevent_req_nomem(subreq, req)) {
1684 return;
1686 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1689 static void recover_db_freeze_done(struct tevent_req *subreq)
1691 struct tevent_req *req = tevent_req_callback_data(
1692 subreq, struct tevent_req);
1693 struct recover_db_state *state = tevent_req_data(
1694 req, struct recover_db_state);
1695 struct ctdb_req_control request;
1696 int *err_list;
1697 int ret;
1698 bool status;
1700 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1701 NULL);
1702 TALLOC_FREE(subreq);
1703 if (! status) {
1704 int ret2;
1705 uint32_t pnn;
1707 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1708 state->nlist->count,
1709 err_list,
1710 &pnn);
1711 if (ret2 != 0) {
1712 D_ERR("control FREEZE_DB failed for db %s"
1713 " on node %u, ret=%d\n",
1714 state->db_name, pnn, ret2);
1716 node_list_ban_credits(state->nlist, pnn);
1717 } else {
1718 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1719 state->db_name, ret);
1721 tevent_req_error(req, ret);
1722 return;
1725 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1726 subreq = ctdb_client_control_multi_send(state,
1727 state->ev,
1728 state->client,
1729 state->nlist->pnn_list,
1730 state->nlist->count,
1731 TIMEOUT(),
1732 &request);
1733 if (tevent_req_nomem(subreq, req)) {
1734 return;
1736 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1739 static void recover_db_transaction_started(struct tevent_req *subreq)
1741 struct tevent_req *req = tevent_req_callback_data(
1742 subreq, struct tevent_req);
1743 struct recover_db_state *state = tevent_req_data(
1744 req, struct recover_db_state);
1745 int *err_list;
1746 uint32_t flags;
1747 int ret;
1748 bool status;
1750 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1751 NULL);
1752 TALLOC_FREE(subreq);
1753 if (! status) {
1754 int ret2;
1755 uint32_t pnn;
1757 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1758 state->nlist->count,
1759 err_list,
1760 &pnn);
1761 if (ret2 != 0) {
1762 D_ERR("control TRANSACTION_DB failed for db=%s"
1763 " on node %u, ret=%d\n",
1764 state->db_name, pnn, ret2);
1765 } else {
1766 D_ERR("control TRANSACTION_DB failed for db=%s,"
1767 " ret=%d\n", state->db_name, ret);
1769 tevent_req_error(req, ret);
1770 return;
1773 flags = state->db->db_flags;
1774 state->recdb = recdb_create(state,
1775 state->db->db_id,
1776 state->db_name,
1777 state->db_path,
1778 state->tun_list->database_hash_size,
1779 flags & CTDB_DB_FLAGS_PERSISTENT);
1780 if (tevent_req_nomem(state->recdb, req)) {
1781 return;
1784 if ((flags & CTDB_DB_FLAGS_PERSISTENT) ||
1785 (flags & CTDB_DB_FLAGS_REPLICATED)) {
1786 subreq = collect_highseqnum_db_send(state,
1787 state->ev,
1788 state->client,
1789 state->nlist,
1790 state->db->db_id,
1791 state->recdb);
1792 } else {
1793 subreq = collect_all_db_send(state,
1794 state->ev,
1795 state->client,
1796 state->nlist,
1797 state->db->db_id,
1798 state->recdb);
1800 if (tevent_req_nomem(subreq, req)) {
1801 return;
1803 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1806 static void recover_db_collect_done(struct tevent_req *subreq)
1808 struct tevent_req *req = tevent_req_callback_data(
1809 subreq, struct tevent_req);
1810 struct recover_db_state *state = tevent_req_data(
1811 req, struct recover_db_state);
1812 struct ctdb_req_control request;
1813 int ret;
1814 bool status;
1816 if ((state->db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1817 (state->db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1818 status = collect_highseqnum_db_recv(subreq, &ret);
1819 } else {
1820 status = collect_all_db_recv(subreq, &ret);
1822 TALLOC_FREE(subreq);
1823 if (! status) {
1824 tevent_req_error(req, ret);
1825 return;
1828 ctdb_req_control_wipe_database(&request, &state->transdb);
1829 subreq = ctdb_client_control_multi_send(state,
1830 state->ev,
1831 state->client,
1832 state->nlist->pnn_list,
1833 state->nlist->count,
1834 TIMEOUT(),
1835 &request);
1836 if (tevent_req_nomem(subreq, req)) {
1837 return;
1839 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1842 static void recover_db_wipedb_done(struct tevent_req *subreq)
1844 struct tevent_req *req = tevent_req_callback_data(
1845 subreq, struct tevent_req);
1846 struct recover_db_state *state = tevent_req_data(
1847 req, struct recover_db_state);
1848 int *err_list;
1849 int ret;
1850 bool status;
1852 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1853 NULL);
1854 TALLOC_FREE(subreq);
1855 if (! status) {
1856 int ret2;
1857 uint32_t pnn;
1859 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1860 state->nlist->count,
1861 err_list,
1862 &pnn);
1863 if (ret2 != 0) {
1864 D_ERR("control WIPEDB failed for db %s on node %u,"
1865 " ret=%d\n", state->db_name, pnn, ret2);
1866 } else {
1867 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1868 state->db_name, ret);
1870 tevent_req_error(req, ret);
1871 return;
1874 subreq = push_database_send(state,
1875 state->ev,
1876 state->client,
1877 state->nlist->pnn_list,
1878 state->nlist->count,
1879 state->recdb,
1880 state->tun_list->rec_buffer_size_limit);
1881 if (tevent_req_nomem(subreq, req)) {
1882 return;
1884 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1887 static void recover_db_pushdb_done(struct tevent_req *subreq)
1889 struct tevent_req *req = tevent_req_callback_data(
1890 subreq, struct tevent_req);
1891 struct recover_db_state *state = tevent_req_data(
1892 req, struct recover_db_state);
1893 struct ctdb_req_control request;
1894 int ret;
1895 bool status;
1897 status = push_database_recv(subreq, &ret);
1898 TALLOC_FREE(subreq);
1899 if (! status) {
1900 tevent_req_error(req, ret);
1901 return;
1904 TALLOC_FREE(state->recdb);
1906 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1907 subreq = ctdb_client_control_multi_send(state,
1908 state->ev,
1909 state->client,
1910 state->nlist->pnn_list,
1911 state->nlist->count,
1912 TIMEOUT(),
1913 &request);
1914 if (tevent_req_nomem(subreq, req)) {
1915 return;
1917 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1920 static void recover_db_transaction_committed(struct tevent_req *subreq)
1922 struct tevent_req *req = tevent_req_callback_data(
1923 subreq, struct tevent_req);
1924 struct recover_db_state *state = tevent_req_data(
1925 req, struct recover_db_state);
1926 struct ctdb_req_control request;
1927 int *err_list;
1928 int ret;
1929 bool status;
1931 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1932 NULL);
1933 TALLOC_FREE(subreq);
1934 if (! status) {
1935 int ret2;
1936 uint32_t pnn;
1938 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1939 state->nlist->count,
1940 err_list,
1941 &pnn);
1942 if (ret2 != 0) {
1943 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1944 " on node %u, ret=%d\n",
1945 state->db_name, pnn, ret2);
1946 } else {
1947 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1948 " ret=%d\n", state->db_name, ret);
1950 tevent_req_error(req, ret);
1951 return;
1954 ctdb_req_control_db_thaw(&request, state->db->db_id);
1955 subreq = ctdb_client_control_multi_send(state,
1956 state->ev,
1957 state->client,
1958 state->nlist->pnn_list,
1959 state->nlist->count,
1960 TIMEOUT(),
1961 &request);
1962 if (tevent_req_nomem(subreq, req)) {
1963 return;
1965 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1968 static void recover_db_thaw_done(struct tevent_req *subreq)
1970 struct tevent_req *req = tevent_req_callback_data(
1971 subreq, struct tevent_req);
1972 struct recover_db_state *state = tevent_req_data(
1973 req, struct recover_db_state);
1974 int *err_list;
1975 int ret;
1976 bool status;
1978 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1979 NULL);
1980 TALLOC_FREE(subreq);
1981 if (! status) {
1982 int ret2;
1983 uint32_t pnn;
1985 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1986 state->nlist->count,
1987 err_list,
1988 &pnn);
1989 if (ret2 != 0) {
1990 D_ERR("control DB_THAW failed for db %s on node %u,"
1991 " ret=%d\n", state->db_name, pnn, ret2);
1992 } else {
1993 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1994 state->db_name, ret);
1996 tevent_req_error(req, ret);
1997 return;
2000 tevent_req_done(req);
2003 static bool recover_db_recv(struct tevent_req *req)
2005 return generic_recv(req, NULL);
2010 * Start database recovery for each database
2012 * Try to recover each database 5 times before failing recovery.
2015 struct db_recovery_state {
2016 struct tevent_context *ev;
2017 struct db_list *dblist;
2018 unsigned int num_replies;
2019 unsigned int num_failed;
2022 struct db_recovery_one_state {
2023 struct tevent_req *req;
2024 struct ctdb_client_context *client;
2025 struct db_list *dblist;
2026 struct ctdb_tunable_list *tun_list;
2027 struct node_list *nlist;
2028 uint32_t generation;
2029 struct db *db;
2030 int num_fails;
2033 static void db_recovery_one_done(struct tevent_req *subreq);
2035 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
2036 struct tevent_context *ev,
2037 struct ctdb_client_context *client,
2038 struct db_list *dblist,
2039 struct ctdb_tunable_list *tun_list,
2040 struct node_list *nlist,
2041 uint32_t generation)
2043 struct tevent_req *req, *subreq;
2044 struct db_recovery_state *state;
2045 struct db *db;
2047 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
2048 if (req == NULL) {
2049 return NULL;
2052 state->ev = ev;
2053 state->dblist = dblist;
2054 state->num_replies = 0;
2055 state->num_failed = 0;
2057 if (dblist->num_dbs == 0) {
2058 tevent_req_done(req);
2059 return tevent_req_post(req, ev);
2062 for (db = dblist->db; db != NULL; db = db->next) {
2063 struct db_recovery_one_state *substate;
2065 substate = talloc_zero(state, struct db_recovery_one_state);
2066 if (tevent_req_nomem(substate, req)) {
2067 return tevent_req_post(req, ev);
2070 substate->req = req;
2071 substate->client = client;
2072 substate->dblist = dblist;
2073 substate->tun_list = tun_list;
2074 substate->nlist = nlist;
2075 substate->generation = generation;
2076 substate->db = db;
2078 subreq = recover_db_send(state,
2080 client,
2081 tun_list,
2082 nlist,
2083 generation,
2084 substate->db);
2085 if (tevent_req_nomem(subreq, req)) {
2086 return tevent_req_post(req, ev);
2088 tevent_req_set_callback(subreq, db_recovery_one_done,
2089 substate);
2090 D_NOTICE("recover database 0x%08x\n", substate->db->db_id);
2093 return req;
2096 static void db_recovery_one_done(struct tevent_req *subreq)
2098 struct db_recovery_one_state *substate = tevent_req_callback_data(
2099 subreq, struct db_recovery_one_state);
2100 struct tevent_req *req = substate->req;
2101 struct db_recovery_state *state = tevent_req_data(
2102 req, struct db_recovery_state);
2103 bool status;
2105 status = recover_db_recv(subreq);
2106 TALLOC_FREE(subreq);
2108 if (status) {
2109 talloc_free(substate);
2110 goto done;
2113 substate->num_fails += 1;
2114 if (substate->num_fails < NUM_RETRIES) {
2115 subreq = recover_db_send(state,
2116 state->ev,
2117 substate->client,
2118 substate->tun_list,
2119 substate->nlist,
2120 substate->generation,
2121 substate->db);
2122 if (tevent_req_nomem(subreq, req)) {
2123 goto failed;
2125 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2126 D_NOTICE("recover database 0x%08x, attempt %d\n",
2127 substate->db->db_id, substate->num_fails+1);
2128 return;
2131 failed:
2132 state->num_failed += 1;
2134 done:
2135 state->num_replies += 1;
2137 if (state->num_replies == state->dblist->num_dbs) {
2138 tevent_req_done(req);
2142 static bool db_recovery_recv(struct tevent_req *req, unsigned int *count)
2144 struct db_recovery_state *state = tevent_req_data(
2145 req, struct db_recovery_state);
2146 int err;
2148 if (tevent_req_is_unix_error(req, &err)) {
2149 *count = 0;
2150 return false;
2153 *count = state->num_replies - state->num_failed;
2155 if (state->num_failed > 0) {
2156 return false;
2159 return true;
2162 struct ban_node_state {
2163 struct tevent_context *ev;
2164 struct ctdb_client_context *client;
2165 struct ctdb_tunable_list *tun_list;
2166 struct node_list *nlist;
2167 uint32_t destnode;
2169 uint32_t max_pnn;
2172 static bool ban_node_check(struct tevent_req *req);
2173 static void ban_node_check_done(struct tevent_req *subreq);
2174 static void ban_node_done(struct tevent_req *subreq);
2176 static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx,
2177 struct tevent_context *ev,
2178 struct ctdb_client_context *client,
2179 struct ctdb_tunable_list *tun_list,
2180 struct node_list *nlist)
2182 struct tevent_req *req;
2183 struct ban_node_state *state;
2184 bool ok;
2186 req = tevent_req_create(mem_ctx, &state, struct ban_node_state);
2187 if (req == NULL) {
2188 return NULL;
2191 state->ev = ev;
2192 state->client = client;
2193 state->tun_list = tun_list;
2194 state->nlist = nlist;
2195 state->destnode = ctdb_client_pnn(client);
2197 /* Bans are not enabled */
2198 if (state->tun_list->enable_bans == 0) {
2199 D_ERR("Bans are not enabled\n");
2200 tevent_req_done(req);
2201 return tevent_req_post(req, ev);
2204 ok = ban_node_check(req);
2205 if (!ok) {
2206 return tevent_req_post(req, ev);
2209 return req;
2212 static bool ban_node_check(struct tevent_req *req)
2214 struct tevent_req *subreq;
2215 struct ban_node_state *state = tevent_req_data(
2216 req, struct ban_node_state);
2217 struct ctdb_req_control request;
2218 unsigned max_credits = 0, i;
2220 for (i=0; i<state->nlist->count; i++) {
2221 if (state->nlist->ban_credits[i] > max_credits) {
2222 state->max_pnn = state->nlist->pnn_list[i];
2223 max_credits = state->nlist->ban_credits[i];
2227 if (max_credits < NUM_RETRIES) {
2228 tevent_req_done(req);
2229 return false;
2232 ctdb_req_control_get_nodemap(&request);
2233 subreq = ctdb_client_control_send(state,
2234 state->ev,
2235 state->client,
2236 state->max_pnn,
2237 TIMEOUT(),
2238 &request);
2239 if (tevent_req_nomem(subreq, req)) {
2240 return false;
2242 tevent_req_set_callback(subreq, ban_node_check_done, req);
2244 return true;
2247 static void ban_node_check_done(struct tevent_req *subreq)
2249 struct tevent_req *req = tevent_req_callback_data(
2250 subreq, struct tevent_req);
2251 struct ban_node_state *state = tevent_req_data(
2252 req, struct ban_node_state);
2253 struct ctdb_reply_control *reply;
2254 struct ctdb_node_map *nodemap;
2255 struct ctdb_req_control request;
2256 struct ctdb_ban_state ban;
2257 unsigned int i;
2258 int ret;
2259 bool ok;
2261 ok = ctdb_client_control_recv(subreq, &ret, state, &reply);
2262 TALLOC_FREE(subreq);
2263 if (!ok) {
2264 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2265 state->max_pnn, ret);
2266 tevent_req_error(req, ret);
2267 return;
2270 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2271 if (ret != 0) {
2272 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2273 tevent_req_error(req, ret);
2274 return;
2277 for (i=0; i<nodemap->num; i++) {
2278 if (nodemap->node[i].pnn != state->max_pnn) {
2279 continue;
2282 /* If the node became inactive, reset ban_credits */
2283 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2284 unsigned int j;
2286 for (j=0; j<state->nlist->count; j++) {
2287 if (state->nlist->pnn_list[j] ==
2288 state->max_pnn) {
2289 state->nlist->ban_credits[j] = 0;
2290 break;
2293 state->max_pnn = CTDB_UNKNOWN_PNN;
2297 talloc_free(nodemap);
2298 talloc_free(reply);
2300 /* If node becames inactive during recovery, pick next */
2301 if (state->max_pnn == CTDB_UNKNOWN_PNN) {
2302 (void) ban_node_check(req);
2303 return;
2306 ban = (struct ctdb_ban_state) {
2307 .pnn = state->max_pnn,
2308 .time = state->tun_list->recovery_ban_period,
2311 D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time);
2313 ctdb_req_control_set_ban_state(&request, &ban);
2314 subreq = ctdb_client_control_send(state,
2315 state->ev,
2316 state->client,
2317 ban.pnn,
2318 TIMEOUT(),
2319 &request);
2320 if (tevent_req_nomem(subreq, req)) {
2321 return;
2323 tevent_req_set_callback(subreq, ban_node_done, req);
2326 static void ban_node_done(struct tevent_req *subreq)
2328 struct tevent_req *req = tevent_req_callback_data(
2329 subreq, struct tevent_req);
2330 struct node_ban_state *state = tevent_req_data(
2331 req, struct node_ban_state);
2332 struct ctdb_reply_control *reply;
2333 int ret;
2334 bool status;
2336 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2337 TALLOC_FREE(subreq);
2338 if (! status) {
2339 tevent_req_error(req, ret);
2340 return;
2343 ret = ctdb_reply_control_set_ban_state(reply);
2344 if (ret != 0) {
2345 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2346 tevent_req_error(req, ret);
2347 return;
2350 talloc_free(reply);
2351 tevent_req_done(req);
2354 static bool ban_node_recv(struct tevent_req *req, int *perr)
2356 if (tevent_req_is_unix_error(req, perr)) {
2357 return false;
2360 return true;
2364 * Run the parallel database recovery
2366 * - Get tunables
2367 * - Get nodemap from all nodes
2368 * - Get capabilities from all nodes
2369 * - Get dbmap
2370 * - Set RECOVERY_ACTIVE
2371 * - Send START_RECOVERY
2372 * - Update vnnmap on all nodes
2373 * - Run database recovery
2374 * - Set RECOVERY_NORMAL
2375 * - Send END_RECOVERY
2378 struct recovery_state {
2379 struct tevent_context *ev;
2380 struct ctdb_client_context *client;
2381 uint32_t generation;
2382 uint32_t destnode;
2383 struct node_list *nlist;
2384 struct ctdb_tunable_list *tun_list;
2385 struct ctdb_vnn_map *vnnmap;
2386 struct db_list *dblist;
2389 static void recovery_tunables_done(struct tevent_req *subreq);
2390 static void recovery_nodemap_done(struct tevent_req *subreq);
2391 static void recovery_nodemap_verify(struct tevent_req *subreq);
2392 static void recovery_capabilities_done(struct tevent_req *subreq);
2393 static void recovery_dbmap_done(struct tevent_req *subreq);
2394 static void recovery_active_done(struct tevent_req *subreq);
2395 static void recovery_start_recovery_done(struct tevent_req *subreq);
2396 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2397 static void recovery_db_recovery_done(struct tevent_req *subreq);
2398 static void recovery_failed_done(struct tevent_req *subreq);
2399 static void recovery_normal_done(struct tevent_req *subreq);
2400 static void recovery_end_recovery_done(struct tevent_req *subreq);
2402 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2403 struct tevent_context *ev,
2404 struct ctdb_client_context *client,
2405 uint32_t generation)
2407 struct tevent_req *req, *subreq;
2408 struct recovery_state *state;
2409 struct ctdb_req_control request;
2411 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2412 if (req == NULL) {
2413 return NULL;
2416 state->ev = ev;
2417 state->client = client;
2418 state->generation = generation;
2419 state->destnode = ctdb_client_pnn(client);
2421 ctdb_req_control_get_all_tunables(&request);
2422 subreq = ctdb_client_control_send(state, state->ev, state->client,
2423 state->destnode, TIMEOUT(),
2424 &request);
2425 if (tevent_req_nomem(subreq, req)) {
2426 return tevent_req_post(req, ev);
2428 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2430 return req;
2433 static void recovery_tunables_done(struct tevent_req *subreq)
2435 struct tevent_req *req = tevent_req_callback_data(
2436 subreq, struct tevent_req);
2437 struct recovery_state *state = tevent_req_data(
2438 req, struct recovery_state);
2439 struct ctdb_reply_control *reply;
2440 struct ctdb_req_control request;
2441 int ret;
2442 bool status;
2444 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2445 TALLOC_FREE(subreq);
2446 if (! status) {
2447 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2448 tevent_req_error(req, ret);
2449 return;
2452 ret = ctdb_reply_control_get_all_tunables(reply, state,
2453 &state->tun_list);
2454 if (ret != 0) {
2455 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2456 tevent_req_error(req, EPROTO);
2457 return;
2460 talloc_free(reply);
2462 recover_timeout = state->tun_list->recover_timeout;
2464 ctdb_req_control_get_nodemap(&request);
2465 subreq = ctdb_client_control_send(state, state->ev, state->client,
2466 state->destnode, TIMEOUT(),
2467 &request);
2468 if (tevent_req_nomem(subreq, req)) {
2469 return;
2471 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2474 static void recovery_nodemap_done(struct tevent_req *subreq)
2476 struct tevent_req *req = tevent_req_callback_data(
2477 subreq, struct tevent_req);
2478 struct recovery_state *state = tevent_req_data(
2479 req, struct recovery_state);
2480 struct ctdb_reply_control *reply;
2481 struct ctdb_req_control request;
2482 struct ctdb_node_map *nodemap;
2483 unsigned int i;
2484 bool status;
2485 int ret;
2487 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2488 TALLOC_FREE(subreq);
2489 if (! status) {
2490 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2491 state->destnode, ret);
2492 tevent_req_error(req, ret);
2493 return;
2496 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2497 if (ret != 0) {
2498 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2499 tevent_req_error(req, ret);
2500 return;
2503 state->nlist = node_list_init(state, nodemap->num);
2504 if (tevent_req_nomem(state->nlist, req)) {
2505 return;
2508 for (i=0; i<nodemap->num; i++) {
2509 bool ok;
2511 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2512 continue;
2515 ok = node_list_add(state->nlist, nodemap->node[i].pnn);
2516 if (!ok) {
2517 tevent_req_error(req, EINVAL);
2518 return;
2522 talloc_free(nodemap);
2523 talloc_free(reply);
2525 /* Verify flags by getting local node information from each node */
2526 ctdb_req_control_get_nodemap(&request);
2527 subreq = ctdb_client_control_multi_send(state,
2528 state->ev,
2529 state->client,
2530 state->nlist->pnn_list,
2531 state->nlist->count,
2532 TIMEOUT(),
2533 &request);
2534 if (tevent_req_nomem(subreq, req)) {
2535 return;
2537 tevent_req_set_callback(subreq, recovery_nodemap_verify, req);
2540 static void recovery_nodemap_verify(struct tevent_req *subreq)
2542 struct tevent_req *req = tevent_req_callback_data(
2543 subreq, struct tevent_req);
2544 struct recovery_state *state = tevent_req_data(
2545 req, struct recovery_state);
2546 struct ctdb_req_control request;
2547 struct ctdb_reply_control **reply;
2548 struct node_list *nlist;
2549 unsigned int i;
2550 int *err_list;
2551 int ret;
2552 bool status;
2554 status = ctdb_client_control_multi_recv(subreq,
2555 &ret,
2556 state,
2557 &err_list,
2558 &reply);
2559 TALLOC_FREE(subreq);
2560 if (! status) {
2561 int ret2;
2562 uint32_t pnn;
2564 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2565 state->nlist->count,
2566 err_list,
2567 &pnn);
2568 if (ret2 != 0) {
2569 D_ERR("control GET_NODEMAP failed on node %u,"
2570 " ret=%d\n", pnn, ret2);
2571 } else {
2572 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2574 tevent_req_error(req, ret);
2575 return;
2578 nlist = node_list_init(state, state->nlist->size);
2579 if (tevent_req_nomem(nlist, req)) {
2580 return;
2583 for (i=0; i<state->nlist->count; i++) {
2584 struct ctdb_node_map *nodemap = NULL;
2585 uint32_t pnn, flags;
2586 unsigned int j;
2587 bool ok;
2589 pnn = state->nlist->pnn_list[i];
2590 ret = ctdb_reply_control_get_nodemap(reply[i],
2591 state,
2592 &nodemap);
2593 if (ret != 0) {
2594 D_ERR("control GET_NODEMAP failed on node %u\n", pnn);
2595 tevent_req_error(req, EPROTO);
2596 return;
2599 flags = NODE_FLAGS_DISCONNECTED;
2600 for (j=0; j<nodemap->num; j++) {
2601 if (nodemap->node[j].pnn == pnn) {
2602 flags = nodemap->node[j].flags;
2603 break;
2607 TALLOC_FREE(nodemap);
2609 if (flags & NODE_FLAGS_INACTIVE) {
2610 continue;
2613 ok = node_list_add(nlist, pnn);
2614 if (!ok) {
2615 tevent_req_error(req, EINVAL);
2616 return;
2620 talloc_free(reply);
2622 talloc_free(state->nlist);
2623 state->nlist = nlist;
2625 ctdb_req_control_get_capabilities(&request);
2626 subreq = ctdb_client_control_multi_send(state,
2627 state->ev,
2628 state->client,
2629 state->nlist->pnn_list,
2630 state->nlist->count,
2631 TIMEOUT(),
2632 &request);
2633 if (tevent_req_nomem(subreq, req)) {
2634 return;
2636 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2639 static void recovery_capabilities_done(struct tevent_req *subreq)
2641 struct tevent_req *req = tevent_req_callback_data(
2642 subreq, struct tevent_req);
2643 struct recovery_state *state = tevent_req_data(
2644 req, struct recovery_state);
2645 struct ctdb_reply_control **reply;
2646 struct ctdb_req_control request;
2647 int *err_list;
2648 unsigned int i;
2649 int ret;
2650 bool status;
2652 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2653 &reply);
2654 TALLOC_FREE(subreq);
2655 if (! status) {
2656 int ret2;
2657 uint32_t pnn;
2659 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2660 state->nlist->count,
2661 err_list,
2662 &pnn);
2663 if (ret2 != 0) {
2664 D_ERR("control GET_CAPABILITIES failed on node %u,"
2665 " ret=%d\n", pnn, ret2);
2666 } else {
2667 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2668 ret);
2670 tevent_req_error(req, ret);
2671 return;
2674 for (i=0; i<state->nlist->count; i++) {
2675 uint32_t caps;
2677 ret = ctdb_reply_control_get_capabilities(reply[i], &caps);
2678 if (ret != 0) {
2679 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2680 state->nlist->pnn_list[i]);
2681 tevent_req_error(req, EPROTO);
2682 return;
2685 state->nlist->caps[i] = caps;
2688 talloc_free(reply);
2690 ctdb_req_control_get_dbmap(&request);
2691 subreq = ctdb_client_control_multi_send(state,
2692 state->ev,
2693 state->client,
2694 state->nlist->pnn_list,
2695 state->nlist->count,
2696 TIMEOUT(),
2697 &request);
2698 if (tevent_req_nomem(subreq, req)) {
2699 return;
2701 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2704 static void recovery_dbmap_done(struct tevent_req *subreq)
2706 struct tevent_req *req = tevent_req_callback_data(
2707 subreq, struct tevent_req);
2708 struct recovery_state *state = tevent_req_data(
2709 req, struct recovery_state);
2710 struct ctdb_reply_control **reply;
2711 struct ctdb_req_control request;
2712 int *err_list;
2713 unsigned int i, j;
2714 int ret;
2715 bool status;
2717 status = ctdb_client_control_multi_recv(subreq,
2718 &ret,
2719 state,
2720 &err_list,
2721 &reply);
2722 TALLOC_FREE(subreq);
2723 if (! status) {
2724 int ret2;
2725 uint32_t pnn;
2727 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2728 state->nlist->count,
2729 err_list,
2730 &pnn);
2731 if (ret2 != 0) {
2732 D_ERR("control GET_DBMAP failed on node %u,"
2733 " ret=%d\n", pnn, ret2);
2734 } else {
2735 D_ERR("control GET_DBMAP failed, ret=%d\n",
2736 ret);
2738 tevent_req_error(req, ret);
2739 return;
2742 state->dblist = db_list_init(state, state->nlist->count);
2743 if (tevent_req_nomem(state->dblist, req)) {
2744 D_ERR("memory allocation error\n");
2745 return;
2748 for (i = 0; i < state->nlist->count; i++) {
2749 struct ctdb_dbid_map *dbmap = NULL;
2750 uint32_t pnn;
2752 pnn = state->nlist->pnn_list[i];
2754 ret = ctdb_reply_control_get_dbmap(reply[i], state, &dbmap);
2755 if (ret != 0) {
2756 D_ERR("control GET_DBMAP failed on node %u\n",
2757 pnn);
2758 tevent_req_error(req, EPROTO);
2759 return;
2762 for (j = 0; j < dbmap->num; j++) {
2763 ret = db_list_check_and_add(state->dblist,
2764 dbmap->dbs[j].db_id,
2765 dbmap->dbs[j].flags,
2766 pnn);
2767 if (ret != 0) {
2768 D_ERR("failed to add database list entry, "
2769 "ret=%d\n",
2770 ret);
2771 tevent_req_error(req, ret);
2772 return;
2776 TALLOC_FREE(dbmap);
2779 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2780 subreq = ctdb_client_control_multi_send(state,
2781 state->ev,
2782 state->client,
2783 state->nlist->pnn_list,
2784 state->nlist->count,
2785 TIMEOUT(),
2786 &request);
2787 if (tevent_req_nomem(subreq, req)) {
2788 return;
2790 tevent_req_set_callback(subreq, recovery_active_done, req);
2793 static void recovery_active_done(struct tevent_req *subreq)
2795 struct tevent_req *req = tevent_req_callback_data(
2796 subreq, struct tevent_req);
2797 struct recovery_state *state = tevent_req_data(
2798 req, struct recovery_state);
2799 struct ctdb_req_control request;
2800 struct ctdb_vnn_map *vnnmap;
2801 int *err_list;
2802 int ret;
2803 bool status;
2805 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2806 NULL);
2807 TALLOC_FREE(subreq);
2808 if (! status) {
2809 int ret2;
2810 uint32_t pnn;
2812 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2813 state->nlist->count,
2814 err_list,
2815 &pnn);
2816 if (ret2 != 0) {
2817 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2818 " ret=%d\n", pnn, ret2);
2819 } else {
2820 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2821 ret);
2823 tevent_req_error(req, ret);
2824 return;
2827 D_ERR("Set recovery mode to ACTIVE\n");
2829 /* Calculate new VNNMAP */
2830 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2831 if (tevent_req_nomem(vnnmap, req)) {
2832 return;
2835 vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size);
2836 if (tevent_req_nomem(vnnmap->map, req)) {
2837 return;
2840 if (vnnmap->size == 0) {
2841 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2842 vnnmap->map[0] = state->destnode;
2843 vnnmap->size = 1;
2846 vnnmap->generation = state->generation;
2848 state->vnnmap = vnnmap;
2850 ctdb_req_control_start_recovery(&request);
2851 subreq = ctdb_client_control_multi_send(state,
2852 state->ev,
2853 state->client,
2854 state->nlist->pnn_list,
2855 state->nlist->count,
2856 TIMEOUT(),
2857 &request);
2858 if (tevent_req_nomem(subreq, req)) {
2859 return;
2861 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2864 static void recovery_start_recovery_done(struct tevent_req *subreq)
2866 struct tevent_req *req = tevent_req_callback_data(
2867 subreq, struct tevent_req);
2868 struct recovery_state *state = tevent_req_data(
2869 req, struct recovery_state);
2870 struct ctdb_req_control request;
2871 int *err_list;
2872 int ret;
2873 bool status;
2875 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2876 NULL);
2877 TALLOC_FREE(subreq);
2878 if (! status) {
2879 int ret2;
2880 uint32_t pnn;
2882 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2883 state->nlist->count,
2884 err_list,
2885 &pnn);
2886 if (ret2 != 0) {
2887 D_ERR("failed to run start_recovery event on node %u,"
2888 " ret=%d\n", pnn, ret2);
2889 } else {
2890 D_ERR("failed to run start_recovery event, ret=%d\n",
2891 ret);
2893 tevent_req_error(req, ret);
2894 return;
2897 D_ERR("start_recovery event finished\n");
2899 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2900 subreq = ctdb_client_control_multi_send(state,
2901 state->ev,
2902 state->client,
2903 state->nlist->pnn_list,
2904 state->nlist->count,
2905 TIMEOUT(),
2906 &request);
2907 if (tevent_req_nomem(subreq, req)) {
2908 return;
2910 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2913 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2915 struct tevent_req *req = tevent_req_callback_data(
2916 subreq, struct tevent_req);
2917 struct recovery_state *state = tevent_req_data(
2918 req, struct recovery_state);
2919 int *err_list;
2920 int ret;
2921 bool status;
2923 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2924 NULL);
2925 TALLOC_FREE(subreq);
2926 if (! status) {
2927 int ret2;
2928 uint32_t pnn;
2930 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2931 state->nlist->count,
2932 err_list,
2933 &pnn);
2934 if (ret2 != 0) {
2935 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2936 pnn, ret2);
2937 } else {
2938 D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2940 tevent_req_error(req, ret);
2941 return;
2944 D_NOTICE("updated VNNMAP\n");
2946 subreq = db_recovery_send(state,
2947 state->ev,
2948 state->client,
2949 state->dblist,
2950 state->tun_list,
2951 state->nlist,
2952 state->vnnmap->generation);
2953 if (tevent_req_nomem(subreq, req)) {
2954 return;
2956 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2959 static void recovery_db_recovery_done(struct tevent_req *subreq)
2961 struct tevent_req *req = tevent_req_callback_data(
2962 subreq, struct tevent_req);
2963 struct recovery_state *state = tevent_req_data(
2964 req, struct recovery_state);
2965 struct ctdb_req_control request;
2966 bool status;
2967 unsigned int count;
2969 status = db_recovery_recv(subreq, &count);
2970 TALLOC_FREE(subreq);
2972 D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs);
2974 if (! status) {
2975 subreq = ban_node_send(state,
2976 state->ev,
2977 state->client,
2978 state->tun_list,
2979 state->nlist);
2980 if (tevent_req_nomem(subreq, req)) {
2981 return;
2983 tevent_req_set_callback(subreq, recovery_failed_done, req);
2984 return;
2987 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2988 subreq = ctdb_client_control_multi_send(state,
2989 state->ev,
2990 state->client,
2991 state->nlist->pnn_list,
2992 state->nlist->count,
2993 TIMEOUT(),
2994 &request);
2995 if (tevent_req_nomem(subreq, req)) {
2996 return;
2998 tevent_req_set_callback(subreq, recovery_normal_done, req);
3001 static void recovery_failed_done(struct tevent_req *subreq)
3003 struct tevent_req *req = tevent_req_callback_data(
3004 subreq, struct tevent_req);
3005 int ret;
3006 bool status;
3008 status = ban_node_recv(subreq, &ret);
3009 TALLOC_FREE(subreq);
3010 if (! status) {
3011 D_ERR("failed to ban node, ret=%d\n", ret);
3014 tevent_req_error(req, EIO);
3017 static void recovery_normal_done(struct tevent_req *subreq)
3019 struct tevent_req *req = tevent_req_callback_data(
3020 subreq, struct tevent_req);
3021 struct recovery_state *state = tevent_req_data(
3022 req, struct recovery_state);
3023 struct ctdb_req_control request;
3024 int *err_list;
3025 int ret;
3026 bool status;
3028 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3029 NULL);
3030 TALLOC_FREE(subreq);
3031 if (! status) {
3032 int ret2;
3033 uint32_t pnn;
3035 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3036 state->nlist->count,
3037 err_list,
3038 &pnn);
3039 if (ret2 != 0) {
3040 D_ERR("failed to set recovery mode NORMAL on node %u,"
3041 " ret=%d\n", pnn, ret2);
3042 } else {
3043 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
3044 ret);
3046 tevent_req_error(req, ret);
3047 return;
3050 D_ERR("Set recovery mode to NORMAL\n");
3052 ctdb_req_control_end_recovery(&request);
3053 subreq = ctdb_client_control_multi_send(state,
3054 state->ev,
3055 state->client,
3056 state->nlist->pnn_list,
3057 state->nlist->count,
3058 TIMEOUT(),
3059 &request);
3060 if (tevent_req_nomem(subreq, req)) {
3061 return;
3063 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
3066 static void recovery_end_recovery_done(struct tevent_req *subreq)
3068 struct tevent_req *req = tevent_req_callback_data(
3069 subreq, struct tevent_req);
3070 struct recovery_state *state = tevent_req_data(
3071 req, struct recovery_state);
3072 int *err_list;
3073 int ret;
3074 bool status;
3076 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3077 NULL);
3078 TALLOC_FREE(subreq);
3079 if (! status) {
3080 int ret2;
3081 uint32_t pnn;
3083 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3084 state->nlist->count,
3085 err_list,
3086 &pnn);
3087 if (ret2 != 0) {
3088 D_ERR("failed to run recovered event on node %u,"
3089 " ret=%d\n", pnn, ret2);
3090 } else {
3091 D_ERR("failed to run recovered event, ret=%d\n", ret);
3093 tevent_req_error(req, ret);
3094 return;
3097 D_ERR("recovered event finished\n");
3099 tevent_req_done(req);
3102 static void recovery_recv(struct tevent_req *req, int *perr)
3104 generic_recv(req, perr);
3107 static void usage(const char *progname)
3109 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
3110 progname);
3115 * Arguments - log fd, write fd, socket path, generation
3117 int main(int argc, char *argv[])
3119 int write_fd;
3120 const char *sockpath;
3121 TALLOC_CTX *mem_ctx = NULL;
3122 struct tevent_context *ev;
3123 struct ctdb_client_context *client;
3124 bool status;
3125 int ret = 0;
3126 struct tevent_req *req;
3127 uint32_t generation;
3129 if (argc != 4) {
3130 usage(argv[0]);
3131 exit(1);
3134 write_fd = atoi(argv[1]);
3135 sockpath = argv[2];
3136 generation = (uint32_t)smb_strtoul(argv[3],
3137 NULL,
3139 &ret,
3140 SMB_STR_STANDARD);
3141 if (ret != 0) {
3142 fprintf(stderr, "recovery: unable to initialize generation\n");
3143 goto failed;
3146 mem_ctx = talloc_new(NULL);
3147 if (mem_ctx == NULL) {
3148 fprintf(stderr, "recovery: talloc_new() failed\n");
3149 goto failed;
3152 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
3153 if (ret != 0) {
3154 fprintf(stderr, "recovery: Unable to initialize logging\n");
3155 goto failed;
3158 ev = tevent_context_init(mem_ctx);
3159 if (ev == NULL) {
3160 D_ERR("tevent_context_init() failed\n");
3161 goto failed;
3164 status = logging_setup_sighup_handler(ev, mem_ctx, NULL, NULL);
3165 if (!status) {
3166 D_ERR("logging_setup_sighup_handler() failed\n");
3167 goto failed;
3170 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
3171 if (ret != 0) {
3172 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
3173 goto failed;
3176 req = recovery_send(mem_ctx, ev, client, generation);
3177 if (req == NULL) {
3178 D_ERR("database_recover_send() failed\n");
3179 goto failed;
3182 if (! tevent_req_poll(req, ev)) {
3183 D_ERR("tevent_req_poll() failed\n");
3184 goto failed;
3187 recovery_recv(req, &ret);
3188 TALLOC_FREE(req);
3189 if (ret != 0) {
3190 D_ERR("database recovery failed, ret=%d\n", ret);
3191 goto failed;
3194 sys_write(write_fd, &ret, sizeof(ret));
3195 return 0;
3197 failed:
3198 TALLOC_FREE(mem_ctx);
3199 return 1;