selftest:Samba4: avoid File::Path 'make_path' in setup_dns_hub_internal()
[Samba.git] / ctdb / server / ctdb_recovery_helper.c
blobf10e60104aeaa58db3246c2491fee6e90d1f3abf
1 /*
2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/sys_rw.h"
32 #include "lib/util/time.h"
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/util.h"
36 #include "protocol/protocol.h"
37 #include "protocol/protocol_api.h"
38 #include "client/client.h"
40 #include "common/logging.h"
42 static int recover_timeout = 30;
44 #define NUM_RETRIES 3
46 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
49 * Utility functions
52 static bool generic_recv(struct tevent_req *req, int *perr)
54 int err;
56 if (tevent_req_is_unix_error(req, &err)) {
57 if (perr != NULL) {
58 *perr = err;
60 return false;
63 return true;
66 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
68 static uint64_t srvid_next(void)
70 rec_srvid += 1;
71 return rec_srvid;
75 * Node related functions
78 struct node_list {
79 uint32_t *pnn_list;
80 uint32_t *caps;
81 uint32_t *ban_credits;
82 unsigned int size;
83 unsigned int count;
86 static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size)
88 struct node_list *nlist;
89 unsigned int i;
91 nlist = talloc_zero(mem_ctx, struct node_list);
92 if (nlist == NULL) {
93 return NULL;
96 nlist->pnn_list = talloc_array(nlist, uint32_t, size);
97 nlist->caps = talloc_zero_array(nlist, uint32_t, size);
98 nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size);
100 if (nlist->pnn_list == NULL ||
101 nlist->caps == NULL ||
102 nlist->ban_credits == NULL) {
103 talloc_free(nlist);
104 return NULL;
106 nlist->size = size;
108 for (i=0; i<nlist->size; i++) {
109 nlist->pnn_list[i] = CTDB_UNKNOWN_PNN;
112 return nlist;
115 static bool node_list_add(struct node_list *nlist, uint32_t pnn)
117 unsigned int i;
119 if (nlist->count == nlist->size) {
120 return false;
123 for (i=0; i<nlist->count; i++) {
124 if (nlist->pnn_list[i] == pnn) {
125 return false;
129 nlist->pnn_list[nlist->count] = pnn;
130 nlist->count += 1;
132 return true;
135 static uint32_t *node_list_lmaster(struct node_list *nlist,
136 TALLOC_CTX *mem_ctx,
137 unsigned int *pnn_count)
139 uint32_t *pnn_list;
140 unsigned int count, i;
142 pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count);
143 if (pnn_list == NULL) {
144 return NULL;
147 count = 0;
148 for (i=0; i<nlist->count; i++) {
149 if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) {
150 continue;
153 pnn_list[count] = nlist->pnn_list[i];
154 count += 1;
157 *pnn_count = count;
158 return pnn_list;
161 static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn)
163 unsigned int i;
165 for (i=0; i<nlist->count; i++) {
166 if (nlist->pnn_list[i] == pnn) {
167 nlist->ban_credits[i] += 1;
168 break;
174 * Database list functions
176 * Simple, naive implementation that could be updated to a db_hash or similar
179 struct db {
180 struct db *prev, *next;
182 uint32_t db_id;
183 uint32_t db_flags;
184 uint32_t *pnn_list;
185 unsigned int num_nodes;
188 struct db_list {
189 unsigned int num_dbs;
190 struct db *db;
191 unsigned int num_nodes;
194 static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes)
196 struct db_list *l;
198 l = talloc_zero(mem_ctx, struct db_list);
199 l->num_nodes = num_nodes;
201 return l;
204 static struct db *db_list_find(struct db_list *dblist, uint32_t db_id)
206 struct db *db;
208 if (dblist == NULL) {
209 return NULL;
212 db = dblist->db;
213 while (db != NULL && db->db_id != db_id) {
214 db = db->next;
217 return db;
220 static int db_list_add(struct db_list *dblist,
221 uint32_t db_id,
222 uint32_t db_flags,
223 uint32_t node)
225 struct db *db = NULL;
227 if (dblist == NULL) {
228 return EINVAL;
231 db = talloc_zero(dblist, struct db);
232 if (db == NULL) {
233 return ENOMEM;
236 db->db_id = db_id;
237 db->db_flags = db_flags;
238 db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes);
239 if (db->pnn_list == NULL) {
240 talloc_free(db);
241 return ENOMEM;
243 db->pnn_list[0] = node;
244 db->num_nodes = 1;
246 DLIST_ADD_END(dblist->db, db);
247 dblist->num_dbs++;
249 return 0;
252 static int db_list_check_and_add(struct db_list *dblist,
253 uint32_t db_id,
254 uint32_t db_flags,
255 uint32_t node)
257 struct db *db = NULL;
258 int ret;
261 * These flags are masked out because they are only set on a
262 * node when a client attaches to that node, so they might not
263 * be set yet. They can't be passed as part of the attch, so
264 * they're no use here.
266 db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY);
268 if (dblist == NULL) {
269 return EINVAL;
272 db = db_list_find(dblist, db_id);
273 if (db == NULL) {
274 ret = db_list_add(dblist, db_id, db_flags, node);
275 return ret;
278 if (db->db_flags != db_flags) {
279 D_ERR("Incompatible database flags for 0x%"PRIx32" "
280 "(0x%"PRIx32" != 0x%"PRIx32")\n",
281 db_id,
282 db_flags,
283 db->db_flags);
284 return EINVAL;
287 if (db->num_nodes >= dblist->num_nodes) {
288 return EINVAL;
291 db->pnn_list[db->num_nodes] = node;
292 db->num_nodes++;
294 return 0;
298 * Create database on nodes where it is missing
301 struct db_create_missing_state {
302 struct tevent_context *ev;
303 struct ctdb_client_context *client;
305 struct node_list *nlist;
307 const char *db_name;
308 uint32_t *missing_pnn_list;
309 int missing_num_nodes;
312 static void db_create_missing_done(struct tevent_req *subreq);
314 static struct tevent_req *db_create_missing_send(
315 TALLOC_CTX *mem_ctx,
316 struct tevent_context *ev,
317 struct ctdb_client_context *client,
318 struct node_list *nlist,
319 const char *db_name,
320 struct db *db)
322 struct tevent_req *req, *subreq;
323 struct db_create_missing_state *state;
324 struct ctdb_req_control request;
325 unsigned int i, j;
327 req = tevent_req_create(mem_ctx,
328 &state,
329 struct db_create_missing_state);
330 if (req == NULL) {
331 return NULL;
334 state->ev = ev;
335 state->client = client;
336 state->nlist = nlist;
337 state->db_name = db_name;
339 if (nlist->count == db->num_nodes) {
340 tevent_req_done(req);
341 return tevent_req_post(req, ev);
344 state->missing_pnn_list = talloc_array(mem_ctx, uint32_t, nlist->count);
345 if (tevent_req_nomem(state->missing_pnn_list, req)) {
346 return tevent_req_post(req, ev);
349 for (i = 0; i < nlist->count; i++) {
350 uint32_t pnn = nlist->pnn_list[i] ;
352 for (j = 0; j < db->num_nodes; j++) {
353 if (pnn == db->pnn_list[j]) {
354 break;
358 if (j < db->num_nodes) {
359 continue;
362 DBG_INFO("Create database %s on node %u\n",
363 state->db_name,
364 pnn);
365 state->missing_pnn_list[state->missing_num_nodes] = pnn;
366 state->missing_num_nodes++;
369 if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
370 ctdb_req_control_db_attach_persistent(&request, db_name);
371 } else if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
372 ctdb_req_control_db_attach_replicated(&request, db_name);
373 } else {
374 ctdb_req_control_db_attach(&request, db_name);
376 request.flags = CTDB_CTRL_FLAG_ATTACH_RECOVERY;
377 subreq = ctdb_client_control_multi_send(state,
378 state->ev,
379 state->client,
380 state->missing_pnn_list,
381 state->missing_num_nodes,
382 TIMEOUT(),
383 &request);
384 if (tevent_req_nomem(subreq, req)) {
385 return tevent_req_post(req, ev);
387 tevent_req_set_callback(subreq, db_create_missing_done, req);
389 return req;
392 static void db_create_missing_done(struct tevent_req *subreq)
394 struct tevent_req *req = tevent_req_callback_data(
395 subreq, struct tevent_req);
396 struct db_create_missing_state *state = tevent_req_data(
397 req, struct db_create_missing_state);
398 int *err_list;
399 int ret;
400 bool status;
402 status = ctdb_client_control_multi_recv(subreq,
403 &ret,
404 NULL,
405 &err_list,
406 NULL);
407 TALLOC_FREE(subreq);
408 if (! status) {
409 int ret2;
410 uint32_t pnn;
412 ret2 = ctdb_client_control_multi_error(
413 state->missing_pnn_list,
414 state->missing_num_nodes,
415 err_list,
416 &pnn);
417 if (ret2 != 0) {
418 D_ERR("control DB_ATTACH failed for db %s"
419 " on node %u, ret=%d\n",
420 state->db_name,
421 pnn,
422 ret2);
423 node_list_ban_credits(state->nlist, pnn);
424 } else {
425 D_ERR("control DB_ATTACH failed for db %s, ret=%d\n",
426 state->db_name,
427 ret);
429 tevent_req_error(req, ret);
430 return;
433 tevent_req_done(req);
436 static bool db_create_missing_recv(struct tevent_req *req, int *perr)
438 return generic_recv(req, perr);
442 * Recovery database functions
445 struct recdb_context {
446 uint32_t db_id;
447 const char *db_name;
448 const char *db_path;
449 struct tdb_wrap *db;
450 bool persistent;
453 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
454 const char *db_name,
455 const char *db_path,
456 uint32_t hash_size, bool persistent)
458 static char *db_dir_state = NULL;
459 struct recdb_context *recdb;
460 unsigned int tdb_flags;
462 recdb = talloc(mem_ctx, struct recdb_context);
463 if (recdb == NULL) {
464 return NULL;
467 if (db_dir_state == NULL) {
468 db_dir_state = getenv("CTDB_DBDIR_STATE");
471 recdb->db_name = db_name;
472 recdb->db_id = db_id;
473 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
474 db_dir_state != NULL ?
475 db_dir_state :
476 dirname(discard_const(db_path)),
477 db_name);
478 if (recdb->db_path == NULL) {
479 talloc_free(recdb);
480 return NULL;
482 unlink(recdb->db_path);
484 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
485 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
486 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
487 if (recdb->db == NULL) {
488 talloc_free(recdb);
489 D_ERR("failed to create recovery db %s\n", recdb->db_path);
490 return NULL;
493 recdb->persistent = persistent;
495 return recdb;
498 static uint32_t recdb_id(struct recdb_context *recdb)
500 return recdb->db_id;
503 static const char *recdb_name(struct recdb_context *recdb)
505 return recdb->db_name;
508 static const char *recdb_path(struct recdb_context *recdb)
510 return recdb->db_path;
513 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
515 return recdb->db->tdb;
518 static bool recdb_persistent(struct recdb_context *recdb)
520 return recdb->persistent;
523 struct recdb_add_traverse_state {
524 struct recdb_context *recdb;
525 uint32_t mypnn;
528 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
529 TDB_DATA key, TDB_DATA data,
530 void *private_data)
532 struct recdb_add_traverse_state *state =
533 (struct recdb_add_traverse_state *)private_data;
534 struct ctdb_ltdb_header *hdr;
535 TDB_DATA prev_data;
536 int ret;
538 /* header is not marshalled separately in the pulldb control */
539 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
540 return -1;
543 hdr = (struct ctdb_ltdb_header *)data.dptr;
545 /* fetch the existing record, if any */
546 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
548 if (prev_data.dptr != NULL) {
549 struct ctdb_ltdb_header prev_hdr;
551 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
552 free(prev_data.dptr);
553 if (hdr->rsn < prev_hdr.rsn ||
554 (hdr->rsn == prev_hdr.rsn &&
555 prev_hdr.dmaster != state->mypnn)) {
556 return 0;
560 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
561 if (ret != 0) {
562 return -1;
564 return 0;
567 static bool recdb_add(struct recdb_context *recdb, int mypnn,
568 struct ctdb_rec_buffer *recbuf)
570 struct recdb_add_traverse_state state;
571 int ret;
573 state.recdb = recdb;
574 state.mypnn = mypnn;
576 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
577 if (ret != 0) {
578 return false;
581 return true;
584 /* This function decides which records from recdb are retained */
585 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
586 uint32_t reqid, uint32_t dmaster,
587 TDB_DATA key, TDB_DATA data)
589 struct ctdb_ltdb_header *header;
590 int ret;
592 /* Skip empty records */
593 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
594 return 0;
597 /* update the dmaster field to point to us */
598 header = (struct ctdb_ltdb_header *)data.dptr;
599 if (!persistent) {
600 header->dmaster = dmaster;
601 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
604 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
605 if (ret != 0) {
606 return ret;
609 return 0;
612 struct recdb_records_traverse_state {
613 struct ctdb_rec_buffer *recbuf;
614 uint32_t dmaster;
615 uint32_t reqid;
616 bool persistent;
617 bool failed;
620 static int recdb_records_traverse(struct tdb_context *tdb,
621 TDB_DATA key, TDB_DATA data,
622 void *private_data)
624 struct recdb_records_traverse_state *state =
625 (struct recdb_records_traverse_state *)private_data;
626 int ret;
628 ret = recbuf_filter_add(state->recbuf, state->persistent,
629 state->reqid, state->dmaster, key, data);
630 if (ret != 0) {
631 state->failed = true;
632 return ret;
635 return 0;
638 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
639 TALLOC_CTX *mem_ctx,
640 uint32_t dmaster)
642 struct recdb_records_traverse_state state;
643 int ret;
645 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
646 if (state.recbuf == NULL) {
647 return NULL;
649 state.dmaster = dmaster;
650 state.reqid = 0;
651 state.persistent = recdb_persistent(recdb);
652 state.failed = false;
654 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
655 &state);
656 if (ret == -1 || state.failed) {
657 D_ERR("Failed to marshall recovery records for %s\n",
658 recdb_name(recdb));
659 TALLOC_FREE(state.recbuf);
660 return NULL;
663 return state.recbuf;
666 struct recdb_file_traverse_state {
667 struct ctdb_rec_buffer *recbuf;
668 struct recdb_context *recdb;
669 TALLOC_CTX *mem_ctx;
670 uint32_t dmaster;
671 uint32_t reqid;
672 bool persistent;
673 bool failed;
674 int fd;
675 size_t max_size;
676 unsigned int num_buffers;
679 static int recdb_file_traverse(struct tdb_context *tdb,
680 TDB_DATA key, TDB_DATA data,
681 void *private_data)
683 struct recdb_file_traverse_state *state =
684 (struct recdb_file_traverse_state *)private_data;
685 int ret;
687 ret = recbuf_filter_add(state->recbuf, state->persistent,
688 state->reqid, state->dmaster, key, data);
689 if (ret != 0) {
690 state->failed = true;
691 return ret;
694 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
695 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
696 if (ret != 0) {
697 D_ERR("Failed to collect recovery records for %s\n",
698 recdb_name(state->recdb));
699 state->failed = true;
700 return ret;
703 state->num_buffers += 1;
705 TALLOC_FREE(state->recbuf);
706 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
707 recdb_id(state->recdb));
708 if (state->recbuf == NULL) {
709 state->failed = true;
710 return ENOMEM;
714 return 0;
717 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
718 uint32_t dmaster, int fd, int max_size)
720 struct recdb_file_traverse_state state;
721 int ret;
723 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
724 if (state.recbuf == NULL) {
725 return -1;
727 state.recdb = recdb;
728 state.mem_ctx = mem_ctx;
729 state.dmaster = dmaster;
730 state.reqid = 0;
731 state.persistent = recdb_persistent(recdb);
732 state.failed = false;
733 state.fd = fd;
734 state.max_size = max_size;
735 state.num_buffers = 0;
737 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
738 if (ret == -1 || state.failed) {
739 TALLOC_FREE(state.recbuf);
740 return -1;
743 ret = ctdb_rec_buffer_write(state.recbuf, fd);
744 if (ret != 0) {
745 D_ERR("Failed to collect recovery records for %s\n",
746 recdb_name(recdb));
747 TALLOC_FREE(state.recbuf);
748 return -1;
750 state.num_buffers += 1;
752 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
753 state.num_buffers, recdb_name(recdb));
755 return state.num_buffers;
759 * Pull database from a single node
762 struct pull_database_state {
763 struct tevent_context *ev;
764 struct ctdb_client_context *client;
765 struct recdb_context *recdb;
766 uint32_t pnn;
767 uint64_t srvid;
768 unsigned int num_records;
769 int result;
772 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
773 void *private_data);
774 static void pull_database_register_done(struct tevent_req *subreq);
775 static void pull_database_old_done(struct tevent_req *subreq);
776 static void pull_database_unregister_done(struct tevent_req *subreq);
777 static void pull_database_new_done(struct tevent_req *subreq);
779 static struct tevent_req *pull_database_send(
780 TALLOC_CTX *mem_ctx,
781 struct tevent_context *ev,
782 struct ctdb_client_context *client,
783 uint32_t pnn, uint32_t caps,
784 struct recdb_context *recdb)
786 struct tevent_req *req, *subreq;
787 struct pull_database_state *state;
788 struct ctdb_req_control request;
790 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
791 if (req == NULL) {
792 return NULL;
795 state->ev = ev;
796 state->client = client;
797 state->recdb = recdb;
798 state->pnn = pnn;
799 state->srvid = srvid_next();
801 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
802 subreq = ctdb_client_set_message_handler_send(
803 state, state->ev, state->client,
804 state->srvid, pull_database_handler,
805 req);
806 if (tevent_req_nomem(subreq, req)) {
807 return tevent_req_post(req, ev);
810 tevent_req_set_callback(subreq, pull_database_register_done,
811 req);
813 } else {
814 struct ctdb_pulldb pulldb;
816 pulldb.db_id = recdb_id(recdb);
817 pulldb.lmaster = CTDB_LMASTER_ANY;
819 ctdb_req_control_pull_db(&request, &pulldb);
820 subreq = ctdb_client_control_send(state, state->ev,
821 state->client,
822 pnn, TIMEOUT(),
823 &request);
824 if (tevent_req_nomem(subreq, req)) {
825 return tevent_req_post(req, ev);
827 tevent_req_set_callback(subreq, pull_database_old_done, req);
830 return req;
833 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
834 void *private_data)
836 struct tevent_req *req = talloc_get_type_abort(
837 private_data, struct tevent_req);
838 struct pull_database_state *state = tevent_req_data(
839 req, struct pull_database_state);
840 struct ctdb_rec_buffer *recbuf;
841 size_t np;
842 int ret;
843 bool status;
845 if (srvid != state->srvid) {
846 return;
849 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
850 if (ret != 0) {
851 D_ERR("Invalid data received for DB_PULL messages\n");
852 return;
855 if (recbuf->db_id != recdb_id(state->recdb)) {
856 talloc_free(recbuf);
857 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
858 recbuf->db_id, recdb_name(state->recdb));
859 return;
862 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
863 recbuf);
864 if (! status) {
865 talloc_free(recbuf);
866 D_ERR("Failed to add records to recdb for %s\n",
867 recdb_name(state->recdb));
868 return;
871 state->num_records += recbuf->count;
872 talloc_free(recbuf);
875 static void pull_database_register_done(struct tevent_req *subreq)
877 struct tevent_req *req = tevent_req_callback_data(
878 subreq, struct tevent_req);
879 struct pull_database_state *state = tevent_req_data(
880 req, struct pull_database_state);
881 struct ctdb_req_control request;
882 struct ctdb_pulldb_ext pulldb_ext;
883 int ret;
884 bool status;
886 status = ctdb_client_set_message_handler_recv(subreq, &ret);
887 TALLOC_FREE(subreq);
888 if (! status) {
889 D_ERR("Failed to set message handler for DB_PULL for %s\n",
890 recdb_name(state->recdb));
891 tevent_req_error(req, ret);
892 return;
895 pulldb_ext.db_id = recdb_id(state->recdb);
896 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
897 pulldb_ext.srvid = state->srvid;
899 ctdb_req_control_db_pull(&request, &pulldb_ext);
900 subreq = ctdb_client_control_send(state, state->ev, state->client,
901 state->pnn, TIMEOUT(), &request);
902 if (tevent_req_nomem(subreq, req)) {
903 return;
905 tevent_req_set_callback(subreq, pull_database_new_done, req);
908 static void pull_database_old_done(struct tevent_req *subreq)
910 struct tevent_req *req = tevent_req_callback_data(
911 subreq, struct tevent_req);
912 struct pull_database_state *state = tevent_req_data(
913 req, struct pull_database_state);
914 struct ctdb_reply_control *reply;
915 struct ctdb_rec_buffer *recbuf;
916 int ret;
917 bool status;
919 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
920 TALLOC_FREE(subreq);
921 if (! status) {
922 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
923 recdb_name(state->recdb), state->pnn, ret);
924 tevent_req_error(req, ret);
925 return;
928 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
929 talloc_free(reply);
930 if (ret != 0) {
931 tevent_req_error(req, ret);
932 return;
935 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
936 recbuf);
937 if (! status) {
938 talloc_free(recbuf);
939 tevent_req_error(req, EIO);
940 return;
943 state->num_records = recbuf->count;
944 talloc_free(recbuf);
946 D_INFO("Pulled %d records for db %s from node %d\n",
947 state->num_records, recdb_name(state->recdb), state->pnn);
949 tevent_req_done(req);
952 static void pull_database_new_done(struct tevent_req *subreq)
954 struct tevent_req *req = tevent_req_callback_data(
955 subreq, struct tevent_req);
956 struct pull_database_state *state = tevent_req_data(
957 req, struct pull_database_state);
958 struct ctdb_reply_control *reply;
959 uint32_t num_records;
960 int ret;
961 bool status;
963 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
964 TALLOC_FREE(subreq);
965 if (! status) {
966 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
967 recdb_name(state->recdb), state->pnn, ret);
968 state->result = ret;
969 goto unregister;
972 ret = ctdb_reply_control_db_pull(reply, &num_records);
973 talloc_free(reply);
974 if (num_records != state->num_records) {
975 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
976 num_records, state->num_records,
977 recdb_name(state->recdb));
978 state->result = EIO;
979 goto unregister;
982 D_INFO("Pulled %d records for db %s from node %d\n",
983 state->num_records, recdb_name(state->recdb), state->pnn);
985 unregister:
987 subreq = ctdb_client_remove_message_handler_send(
988 state, state->ev, state->client,
989 state->srvid, req);
990 if (tevent_req_nomem(subreq, req)) {
991 return;
993 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
996 static void pull_database_unregister_done(struct tevent_req *subreq)
998 struct tevent_req *req = tevent_req_callback_data(
999 subreq, struct tevent_req);
1000 struct pull_database_state *state = tevent_req_data(
1001 req, struct pull_database_state);
1002 int ret;
1003 bool status;
1005 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1006 TALLOC_FREE(subreq);
1007 if (! status) {
1008 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
1009 recdb_name(state->recdb));
1010 tevent_req_error(req, ret);
1011 return;
1014 if (state->result != 0) {
1015 tevent_req_error(req, state->result);
1016 return;
1019 tevent_req_done(req);
1022 static bool pull_database_recv(struct tevent_req *req, int *perr)
1024 return generic_recv(req, perr);
1028 * Push database to specified nodes (old style)
1031 struct push_database_old_state {
1032 struct tevent_context *ev;
1033 struct ctdb_client_context *client;
1034 struct recdb_context *recdb;
1035 uint32_t *pnn_list;
1036 unsigned int count;
1037 struct ctdb_rec_buffer *recbuf;
1038 unsigned int index;
1041 static void push_database_old_push_done(struct tevent_req *subreq);
1043 static struct tevent_req *push_database_old_send(
1044 TALLOC_CTX *mem_ctx,
1045 struct tevent_context *ev,
1046 struct ctdb_client_context *client,
1047 uint32_t *pnn_list,
1048 unsigned int count,
1049 struct recdb_context *recdb)
1051 struct tevent_req *req, *subreq;
1052 struct push_database_old_state *state;
1053 struct ctdb_req_control request;
1054 uint32_t pnn;
1056 req = tevent_req_create(mem_ctx, &state,
1057 struct push_database_old_state);
1058 if (req == NULL) {
1059 return NULL;
1062 state->ev = ev;
1063 state->client = client;
1064 state->recdb = recdb;
1065 state->pnn_list = pnn_list;
1066 state->count = count;
1067 state->index = 0;
1069 state->recbuf = recdb_records(recdb, state,
1070 ctdb_client_pnn(client));
1071 if (tevent_req_nomem(state->recbuf, req)) {
1072 return tevent_req_post(req, ev);
1075 pnn = state->pnn_list[state->index];
1077 ctdb_req_control_push_db(&request, state->recbuf);
1078 subreq = ctdb_client_control_send(state, ev, client, pnn,
1079 TIMEOUT(), &request);
1080 if (tevent_req_nomem(subreq, req)) {
1081 return tevent_req_post(req, ev);
1083 tevent_req_set_callback(subreq, push_database_old_push_done, req);
1085 return req;
1088 static void push_database_old_push_done(struct tevent_req *subreq)
1090 struct tevent_req *req = tevent_req_callback_data(
1091 subreq, struct tevent_req);
1092 struct push_database_old_state *state = tevent_req_data(
1093 req, struct push_database_old_state);
1094 struct ctdb_req_control request;
1095 uint32_t pnn;
1096 int ret;
1097 bool status;
1099 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1100 TALLOC_FREE(subreq);
1101 if (! status) {
1102 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
1103 recdb_name(state->recdb), state->pnn_list[state->index],
1104 ret);
1105 tevent_req_error(req, ret);
1106 return;
1109 state->index += 1;
1110 if (state->index == state->count) {
1111 TALLOC_FREE(state->recbuf);
1112 tevent_req_done(req);
1113 return;
1116 pnn = state->pnn_list[state->index];
1118 ctdb_req_control_push_db(&request, state->recbuf);
1119 subreq = ctdb_client_control_send(state, state->ev, state->client,
1120 pnn, TIMEOUT(), &request);
1121 if (tevent_req_nomem(subreq, req)) {
1122 return;
1124 tevent_req_set_callback(subreq, push_database_old_push_done, req);
1127 static bool push_database_old_recv(struct tevent_req *req, int *perr)
1129 return generic_recv(req, perr);
1133 * Push database to specified nodes (new style)
1136 struct push_database_new_state {
1137 struct tevent_context *ev;
1138 struct ctdb_client_context *client;
1139 struct recdb_context *recdb;
1140 uint32_t *pnn_list;
1141 unsigned int count;
1142 uint64_t srvid;
1143 uint32_t dmaster;
1144 int fd;
1145 int num_buffers;
1146 int num_buffers_sent;
1147 unsigned int num_records;
1150 static void push_database_new_started(struct tevent_req *subreq);
1151 static void push_database_new_send_msg(struct tevent_req *req);
1152 static void push_database_new_send_done(struct tevent_req *subreq);
1153 static void push_database_new_confirmed(struct tevent_req *subreq);
1155 static struct tevent_req *push_database_new_send(
1156 TALLOC_CTX *mem_ctx,
1157 struct tevent_context *ev,
1158 struct ctdb_client_context *client,
1159 uint32_t *pnn_list,
1160 unsigned int count,
1161 struct recdb_context *recdb,
1162 int max_size)
1164 struct tevent_req *req, *subreq;
1165 struct push_database_new_state *state;
1166 struct ctdb_req_control request;
1167 struct ctdb_pulldb_ext pulldb_ext;
1168 char *filename;
1169 off_t offset;
1171 req = tevent_req_create(mem_ctx, &state,
1172 struct push_database_new_state);
1173 if (req == NULL) {
1174 return NULL;
1177 state->ev = ev;
1178 state->client = client;
1179 state->recdb = recdb;
1180 state->pnn_list = pnn_list;
1181 state->count = count;
1183 state->srvid = srvid_next();
1184 state->dmaster = ctdb_client_pnn(client);
1185 state->num_buffers_sent = 0;
1186 state->num_records = 0;
1188 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
1189 if (tevent_req_nomem(filename, req)) {
1190 return tevent_req_post(req, ev);
1193 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
1194 if (state->fd == -1) {
1195 tevent_req_error(req, errno);
1196 return tevent_req_post(req, ev);
1198 unlink(filename);
1199 talloc_free(filename);
1201 state->num_buffers = recdb_file(recdb, state, state->dmaster,
1202 state->fd, max_size);
1203 if (state->num_buffers == -1) {
1204 tevent_req_error(req, ENOMEM);
1205 return tevent_req_post(req, ev);
1208 offset = lseek(state->fd, 0, SEEK_SET);
1209 if (offset != 0) {
1210 tevent_req_error(req, EIO);
1211 return tevent_req_post(req, ev);
1214 pulldb_ext.db_id = recdb_id(recdb);
1215 pulldb_ext.srvid = state->srvid;
1217 ctdb_req_control_db_push_start(&request, &pulldb_ext);
1218 subreq = ctdb_client_control_multi_send(state, ev, client,
1219 pnn_list, count,
1220 TIMEOUT(), &request);
1221 if (tevent_req_nomem(subreq, req)) {
1222 return tevent_req_post(req, ev);
1224 tevent_req_set_callback(subreq, push_database_new_started, req);
1226 return req;
1229 static void push_database_new_started(struct tevent_req *subreq)
1231 struct tevent_req *req = tevent_req_callback_data(
1232 subreq, struct tevent_req);
1233 struct push_database_new_state *state = tevent_req_data(
1234 req, struct push_database_new_state);
1235 int *err_list;
1236 int ret;
1237 bool status;
1239 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1240 &err_list, NULL);
1241 TALLOC_FREE(subreq);
1242 if (! status) {
1243 int ret2;
1244 uint32_t pnn;
1246 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1247 state->count,
1248 err_list, &pnn);
1249 if (ret2 != 0) {
1250 D_ERR("control DB_PUSH_START failed for db %s"
1251 " on node %u, ret=%d\n",
1252 recdb_name(state->recdb), pnn, ret2);
1253 } else {
1254 D_ERR("control DB_PUSH_START failed for db %s,"
1255 " ret=%d\n",
1256 recdb_name(state->recdb), ret);
1258 talloc_free(err_list);
1260 tevent_req_error(req, ret);
1261 return;
1264 push_database_new_send_msg(req);
1267 static void push_database_new_send_msg(struct tevent_req *req)
1269 struct push_database_new_state *state = tevent_req_data(
1270 req, struct push_database_new_state);
1271 struct tevent_req *subreq;
1272 struct ctdb_rec_buffer *recbuf;
1273 struct ctdb_req_message message;
1274 TDB_DATA data;
1275 size_t np;
1276 int ret;
1278 if (state->num_buffers_sent == state->num_buffers) {
1279 struct ctdb_req_control request;
1281 ctdb_req_control_db_push_confirm(&request,
1282 recdb_id(state->recdb));
1283 subreq = ctdb_client_control_multi_send(state, state->ev,
1284 state->client,
1285 state->pnn_list,
1286 state->count,
1287 TIMEOUT(), &request);
1288 if (tevent_req_nomem(subreq, req)) {
1289 return;
1291 tevent_req_set_callback(subreq, push_database_new_confirmed,
1292 req);
1293 return;
1296 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
1297 if (ret != 0) {
1298 tevent_req_error(req, ret);
1299 return;
1302 data.dsize = ctdb_rec_buffer_len(recbuf);
1303 data.dptr = talloc_size(state, data.dsize);
1304 if (tevent_req_nomem(data.dptr, req)) {
1305 return;
1308 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
1310 message.srvid = state->srvid;
1311 message.data.data = data;
1313 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
1314 state->num_buffers_sent, recbuf->count,
1315 recdb_name(state->recdb));
1317 subreq = ctdb_client_message_multi_send(state, state->ev,
1318 state->client,
1319 state->pnn_list, state->count,
1320 &message);
1321 if (tevent_req_nomem(subreq, req)) {
1322 return;
1324 tevent_req_set_callback(subreq, push_database_new_send_done, req);
1326 state->num_records += recbuf->count;
1328 talloc_free(data.dptr);
1329 talloc_free(recbuf);
1332 static void push_database_new_send_done(struct tevent_req *subreq)
1334 struct tevent_req *req = tevent_req_callback_data(
1335 subreq, struct tevent_req);
1336 struct push_database_new_state *state = tevent_req_data(
1337 req, struct push_database_new_state);
1338 bool status;
1339 int ret;
1341 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1342 TALLOC_FREE(subreq);
1343 if (! status) {
1344 D_ERR("Sending recovery records failed for %s\n",
1345 recdb_name(state->recdb));
1346 tevent_req_error(req, ret);
1347 return;
1350 state->num_buffers_sent += 1;
1352 push_database_new_send_msg(req);
1355 static void push_database_new_confirmed(struct tevent_req *subreq)
1357 struct tevent_req *req = tevent_req_callback_data(
1358 subreq, struct tevent_req);
1359 struct push_database_new_state *state = tevent_req_data(
1360 req, struct push_database_new_state);
1361 struct ctdb_reply_control **reply;
1362 int *err_list;
1363 bool status;
1364 unsigned int i;
1365 int ret;
1366 uint32_t num_records;
1368 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1369 &err_list, &reply);
1370 TALLOC_FREE(subreq);
1371 if (! status) {
1372 int ret2;
1373 uint32_t pnn;
1375 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1376 state->count, err_list,
1377 &pnn);
1378 if (ret2 != 0) {
1379 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1380 " on node %u, ret=%d\n",
1381 recdb_name(state->recdb), pnn, ret2);
1382 } else {
1383 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1384 " ret=%d\n",
1385 recdb_name(state->recdb), ret);
1387 tevent_req_error(req, ret);
1388 return;
1391 for (i=0; i<state->count; i++) {
1392 ret = ctdb_reply_control_db_push_confirm(reply[i],
1393 &num_records);
1394 if (ret != 0) {
1395 tevent_req_error(req, EPROTO);
1396 return;
1399 if (num_records != state->num_records) {
1400 D_ERR("Node %u received %d of %d records for %s\n",
1401 state->pnn_list[i], num_records,
1402 state->num_records, recdb_name(state->recdb));
1403 tevent_req_error(req, EPROTO);
1404 return;
1408 talloc_free(reply);
1410 D_INFO("Pushed %d records for db %s\n",
1411 state->num_records, recdb_name(state->recdb));
1413 tevent_req_done(req);
1416 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1418 return generic_recv(req, perr);
1422 * wrapper for push_database_old and push_database_new
1425 struct push_database_state {
1426 bool old_done, new_done;
1429 static void push_database_old_done(struct tevent_req *subreq);
1430 static void push_database_new_done(struct tevent_req *subreq);
1432 static struct tevent_req *push_database_send(
1433 TALLOC_CTX *mem_ctx,
1434 struct tevent_context *ev,
1435 struct ctdb_client_context *client,
1436 struct node_list *nlist,
1437 struct ctdb_tunable_list *tun_list,
1438 struct recdb_context *recdb)
1440 struct tevent_req *req, *subreq;
1441 struct push_database_state *state;
1442 uint32_t *old_list, *new_list;
1443 unsigned int old_count, new_count;
1444 unsigned int i;
1446 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1447 if (req == NULL) {
1448 return NULL;
1451 state->old_done = false;
1452 state->new_done = false;
1454 old_count = 0;
1455 new_count = 0;
1456 old_list = talloc_array(state, uint32_t, nlist->count);
1457 new_list = talloc_array(state, uint32_t, nlist->count);
1458 if (tevent_req_nomem(old_list, req) ||
1459 tevent_req_nomem(new_list,req)) {
1460 return tevent_req_post(req, ev);
1463 for (i=0; i<nlist->count; i++) {
1464 if (nlist->caps[i] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1465 new_list[new_count] = nlist->pnn_list[i];
1466 new_count += 1;
1467 } else {
1468 old_list[old_count] = nlist->pnn_list[i];
1469 old_count += 1;
1473 if (old_count > 0) {
1474 subreq = push_database_old_send(state, ev, client,
1475 old_list, old_count, recdb);
1476 if (tevent_req_nomem(subreq, req)) {
1477 return tevent_req_post(req, ev);
1479 tevent_req_set_callback(subreq, push_database_old_done, req);
1480 } else {
1481 state->old_done = true;
1484 if (new_count > 0) {
1485 subreq = push_database_new_send(state, ev, client,
1486 new_list, new_count, recdb,
1487 tun_list->rec_buffer_size_limit);
1488 if (tevent_req_nomem(subreq, req)) {
1489 return tevent_req_post(req, ev);
1491 tevent_req_set_callback(subreq, push_database_new_done, req);
1492 } else {
1493 state->new_done = true;
1496 return req;
1499 static void push_database_old_done(struct tevent_req *subreq)
1501 struct tevent_req *req = tevent_req_callback_data(
1502 subreq, struct tevent_req);
1503 struct push_database_state *state = tevent_req_data(
1504 req, struct push_database_state);
1505 bool status;
1506 int ret;
1508 status = push_database_old_recv(subreq, &ret);
1509 if (! status) {
1510 tevent_req_error(req, ret);
1511 return;
1514 state->old_done = true;
1516 if (state->old_done && state->new_done) {
1517 tevent_req_done(req);
1521 static void push_database_new_done(struct tevent_req *subreq)
1523 struct tevent_req *req = tevent_req_callback_data(
1524 subreq, struct tevent_req);
1525 struct push_database_state *state = tevent_req_data(
1526 req, struct push_database_state);
1527 bool status;
1528 int ret;
1530 status = push_database_new_recv(subreq, &ret);
1531 if (! status) {
1532 tevent_req_error(req, ret);
1533 return;
1536 state->new_done = true;
1538 if (state->old_done && state->new_done) {
1539 tevent_req_done(req);
1543 static bool push_database_recv(struct tevent_req *req, int *perr)
1545 return generic_recv(req, perr);
1549 * Collect databases using highest sequence number
1552 struct collect_highseqnum_db_state {
1553 struct tevent_context *ev;
1554 struct ctdb_client_context *client;
1555 struct node_list *nlist;
1556 uint32_t db_id;
1557 struct recdb_context *recdb;
1559 uint32_t max_pnn;
1562 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1563 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1565 static struct tevent_req *collect_highseqnum_db_send(
1566 TALLOC_CTX *mem_ctx,
1567 struct tevent_context *ev,
1568 struct ctdb_client_context *client,
1569 struct node_list *nlist,
1570 uint32_t db_id,
1571 struct recdb_context *recdb)
1573 struct tevent_req *req, *subreq;
1574 struct collect_highseqnum_db_state *state;
1575 struct ctdb_req_control request;
1577 req = tevent_req_create(mem_ctx, &state,
1578 struct collect_highseqnum_db_state);
1579 if (req == NULL) {
1580 return NULL;
1583 state->ev = ev;
1584 state->client = client;
1585 state->nlist = nlist;
1586 state->db_id = db_id;
1587 state->recdb = recdb;
1589 ctdb_req_control_get_db_seqnum(&request, db_id);
1590 subreq = ctdb_client_control_multi_send(mem_ctx,
1592 client,
1593 nlist->pnn_list,
1594 nlist->count,
1595 TIMEOUT(),
1596 &request);
1597 if (tevent_req_nomem(subreq, req)) {
1598 return tevent_req_post(req, ev);
1600 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1601 req);
1603 return req;
1606 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1608 struct tevent_req *req = tevent_req_callback_data(
1609 subreq, struct tevent_req);
1610 struct collect_highseqnum_db_state *state = tevent_req_data(
1611 req, struct collect_highseqnum_db_state);
1612 struct ctdb_reply_control **reply;
1613 int *err_list;
1614 bool status;
1615 unsigned int i;
1616 int ret;
1617 uint64_t seqnum, max_seqnum;
1618 uint32_t max_caps;
1620 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1621 &err_list, &reply);
1622 TALLOC_FREE(subreq);
1623 if (! status) {
1624 int ret2;
1625 uint32_t pnn;
1627 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
1628 state->nlist->count,
1629 err_list,
1630 &pnn);
1631 if (ret2 != 0) {
1632 D_ERR("control GET_DB_SEQNUM failed for db %s"
1633 " on node %u, ret=%d\n",
1634 recdb_name(state->recdb), pnn, ret2);
1635 } else {
1636 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1637 " ret=%d\n",
1638 recdb_name(state->recdb), ret);
1640 tevent_req_error(req, ret);
1641 return;
1644 max_seqnum = 0;
1645 state->max_pnn = state->nlist->pnn_list[0];
1646 max_caps = state->nlist->caps[0];
1647 for (i=0; i<state->nlist->count; i++) {
1648 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1649 if (ret != 0) {
1650 tevent_req_error(req, EPROTO);
1651 return;
1654 if (max_seqnum < seqnum) {
1655 max_seqnum = seqnum;
1656 state->max_pnn = state->nlist->pnn_list[i];
1657 max_caps = state->nlist->caps[i];
1661 talloc_free(reply);
1663 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1664 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1666 subreq = pull_database_send(state,
1667 state->ev,
1668 state->client,
1669 state->max_pnn,
1670 max_caps,
1671 state->recdb);
1672 if (tevent_req_nomem(subreq, req)) {
1673 return;
1675 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1676 req);
1679 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1681 struct tevent_req *req = tevent_req_callback_data(
1682 subreq, struct tevent_req);
1683 struct collect_highseqnum_db_state *state = tevent_req_data(
1684 req, struct collect_highseqnum_db_state);
1685 int ret;
1686 bool status;
1688 status = pull_database_recv(subreq, &ret);
1689 TALLOC_FREE(subreq);
1690 if (! status) {
1691 node_list_ban_credits(state->nlist, state->max_pnn);
1692 tevent_req_error(req, ret);
1693 return;
1696 tevent_req_done(req);
1699 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1701 return generic_recv(req, perr);
1705 * Collect all databases
1708 struct collect_all_db_state {
1709 struct tevent_context *ev;
1710 struct ctdb_client_context *client;
1711 struct node_list *nlist;
1712 uint32_t db_id;
1713 struct recdb_context *recdb;
1715 struct ctdb_pulldb pulldb;
1716 unsigned int index;
1719 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1721 static struct tevent_req *collect_all_db_send(
1722 TALLOC_CTX *mem_ctx,
1723 struct tevent_context *ev,
1724 struct ctdb_client_context *client,
1725 struct node_list *nlist,
1726 uint32_t db_id,
1727 struct recdb_context *recdb)
1729 struct tevent_req *req, *subreq;
1730 struct collect_all_db_state *state;
1732 req = tevent_req_create(mem_ctx, &state,
1733 struct collect_all_db_state);
1734 if (req == NULL) {
1735 return NULL;
1738 state->ev = ev;
1739 state->client = client;
1740 state->nlist = nlist;
1741 state->db_id = db_id;
1742 state->recdb = recdb;
1743 state->index = 0;
1745 subreq = pull_database_send(state,
1747 client,
1748 nlist->pnn_list[state->index],
1749 nlist->caps[state->index],
1750 recdb);
1751 if (tevent_req_nomem(subreq, req)) {
1752 return tevent_req_post(req, ev);
1754 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1756 return req;
1759 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1761 struct tevent_req *req = tevent_req_callback_data(
1762 subreq, struct tevent_req);
1763 struct collect_all_db_state *state = tevent_req_data(
1764 req, struct collect_all_db_state);
1765 int ret;
1766 bool status;
1768 status = pull_database_recv(subreq, &ret);
1769 TALLOC_FREE(subreq);
1770 if (! status) {
1771 node_list_ban_credits(state->nlist,
1772 state->nlist->pnn_list[state->index]);
1773 tevent_req_error(req, ret);
1774 return;
1777 state->index += 1;
1778 if (state->index == state->nlist->count) {
1779 tevent_req_done(req);
1780 return;
1783 subreq = pull_database_send(state,
1784 state->ev,
1785 state->client,
1786 state->nlist->pnn_list[state->index],
1787 state->nlist->caps[state->index],
1788 state->recdb);
1789 if (tevent_req_nomem(subreq, req)) {
1790 return;
1792 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1795 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1797 return generic_recv(req, perr);
1802 * For each database do the following:
1803 * - Get DB name from all nodes
1804 * - Attach database on missing nodes
1805 * - Get DB path
1806 * - Freeze database on all nodes
1807 * - Start transaction on all nodes
1808 * - Collect database from all nodes
1809 * - Wipe database on all nodes
1810 * - Push database to all nodes
1811 * - Commit transaction on all nodes
1812 * - Thaw database on all nodes
1815 struct recover_db_state {
1816 struct tevent_context *ev;
1817 struct ctdb_client_context *client;
1818 struct ctdb_tunable_list *tun_list;
1819 struct node_list *nlist;
1820 struct db *db;
1822 uint32_t destnode;
1823 struct ctdb_transdb transdb;
1825 const char *db_name, *db_path;
1826 struct recdb_context *recdb;
1829 static void recover_db_name_done(struct tevent_req *subreq);
1830 static void recover_db_create_missing_done(struct tevent_req *subreq);
1831 static void recover_db_path_done(struct tevent_req *subreq);
1832 static void recover_db_freeze_done(struct tevent_req *subreq);
1833 static void recover_db_transaction_started(struct tevent_req *subreq);
1834 static void recover_db_collect_done(struct tevent_req *subreq);
1835 static void recover_db_wipedb_done(struct tevent_req *subreq);
1836 static void recover_db_pushdb_done(struct tevent_req *subreq);
1837 static void recover_db_transaction_committed(struct tevent_req *subreq);
1838 static void recover_db_thaw_done(struct tevent_req *subreq);
1840 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1841 struct tevent_context *ev,
1842 struct ctdb_client_context *client,
1843 struct ctdb_tunable_list *tun_list,
1844 struct node_list *nlist,
1845 uint32_t generation,
1846 struct db *db)
1848 struct tevent_req *req, *subreq;
1849 struct recover_db_state *state;
1850 struct ctdb_req_control request;
1852 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1853 if (req == NULL) {
1854 return NULL;
1857 state->ev = ev;
1858 state->client = client;
1859 state->tun_list = tun_list;
1860 state->nlist = nlist;
1861 state->db = db;
1863 state->destnode = ctdb_client_pnn(client);
1864 state->transdb.db_id = db->db_id;
1865 state->transdb.tid = generation;
1867 ctdb_req_control_get_dbname(&request, db->db_id);
1868 subreq = ctdb_client_control_multi_send(state,
1870 client,
1871 state->db->pnn_list,
1872 state->db->num_nodes,
1873 TIMEOUT(),
1874 &request);
1875 if (tevent_req_nomem(subreq, req)) {
1876 return tevent_req_post(req, ev);
1878 tevent_req_set_callback(subreq, recover_db_name_done, req);
1880 return req;
1883 static void recover_db_name_done(struct tevent_req *subreq)
1885 struct tevent_req *req = tevent_req_callback_data(
1886 subreq, struct tevent_req);
1887 struct recover_db_state *state = tevent_req_data(
1888 req, struct recover_db_state);
1889 struct ctdb_reply_control **reply;
1890 int *err_list;
1891 unsigned int i;
1892 int ret;
1893 bool status;
1895 status = ctdb_client_control_multi_recv(subreq,
1896 &ret,
1897 state,
1898 &err_list,
1899 &reply);
1900 TALLOC_FREE(subreq);
1901 if (! status) {
1902 int ret2;
1903 uint32_t pnn;
1905 ret2 = ctdb_client_control_multi_error(state->db->pnn_list,
1906 state->db->num_nodes,
1907 err_list,
1908 &pnn);
1909 if (ret2 != 0) {
1910 D_ERR("control GET_DBNAME failed on node %u,"
1911 " ret=%d\n",
1912 pnn,
1913 ret2);
1914 } else {
1915 D_ERR("control GET_DBNAME failed, ret=%d\n",
1916 ret);
1918 tevent_req_error(req, ret);
1919 return;
1922 for (i = 0; i < state->db->num_nodes; i++) {
1923 const char *db_name;
1924 uint32_t pnn;
1926 pnn = state->nlist->pnn_list[i];
1928 ret = ctdb_reply_control_get_dbname(reply[i],
1929 state,
1930 &db_name);
1931 if (ret != 0) {
1932 D_ERR("control GET_DBNAME failed on node %u "
1933 "for db=0x%x, ret=%d\n",
1934 pnn,
1935 state->db->db_id,
1936 ret);
1937 tevent_req_error(req, EPROTO);
1938 return;
1941 if (state->db_name == NULL) {
1942 state->db_name = db_name;
1943 continue;
1946 if (strcmp(state->db_name, db_name) != 0) {
1947 D_ERR("Incompatible database name for 0x%"PRIx32" "
1948 "(%s != %s) on node %"PRIu32"\n",
1949 state->db->db_id,
1950 db_name,
1951 state->db_name,
1952 pnn);
1953 node_list_ban_credits(state->nlist, pnn);
1954 tevent_req_error(req, ret);
1955 return;
1959 talloc_free(reply);
1961 subreq = db_create_missing_send(state,
1962 state->ev,
1963 state->client,
1964 state->nlist,
1965 state->db_name,
1966 state->db);
1968 if (tevent_req_nomem(subreq, req)) {
1969 return;
1971 tevent_req_set_callback(subreq, recover_db_create_missing_done, req);
1974 static void recover_db_create_missing_done(struct tevent_req *subreq)
1976 struct tevent_req *req = tevent_req_callback_data(
1977 subreq, struct tevent_req);
1978 struct recover_db_state *state = tevent_req_data(
1979 req, struct recover_db_state);
1980 struct ctdb_req_control request;
1981 int ret;
1982 bool status;
1984 /* Could sanity check the db_id here */
1985 status = db_create_missing_recv(subreq, &ret);
1986 TALLOC_FREE(subreq);
1987 if (! status) {
1988 tevent_req_error(req, ret);
1989 return;
1992 ctdb_req_control_getdbpath(&request, state->db->db_id);
1993 subreq = ctdb_client_control_send(state, state->ev, state->client,
1994 state->destnode, TIMEOUT(),
1995 &request);
1996 if (tevent_req_nomem(subreq, req)) {
1997 return;
1999 tevent_req_set_callback(subreq, recover_db_path_done, req);
2002 static void recover_db_path_done(struct tevent_req *subreq)
2004 struct tevent_req *req = tevent_req_callback_data(
2005 subreq, struct tevent_req);
2006 struct recover_db_state *state = tevent_req_data(
2007 req, struct recover_db_state);
2008 struct ctdb_reply_control *reply;
2009 struct ctdb_req_control request;
2010 int ret;
2011 bool status;
2013 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2014 TALLOC_FREE(subreq);
2015 if (! status) {
2016 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
2017 state->db_name, ret);
2018 tevent_req_error(req, ret);
2019 return;
2022 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
2023 if (ret != 0) {
2024 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
2025 state->db_name, ret);
2026 tevent_req_error(req, EPROTO);
2027 return;
2030 talloc_free(reply);
2032 ctdb_req_control_db_freeze(&request, state->db->db_id);
2033 subreq = ctdb_client_control_multi_send(state,
2034 state->ev,
2035 state->client,
2036 state->nlist->pnn_list,
2037 state->nlist->count,
2038 TIMEOUT(),
2039 &request);
2040 if (tevent_req_nomem(subreq, req)) {
2041 return;
2043 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
2046 static void recover_db_freeze_done(struct tevent_req *subreq)
2048 struct tevent_req *req = tevent_req_callback_data(
2049 subreq, struct tevent_req);
2050 struct recover_db_state *state = tevent_req_data(
2051 req, struct recover_db_state);
2052 struct ctdb_req_control request;
2053 int *err_list;
2054 int ret;
2055 bool status;
2057 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2058 NULL);
2059 TALLOC_FREE(subreq);
2060 if (! status) {
2061 int ret2;
2062 uint32_t pnn;
2064 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2065 state->nlist->count,
2066 err_list,
2067 &pnn);
2068 if (ret2 != 0) {
2069 D_ERR("control FREEZE_DB failed for db %s"
2070 " on node %u, ret=%d\n",
2071 state->db_name, pnn, ret2);
2073 node_list_ban_credits(state->nlist, pnn);
2074 } else {
2075 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
2076 state->db_name, ret);
2078 tevent_req_error(req, ret);
2079 return;
2082 ctdb_req_control_db_transaction_start(&request, &state->transdb);
2083 subreq = ctdb_client_control_multi_send(state,
2084 state->ev,
2085 state->client,
2086 state->nlist->pnn_list,
2087 state->nlist->count,
2088 TIMEOUT(),
2089 &request);
2090 if (tevent_req_nomem(subreq, req)) {
2091 return;
2093 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
2096 static void recover_db_transaction_started(struct tevent_req *subreq)
2098 struct tevent_req *req = tevent_req_callback_data(
2099 subreq, struct tevent_req);
2100 struct recover_db_state *state = tevent_req_data(
2101 req, struct recover_db_state);
2102 int *err_list;
2103 uint32_t flags;
2104 int ret;
2105 bool status;
2107 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2108 NULL);
2109 TALLOC_FREE(subreq);
2110 if (! status) {
2111 int ret2;
2112 uint32_t pnn;
2114 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2115 state->nlist->count,
2116 err_list,
2117 &pnn);
2118 if (ret2 != 0) {
2119 D_ERR("control TRANSACTION_DB failed for db=%s"
2120 " on node %u, ret=%d\n",
2121 state->db_name, pnn, ret2);
2122 } else {
2123 D_ERR("control TRANSACTION_DB failed for db=%s,"
2124 " ret=%d\n", state->db_name, ret);
2126 tevent_req_error(req, ret);
2127 return;
2130 flags = state->db->db_flags;
2131 state->recdb = recdb_create(state,
2132 state->db->db_id,
2133 state->db_name,
2134 state->db_path,
2135 state->tun_list->database_hash_size,
2136 flags & CTDB_DB_FLAGS_PERSISTENT);
2137 if (tevent_req_nomem(state->recdb, req)) {
2138 return;
2141 if ((flags & CTDB_DB_FLAGS_PERSISTENT) ||
2142 (flags & CTDB_DB_FLAGS_REPLICATED)) {
2143 subreq = collect_highseqnum_db_send(state,
2144 state->ev,
2145 state->client,
2146 state->nlist,
2147 state->db->db_id,
2148 state->recdb);
2149 } else {
2150 subreq = collect_all_db_send(state,
2151 state->ev,
2152 state->client,
2153 state->nlist,
2154 state->db->db_id,
2155 state->recdb);
2157 if (tevent_req_nomem(subreq, req)) {
2158 return;
2160 tevent_req_set_callback(subreq, recover_db_collect_done, req);
2163 static void recover_db_collect_done(struct tevent_req *subreq)
2165 struct tevent_req *req = tevent_req_callback_data(
2166 subreq, struct tevent_req);
2167 struct recover_db_state *state = tevent_req_data(
2168 req, struct recover_db_state);
2169 struct ctdb_req_control request;
2170 int ret;
2171 bool status;
2173 if ((state->db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
2174 (state->db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
2175 status = collect_highseqnum_db_recv(subreq, &ret);
2176 } else {
2177 status = collect_all_db_recv(subreq, &ret);
2179 TALLOC_FREE(subreq);
2180 if (! status) {
2181 tevent_req_error(req, ret);
2182 return;
2185 ctdb_req_control_wipe_database(&request, &state->transdb);
2186 subreq = ctdb_client_control_multi_send(state,
2187 state->ev,
2188 state->client,
2189 state->nlist->pnn_list,
2190 state->nlist->count,
2191 TIMEOUT(),
2192 &request);
2193 if (tevent_req_nomem(subreq, req)) {
2194 return;
2196 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
2199 static void recover_db_wipedb_done(struct tevent_req *subreq)
2201 struct tevent_req *req = tevent_req_callback_data(
2202 subreq, struct tevent_req);
2203 struct recover_db_state *state = tevent_req_data(
2204 req, struct recover_db_state);
2205 int *err_list;
2206 int ret;
2207 bool status;
2209 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2210 NULL);
2211 TALLOC_FREE(subreq);
2212 if (! status) {
2213 int ret2;
2214 uint32_t pnn;
2216 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2217 state->nlist->count,
2218 err_list,
2219 &pnn);
2220 if (ret2 != 0) {
2221 D_ERR("control WIPEDB failed for db %s on node %u,"
2222 " ret=%d\n", state->db_name, pnn, ret2);
2223 } else {
2224 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
2225 state->db_name, ret);
2227 tevent_req_error(req, ret);
2228 return;
2231 subreq = push_database_send(state,
2232 state->ev,
2233 state->client,
2234 state->nlist,
2235 state->tun_list,
2236 state->recdb);
2237 if (tevent_req_nomem(subreq, req)) {
2238 return;
2240 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
2243 static void recover_db_pushdb_done(struct tevent_req *subreq)
2245 struct tevent_req *req = tevent_req_callback_data(
2246 subreq, struct tevent_req);
2247 struct recover_db_state *state = tevent_req_data(
2248 req, struct recover_db_state);
2249 struct ctdb_req_control request;
2250 int ret;
2251 bool status;
2253 status = push_database_recv(subreq, &ret);
2254 TALLOC_FREE(subreq);
2255 if (! status) {
2256 tevent_req_error(req, ret);
2257 return;
2260 TALLOC_FREE(state->recdb);
2262 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
2263 subreq = ctdb_client_control_multi_send(state,
2264 state->ev,
2265 state->client,
2266 state->nlist->pnn_list,
2267 state->nlist->count,
2268 TIMEOUT(),
2269 &request);
2270 if (tevent_req_nomem(subreq, req)) {
2271 return;
2273 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
2276 static void recover_db_transaction_committed(struct tevent_req *subreq)
2278 struct tevent_req *req = tevent_req_callback_data(
2279 subreq, struct tevent_req);
2280 struct recover_db_state *state = tevent_req_data(
2281 req, struct recover_db_state);
2282 struct ctdb_req_control request;
2283 int *err_list;
2284 int ret;
2285 bool status;
2287 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2288 NULL);
2289 TALLOC_FREE(subreq);
2290 if (! status) {
2291 int ret2;
2292 uint32_t pnn;
2294 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2295 state->nlist->count,
2296 err_list,
2297 &pnn);
2298 if (ret2 != 0) {
2299 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
2300 " on node %u, ret=%d\n",
2301 state->db_name, pnn, ret2);
2302 } else {
2303 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
2304 " ret=%d\n", state->db_name, ret);
2306 tevent_req_error(req, ret);
2307 return;
2310 ctdb_req_control_db_thaw(&request, state->db->db_id);
2311 subreq = ctdb_client_control_multi_send(state,
2312 state->ev,
2313 state->client,
2314 state->nlist->pnn_list,
2315 state->nlist->count,
2316 TIMEOUT(),
2317 &request);
2318 if (tevent_req_nomem(subreq, req)) {
2319 return;
2321 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
2324 static void recover_db_thaw_done(struct tevent_req *subreq)
2326 struct tevent_req *req = tevent_req_callback_data(
2327 subreq, struct tevent_req);
2328 struct recover_db_state *state = tevent_req_data(
2329 req, struct recover_db_state);
2330 int *err_list;
2331 int ret;
2332 bool status;
2334 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2335 NULL);
2336 TALLOC_FREE(subreq);
2337 if (! status) {
2338 int ret2;
2339 uint32_t pnn;
2341 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2342 state->nlist->count,
2343 err_list,
2344 &pnn);
2345 if (ret2 != 0) {
2346 D_ERR("control DB_THAW failed for db %s on node %u,"
2347 " ret=%d\n", state->db_name, pnn, ret2);
2348 } else {
2349 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
2350 state->db_name, ret);
2352 tevent_req_error(req, ret);
2353 return;
2356 tevent_req_done(req);
2359 static bool recover_db_recv(struct tevent_req *req)
2361 return generic_recv(req, NULL);
2366 * Start database recovery for each database
2368 * Try to recover each database 5 times before failing recovery.
2371 struct db_recovery_state {
2372 struct tevent_context *ev;
2373 struct db_list *dblist;
2374 unsigned int num_replies;
2375 unsigned int num_failed;
2378 struct db_recovery_one_state {
2379 struct tevent_req *req;
2380 struct ctdb_client_context *client;
2381 struct db_list *dblist;
2382 struct ctdb_tunable_list *tun_list;
2383 struct node_list *nlist;
2384 uint32_t generation;
2385 struct db *db;
2386 int num_fails;
2389 static void db_recovery_one_done(struct tevent_req *subreq);
2391 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
2392 struct tevent_context *ev,
2393 struct ctdb_client_context *client,
2394 struct db_list *dblist,
2395 struct ctdb_tunable_list *tun_list,
2396 struct node_list *nlist,
2397 uint32_t generation)
2399 struct tevent_req *req, *subreq;
2400 struct db_recovery_state *state;
2401 struct db *db;
2403 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
2404 if (req == NULL) {
2405 return NULL;
2408 state->ev = ev;
2409 state->dblist = dblist;
2410 state->num_replies = 0;
2411 state->num_failed = 0;
2413 if (dblist->num_dbs == 0) {
2414 tevent_req_done(req);
2415 return tevent_req_post(req, ev);
2418 for (db = dblist->db; db != NULL; db = db->next) {
2419 struct db_recovery_one_state *substate;
2421 substate = talloc_zero(state, struct db_recovery_one_state);
2422 if (tevent_req_nomem(substate, req)) {
2423 return tevent_req_post(req, ev);
2426 substate->req = req;
2427 substate->client = client;
2428 substate->dblist = dblist;
2429 substate->tun_list = tun_list;
2430 substate->nlist = nlist;
2431 substate->generation = generation;
2432 substate->db = db;
2434 subreq = recover_db_send(state,
2436 client,
2437 tun_list,
2438 nlist,
2439 generation,
2440 substate->db);
2441 if (tevent_req_nomem(subreq, req)) {
2442 return tevent_req_post(req, ev);
2444 tevent_req_set_callback(subreq, db_recovery_one_done,
2445 substate);
2446 D_NOTICE("recover database 0x%08x\n", substate->db->db_id);
2449 return req;
2452 static void db_recovery_one_done(struct tevent_req *subreq)
2454 struct db_recovery_one_state *substate = tevent_req_callback_data(
2455 subreq, struct db_recovery_one_state);
2456 struct tevent_req *req = substate->req;
2457 struct db_recovery_state *state = tevent_req_data(
2458 req, struct db_recovery_state);
2459 bool status;
2461 status = recover_db_recv(subreq);
2462 TALLOC_FREE(subreq);
2464 if (status) {
2465 talloc_free(substate);
2466 goto done;
2469 substate->num_fails += 1;
2470 if (substate->num_fails < NUM_RETRIES) {
2471 subreq = recover_db_send(state,
2472 state->ev,
2473 substate->client,
2474 substate->tun_list,
2475 substate->nlist,
2476 substate->generation,
2477 substate->db);
2478 if (tevent_req_nomem(subreq, req)) {
2479 goto failed;
2481 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2482 D_NOTICE("recover database 0x%08x, attempt %d\n",
2483 substate->db->db_id, substate->num_fails+1);
2484 return;
2487 failed:
2488 state->num_failed += 1;
2490 done:
2491 state->num_replies += 1;
2493 if (state->num_replies == state->dblist->num_dbs) {
2494 tevent_req_done(req);
2498 static bool db_recovery_recv(struct tevent_req *req, unsigned int *count)
2500 struct db_recovery_state *state = tevent_req_data(
2501 req, struct db_recovery_state);
2502 int err;
2504 if (tevent_req_is_unix_error(req, &err)) {
2505 *count = 0;
2506 return false;
2509 *count = state->num_replies - state->num_failed;
2511 if (state->num_failed > 0) {
2512 return false;
2515 return true;
2518 struct ban_node_state {
2519 struct tevent_context *ev;
2520 struct ctdb_client_context *client;
2521 struct ctdb_tunable_list *tun_list;
2522 struct node_list *nlist;
2523 uint32_t destnode;
2525 uint32_t max_pnn;
2528 static bool ban_node_check(struct tevent_req *req);
2529 static void ban_node_check_done(struct tevent_req *subreq);
2530 static void ban_node_done(struct tevent_req *subreq);
2532 static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx,
2533 struct tevent_context *ev,
2534 struct ctdb_client_context *client,
2535 struct ctdb_tunable_list *tun_list,
2536 struct node_list *nlist)
2538 struct tevent_req *req;
2539 struct ban_node_state *state;
2540 bool ok;
2542 req = tevent_req_create(mem_ctx, &state, struct ban_node_state);
2543 if (req == NULL) {
2544 return NULL;
2547 state->ev = ev;
2548 state->client = client;
2549 state->tun_list = tun_list;
2550 state->nlist = nlist;
2551 state->destnode = ctdb_client_pnn(client);
2553 /* Bans are not enabled */
2554 if (state->tun_list->enable_bans == 0) {
2555 D_ERR("Bans are not enabled\n");
2556 tevent_req_done(req);
2557 return tevent_req_post(req, ev);
2560 ok = ban_node_check(req);
2561 if (!ok) {
2562 return tevent_req_post(req, ev);
2565 return req;
2568 static bool ban_node_check(struct tevent_req *req)
2570 struct tevent_req *subreq;
2571 struct ban_node_state *state = tevent_req_data(
2572 req, struct ban_node_state);
2573 struct ctdb_req_control request;
2574 unsigned max_credits = 0, i;
2576 for (i=0; i<state->nlist->count; i++) {
2577 if (state->nlist->ban_credits[i] > max_credits) {
2578 state->max_pnn = state->nlist->pnn_list[i];
2579 max_credits = state->nlist->ban_credits[i];
2583 if (max_credits < NUM_RETRIES) {
2584 tevent_req_done(req);
2585 return false;
2588 ctdb_req_control_get_nodemap(&request);
2589 subreq = ctdb_client_control_send(state,
2590 state->ev,
2591 state->client,
2592 state->max_pnn,
2593 TIMEOUT(),
2594 &request);
2595 if (tevent_req_nomem(subreq, req)) {
2596 return false;
2598 tevent_req_set_callback(subreq, ban_node_check_done, req);
2600 return true;
2603 static void ban_node_check_done(struct tevent_req *subreq)
2605 struct tevent_req *req = tevent_req_callback_data(
2606 subreq, struct tevent_req);
2607 struct ban_node_state *state = tevent_req_data(
2608 req, struct ban_node_state);
2609 struct ctdb_reply_control *reply;
2610 struct ctdb_node_map *nodemap;
2611 struct ctdb_req_control request;
2612 struct ctdb_ban_state ban;
2613 unsigned int i;
2614 int ret;
2615 bool ok;
2617 ok = ctdb_client_control_recv(subreq, &ret, state, &reply);
2618 TALLOC_FREE(subreq);
2619 if (!ok) {
2620 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2621 state->max_pnn, ret);
2622 tevent_req_error(req, ret);
2623 return;
2626 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2627 if (ret != 0) {
2628 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2629 tevent_req_error(req, ret);
2630 return;
2633 for (i=0; i<nodemap->num; i++) {
2634 if (nodemap->node[i].pnn != state->max_pnn) {
2635 continue;
2638 /* If the node became inactive, reset ban_credits */
2639 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2640 unsigned int j;
2642 for (j=0; j<state->nlist->count; j++) {
2643 if (state->nlist->pnn_list[j] ==
2644 state->max_pnn) {
2645 state->nlist->ban_credits[j] = 0;
2646 break;
2649 state->max_pnn = CTDB_UNKNOWN_PNN;
2653 talloc_free(nodemap);
2654 talloc_free(reply);
2656 /* If node becames inactive during recovery, pick next */
2657 if (state->max_pnn == CTDB_UNKNOWN_PNN) {
2658 (void) ban_node_check(req);
2659 return;
2662 ban = (struct ctdb_ban_state) {
2663 .pnn = state->max_pnn,
2664 .time = state->tun_list->recovery_ban_period,
2667 D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time);
2669 ctdb_req_control_set_ban_state(&request, &ban);
2670 subreq = ctdb_client_control_send(state,
2671 state->ev,
2672 state->client,
2673 ban.pnn,
2674 TIMEOUT(),
2675 &request);
2676 if (tevent_req_nomem(subreq, req)) {
2677 return;
2679 tevent_req_set_callback(subreq, ban_node_done, req);
2682 static void ban_node_done(struct tevent_req *subreq)
2684 struct tevent_req *req = tevent_req_callback_data(
2685 subreq, struct tevent_req);
2686 struct node_ban_state *state = tevent_req_data(
2687 req, struct node_ban_state);
2688 struct ctdb_reply_control *reply;
2689 int ret;
2690 bool status;
2692 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2693 TALLOC_FREE(subreq);
2694 if (! status) {
2695 tevent_req_error(req, ret);
2696 return;
2699 ret = ctdb_reply_control_set_ban_state(reply);
2700 if (ret != 0) {
2701 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2702 tevent_req_error(req, ret);
2703 return;
2706 talloc_free(reply);
2707 tevent_req_done(req);
2710 static bool ban_node_recv(struct tevent_req *req, int *perr)
2712 if (tevent_req_is_unix_error(req, perr)) {
2713 return false;
2716 return true;
2720 * Run the parallel database recovery
2722 * - Get tunables
2723 * - Get nodemap from all nodes
2724 * - Get capabilities from all nodes
2725 * - Get dbmap
2726 * - Set RECOVERY_ACTIVE
2727 * - Send START_RECOVERY
2728 * - Update vnnmap on all nodes
2729 * - Run database recovery
2730 * - Set RECOVERY_NORMAL
2731 * - Send END_RECOVERY
2734 struct recovery_state {
2735 struct tevent_context *ev;
2736 struct ctdb_client_context *client;
2737 uint32_t generation;
2738 uint32_t destnode;
2739 struct node_list *nlist;
2740 struct ctdb_tunable_list *tun_list;
2741 struct ctdb_vnn_map *vnnmap;
2742 struct db_list *dblist;
2745 static void recovery_tunables_done(struct tevent_req *subreq);
2746 static void recovery_nodemap_done(struct tevent_req *subreq);
2747 static void recovery_nodemap_verify(struct tevent_req *subreq);
2748 static void recovery_capabilities_done(struct tevent_req *subreq);
2749 static void recovery_dbmap_done(struct tevent_req *subreq);
2750 static void recovery_active_done(struct tevent_req *subreq);
2751 static void recovery_start_recovery_done(struct tevent_req *subreq);
2752 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2753 static void recovery_db_recovery_done(struct tevent_req *subreq);
2754 static void recovery_failed_done(struct tevent_req *subreq);
2755 static void recovery_normal_done(struct tevent_req *subreq);
2756 static void recovery_end_recovery_done(struct tevent_req *subreq);
2758 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2759 struct tevent_context *ev,
2760 struct ctdb_client_context *client,
2761 uint32_t generation)
2763 struct tevent_req *req, *subreq;
2764 struct recovery_state *state;
2765 struct ctdb_req_control request;
2767 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2768 if (req == NULL) {
2769 return NULL;
2772 state->ev = ev;
2773 state->client = client;
2774 state->generation = generation;
2775 state->destnode = ctdb_client_pnn(client);
2777 ctdb_req_control_get_all_tunables(&request);
2778 subreq = ctdb_client_control_send(state, state->ev, state->client,
2779 state->destnode, TIMEOUT(),
2780 &request);
2781 if (tevent_req_nomem(subreq, req)) {
2782 return tevent_req_post(req, ev);
2784 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2786 return req;
2789 static void recovery_tunables_done(struct tevent_req *subreq)
2791 struct tevent_req *req = tevent_req_callback_data(
2792 subreq, struct tevent_req);
2793 struct recovery_state *state = tevent_req_data(
2794 req, struct recovery_state);
2795 struct ctdb_reply_control *reply;
2796 struct ctdb_req_control request;
2797 int ret;
2798 bool status;
2800 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2801 TALLOC_FREE(subreq);
2802 if (! status) {
2803 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2804 tevent_req_error(req, ret);
2805 return;
2808 ret = ctdb_reply_control_get_all_tunables(reply, state,
2809 &state->tun_list);
2810 if (ret != 0) {
2811 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2812 tevent_req_error(req, EPROTO);
2813 return;
2816 talloc_free(reply);
2818 recover_timeout = state->tun_list->recover_timeout;
2820 ctdb_req_control_get_nodemap(&request);
2821 subreq = ctdb_client_control_send(state, state->ev, state->client,
2822 state->destnode, TIMEOUT(),
2823 &request);
2824 if (tevent_req_nomem(subreq, req)) {
2825 return;
2827 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2830 static void recovery_nodemap_done(struct tevent_req *subreq)
2832 struct tevent_req *req = tevent_req_callback_data(
2833 subreq, struct tevent_req);
2834 struct recovery_state *state = tevent_req_data(
2835 req, struct recovery_state);
2836 struct ctdb_reply_control *reply;
2837 struct ctdb_req_control request;
2838 struct ctdb_node_map *nodemap;
2839 unsigned int i;
2840 bool status;
2841 int ret;
2843 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2844 TALLOC_FREE(subreq);
2845 if (! status) {
2846 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2847 state->destnode, ret);
2848 tevent_req_error(req, ret);
2849 return;
2852 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
2853 if (ret != 0) {
2854 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2855 tevent_req_error(req, ret);
2856 return;
2859 state->nlist = node_list_init(state, nodemap->num);
2860 if (tevent_req_nomem(state->nlist, req)) {
2861 return;
2864 for (i=0; i<nodemap->num; i++) {
2865 bool ok;
2867 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2868 continue;
2871 ok = node_list_add(state->nlist, nodemap->node[i].pnn);
2872 if (!ok) {
2873 tevent_req_error(req, EINVAL);
2874 return;
2878 talloc_free(nodemap);
2879 talloc_free(reply);
2881 /* Verify flags by getting local node information from each node */
2882 ctdb_req_control_get_nodemap(&request);
2883 subreq = ctdb_client_control_multi_send(state,
2884 state->ev,
2885 state->client,
2886 state->nlist->pnn_list,
2887 state->nlist->count,
2888 TIMEOUT(),
2889 &request);
2890 if (tevent_req_nomem(subreq, req)) {
2891 return;
2893 tevent_req_set_callback(subreq, recovery_nodemap_verify, req);
2896 static void recovery_nodemap_verify(struct tevent_req *subreq)
2898 struct tevent_req *req = tevent_req_callback_data(
2899 subreq, struct tevent_req);
2900 struct recovery_state *state = tevent_req_data(
2901 req, struct recovery_state);
2902 struct ctdb_req_control request;
2903 struct ctdb_reply_control **reply;
2904 struct node_list *nlist;
2905 unsigned int i;
2906 int *err_list;
2907 int ret;
2908 bool status;
2910 status = ctdb_client_control_multi_recv(subreq,
2911 &ret,
2912 state,
2913 &err_list,
2914 &reply);
2915 TALLOC_FREE(subreq);
2916 if (! status) {
2917 int ret2;
2918 uint32_t pnn;
2920 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
2921 state->nlist->count,
2922 err_list,
2923 &pnn);
2924 if (ret2 != 0) {
2925 D_ERR("control GET_NODEMAP failed on node %u,"
2926 " ret=%d\n", pnn, ret2);
2927 } else {
2928 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2930 tevent_req_error(req, ret);
2931 return;
2934 nlist = node_list_init(state, state->nlist->size);
2935 if (tevent_req_nomem(nlist, req)) {
2936 return;
2939 for (i=0; i<state->nlist->count; i++) {
2940 struct ctdb_node_map *nodemap = NULL;
2941 uint32_t pnn, flags;
2942 unsigned int j;
2943 bool ok;
2945 pnn = state->nlist->pnn_list[i];
2946 ret = ctdb_reply_control_get_nodemap(reply[i],
2947 state,
2948 &nodemap);
2949 if (ret != 0) {
2950 D_ERR("control GET_NODEMAP failed on node %u\n", pnn);
2951 tevent_req_error(req, EPROTO);
2952 return;
2955 flags = NODE_FLAGS_DISCONNECTED;
2956 for (j=0; j<nodemap->num; j++) {
2957 if (nodemap->node[j].pnn == pnn) {
2958 flags = nodemap->node[j].flags;
2959 break;
2963 TALLOC_FREE(nodemap);
2965 if (flags & NODE_FLAGS_INACTIVE) {
2966 continue;
2969 ok = node_list_add(nlist, pnn);
2970 if (!ok) {
2971 tevent_req_error(req, EINVAL);
2972 return;
2976 talloc_free(reply);
2978 talloc_free(state->nlist);
2979 state->nlist = nlist;
2981 ctdb_req_control_get_capabilities(&request);
2982 subreq = ctdb_client_control_multi_send(state,
2983 state->ev,
2984 state->client,
2985 state->nlist->pnn_list,
2986 state->nlist->count,
2987 TIMEOUT(),
2988 &request);
2989 if (tevent_req_nomem(subreq, req)) {
2990 return;
2992 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2995 static void recovery_capabilities_done(struct tevent_req *subreq)
2997 struct tevent_req *req = tevent_req_callback_data(
2998 subreq, struct tevent_req);
2999 struct recovery_state *state = tevent_req_data(
3000 req, struct recovery_state);
3001 struct ctdb_reply_control **reply;
3002 struct ctdb_req_control request;
3003 int *err_list;
3004 unsigned int i;
3005 int ret;
3006 bool status;
3008 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3009 &reply);
3010 TALLOC_FREE(subreq);
3011 if (! status) {
3012 int ret2;
3013 uint32_t pnn;
3015 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3016 state->nlist->count,
3017 err_list,
3018 &pnn);
3019 if (ret2 != 0) {
3020 D_ERR("control GET_CAPABILITIES failed on node %u,"
3021 " ret=%d\n", pnn, ret2);
3022 } else {
3023 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
3024 ret);
3026 tevent_req_error(req, ret);
3027 return;
3030 for (i=0; i<state->nlist->count; i++) {
3031 uint32_t caps;
3033 ret = ctdb_reply_control_get_capabilities(reply[i], &caps);
3034 if (ret != 0) {
3035 D_ERR("control GET_CAPABILITIES failed on node %u\n",
3036 state->nlist->pnn_list[i]);
3037 tevent_req_error(req, EPROTO);
3038 return;
3041 state->nlist->caps[i] = caps;
3044 talloc_free(reply);
3046 ctdb_req_control_get_dbmap(&request);
3047 subreq = ctdb_client_control_multi_send(state,
3048 state->ev,
3049 state->client,
3050 state->nlist->pnn_list,
3051 state->nlist->count,
3052 TIMEOUT(),
3053 &request);
3054 if (tevent_req_nomem(subreq, req)) {
3055 return;
3057 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
3060 static void recovery_dbmap_done(struct tevent_req *subreq)
3062 struct tevent_req *req = tevent_req_callback_data(
3063 subreq, struct tevent_req);
3064 struct recovery_state *state = tevent_req_data(
3065 req, struct recovery_state);
3066 struct ctdb_reply_control **reply;
3067 struct ctdb_req_control request;
3068 int *err_list;
3069 unsigned int i, j;
3070 int ret;
3071 bool status;
3073 status = ctdb_client_control_multi_recv(subreq,
3074 &ret,
3075 state,
3076 &err_list,
3077 &reply);
3078 TALLOC_FREE(subreq);
3079 if (! status) {
3080 int ret2;
3081 uint32_t pnn;
3083 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3084 state->nlist->count,
3085 err_list,
3086 &pnn);
3087 if (ret2 != 0) {
3088 D_ERR("control GET_DBMAP failed on node %u,"
3089 " ret=%d\n", pnn, ret2);
3090 } else {
3091 D_ERR("control GET_DBMAP failed, ret=%d\n",
3092 ret);
3094 tevent_req_error(req, ret);
3095 return;
3098 state->dblist = db_list_init(state, state->nlist->count);
3099 if (tevent_req_nomem(state->dblist, req)) {
3100 D_ERR("memory allocation error\n");
3101 return;
3104 for (i = 0; i < state->nlist->count; i++) {
3105 struct ctdb_dbid_map *dbmap = NULL;
3106 uint32_t pnn;
3108 pnn = state->nlist->pnn_list[i];
3110 ret = ctdb_reply_control_get_dbmap(reply[i], state, &dbmap);
3111 if (ret != 0) {
3112 D_ERR("control GET_DBMAP failed on node %u\n",
3113 pnn);
3114 tevent_req_error(req, EPROTO);
3115 return;
3118 for (j = 0; j < dbmap->num; j++) {
3119 ret = db_list_check_and_add(state->dblist,
3120 dbmap->dbs[j].db_id,
3121 dbmap->dbs[j].flags,
3122 pnn);
3123 if (ret != 0) {
3124 D_ERR("failed to add database list entry, "
3125 "ret=%d\n",
3126 ret);
3127 tevent_req_error(req, ret);
3128 return;
3132 TALLOC_FREE(dbmap);
3135 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
3136 subreq = ctdb_client_control_multi_send(state,
3137 state->ev,
3138 state->client,
3139 state->nlist->pnn_list,
3140 state->nlist->count,
3141 TIMEOUT(),
3142 &request);
3143 if (tevent_req_nomem(subreq, req)) {
3144 return;
3146 tevent_req_set_callback(subreq, recovery_active_done, req);
3149 static void recovery_active_done(struct tevent_req *subreq)
3151 struct tevent_req *req = tevent_req_callback_data(
3152 subreq, struct tevent_req);
3153 struct recovery_state *state = tevent_req_data(
3154 req, struct recovery_state);
3155 struct ctdb_req_control request;
3156 struct ctdb_vnn_map *vnnmap;
3157 int *err_list;
3158 int ret;
3159 bool status;
3161 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
3162 NULL);
3163 TALLOC_FREE(subreq);
3164 if (! status) {
3165 int ret2;
3166 uint32_t pnn;
3168 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3169 state->nlist->count,
3170 err_list,
3171 &pnn);
3172 if (ret2 != 0) {
3173 D_ERR("failed to set recovery mode ACTIVE on node %u,"
3174 " ret=%d\n", pnn, ret2);
3175 } else {
3176 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
3177 ret);
3179 tevent_req_error(req, ret);
3180 return;
3183 D_ERR("Set recovery mode to ACTIVE\n");
3185 /* Calculate new VNNMAP */
3186 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
3187 if (tevent_req_nomem(vnnmap, req)) {
3188 return;
3191 vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size);
3192 if (tevent_req_nomem(vnnmap->map, req)) {
3193 return;
3196 if (vnnmap->size == 0) {
3197 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
3198 vnnmap->map[0] = state->destnode;
3199 vnnmap->size = 1;
3202 vnnmap->generation = state->generation;
3204 state->vnnmap = vnnmap;
3206 ctdb_req_control_start_recovery(&request);
3207 subreq = ctdb_client_control_multi_send(state,
3208 state->ev,
3209 state->client,
3210 state->nlist->pnn_list,
3211 state->nlist->count,
3212 TIMEOUT(),
3213 &request);
3214 if (tevent_req_nomem(subreq, req)) {
3215 return;
3217 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
3220 static void recovery_start_recovery_done(struct tevent_req *subreq)
3222 struct tevent_req *req = tevent_req_callback_data(
3223 subreq, struct tevent_req);
3224 struct recovery_state *state = tevent_req_data(
3225 req, struct recovery_state);
3226 struct ctdb_req_control request;
3227 int *err_list;
3228 int ret;
3229 bool status;
3231 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
3232 NULL);
3233 TALLOC_FREE(subreq);
3234 if (! status) {
3235 int ret2;
3236 uint32_t pnn;
3238 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3239 state->nlist->count,
3240 err_list,
3241 &pnn);
3242 if (ret2 != 0) {
3243 D_ERR("failed to run start_recovery event on node %u,"
3244 " ret=%d\n", pnn, ret2);
3245 } else {
3246 D_ERR("failed to run start_recovery event, ret=%d\n",
3247 ret);
3249 tevent_req_error(req, ret);
3250 return;
3253 D_ERR("start_recovery event finished\n");
3255 ctdb_req_control_setvnnmap(&request, state->vnnmap);
3256 subreq = ctdb_client_control_multi_send(state,
3257 state->ev,
3258 state->client,
3259 state->nlist->pnn_list,
3260 state->nlist->count,
3261 TIMEOUT(),
3262 &request);
3263 if (tevent_req_nomem(subreq, req)) {
3264 return;
3266 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
3269 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
3271 struct tevent_req *req = tevent_req_callback_data(
3272 subreq, struct tevent_req);
3273 struct recovery_state *state = tevent_req_data(
3274 req, struct recovery_state);
3275 int *err_list;
3276 int ret;
3277 bool status;
3279 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
3280 NULL);
3281 TALLOC_FREE(subreq);
3282 if (! status) {
3283 int ret2;
3284 uint32_t pnn;
3286 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3287 state->nlist->count,
3288 err_list,
3289 &pnn);
3290 if (ret2 != 0) {
3291 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
3292 pnn, ret2);
3293 } else {
3294 D_ERR("failed to update VNNMAP, ret=%d\n", ret);
3296 tevent_req_error(req, ret);
3297 return;
3300 D_NOTICE("updated VNNMAP\n");
3302 subreq = db_recovery_send(state,
3303 state->ev,
3304 state->client,
3305 state->dblist,
3306 state->tun_list,
3307 state->nlist,
3308 state->vnnmap->generation);
3309 if (tevent_req_nomem(subreq, req)) {
3310 return;
3312 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
3315 static void recovery_db_recovery_done(struct tevent_req *subreq)
3317 struct tevent_req *req = tevent_req_callback_data(
3318 subreq, struct tevent_req);
3319 struct recovery_state *state = tevent_req_data(
3320 req, struct recovery_state);
3321 struct ctdb_req_control request;
3322 bool status;
3323 unsigned int count;
3325 status = db_recovery_recv(subreq, &count);
3326 TALLOC_FREE(subreq);
3328 D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs);
3330 if (! status) {
3331 subreq = ban_node_send(state,
3332 state->ev,
3333 state->client,
3334 state->tun_list,
3335 state->nlist);
3336 if (tevent_req_nomem(subreq, req)) {
3337 return;
3339 tevent_req_set_callback(subreq, recovery_failed_done, req);
3340 return;
3343 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
3344 subreq = ctdb_client_control_multi_send(state,
3345 state->ev,
3346 state->client,
3347 state->nlist->pnn_list,
3348 state->nlist->count,
3349 TIMEOUT(),
3350 &request);
3351 if (tevent_req_nomem(subreq, req)) {
3352 return;
3354 tevent_req_set_callback(subreq, recovery_normal_done, req);
3357 static void recovery_failed_done(struct tevent_req *subreq)
3359 struct tevent_req *req = tevent_req_callback_data(
3360 subreq, struct tevent_req);
3361 int ret;
3362 bool status;
3364 status = ban_node_recv(subreq, &ret);
3365 TALLOC_FREE(subreq);
3366 if (! status) {
3367 D_ERR("failed to ban node, ret=%d\n", ret);
3370 tevent_req_error(req, EIO);
3373 static void recovery_normal_done(struct tevent_req *subreq)
3375 struct tevent_req *req = tevent_req_callback_data(
3376 subreq, struct tevent_req);
3377 struct recovery_state *state = tevent_req_data(
3378 req, struct recovery_state);
3379 struct ctdb_req_control request;
3380 int *err_list;
3381 int ret;
3382 bool status;
3384 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3385 NULL);
3386 TALLOC_FREE(subreq);
3387 if (! status) {
3388 int ret2;
3389 uint32_t pnn;
3391 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3392 state->nlist->count,
3393 err_list,
3394 &pnn);
3395 if (ret2 != 0) {
3396 D_ERR("failed to set recovery mode NORMAL on node %u,"
3397 " ret=%d\n", pnn, ret2);
3398 } else {
3399 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
3400 ret);
3402 tevent_req_error(req, ret);
3403 return;
3406 D_ERR("Set recovery mode to NORMAL\n");
3408 ctdb_req_control_end_recovery(&request);
3409 subreq = ctdb_client_control_multi_send(state,
3410 state->ev,
3411 state->client,
3412 state->nlist->pnn_list,
3413 state->nlist->count,
3414 TIMEOUT(),
3415 &request);
3416 if (tevent_req_nomem(subreq, req)) {
3417 return;
3419 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
3422 static void recovery_end_recovery_done(struct tevent_req *subreq)
3424 struct tevent_req *req = tevent_req_callback_data(
3425 subreq, struct tevent_req);
3426 struct recovery_state *state = tevent_req_data(
3427 req, struct recovery_state);
3428 int *err_list;
3429 int ret;
3430 bool status;
3432 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
3433 NULL);
3434 TALLOC_FREE(subreq);
3435 if (! status) {
3436 int ret2;
3437 uint32_t pnn;
3439 ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
3440 state->nlist->count,
3441 err_list,
3442 &pnn);
3443 if (ret2 != 0) {
3444 D_ERR("failed to run recovered event on node %u,"
3445 " ret=%d\n", pnn, ret2);
3446 } else {
3447 D_ERR("failed to run recovered event, ret=%d\n", ret);
3449 tevent_req_error(req, ret);
3450 return;
3453 D_ERR("recovered event finished\n");
3455 tevent_req_done(req);
3458 static void recovery_recv(struct tevent_req *req, int *perr)
3460 generic_recv(req, perr);
3463 static void usage(const char *progname)
3465 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
3466 progname);
3471 * Arguments - log fd, write fd, socket path, generation
3473 int main(int argc, char *argv[])
3475 int write_fd;
3476 const char *sockpath;
3477 TALLOC_CTX *mem_ctx = NULL;
3478 struct tevent_context *ev;
3479 struct ctdb_client_context *client;
3480 int ret = 0;
3481 struct tevent_req *req;
3482 uint32_t generation;
3484 if (argc != 4) {
3485 usage(argv[0]);
3486 exit(1);
3489 write_fd = atoi(argv[1]);
3490 sockpath = argv[2];
3491 generation = (uint32_t)smb_strtoul(argv[3],
3492 NULL,
3494 &ret,
3495 SMB_STR_STANDARD);
3496 if (ret != 0) {
3497 fprintf(stderr, "recovery: unable to initialize generation\n");
3498 goto failed;
3501 mem_ctx = talloc_new(NULL);
3502 if (mem_ctx == NULL) {
3503 fprintf(stderr, "recovery: talloc_new() failed\n");
3504 goto failed;
3507 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
3508 if (ret != 0) {
3509 fprintf(stderr, "recovery: Unable to initialize logging\n");
3510 goto failed;
3513 ev = tevent_context_init(mem_ctx);
3514 if (ev == NULL) {
3515 D_ERR("tevent_context_init() failed\n");
3516 goto failed;
3519 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
3520 if (ret != 0) {
3521 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
3522 goto failed;
3525 req = recovery_send(mem_ctx, ev, client, generation);
3526 if (req == NULL) {
3527 D_ERR("database_recover_send() failed\n");
3528 goto failed;
3531 if (! tevent_req_poll(req, ev)) {
3532 D_ERR("tevent_req_poll() failed\n");
3533 goto failed;
3536 recovery_recv(req, &ret);
3537 TALLOC_FREE(req);
3538 if (ret != 0) {
3539 D_ERR("database recovery failed, ret=%d\n", ret);
3540 goto failed;
3543 sys_write(write_fd, &ret, sizeof(ret));
3544 return 0;
3546 failed:
3547 TALLOC_FREE(mem_ctx);
3548 return 1;