ctdb-recovery: Ban a node that causes recovery failure
[Samba.git] / ctdb / server / ctdb_recovery_helper.c
blob7fdcc2e5a29ea0414bb8fe84a44e6fa9415b3cba
1 /*
2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client.h"
38 #include "common/logging.h"
40 static int recover_timeout = 30;
42 #define NUM_RETRIES 3
44 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
47 * Utility functions
50 static bool generic_recv(struct tevent_req *req, int *perr)
52 int err;
54 if (tevent_req_is_unix_error(req, &err)) {
55 if (perr != NULL) {
56 *perr = err;
58 return false;
61 return true;
64 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
66 static uint64_t srvid_next(void)
68 rec_srvid += 1;
69 return rec_srvid;
73 * Recovery database functions
76 struct recdb_context {
77 uint32_t db_id;
78 const char *db_name;
79 const char *db_path;
80 struct tdb_wrap *db;
81 bool persistent;
84 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
85 const char *db_name,
86 const char *db_path,
87 uint32_t hash_size, bool persistent)
89 static char *db_dir_state = NULL;
90 struct recdb_context *recdb;
91 unsigned int tdb_flags;
93 recdb = talloc(mem_ctx, struct recdb_context);
94 if (recdb == NULL) {
95 return NULL;
98 if (db_dir_state == NULL) {
99 db_dir_state = getenv("CTDB_DBDIR_STATE");
102 recdb->db_name = db_name;
103 recdb->db_id = db_id;
104 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
105 db_dir_state != NULL ?
106 db_dir_state :
107 dirname(discard_const(db_path)),
108 db_name);
109 if (recdb->db_path == NULL) {
110 talloc_free(recdb);
111 return NULL;
113 unlink(recdb->db_path);
115 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
116 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
117 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
118 if (recdb->db == NULL) {
119 talloc_free(recdb);
120 D_ERR("failed to create recovery db %s\n", recdb->db_path);
121 return NULL;
124 recdb->persistent = persistent;
126 return recdb;
129 static uint32_t recdb_id(struct recdb_context *recdb)
131 return recdb->db_id;
134 static const char *recdb_name(struct recdb_context *recdb)
136 return recdb->db_name;
139 static const char *recdb_path(struct recdb_context *recdb)
141 return recdb->db_path;
144 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
146 return recdb->db->tdb;
149 static bool recdb_persistent(struct recdb_context *recdb)
151 return recdb->persistent;
154 struct recdb_add_traverse_state {
155 struct recdb_context *recdb;
156 int mypnn;
159 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
160 TDB_DATA key, TDB_DATA data,
161 void *private_data)
163 struct recdb_add_traverse_state *state =
164 (struct recdb_add_traverse_state *)private_data;
165 struct ctdb_ltdb_header *hdr;
166 TDB_DATA prev_data;
167 int ret;
169 /* header is not marshalled separately in the pulldb control */
170 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
171 return -1;
174 hdr = (struct ctdb_ltdb_header *)data.dptr;
176 /* fetch the existing record, if any */
177 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
179 if (prev_data.dptr != NULL) {
180 struct ctdb_ltdb_header prev_hdr;
182 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
183 free(prev_data.dptr);
184 if (hdr->rsn < prev_hdr.rsn ||
185 (hdr->rsn == prev_hdr.rsn &&
186 prev_hdr.dmaster != state->mypnn)) {
187 return 0;
191 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
192 if (ret != 0) {
193 return -1;
195 return 0;
198 static bool recdb_add(struct recdb_context *recdb, int mypnn,
199 struct ctdb_rec_buffer *recbuf)
201 struct recdb_add_traverse_state state;
202 int ret;
204 state.recdb = recdb;
205 state.mypnn = mypnn;
207 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
208 if (ret != 0) {
209 return false;
212 return true;
215 /* This function decides which records from recdb are retained */
216 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
217 uint32_t reqid, uint32_t dmaster,
218 TDB_DATA key, TDB_DATA data)
220 struct ctdb_ltdb_header *header;
221 int ret;
223 /* Skip empty records */
224 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
225 return 0;
228 /* update the dmaster field to point to us */
229 header = (struct ctdb_ltdb_header *)data.dptr;
230 if (!persistent) {
231 header->dmaster = dmaster;
232 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
235 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
236 if (ret != 0) {
237 return ret;
240 return 0;
243 struct recdb_records_traverse_state {
244 struct ctdb_rec_buffer *recbuf;
245 uint32_t dmaster;
246 uint32_t reqid;
247 bool persistent;
248 bool failed;
251 static int recdb_records_traverse(struct tdb_context *tdb,
252 TDB_DATA key, TDB_DATA data,
253 void *private_data)
255 struct recdb_records_traverse_state *state =
256 (struct recdb_records_traverse_state *)private_data;
257 int ret;
259 ret = recbuf_filter_add(state->recbuf, state->persistent,
260 state->reqid, state->dmaster, key, data);
261 if (ret != 0) {
262 state->failed = true;
263 return ret;
266 return 0;
269 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
270 TALLOC_CTX *mem_ctx,
271 uint32_t dmaster)
273 struct recdb_records_traverse_state state;
274 int ret;
276 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
277 if (state.recbuf == NULL) {
278 return NULL;
280 state.dmaster = dmaster;
281 state.reqid = 0;
282 state.persistent = recdb_persistent(recdb);
283 state.failed = false;
285 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
286 &state);
287 if (ret == -1 || state.failed) {
288 D_ERR("Failed to marshall recovery records for %s\n",
289 recdb_name(recdb));
290 TALLOC_FREE(state.recbuf);
291 return NULL;
294 return state.recbuf;
297 struct recdb_file_traverse_state {
298 struct ctdb_rec_buffer *recbuf;
299 struct recdb_context *recdb;
300 TALLOC_CTX *mem_ctx;
301 uint32_t dmaster;
302 uint32_t reqid;
303 bool persistent;
304 bool failed;
305 int fd;
306 int max_size;
307 int num_buffers;
310 static int recdb_file_traverse(struct tdb_context *tdb,
311 TDB_DATA key, TDB_DATA data,
312 void *private_data)
314 struct recdb_file_traverse_state *state =
315 (struct recdb_file_traverse_state *)private_data;
316 int ret;
318 ret = recbuf_filter_add(state->recbuf, state->persistent,
319 state->reqid, state->dmaster, key, data);
320 if (ret != 0) {
321 state->failed = true;
322 return ret;
325 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
326 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
327 if (ret != 0) {
328 D_ERR("Failed to collect recovery records for %s\n",
329 recdb_name(state->recdb));
330 state->failed = true;
331 return ret;
334 state->num_buffers += 1;
336 TALLOC_FREE(state->recbuf);
337 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
338 recdb_id(state->recdb));
339 if (state->recbuf == NULL) {
340 state->failed = true;
341 return ENOMEM;
345 return 0;
348 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
349 uint32_t dmaster, int fd, int max_size)
351 struct recdb_file_traverse_state state;
352 int ret;
354 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
355 if (state.recbuf == NULL) {
356 return -1;
358 state.recdb = recdb;
359 state.mem_ctx = mem_ctx;
360 state.dmaster = dmaster;
361 state.reqid = 0;
362 state.persistent = recdb_persistent(recdb);
363 state.failed = false;
364 state.fd = fd;
365 state.max_size = max_size;
366 state.num_buffers = 0;
368 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
369 if (ret == -1 || state.failed) {
370 TALLOC_FREE(state.recbuf);
371 return -1;
374 ret = ctdb_rec_buffer_write(state.recbuf, fd);
375 if (ret != 0) {
376 D_ERR("Failed to collect recovery records for %s\n",
377 recdb_name(recdb));
378 TALLOC_FREE(state.recbuf);
379 return -1;
381 state.num_buffers += 1;
383 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
384 state.num_buffers, recdb_name(recdb));
386 return state.num_buffers;
390 * Pull database from a single node
393 struct pull_database_state {
394 struct tevent_context *ev;
395 struct ctdb_client_context *client;
396 struct recdb_context *recdb;
397 uint32_t pnn;
398 uint64_t srvid;
399 int num_records;
400 int result;
403 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
404 void *private_data);
405 static void pull_database_register_done(struct tevent_req *subreq);
406 static void pull_database_old_done(struct tevent_req *subreq);
407 static void pull_database_unregister_done(struct tevent_req *subreq);
408 static void pull_database_new_done(struct tevent_req *subreq);
410 static struct tevent_req *pull_database_send(
411 TALLOC_CTX *mem_ctx,
412 struct tevent_context *ev,
413 struct ctdb_client_context *client,
414 uint32_t pnn, uint32_t caps,
415 struct recdb_context *recdb)
417 struct tevent_req *req, *subreq;
418 struct pull_database_state *state;
419 struct ctdb_req_control request;
421 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
422 if (req == NULL) {
423 return NULL;
426 state->ev = ev;
427 state->client = client;
428 state->recdb = recdb;
429 state->pnn = pnn;
430 state->srvid = srvid_next();
432 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
433 subreq = ctdb_client_set_message_handler_send(
434 state, state->ev, state->client,
435 state->srvid, pull_database_handler,
436 req);
437 if (tevent_req_nomem(subreq, req)) {
438 return tevent_req_post(req, ev);
441 tevent_req_set_callback(subreq, pull_database_register_done,
442 req);
444 } else {
445 struct ctdb_pulldb pulldb;
447 pulldb.db_id = recdb_id(recdb);
448 pulldb.lmaster = CTDB_LMASTER_ANY;
450 ctdb_req_control_pull_db(&request, &pulldb);
451 subreq = ctdb_client_control_send(state, state->ev,
452 state->client,
453 pnn, TIMEOUT(),
454 &request);
455 if (tevent_req_nomem(subreq, req)) {
456 return tevent_req_post(req, ev);
458 tevent_req_set_callback(subreq, pull_database_old_done, req);
461 return req;
464 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
465 void *private_data)
467 struct tevent_req *req = talloc_get_type_abort(
468 private_data, struct tevent_req);
469 struct pull_database_state *state = tevent_req_data(
470 req, struct pull_database_state);
471 struct ctdb_rec_buffer *recbuf;
472 size_t np;
473 int ret;
474 bool status;
476 if (srvid != state->srvid) {
477 return;
480 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
481 if (ret != 0) {
482 D_ERR("Invalid data received for DB_PULL messages\n");
483 return;
486 if (recbuf->db_id != recdb_id(state->recdb)) {
487 talloc_free(recbuf);
488 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
489 recbuf->db_id, recdb_name(state->recdb));
490 return;
493 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
494 recbuf);
495 if (! status) {
496 talloc_free(recbuf);
497 D_ERR("Failed to add records to recdb for %s\n",
498 recdb_name(state->recdb));
499 return;
502 state->num_records += recbuf->count;
503 talloc_free(recbuf);
506 static void pull_database_register_done(struct tevent_req *subreq)
508 struct tevent_req *req = tevent_req_callback_data(
509 subreq, struct tevent_req);
510 struct pull_database_state *state = tevent_req_data(
511 req, struct pull_database_state);
512 struct ctdb_req_control request;
513 struct ctdb_pulldb_ext pulldb_ext;
514 int ret;
515 bool status;
517 status = ctdb_client_set_message_handler_recv(subreq, &ret);
518 TALLOC_FREE(subreq);
519 if (! status) {
520 D_ERR("Failed to set message handler for DB_PULL for %s\n",
521 recdb_name(state->recdb));
522 tevent_req_error(req, ret);
523 return;
526 pulldb_ext.db_id = recdb_id(state->recdb);
527 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
528 pulldb_ext.srvid = state->srvid;
530 ctdb_req_control_db_pull(&request, &pulldb_ext);
531 subreq = ctdb_client_control_send(state, state->ev, state->client,
532 state->pnn, TIMEOUT(), &request);
533 if (tevent_req_nomem(subreq, req)) {
534 return;
536 tevent_req_set_callback(subreq, pull_database_new_done, req);
539 static void pull_database_old_done(struct tevent_req *subreq)
541 struct tevent_req *req = tevent_req_callback_data(
542 subreq, struct tevent_req);
543 struct pull_database_state *state = tevent_req_data(
544 req, struct pull_database_state);
545 struct ctdb_reply_control *reply;
546 struct ctdb_rec_buffer *recbuf;
547 int ret;
548 bool status;
550 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
551 TALLOC_FREE(subreq);
552 if (! status) {
553 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
554 recdb_name(state->recdb), state->pnn, ret);
555 tevent_req_error(req, ret);
556 return;
559 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
560 talloc_free(reply);
561 if (ret != 0) {
562 tevent_req_error(req, ret);
563 return;
566 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
567 recbuf);
568 if (! status) {
569 talloc_free(recbuf);
570 tevent_req_error(req, EIO);
571 return;
574 state->num_records = recbuf->count;
575 talloc_free(recbuf);
577 D_INFO("Pulled %d records for db %s from node %d\n",
578 state->num_records, recdb_name(state->recdb), state->pnn);
580 tevent_req_done(req);
583 static void pull_database_new_done(struct tevent_req *subreq)
585 struct tevent_req *req = tevent_req_callback_data(
586 subreq, struct tevent_req);
587 struct pull_database_state *state = tevent_req_data(
588 req, struct pull_database_state);
589 struct ctdb_reply_control *reply;
590 uint32_t num_records;
591 int ret;
592 bool status;
594 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
595 TALLOC_FREE(subreq);
596 if (! status) {
597 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
598 recdb_name(state->recdb), state->pnn, ret);
599 state->result = ret;
600 goto unregister;
603 ret = ctdb_reply_control_db_pull(reply, &num_records);
604 talloc_free(reply);
605 if (num_records != state->num_records) {
606 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
607 num_records, state->num_records,
608 recdb_name(state->recdb));
609 state->result = EIO;
610 goto unregister;
613 D_INFO("Pulled %d records for db %s from node %d\n",
614 state->num_records, recdb_name(state->recdb), state->pnn);
616 unregister:
618 subreq = ctdb_client_remove_message_handler_send(
619 state, state->ev, state->client,
620 state->srvid, req);
621 if (tevent_req_nomem(subreq, req)) {
622 return;
624 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
627 static void pull_database_unregister_done(struct tevent_req *subreq)
629 struct tevent_req *req = tevent_req_callback_data(
630 subreq, struct tevent_req);
631 struct pull_database_state *state = tevent_req_data(
632 req, struct pull_database_state);
633 int ret;
634 bool status;
636 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
637 TALLOC_FREE(subreq);
638 if (! status) {
639 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
640 recdb_name(state->recdb));
641 tevent_req_error(req, ret);
642 return;
645 if (state->result != 0) {
646 tevent_req_error(req, state->result);
647 return;
650 tevent_req_done(req);
653 static bool pull_database_recv(struct tevent_req *req, int *perr)
655 return generic_recv(req, perr);
659 * Push database to specified nodes (old style)
662 struct push_database_old_state {
663 struct tevent_context *ev;
664 struct ctdb_client_context *client;
665 struct recdb_context *recdb;
666 uint32_t *pnn_list;
667 int count;
668 struct ctdb_rec_buffer *recbuf;
669 int index;
672 static void push_database_old_push_done(struct tevent_req *subreq);
674 static struct tevent_req *push_database_old_send(
675 TALLOC_CTX *mem_ctx,
676 struct tevent_context *ev,
677 struct ctdb_client_context *client,
678 uint32_t *pnn_list, int count,
679 struct recdb_context *recdb)
681 struct tevent_req *req, *subreq;
682 struct push_database_old_state *state;
683 struct ctdb_req_control request;
684 uint32_t pnn;
686 req = tevent_req_create(mem_ctx, &state,
687 struct push_database_old_state);
688 if (req == NULL) {
689 return NULL;
692 state->ev = ev;
693 state->client = client;
694 state->recdb = recdb;
695 state->pnn_list = pnn_list;
696 state->count = count;
697 state->index = 0;
699 state->recbuf = recdb_records(recdb, state,
700 ctdb_client_pnn(client));
701 if (tevent_req_nomem(state->recbuf, req)) {
702 return tevent_req_post(req, ev);
705 pnn = state->pnn_list[state->index];
707 ctdb_req_control_push_db(&request, state->recbuf);
708 subreq = ctdb_client_control_send(state, ev, client, pnn,
709 TIMEOUT(), &request);
710 if (tevent_req_nomem(subreq, req)) {
711 return tevent_req_post(req, ev);
713 tevent_req_set_callback(subreq, push_database_old_push_done, req);
715 return req;
718 static void push_database_old_push_done(struct tevent_req *subreq)
720 struct tevent_req *req = tevent_req_callback_data(
721 subreq, struct tevent_req);
722 struct push_database_old_state *state = tevent_req_data(
723 req, struct push_database_old_state);
724 struct ctdb_req_control request;
725 uint32_t pnn;
726 int ret;
727 bool status;
729 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
730 TALLOC_FREE(subreq);
731 if (! status) {
732 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
733 recdb_name(state->recdb), state->pnn_list[state->index],
734 ret);
735 tevent_req_error(req, ret);
736 return;
739 state->index += 1;
740 if (state->index == state->count) {
741 TALLOC_FREE(state->recbuf);
742 tevent_req_done(req);
743 return;
746 pnn = state->pnn_list[state->index];
748 ctdb_req_control_push_db(&request, state->recbuf);
749 subreq = ctdb_client_control_send(state, state->ev, state->client,
750 pnn, TIMEOUT(), &request);
751 if (tevent_req_nomem(subreq, req)) {
752 return;
754 tevent_req_set_callback(subreq, push_database_old_push_done, req);
757 static bool push_database_old_recv(struct tevent_req *req, int *perr)
759 return generic_recv(req, perr);
763 * Push database to specified nodes (new style)
766 struct push_database_new_state {
767 struct tevent_context *ev;
768 struct ctdb_client_context *client;
769 struct recdb_context *recdb;
770 uint32_t *pnn_list;
771 int count;
772 uint64_t srvid;
773 uint32_t dmaster;
774 int fd;
775 int num_buffers;
776 int num_buffers_sent;
777 int num_records;
780 static void push_database_new_started(struct tevent_req *subreq);
781 static void push_database_new_send_msg(struct tevent_req *req);
782 static void push_database_new_send_done(struct tevent_req *subreq);
783 static void push_database_new_confirmed(struct tevent_req *subreq);
785 static struct tevent_req *push_database_new_send(
786 TALLOC_CTX *mem_ctx,
787 struct tevent_context *ev,
788 struct ctdb_client_context *client,
789 uint32_t *pnn_list, int count,
790 struct recdb_context *recdb,
791 int max_size)
793 struct tevent_req *req, *subreq;
794 struct push_database_new_state *state;
795 struct ctdb_req_control request;
796 struct ctdb_pulldb_ext pulldb_ext;
797 char *filename;
798 off_t offset;
800 req = tevent_req_create(mem_ctx, &state,
801 struct push_database_new_state);
802 if (req == NULL) {
803 return NULL;
806 state->ev = ev;
807 state->client = client;
808 state->recdb = recdb;
809 state->pnn_list = pnn_list;
810 state->count = count;
812 state->srvid = srvid_next();
813 state->dmaster = ctdb_client_pnn(client);
814 state->num_buffers_sent = 0;
815 state->num_records = 0;
817 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
818 if (tevent_req_nomem(filename, req)) {
819 return tevent_req_post(req, ev);
822 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
823 if (state->fd == -1) {
824 tevent_req_error(req, errno);
825 return tevent_req_post(req, ev);
827 unlink(filename);
828 talloc_free(filename);
830 state->num_buffers = recdb_file(recdb, state, state->dmaster,
831 state->fd, max_size);
832 if (state->num_buffers == -1) {
833 tevent_req_error(req, ENOMEM);
834 return tevent_req_post(req, ev);
837 offset = lseek(state->fd, 0, SEEK_SET);
838 if (offset != 0) {
839 tevent_req_error(req, EIO);
840 return tevent_req_post(req, ev);
843 pulldb_ext.db_id = recdb_id(recdb);
844 pulldb_ext.srvid = state->srvid;
846 ctdb_req_control_db_push_start(&request, &pulldb_ext);
847 subreq = ctdb_client_control_multi_send(state, ev, client,
848 pnn_list, count,
849 TIMEOUT(), &request);
850 if (tevent_req_nomem(subreq, req)) {
851 return tevent_req_post(req, ev);
853 tevent_req_set_callback(subreq, push_database_new_started, req);
855 return req;
858 static void push_database_new_started(struct tevent_req *subreq)
860 struct tevent_req *req = tevent_req_callback_data(
861 subreq, struct tevent_req);
862 struct push_database_new_state *state = tevent_req_data(
863 req, struct push_database_new_state);
864 int *err_list;
865 int ret;
866 bool status;
868 status = ctdb_client_control_multi_recv(subreq, &ret, state,
869 &err_list, NULL);
870 TALLOC_FREE(subreq);
871 if (! status) {
872 int ret2;
873 uint32_t pnn;
875 ret2 = ctdb_client_control_multi_error(state->pnn_list,
876 state->count,
877 err_list, &pnn);
878 if (ret2 != 0) {
879 D_ERR("control DB_PUSH_START failed for db %s"
880 " on node %u, ret=%d\n",
881 recdb_name(state->recdb), pnn, ret2);
882 } else {
883 D_ERR("control DB_PUSH_START failed for db %s,"
884 " ret=%d\n",
885 recdb_name(state->recdb), ret);
887 talloc_free(err_list);
889 tevent_req_error(req, ret);
890 return;
893 push_database_new_send_msg(req);
896 static void push_database_new_send_msg(struct tevent_req *req)
898 struct push_database_new_state *state = tevent_req_data(
899 req, struct push_database_new_state);
900 struct tevent_req *subreq;
901 struct ctdb_rec_buffer *recbuf;
902 struct ctdb_req_message message;
903 TDB_DATA data;
904 size_t np;
905 int ret;
907 if (state->num_buffers_sent == state->num_buffers) {
908 struct ctdb_req_control request;
910 ctdb_req_control_db_push_confirm(&request,
911 recdb_id(state->recdb));
912 subreq = ctdb_client_control_multi_send(state, state->ev,
913 state->client,
914 state->pnn_list,
915 state->count,
916 TIMEOUT(), &request);
917 if (tevent_req_nomem(subreq, req)) {
918 return;
920 tevent_req_set_callback(subreq, push_database_new_confirmed,
921 req);
922 return;
925 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
926 if (ret != 0) {
927 tevent_req_error(req, ret);
928 return;
931 data.dsize = ctdb_rec_buffer_len(recbuf);
932 data.dptr = talloc_size(state, data.dsize);
933 if (tevent_req_nomem(data.dptr, req)) {
934 return;
937 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
939 message.srvid = state->srvid;
940 message.data.data = data;
942 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
943 state->num_buffers_sent, recbuf->count,
944 recdb_name(state->recdb));
946 subreq = ctdb_client_message_multi_send(state, state->ev,
947 state->client,
948 state->pnn_list, state->count,
949 &message);
950 if (tevent_req_nomem(subreq, req)) {
951 return;
953 tevent_req_set_callback(subreq, push_database_new_send_done, req);
955 state->num_records += recbuf->count;
957 talloc_free(data.dptr);
958 talloc_free(recbuf);
961 static void push_database_new_send_done(struct tevent_req *subreq)
963 struct tevent_req *req = tevent_req_callback_data(
964 subreq, struct tevent_req);
965 struct push_database_new_state *state = tevent_req_data(
966 req, struct push_database_new_state);
967 bool status;
968 int ret;
970 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
971 TALLOC_FREE(subreq);
972 if (! status) {
973 D_ERR("Sending recovery records failed for %s\n",
974 recdb_name(state->recdb));
975 tevent_req_error(req, ret);
976 return;
979 state->num_buffers_sent += 1;
981 push_database_new_send_msg(req);
984 static void push_database_new_confirmed(struct tevent_req *subreq)
986 struct tevent_req *req = tevent_req_callback_data(
987 subreq, struct tevent_req);
988 struct push_database_new_state *state = tevent_req_data(
989 req, struct push_database_new_state);
990 struct ctdb_reply_control **reply;
991 int *err_list;
992 bool status;
993 int ret, i;
994 uint32_t num_records;
996 status = ctdb_client_control_multi_recv(subreq, &ret, state,
997 &err_list, &reply);
998 TALLOC_FREE(subreq);
999 if (! status) {
1000 int ret2;
1001 uint32_t pnn;
1003 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1004 state->count, err_list,
1005 &pnn);
1006 if (ret2 != 0) {
1007 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1008 " on node %u, ret=%d\n",
1009 recdb_name(state->recdb), pnn, ret2);
1010 } else {
1011 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1012 " ret=%d\n",
1013 recdb_name(state->recdb), ret);
1015 tevent_req_error(req, ret);
1016 return;
1019 for (i=0; i<state->count; i++) {
1020 ret = ctdb_reply_control_db_push_confirm(reply[i],
1021 &num_records);
1022 if (ret != 0) {
1023 tevent_req_error(req, EPROTO);
1024 return;
1027 if (num_records != state->num_records) {
1028 D_ERR("Node %u received %d of %d records for %s\n",
1029 state->pnn_list[i], num_records,
1030 state->num_records, recdb_name(state->recdb));
1031 tevent_req_error(req, EPROTO);
1032 return;
1036 talloc_free(reply);
1038 D_INFO("Pushed %d records for db %s\n",
1039 state->num_records, recdb_name(state->recdb));
1041 tevent_req_done(req);
1044 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1046 return generic_recv(req, perr);
1050 * wrapper for push_database_old and push_database_new
1053 struct push_database_state {
1054 bool old_done, new_done;
1057 static void push_database_old_done(struct tevent_req *subreq);
1058 static void push_database_new_done(struct tevent_req *subreq);
1060 static struct tevent_req *push_database_send(
1061 TALLOC_CTX *mem_ctx,
1062 struct tevent_context *ev,
1063 struct ctdb_client_context *client,
1064 uint32_t *pnn_list, int count, uint32_t *caps,
1065 struct ctdb_tunable_list *tun_list,
1066 struct recdb_context *recdb)
1068 struct tevent_req *req, *subreq;
1069 struct push_database_state *state;
1070 uint32_t *old_list, *new_list;
1071 unsigned int old_count, new_count;
1072 int i;
1074 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1075 if (req == NULL) {
1076 return NULL;
1079 state->old_done = false;
1080 state->new_done = false;
1082 old_count = 0;
1083 new_count = 0;
1084 old_list = talloc_array(state, uint32_t, count);
1085 new_list = talloc_array(state, uint32_t, count);
1086 if (tevent_req_nomem(old_list, req) ||
1087 tevent_req_nomem(new_list,req)) {
1088 return tevent_req_post(req, ev);
1091 for (i=0; i<count; i++) {
1092 uint32_t pnn = pnn_list[i];
1094 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1095 new_list[new_count] = pnn;
1096 new_count += 1;
1097 } else {
1098 old_list[old_count] = pnn;
1099 old_count += 1;
1103 if (old_count > 0) {
1104 subreq = push_database_old_send(state, ev, client,
1105 old_list, old_count, recdb);
1106 if (tevent_req_nomem(subreq, req)) {
1107 return tevent_req_post(req, ev);
1109 tevent_req_set_callback(subreq, push_database_old_done, req);
1110 } else {
1111 state->old_done = true;
1114 if (new_count > 0) {
1115 subreq = push_database_new_send(state, ev, client,
1116 new_list, new_count, recdb,
1117 tun_list->rec_buffer_size_limit);
1118 if (tevent_req_nomem(subreq, req)) {
1119 return tevent_req_post(req, ev);
1121 tevent_req_set_callback(subreq, push_database_new_done, req);
1122 } else {
1123 state->new_done = true;
1126 return req;
1129 static void push_database_old_done(struct tevent_req *subreq)
1131 struct tevent_req *req = tevent_req_callback_data(
1132 subreq, struct tevent_req);
1133 struct push_database_state *state = tevent_req_data(
1134 req, struct push_database_state);
1135 bool status;
1136 int ret;
1138 status = push_database_old_recv(subreq, &ret);
1139 if (! status) {
1140 tevent_req_error(req, ret);
1141 return;
1144 state->old_done = true;
1146 if (state->old_done && state->new_done) {
1147 tevent_req_done(req);
1151 static void push_database_new_done(struct tevent_req *subreq)
1153 struct tevent_req *req = tevent_req_callback_data(
1154 subreq, struct tevent_req);
1155 struct push_database_state *state = tevent_req_data(
1156 req, struct push_database_state);
1157 bool status;
1158 int ret;
1160 status = push_database_new_recv(subreq, &ret);
1161 if (! status) {
1162 tevent_req_error(req, ret);
1163 return;
1166 state->new_done = true;
1168 if (state->old_done && state->new_done) {
1169 tevent_req_done(req);
1173 static bool push_database_recv(struct tevent_req *req, int *perr)
1175 return generic_recv(req, perr);
1179 * Collect databases using highest sequence number
1182 struct collect_highseqnum_db_state {
1183 struct tevent_context *ev;
1184 struct ctdb_client_context *client;
1185 uint32_t *pnn_list;
1186 int count;
1187 uint32_t *caps;
1188 uint32_t *ban_credits;
1189 uint32_t db_id;
1190 struct recdb_context *recdb;
1191 uint32_t max_pnn;
1194 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1195 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1197 static struct tevent_req *collect_highseqnum_db_send(
1198 TALLOC_CTX *mem_ctx,
1199 struct tevent_context *ev,
1200 struct ctdb_client_context *client,
1201 uint32_t *pnn_list, int count, uint32_t *caps,
1202 uint32_t *ban_credits, uint32_t db_id,
1203 struct recdb_context *recdb)
1205 struct tevent_req *req, *subreq;
1206 struct collect_highseqnum_db_state *state;
1207 struct ctdb_req_control request;
1209 req = tevent_req_create(mem_ctx, &state,
1210 struct collect_highseqnum_db_state);
1211 if (req == NULL) {
1212 return NULL;
1215 state->ev = ev;
1216 state->client = client;
1217 state->pnn_list = pnn_list;
1218 state->count = count;
1219 state->caps = caps;
1220 state->ban_credits = ban_credits;
1221 state->db_id = db_id;
1222 state->recdb = recdb;
1224 ctdb_req_control_get_db_seqnum(&request, db_id);
1225 subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1226 state->pnn_list, state->count,
1227 TIMEOUT(), &request);
1228 if (tevent_req_nomem(subreq, req)) {
1229 return tevent_req_post(req, ev);
1231 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1232 req);
1234 return req;
1237 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1239 struct tevent_req *req = tevent_req_callback_data(
1240 subreq, struct tevent_req);
1241 struct collect_highseqnum_db_state *state = tevent_req_data(
1242 req, struct collect_highseqnum_db_state);
1243 struct ctdb_reply_control **reply;
1244 int *err_list;
1245 bool status;
1246 int ret, i;
1247 uint64_t seqnum, max_seqnum;
1249 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1250 &err_list, &reply);
1251 TALLOC_FREE(subreq);
1252 if (! status) {
1253 int ret2;
1254 uint32_t pnn;
1256 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1257 state->count, err_list,
1258 &pnn);
1259 if (ret2 != 0) {
1260 D_ERR("control GET_DB_SEQNUM failed for db %s"
1261 " on node %u, ret=%d\n",
1262 recdb_name(state->recdb), pnn, ret2);
1263 } else {
1264 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1265 " ret=%d\n",
1266 recdb_name(state->recdb), ret);
1268 tevent_req_error(req, ret);
1269 return;
1272 max_seqnum = 0;
1273 state->max_pnn = state->pnn_list[0];
1274 for (i=0; i<state->count; i++) {
1275 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1276 if (ret != 0) {
1277 tevent_req_error(req, EPROTO);
1278 return;
1281 if (max_seqnum < seqnum) {
1282 max_seqnum = seqnum;
1283 state->max_pnn = state->pnn_list[i];
1287 talloc_free(reply);
1289 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1290 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1292 subreq = pull_database_send(state, state->ev, state->client,
1293 state->max_pnn,
1294 state->caps[state->max_pnn],
1295 state->recdb);
1296 if (tevent_req_nomem(subreq, req)) {
1297 return;
1299 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1300 req);
1303 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1305 struct tevent_req *req = tevent_req_callback_data(
1306 subreq, struct tevent_req);
1307 struct collect_highseqnum_db_state *state = tevent_req_data(
1308 req, struct collect_highseqnum_db_state);
1309 int ret;
1310 bool status;
1312 status = pull_database_recv(subreq, &ret);
1313 TALLOC_FREE(subreq);
1314 if (! status) {
1315 state->ban_credits[state->max_pnn] += 1;
1316 tevent_req_error(req, ret);
1317 return;
1320 tevent_req_done(req);
1323 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1325 return generic_recv(req, perr);
1329 * Collect all databases
1332 struct collect_all_db_state {
1333 struct tevent_context *ev;
1334 struct ctdb_client_context *client;
1335 uint32_t *pnn_list;
1336 int count;
1337 uint32_t *caps;
1338 uint32_t *ban_credits;
1339 uint32_t db_id;
1340 struct recdb_context *recdb;
1341 struct ctdb_pulldb pulldb;
1342 int index;
1345 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1347 static struct tevent_req *collect_all_db_send(
1348 TALLOC_CTX *mem_ctx,
1349 struct tevent_context *ev,
1350 struct ctdb_client_context *client,
1351 uint32_t *pnn_list, int count, uint32_t *caps,
1352 uint32_t *ban_credits, uint32_t db_id,
1353 struct recdb_context *recdb)
1355 struct tevent_req *req, *subreq;
1356 struct collect_all_db_state *state;
1357 uint32_t pnn;
1359 req = tevent_req_create(mem_ctx, &state,
1360 struct collect_all_db_state);
1361 if (req == NULL) {
1362 return NULL;
1365 state->ev = ev;
1366 state->client = client;
1367 state->pnn_list = pnn_list;
1368 state->count = count;
1369 state->caps = caps;
1370 state->ban_credits = ban_credits;
1371 state->db_id = db_id;
1372 state->recdb = recdb;
1373 state->index = 0;
1375 pnn = state->pnn_list[state->index];
1377 subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1378 if (tevent_req_nomem(subreq, req)) {
1379 return tevent_req_post(req, ev);
1381 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1383 return req;
1386 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1388 struct tevent_req *req = tevent_req_callback_data(
1389 subreq, struct tevent_req);
1390 struct collect_all_db_state *state = tevent_req_data(
1391 req, struct collect_all_db_state);
1392 uint32_t pnn;
1393 int ret;
1394 bool status;
1396 status = pull_database_recv(subreq, &ret);
1397 TALLOC_FREE(subreq);
1398 if (! status) {
1399 pnn = state->pnn_list[state->index];
1400 state->ban_credits[pnn] += 1;
1401 tevent_req_error(req, ret);
1402 return;
1405 state->index += 1;
1406 if (state->index == state->count) {
1407 tevent_req_done(req);
1408 return;
1411 pnn = state->pnn_list[state->index];
1412 subreq = pull_database_send(state, state->ev, state->client,
1413 pnn, state->caps[pnn], state->recdb);
1414 if (tevent_req_nomem(subreq, req)) {
1415 return;
1417 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1420 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1422 return generic_recv(req, perr);
1427 * For each database do the following:
1428 * - Get DB name
1429 * - Get DB path
1430 * - Freeze database on all nodes
1431 * - Start transaction on all nodes
1432 * - Collect database from all nodes
1433 * - Wipe database on all nodes
1434 * - Push database to all nodes
1435 * - Commit transaction on all nodes
1436 * - Thaw database on all nodes
1439 struct recover_db_state {
1440 struct tevent_context *ev;
1441 struct ctdb_client_context *client;
1442 struct ctdb_tunable_list *tun_list;
1443 uint32_t *pnn_list;
1444 int count;
1445 uint32_t *caps;
1446 uint32_t *ban_credits;
1447 uint32_t db_id;
1448 uint8_t db_flags;
1450 uint32_t destnode;
1451 struct ctdb_transdb transdb;
1453 const char *db_name, *db_path;
1454 struct recdb_context *recdb;
1457 static void recover_db_name_done(struct tevent_req *subreq);
1458 static void recover_db_path_done(struct tevent_req *subreq);
1459 static void recover_db_freeze_done(struct tevent_req *subreq);
1460 static void recover_db_transaction_started(struct tevent_req *subreq);
1461 static void recover_db_collect_done(struct tevent_req *subreq);
1462 static void recover_db_wipedb_done(struct tevent_req *subreq);
1463 static void recover_db_pushdb_done(struct tevent_req *subreq);
1464 static void recover_db_transaction_committed(struct tevent_req *subreq);
1465 static void recover_db_thaw_done(struct tevent_req *subreq);
1467 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1468 struct tevent_context *ev,
1469 struct ctdb_client_context *client,
1470 struct ctdb_tunable_list *tun_list,
1471 uint32_t *pnn_list, int count,
1472 uint32_t *caps,
1473 uint32_t *ban_credits,
1474 uint32_t generation,
1475 uint32_t db_id, uint8_t db_flags)
1477 struct tevent_req *req, *subreq;
1478 struct recover_db_state *state;
1479 struct ctdb_req_control request;
1481 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1482 if (req == NULL) {
1483 return NULL;
1486 state->ev = ev;
1487 state->client = client;
1488 state->tun_list = tun_list;
1489 state->pnn_list = pnn_list;
1490 state->count = count;
1491 state->caps = caps;
1492 state->ban_credits = ban_credits;
1493 state->db_id = db_id;
1494 state->db_flags = db_flags;
1496 state->destnode = ctdb_client_pnn(client);
1497 state->transdb.db_id = db_id;
1498 state->transdb.tid = generation;
1500 ctdb_req_control_get_dbname(&request, db_id);
1501 subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1502 TIMEOUT(), &request);
1503 if (tevent_req_nomem(subreq, req)) {
1504 return tevent_req_post(req, ev);
1506 tevent_req_set_callback(subreq, recover_db_name_done, req);
1508 return req;
1511 static void recover_db_name_done(struct tevent_req *subreq)
1513 struct tevent_req *req = tevent_req_callback_data(
1514 subreq, struct tevent_req);
1515 struct recover_db_state *state = tevent_req_data(
1516 req, struct recover_db_state);
1517 struct ctdb_reply_control *reply;
1518 struct ctdb_req_control request;
1519 int ret;
1520 bool status;
1522 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1523 TALLOC_FREE(subreq);
1524 if (! status) {
1525 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1526 state->db_id, ret);
1527 tevent_req_error(req, ret);
1528 return;
1531 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1532 if (ret != 0) {
1533 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1534 state->db_id, ret);
1535 tevent_req_error(req, EPROTO);
1536 return;
1539 talloc_free(reply);
1541 ctdb_req_control_getdbpath(&request, state->db_id);
1542 subreq = ctdb_client_control_send(state, state->ev, state->client,
1543 state->destnode, TIMEOUT(),
1544 &request);
1545 if (tevent_req_nomem(subreq, req)) {
1546 return;
1548 tevent_req_set_callback(subreq, recover_db_path_done, req);
1551 static void recover_db_path_done(struct tevent_req *subreq)
1553 struct tevent_req *req = tevent_req_callback_data(
1554 subreq, struct tevent_req);
1555 struct recover_db_state *state = tevent_req_data(
1556 req, struct recover_db_state);
1557 struct ctdb_reply_control *reply;
1558 struct ctdb_req_control request;
1559 int ret;
1560 bool status;
1562 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1563 TALLOC_FREE(subreq);
1564 if (! status) {
1565 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1566 state->db_name, ret);
1567 tevent_req_error(req, ret);
1568 return;
1571 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1572 if (ret != 0) {
1573 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1574 state->db_name, ret);
1575 tevent_req_error(req, EPROTO);
1576 return;
1579 talloc_free(reply);
1581 ctdb_req_control_db_freeze(&request, state->db_id);
1582 subreq = ctdb_client_control_multi_send(state, state->ev,
1583 state->client,
1584 state->pnn_list, state->count,
1585 TIMEOUT(), &request);
1586 if (tevent_req_nomem(subreq, req)) {
1587 return;
1589 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1592 static void recover_db_freeze_done(struct tevent_req *subreq)
1594 struct tevent_req *req = tevent_req_callback_data(
1595 subreq, struct tevent_req);
1596 struct recover_db_state *state = tevent_req_data(
1597 req, struct recover_db_state);
1598 struct ctdb_req_control request;
1599 int *err_list;
1600 int ret;
1601 bool status;
1603 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1604 NULL);
1605 TALLOC_FREE(subreq);
1606 if (! status) {
1607 int ret2;
1608 uint32_t pnn;
1610 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1611 state->count, err_list,
1612 &pnn);
1613 if (ret2 != 0) {
1614 D_ERR("control FREEZE_DB failed for db %s"
1615 " on node %u, ret=%d\n",
1616 state->db_name, pnn, ret2);
1617 state->ban_credits[pnn] += 1;
1618 } else {
1619 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1620 state->db_name, ret);
1622 tevent_req_error(req, ret);
1623 return;
1626 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1627 subreq = ctdb_client_control_multi_send(state, state->ev,
1628 state->client,
1629 state->pnn_list, state->count,
1630 TIMEOUT(), &request);
1631 if (tevent_req_nomem(subreq, req)) {
1632 return;
1634 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1637 static void recover_db_transaction_started(struct tevent_req *subreq)
1639 struct tevent_req *req = tevent_req_callback_data(
1640 subreq, struct tevent_req);
1641 struct recover_db_state *state = tevent_req_data(
1642 req, struct recover_db_state);
1643 int *err_list;
1644 int ret;
1645 bool status;
1647 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1648 NULL);
1649 TALLOC_FREE(subreq);
1650 if (! status) {
1651 int ret2;
1652 uint32_t pnn;
1654 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1655 state->count,
1656 err_list, &pnn);
1657 if (ret2 != 0) {
1658 D_ERR("control TRANSACTION_DB failed for db=%s"
1659 " on node %u, ret=%d\n",
1660 state->db_name, pnn, ret2);
1661 } else {
1662 D_ERR("control TRANSACTION_DB failed for db=%s,"
1663 " ret=%d\n", state->db_name, ret);
1665 tevent_req_error(req, ret);
1666 return;
1669 state->recdb = recdb_create(state, state->db_id, state->db_name,
1670 state->db_path,
1671 state->tun_list->database_hash_size,
1672 state->db_flags & CTDB_DB_FLAGS_PERSISTENT);
1673 if (tevent_req_nomem(state->recdb, req)) {
1674 return;
1677 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1678 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1679 subreq = collect_highseqnum_db_send(
1680 state, state->ev, state->client,
1681 state->pnn_list, state->count, state->caps,
1682 state->ban_credits, state->db_id,
1683 state->recdb);
1684 } else {
1685 subreq = collect_all_db_send(
1686 state, state->ev, state->client,
1687 state->pnn_list, state->count, state->caps,
1688 state->ban_credits, state->db_id,
1689 state->recdb);
1691 if (tevent_req_nomem(subreq, req)) {
1692 return;
1694 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1697 static void recover_db_collect_done(struct tevent_req *subreq)
1699 struct tevent_req *req = tevent_req_callback_data(
1700 subreq, struct tevent_req);
1701 struct recover_db_state *state = tevent_req_data(
1702 req, struct recover_db_state);
1703 struct ctdb_req_control request;
1704 int ret;
1705 bool status;
1707 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1708 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1709 status = collect_highseqnum_db_recv(subreq, &ret);
1710 } else {
1711 status = collect_all_db_recv(subreq, &ret);
1713 TALLOC_FREE(subreq);
1714 if (! status) {
1715 tevent_req_error(req, ret);
1716 return;
1719 ctdb_req_control_wipe_database(&request, &state->transdb);
1720 subreq = ctdb_client_control_multi_send(state, state->ev,
1721 state->client,
1722 state->pnn_list, state->count,
1723 TIMEOUT(), &request);
1724 if (tevent_req_nomem(subreq, req)) {
1725 return;
1727 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1730 static void recover_db_wipedb_done(struct tevent_req *subreq)
1732 struct tevent_req *req = tevent_req_callback_data(
1733 subreq, struct tevent_req);
1734 struct recover_db_state *state = tevent_req_data(
1735 req, struct recover_db_state);
1736 int *err_list;
1737 int ret;
1738 bool status;
1740 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1741 NULL);
1742 TALLOC_FREE(subreq);
1743 if (! status) {
1744 int ret2;
1745 uint32_t pnn;
1747 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1748 state->count,
1749 err_list, &pnn);
1750 if (ret2 != 0) {
1751 D_ERR("control WIPEDB failed for db %s on node %u,"
1752 " ret=%d\n", state->db_name, pnn, ret2);
1753 } else {
1754 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1755 state->db_name, ret);
1757 tevent_req_error(req, ret);
1758 return;
1761 subreq = push_database_send(state, state->ev, state->client,
1762 state->pnn_list, state->count,
1763 state->caps, state->tun_list,
1764 state->recdb);
1765 if (tevent_req_nomem(subreq, req)) {
1766 return;
1768 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1771 static void recover_db_pushdb_done(struct tevent_req *subreq)
1773 struct tevent_req *req = tevent_req_callback_data(
1774 subreq, struct tevent_req);
1775 struct recover_db_state *state = tevent_req_data(
1776 req, struct recover_db_state);
1777 struct ctdb_req_control request;
1778 int ret;
1779 bool status;
1781 status = push_database_recv(subreq, &ret);
1782 TALLOC_FREE(subreq);
1783 if (! status) {
1784 tevent_req_error(req, ret);
1785 return;
1788 TALLOC_FREE(state->recdb);
1790 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1791 subreq = ctdb_client_control_multi_send(state, state->ev,
1792 state->client,
1793 state->pnn_list, state->count,
1794 TIMEOUT(), &request);
1795 if (tevent_req_nomem(subreq, req)) {
1796 return;
1798 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1801 static void recover_db_transaction_committed(struct tevent_req *subreq)
1803 struct tevent_req *req = tevent_req_callback_data(
1804 subreq, struct tevent_req);
1805 struct recover_db_state *state = tevent_req_data(
1806 req, struct recover_db_state);
1807 struct ctdb_req_control request;
1808 int *err_list;
1809 int ret;
1810 bool status;
1812 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1813 NULL);
1814 TALLOC_FREE(subreq);
1815 if (! status) {
1816 int ret2;
1817 uint32_t pnn;
1819 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1820 state->count,
1821 err_list, &pnn);
1822 if (ret2 != 0) {
1823 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1824 " on node %u, ret=%d\n",
1825 state->db_name, pnn, ret2);
1826 } else {
1827 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1828 " ret=%d\n", state->db_name, ret);
1830 tevent_req_error(req, ret);
1831 return;
1834 ctdb_req_control_db_thaw(&request, state->db_id);
1835 subreq = ctdb_client_control_multi_send(state, state->ev,
1836 state->client,
1837 state->pnn_list, state->count,
1838 TIMEOUT(), &request);
1839 if (tevent_req_nomem(subreq, req)) {
1840 return;
1842 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1845 static void recover_db_thaw_done(struct tevent_req *subreq)
1847 struct tevent_req *req = tevent_req_callback_data(
1848 subreq, struct tevent_req);
1849 struct recover_db_state *state = tevent_req_data(
1850 req, struct recover_db_state);
1851 int *err_list;
1852 int ret;
1853 bool status;
1855 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1856 NULL);
1857 TALLOC_FREE(subreq);
1858 if (! status) {
1859 int ret2;
1860 uint32_t pnn;
1862 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1863 state->count,
1864 err_list, &pnn);
1865 if (ret2 != 0) {
1866 D_ERR("control DB_THAW failed for db %s on node %u,"
1867 " ret=%d\n", state->db_name, pnn, ret2);
1868 } else {
1869 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1870 state->db_name, ret);
1872 tevent_req_error(req, ret);
1873 return;
1876 tevent_req_done(req);
1879 static bool recover_db_recv(struct tevent_req *req)
1881 return generic_recv(req, NULL);
1886 * Start database recovery for each database
1888 * Try to recover each database 5 times before failing recovery.
1891 struct db_recovery_state {
1892 struct tevent_context *ev;
1893 struct ctdb_dbid_map *dbmap;
1894 int num_replies;
1895 int num_failed;
1898 struct db_recovery_one_state {
1899 struct tevent_req *req;
1900 struct ctdb_client_context *client;
1901 struct ctdb_dbid_map *dbmap;
1902 struct ctdb_tunable_list *tun_list;
1903 uint32_t *pnn_list;
1904 int count;
1905 uint32_t *caps;
1906 uint32_t *ban_credits;
1907 uint32_t generation;
1908 uint32_t db_id;
1909 uint8_t db_flags;
1910 int num_fails;
1913 static void db_recovery_one_done(struct tevent_req *subreq);
1915 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1916 struct tevent_context *ev,
1917 struct ctdb_client_context *client,
1918 struct ctdb_dbid_map *dbmap,
1919 struct ctdb_tunable_list *tun_list,
1920 uint32_t *pnn_list, int count,
1921 uint32_t *caps,
1922 uint32_t *ban_credits,
1923 uint32_t generation)
1925 struct tevent_req *req, *subreq;
1926 struct db_recovery_state *state;
1927 int i;
1929 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1930 if (req == NULL) {
1931 return NULL;
1934 state->ev = ev;
1935 state->dbmap = dbmap;
1936 state->num_replies = 0;
1937 state->num_failed = 0;
1939 if (dbmap->num == 0) {
1940 tevent_req_done(req);
1941 return tevent_req_post(req, ev);
1944 for (i=0; i<dbmap->num; i++) {
1945 struct db_recovery_one_state *substate;
1947 substate = talloc_zero(state, struct db_recovery_one_state);
1948 if (tevent_req_nomem(substate, req)) {
1949 return tevent_req_post(req, ev);
1952 substate->req = req;
1953 substate->client = client;
1954 substate->dbmap = dbmap;
1955 substate->tun_list = tun_list;
1956 substate->pnn_list = pnn_list;
1957 substate->count = count;
1958 substate->caps = caps;
1959 substate->ban_credits = ban_credits;
1960 substate->generation = generation;
1961 substate->db_id = dbmap->dbs[i].db_id;
1962 substate->db_flags = dbmap->dbs[i].flags;
1964 subreq = recover_db_send(state, ev, client, tun_list,
1965 pnn_list, count, caps, ban_credits,
1966 generation, substate->db_id,
1967 substate->db_flags);
1968 if (tevent_req_nomem(subreq, req)) {
1969 return tevent_req_post(req, ev);
1971 tevent_req_set_callback(subreq, db_recovery_one_done,
1972 substate);
1973 D_NOTICE("recover database 0x%08x\n", substate->db_id);
1976 return req;
1979 static void db_recovery_one_done(struct tevent_req *subreq)
1981 struct db_recovery_one_state *substate = tevent_req_callback_data(
1982 subreq, struct db_recovery_one_state);
1983 struct tevent_req *req = substate->req;
1984 struct db_recovery_state *state = tevent_req_data(
1985 req, struct db_recovery_state);
1986 bool status;
1988 status = recover_db_recv(subreq);
1989 TALLOC_FREE(subreq);
1991 if (status) {
1992 talloc_free(substate);
1993 goto done;
1996 substate->num_fails += 1;
1997 if (substate->num_fails < NUM_RETRIES) {
1998 subreq = recover_db_send(state, state->ev, substate->client,
1999 substate->tun_list,
2000 substate->pnn_list, substate->count,
2001 substate->caps, substate->ban_credits,
2002 substate->generation, substate->db_id,
2003 substate->db_flags);
2004 if (tevent_req_nomem(subreq, req)) {
2005 goto failed;
2007 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2008 D_NOTICE("recover database 0x%08x, attempt %d\n",
2009 substate->db_id, substate->num_fails+1);
2010 return;
2013 failed:
2014 state->num_failed += 1;
2016 done:
2017 state->num_replies += 1;
2019 if (state->num_replies == state->dbmap->num) {
2020 tevent_req_done(req);
2024 static bool db_recovery_recv(struct tevent_req *req, int *count)
2026 struct db_recovery_state *state = tevent_req_data(
2027 req, struct db_recovery_state);
2028 int err;
2030 if (tevent_req_is_unix_error(req, &err)) {
2031 *count = 0;
2032 return false;
2035 *count = state->num_replies - state->num_failed;
2037 if (state->num_failed > 0) {
2038 return false;
2041 return true;
2046 * Run the parallel database recovery
2048 * - Get tunables
2049 * - Get nodemap
2050 * - Get vnnmap
2051 * - Get capabilities from all nodes
2052 * - Get dbmap
2053 * - Set RECOVERY_ACTIVE
2054 * - Send START_RECOVERY
2055 * - Update vnnmap on all nodes
2056 * - Run database recovery
2057 * - Set RECOVERY_NORMAL
2058 * - Send END_RECOVERY
2061 struct recovery_state {
2062 struct tevent_context *ev;
2063 struct ctdb_client_context *client;
2064 uint32_t generation;
2065 uint32_t *pnn_list;
2066 int count;
2067 uint32_t destnode;
2068 struct ctdb_node_map *nodemap;
2069 uint32_t *caps;
2070 uint32_t *ban_credits;
2071 struct ctdb_tunable_list *tun_list;
2072 struct ctdb_vnn_map *vnnmap;
2073 struct ctdb_dbid_map *dbmap;
2076 static void recovery_tunables_done(struct tevent_req *subreq);
2077 static void recovery_nodemap_done(struct tevent_req *subreq);
2078 static void recovery_vnnmap_done(struct tevent_req *subreq);
2079 static void recovery_capabilities_done(struct tevent_req *subreq);
2080 static void recovery_dbmap_done(struct tevent_req *subreq);
2081 static void recovery_active_done(struct tevent_req *subreq);
2082 static void recovery_start_recovery_done(struct tevent_req *subreq);
2083 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2084 static void recovery_db_recovery_done(struct tevent_req *subreq);
2085 static void recovery_failed_done(struct tevent_req *subreq);
2086 static void recovery_normal_done(struct tevent_req *subreq);
2087 static void recovery_end_recovery_done(struct tevent_req *subreq);
2089 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2090 struct tevent_context *ev,
2091 struct ctdb_client_context *client,
2092 uint32_t generation)
2094 struct tevent_req *req, *subreq;
2095 struct recovery_state *state;
2096 struct ctdb_req_control request;
2098 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2099 if (req == NULL) {
2100 return NULL;
2103 state->ev = ev;
2104 state->client = client;
2105 state->generation = generation;
2106 state->destnode = ctdb_client_pnn(client);
2108 ctdb_req_control_get_all_tunables(&request);
2109 subreq = ctdb_client_control_send(state, state->ev, state->client,
2110 state->destnode, TIMEOUT(),
2111 &request);
2112 if (tevent_req_nomem(subreq, req)) {
2113 return tevent_req_post(req, ev);
2115 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2117 return req;
2120 static void recovery_tunables_done(struct tevent_req *subreq)
2122 struct tevent_req *req = tevent_req_callback_data(
2123 subreq, struct tevent_req);
2124 struct recovery_state *state = tevent_req_data(
2125 req, struct recovery_state);
2126 struct ctdb_reply_control *reply;
2127 struct ctdb_req_control request;
2128 int ret;
2129 bool status;
2131 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2132 TALLOC_FREE(subreq);
2133 if (! status) {
2134 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2135 tevent_req_error(req, ret);
2136 return;
2139 ret = ctdb_reply_control_get_all_tunables(reply, state,
2140 &state->tun_list);
2141 if (ret != 0) {
2142 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2143 tevent_req_error(req, EPROTO);
2144 return;
2147 talloc_free(reply);
2149 recover_timeout = state->tun_list->recover_timeout;
2151 ctdb_req_control_get_nodemap(&request);
2152 subreq = ctdb_client_control_send(state, state->ev, state->client,
2153 state->destnode, TIMEOUT(),
2154 &request);
2155 if (tevent_req_nomem(subreq, req)) {
2156 return;
2158 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2161 static void recovery_nodemap_done(struct tevent_req *subreq)
2163 struct tevent_req *req = tevent_req_callback_data(
2164 subreq, struct tevent_req);
2165 struct recovery_state *state = tevent_req_data(
2166 req, struct recovery_state);
2167 struct ctdb_reply_control *reply;
2168 struct ctdb_req_control request;
2169 bool status;
2170 int ret;
2172 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2173 TALLOC_FREE(subreq);
2174 if (! status) {
2175 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2176 state->destnode, ret);
2177 tevent_req_error(req, ret);
2178 return;
2181 ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2182 if (ret != 0) {
2183 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2184 tevent_req_error(req, ret);
2185 return;
2188 state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2189 state, &state->pnn_list);
2190 if (state->count <= 0) {
2191 tevent_req_error(req, ENOMEM);
2192 return;
2195 state->ban_credits = talloc_zero_array(state, uint32_t,
2196 state->nodemap->num);
2197 if (tevent_req_nomem(state->ban_credits, req)) {
2198 return;
2201 ctdb_req_control_getvnnmap(&request);
2202 subreq = ctdb_client_control_send(state, state->ev, state->client,
2203 state->destnode, TIMEOUT(),
2204 &request);
2205 if (tevent_req_nomem(subreq, req)) {
2206 return;
2208 tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2211 static void recovery_vnnmap_done(struct tevent_req *subreq)
2213 struct tevent_req *req = tevent_req_callback_data(
2214 subreq, struct tevent_req);
2215 struct recovery_state *state = tevent_req_data(
2216 req, struct recovery_state);
2217 struct ctdb_reply_control *reply;
2218 struct ctdb_req_control request;
2219 bool status;
2220 int ret;
2222 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2223 TALLOC_FREE(subreq);
2224 if (! status) {
2225 D_ERR("control GETVNNMAP failed to node %u, ret=%d\n",
2226 state->destnode, ret);
2227 tevent_req_error(req, ret);
2228 return;
2231 ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2232 if (ret != 0) {
2233 D_ERR("control GETVNNMAP failed, ret=%d\n", ret);
2234 tevent_req_error(req, ret);
2235 return;
2238 ctdb_req_control_get_capabilities(&request);
2239 subreq = ctdb_client_control_multi_send(state, state->ev,
2240 state->client,
2241 state->pnn_list, state->count,
2242 TIMEOUT(), &request);
2243 if (tevent_req_nomem(subreq, req)) {
2244 return;
2246 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2249 static void recovery_capabilities_done(struct tevent_req *subreq)
2251 struct tevent_req *req = tevent_req_callback_data(
2252 subreq, struct tevent_req);
2253 struct recovery_state *state = tevent_req_data(
2254 req, struct recovery_state);
2255 struct ctdb_reply_control **reply;
2256 struct ctdb_req_control request;
2257 int *err_list;
2258 int ret, i;
2259 bool status;
2261 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2262 &reply);
2263 TALLOC_FREE(subreq);
2264 if (! status) {
2265 int ret2;
2266 uint32_t pnn;
2268 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2269 state->count,
2270 err_list, &pnn);
2271 if (ret2 != 0) {
2272 D_ERR("control GET_CAPABILITIES failed on node %u,"
2273 " ret=%d\n", pnn, ret2);
2274 } else {
2275 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2276 ret);
2278 tevent_req_error(req, ret);
2279 return;
2282 /* Make the array size same as nodemap */
2283 state->caps = talloc_zero_array(state, uint32_t,
2284 state->nodemap->num);
2285 if (tevent_req_nomem(state->caps, req)) {
2286 return;
2289 for (i=0; i<state->count; i++) {
2290 uint32_t pnn;
2292 pnn = state->pnn_list[i];
2293 ret = ctdb_reply_control_get_capabilities(reply[i],
2294 &state->caps[pnn]);
2295 if (ret != 0) {
2296 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2297 pnn);
2298 tevent_req_error(req, EPROTO);
2299 return;
2303 talloc_free(reply);
2305 ctdb_req_control_get_dbmap(&request);
2306 subreq = ctdb_client_control_send(state, state->ev, state->client,
2307 state->destnode, TIMEOUT(),
2308 &request);
2309 if (tevent_req_nomem(subreq, req)) {
2310 return;
2312 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2315 static void recovery_dbmap_done(struct tevent_req *subreq)
2317 struct tevent_req *req = tevent_req_callback_data(
2318 subreq, struct tevent_req);
2319 struct recovery_state *state = tevent_req_data(
2320 req, struct recovery_state);
2321 struct ctdb_reply_control *reply;
2322 struct ctdb_req_control request;
2323 int ret;
2324 bool status;
2326 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2327 TALLOC_FREE(subreq);
2328 if (! status) {
2329 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2330 state->destnode, ret);
2331 tevent_req_error(req, ret);
2332 return;
2335 ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2336 if (ret != 0) {
2337 D_ERR("control GET_DBMAP failed, ret=%d\n", ret);
2338 tevent_req_error(req, ret);
2339 return;
2342 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2343 subreq = ctdb_client_control_multi_send(state, state->ev,
2344 state->client,
2345 state->pnn_list, state->count,
2346 TIMEOUT(), &request);
2347 if (tevent_req_nomem(subreq, req)) {
2348 return;
2350 tevent_req_set_callback(subreq, recovery_active_done, req);
2353 static void recovery_active_done(struct tevent_req *subreq)
2355 struct tevent_req *req = tevent_req_callback_data(
2356 subreq, struct tevent_req);
2357 struct recovery_state *state = tevent_req_data(
2358 req, struct recovery_state);
2359 struct ctdb_req_control request;
2360 struct ctdb_vnn_map *vnnmap;
2361 int *err_list;
2362 int ret, i;
2363 unsigned int count;
2364 bool status;
2366 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2367 NULL);
2368 TALLOC_FREE(subreq);
2369 if (! status) {
2370 int ret2;
2371 uint32_t pnn;
2373 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2374 state->count,
2375 err_list, &pnn);
2376 if (ret2 != 0) {
2377 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2378 " ret=%d\n", pnn, ret2);
2379 } else {
2380 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2381 ret);
2383 tevent_req_error(req, ret);
2384 return;
2387 D_ERR("Set recovery mode to ACTIVE\n");
2389 /* Calculate new VNNMAP */
2390 count = 0;
2391 for (i=0; i<state->nodemap->num; i++) {
2392 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2393 continue;
2395 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2396 continue;
2398 count += 1;
2401 if (count == 0) {
2402 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2405 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2406 if (tevent_req_nomem(vnnmap, req)) {
2407 return;
2410 vnnmap->size = (count == 0 ? 1 : count);
2411 vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2412 if (tevent_req_nomem(vnnmap->map, req)) {
2413 return;
2416 if (count == 0) {
2417 vnnmap->map[0] = state->destnode;
2418 } else {
2419 count = 0;
2420 for (i=0; i<state->nodemap->num; i++) {
2421 if (state->nodemap->node[i].flags &
2422 NODE_FLAGS_INACTIVE) {
2423 continue;
2425 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2426 continue;
2429 vnnmap->map[count] = state->nodemap->node[i].pnn;
2430 count += 1;
2434 vnnmap->generation = state->generation;
2436 talloc_free(state->vnnmap);
2437 state->vnnmap = vnnmap;
2439 ctdb_req_control_start_recovery(&request);
2440 subreq = ctdb_client_control_multi_send(state, state->ev,
2441 state->client,
2442 state->pnn_list, state->count,
2443 TIMEOUT(), &request);
2444 if (tevent_req_nomem(subreq, req)) {
2445 return;
2447 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2450 static void recovery_start_recovery_done(struct tevent_req *subreq)
2452 struct tevent_req *req = tevent_req_callback_data(
2453 subreq, struct tevent_req);
2454 struct recovery_state *state = tevent_req_data(
2455 req, struct recovery_state);
2456 struct ctdb_req_control request;
2457 int *err_list;
2458 int ret;
2459 bool status;
2461 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2462 NULL);
2463 TALLOC_FREE(subreq);
2464 if (! status) {
2465 int ret2;
2466 uint32_t pnn;
2468 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2469 state->count,
2470 err_list, &pnn);
2471 if (ret2 != 0) {
2472 D_ERR("failed to run start_recovery event on node %u,"
2473 " ret=%d\n", pnn, ret2);
2474 } else {
2475 D_ERR("failed to run start_recovery event, ret=%d\n",
2476 ret);
2478 tevent_req_error(req, ret);
2479 return;
2482 D_ERR("start_recovery event finished\n");
2484 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2485 subreq = ctdb_client_control_multi_send(state, state->ev,
2486 state->client,
2487 state->pnn_list, state->count,
2488 TIMEOUT(), &request);
2489 if (tevent_req_nomem(subreq, req)) {
2490 return;
2492 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2495 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2497 struct tevent_req *req = tevent_req_callback_data(
2498 subreq, struct tevent_req);
2499 struct recovery_state *state = tevent_req_data(
2500 req, struct recovery_state);
2501 int *err_list;
2502 int ret;
2503 bool status;
2505 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2506 NULL);
2507 TALLOC_FREE(subreq);
2508 if (! status) {
2509 int ret2;
2510 uint32_t pnn;
2512 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2513 state->count,
2514 err_list, &pnn);
2515 if (ret2 != 0) {
2516 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2517 pnn, ret2);
2518 } else {
2519 D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2521 tevent_req_error(req, ret);
2522 return;
2525 D_NOTICE("updated VNNMAP\n");
2527 subreq = db_recovery_send(state, state->ev, state->client,
2528 state->dbmap, state->tun_list,
2529 state->pnn_list, state->count,
2530 state->caps, state->ban_credits,
2531 state->vnnmap->generation);
2532 if (tevent_req_nomem(subreq, req)) {
2533 return;
2535 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2538 static void recovery_db_recovery_done(struct tevent_req *subreq)
2540 struct tevent_req *req = tevent_req_callback_data(
2541 subreq, struct tevent_req);
2542 struct recovery_state *state = tevent_req_data(
2543 req, struct recovery_state);
2544 struct ctdb_req_control request;
2545 bool status;
2546 int count;
2548 status = db_recovery_recv(subreq, &count);
2549 TALLOC_FREE(subreq);
2551 D_ERR("%d of %d databases recovered\n", count, state->dbmap->num);
2553 if (! status) {
2554 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2555 int i;
2557 /* Bans are not enabled */
2558 if (state->tun_list->enable_bans == 0) {
2559 tevent_req_error(req, EIO);
2560 return;
2563 for (i=0; i<state->count; i++) {
2564 uint32_t pnn;
2565 pnn = state->pnn_list[i];
2566 if (state->ban_credits[pnn] > max_credits) {
2567 max_pnn = pnn;
2568 max_credits = state->ban_credits[pnn];
2572 /* If pulling database fails multiple times */
2573 if (max_credits >= NUM_RETRIES) {
2574 struct ctdb_ban_state ban_state = {
2575 .pnn = max_pnn,
2576 .time = state->tun_list->recovery_ban_period,
2579 D_ERR("Banning node %u for %u seconds\n",
2580 ban_state.pnn,
2581 ban_state.time);
2583 ctdb_req_control_set_ban_state(&request,
2584 &ban_state);
2585 subreq = ctdb_client_control_send(state,
2586 state->ev,
2587 state->client,
2588 ban_state.pnn,
2589 TIMEOUT(),
2590 &request);
2591 if (tevent_req_nomem(subreq, req)) {
2592 return;
2594 tevent_req_set_callback(subreq,
2595 recovery_failed_done,
2596 req);
2597 } else {
2598 tevent_req_error(req, EIO);
2600 return;
2603 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2604 subreq = ctdb_client_control_multi_send(state, state->ev,
2605 state->client,
2606 state->pnn_list, state->count,
2607 TIMEOUT(), &request);
2608 if (tevent_req_nomem(subreq, req)) {
2609 return;
2611 tevent_req_set_callback(subreq, recovery_normal_done, req);
2614 static void recovery_failed_done(struct tevent_req *subreq)
2616 struct tevent_req *req = tevent_req_callback_data(
2617 subreq, struct tevent_req);
2618 struct recovery_state *state = tevent_req_data(
2619 req, struct recovery_state);
2620 struct ctdb_reply_control *reply;
2621 int ret;
2622 bool status;
2624 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2625 TALLOC_FREE(subreq);
2626 if (! status) {
2627 D_ERR("failed to ban node, ret=%d\n", ret);
2628 goto done;
2631 ret = ctdb_reply_control_set_ban_state(reply);
2632 if (ret != 0) {
2633 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2636 done:
2637 tevent_req_error(req, EIO);
2640 static void recovery_normal_done(struct tevent_req *subreq)
2642 struct tevent_req *req = tevent_req_callback_data(
2643 subreq, struct tevent_req);
2644 struct recovery_state *state = tevent_req_data(
2645 req, struct recovery_state);
2646 struct ctdb_req_control request;
2647 int *err_list;
2648 int ret;
2649 bool status;
2651 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2652 NULL);
2653 TALLOC_FREE(subreq);
2654 if (! status) {
2655 int ret2;
2656 uint32_t pnn;
2658 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2659 state->count,
2660 err_list, &pnn);
2661 if (ret2 != 0) {
2662 D_ERR("failed to set recovery mode NORMAL on node %u,"
2663 " ret=%d\n", pnn, ret2);
2664 } else {
2665 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
2666 ret);
2668 tevent_req_error(req, ret);
2669 return;
2672 D_ERR("Set recovery mode to NORMAL\n");
2674 ctdb_req_control_end_recovery(&request);
2675 subreq = ctdb_client_control_multi_send(state, state->ev,
2676 state->client,
2677 state->pnn_list, state->count,
2678 TIMEOUT(), &request);
2679 if (tevent_req_nomem(subreq, req)) {
2680 return;
2682 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2685 static void recovery_end_recovery_done(struct tevent_req *subreq)
2687 struct tevent_req *req = tevent_req_callback_data(
2688 subreq, struct tevent_req);
2689 struct recovery_state *state = tevent_req_data(
2690 req, struct recovery_state);
2691 int *err_list;
2692 int ret;
2693 bool status;
2695 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2696 NULL);
2697 TALLOC_FREE(subreq);
2698 if (! status) {
2699 int ret2;
2700 uint32_t pnn;
2702 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2703 state->count,
2704 err_list, &pnn);
2705 if (ret2 != 0) {
2706 D_ERR("failed to run recovered event on node %u,"
2707 " ret=%d\n", pnn, ret2);
2708 } else {
2709 D_ERR("failed to run recovered event, ret=%d\n", ret);
2711 tevent_req_error(req, ret);
2712 return;
2715 D_ERR("recovered event finished\n");
2717 tevent_req_done(req);
2720 static void recovery_recv(struct tevent_req *req, int *perr)
2722 generic_recv(req, perr);
2725 static void usage(const char *progname)
2727 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2728 progname);
2733 * Arguments - log fd, write fd, socket path, generation
2735 int main(int argc, char *argv[])
2737 int write_fd;
2738 const char *sockpath;
2739 TALLOC_CTX *mem_ctx;
2740 struct tevent_context *ev;
2741 struct ctdb_client_context *client;
2742 int ret;
2743 struct tevent_req *req;
2744 uint32_t generation;
2746 if (argc != 4) {
2747 usage(argv[0]);
2748 exit(1);
2751 write_fd = atoi(argv[1]);
2752 sockpath = argv[2];
2753 generation = (uint32_t)strtoul(argv[3], NULL, 0);
2755 mem_ctx = talloc_new(NULL);
2756 if (mem_ctx == NULL) {
2757 fprintf(stderr, "recovery: talloc_new() failed\n");
2758 goto failed;
2761 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
2762 if (ret != 0) {
2763 fprintf(stderr, "recovery: Unable to initialize logging\n");
2764 goto failed;
2767 ev = tevent_context_init(mem_ctx);
2768 if (ev == NULL) {
2769 D_ERR("tevent_context_init() failed\n");
2770 goto failed;
2773 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2774 if (ret != 0) {
2775 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
2776 goto failed;
2779 req = recovery_send(mem_ctx, ev, client, generation);
2780 if (req == NULL) {
2781 D_ERR("database_recover_send() failed\n");
2782 goto failed;
2785 if (! tevent_req_poll(req, ev)) {
2786 D_ERR("tevent_req_poll() failed\n");
2787 goto failed;
2790 recovery_recv(req, &ret);
2791 TALLOC_FREE(req);
2792 if (ret != 0) {
2793 D_ERR("database recovery failed, ret=%d\n", ret);
2794 goto failed;
2797 sys_write(write_fd, &ret, sizeof(ret));
2798 return 0;
2800 failed:
2801 TALLOC_FREE(mem_ctx);
2802 return 1;