ctdb-ipalloc: Use a cumulative timeout for takeover run stages
[Samba.git] / ctdb / server / ctdb_recovery_helper.c
blob086dd16ca2ee178582ef10782bb5c1d42eb786ad
1 /*
2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/time.h"
31 #include "lib/util/tevent_unix.h"
33 #include "protocol/protocol.h"
34 #include "protocol/protocol_api.h"
35 #include "client/client.h"
37 static int recover_timeout = 30;
39 #define NUM_RETRIES 3
41 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
43 static void LOG(const char *fmt, ...)
45 va_list ap;
47 va_start(ap, fmt);
48 vfprintf(stderr, fmt, ap);
49 va_end(ap);
53 * Utility functions
56 static ssize_t sys_write(int fd, const void *buf, size_t count)
58 ssize_t ret;
60 do {
61 ret = write(fd, buf, count);
62 #if defined(EWOULDBLOCK)
63 } while (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK));
64 #else
65 } while (ret == -1 && (errno == EINTR || errno == EAGAIN));
66 #endif
67 return ret;
70 static bool generic_recv(struct tevent_req *req, int *perr)
72 int err;
74 if (tevent_req_is_unix_error(req, &err)) {
75 if (perr != NULL) {
76 *perr = err;
78 return false;
81 return true;
84 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
86 static uint64_t srvid_next(void)
88 rec_srvid += 1;
89 return rec_srvid;
93 * Recovery database functions
96 struct recdb_context {
97 uint32_t db_id;
98 const char *db_name;
99 const char *db_path;
100 struct tdb_wrap *db;
101 bool persistent;
104 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
105 const char *db_name,
106 const char *db_path,
107 uint32_t hash_size, bool persistent)
109 static char *db_dir_state = NULL;
110 struct recdb_context *recdb;
111 unsigned int tdb_flags;
113 recdb = talloc(mem_ctx, struct recdb_context);
114 if (recdb == NULL) {
115 return NULL;
118 if (db_dir_state == NULL) {
119 db_dir_state = getenv("CTDB_DBDIR_STATE");
122 recdb->db_name = db_name;
123 recdb->db_id = db_id;
124 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
125 db_dir_state != NULL ?
126 db_dir_state :
127 dirname(discard_const(db_path)),
128 db_name);
129 if (recdb->db_path == NULL) {
130 talloc_free(recdb);
131 return NULL;
133 unlink(recdb->db_path);
135 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
136 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
137 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
138 if (recdb->db == NULL) {
139 talloc_free(recdb);
140 LOG("failed to create recovery db %s\n", recdb->db_path);
143 recdb->persistent = persistent;
145 return recdb;
148 static uint32_t recdb_id(struct recdb_context *recdb)
150 return recdb->db_id;
153 static const char *recdb_name(struct recdb_context *recdb)
155 return recdb->db_name;
158 static const char *recdb_path(struct recdb_context *recdb)
160 return recdb->db_path;
163 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
165 return recdb->db->tdb;
168 static bool recdb_persistent(struct recdb_context *recdb)
170 return recdb->persistent;
173 struct recdb_add_traverse_state {
174 struct recdb_context *recdb;
175 int mypnn;
178 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
179 TDB_DATA key, TDB_DATA data,
180 void *private_data)
182 struct recdb_add_traverse_state *state =
183 (struct recdb_add_traverse_state *)private_data;
184 struct ctdb_ltdb_header *hdr;
185 TDB_DATA prev_data;
186 int ret;
188 /* header is not marshalled separately in the pulldb control */
189 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
190 return -1;
193 hdr = (struct ctdb_ltdb_header *)data.dptr;
195 /* fetch the existing record, if any */
196 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
198 if (prev_data.dptr != NULL) {
199 struct ctdb_ltdb_header prev_hdr;
201 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
202 free(prev_data.dptr);
203 if (hdr->rsn < prev_hdr.rsn ||
204 (hdr->rsn == prev_hdr.rsn &&
205 prev_hdr.dmaster != state->mypnn)) {
206 return 0;
210 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
211 if (ret != 0) {
212 return -1;
214 return 0;
217 static bool recdb_add(struct recdb_context *recdb, int mypnn,
218 struct ctdb_rec_buffer *recbuf)
220 struct recdb_add_traverse_state state;
221 int ret;
223 state.recdb = recdb;
224 state.mypnn = mypnn;
226 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
227 if (ret != 0) {
228 return false;
231 return true;
234 /* This function decides which records from recdb are retained */
235 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
236 uint32_t reqid, uint32_t dmaster,
237 TDB_DATA key, TDB_DATA data)
239 struct ctdb_ltdb_header *header;
240 int ret;
243 * skip empty records - but NOT for persistent databases:
245 * The record-by-record mode of recovery deletes empty records.
246 * For persistent databases, this can lead to data corruption
247 * by deleting records that should be there:
249 * - Assume the cluster has been running for a while.
251 * - A record R in a persistent database has been created and
252 * deleted a couple of times, the last operation being deletion,
253 * leaving an empty record with a high RSN, say 10.
255 * - Now a node N is turned off.
257 * - This leaves the local database copy of D on N with the empty
258 * copy of R and RSN 10. On all other nodes, the recovery has deleted
259 * the copy of record R.
261 * - Now the record is created again while node N is turned off.
262 * This creates R with RSN = 1 on all nodes except for N.
264 * - Now node N is turned on again. The following recovery will chose
265 * the older empty copy of R due to RSN 10 > RSN 1.
267 * ==> Hence the record is gone after the recovery.
269 * On databases like Samba's registry, this can damage the higher-level
270 * data structures built from the various tdb-level records.
272 if (!persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
273 return 0;
276 /* update the dmaster field to point to us */
277 header = (struct ctdb_ltdb_header *)data.dptr;
278 if (!persistent) {
279 header->dmaster = dmaster;
280 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
283 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
284 if (ret != 0) {
285 return ret;
288 return 0;
291 struct recdb_records_traverse_state {
292 struct ctdb_rec_buffer *recbuf;
293 uint32_t dmaster;
294 uint32_t reqid;
295 bool persistent;
296 bool failed;
299 static int recdb_records_traverse(struct tdb_context *tdb,
300 TDB_DATA key, TDB_DATA data,
301 void *private_data)
303 struct recdb_records_traverse_state *state =
304 (struct recdb_records_traverse_state *)private_data;
305 int ret;
307 ret = recbuf_filter_add(state->recbuf, state->persistent,
308 state->reqid, state->dmaster, key, data);
309 if (ret != 0) {
310 state->failed = true;
311 return ret;
314 return 0;
317 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
318 TALLOC_CTX *mem_ctx,
319 uint32_t dmaster)
321 struct recdb_records_traverse_state state;
322 int ret;
324 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
325 if (state.recbuf == NULL) {
326 return NULL;
328 state.dmaster = dmaster;
329 state.reqid = 0;
330 state.persistent = recdb_persistent(recdb);
331 state.failed = false;
333 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
334 &state);
335 if (ret == -1 || state.failed) {
336 LOG("Failed to marshall recovery records for %s\n",
337 recdb_name(recdb));
338 TALLOC_FREE(state.recbuf);
339 return NULL;
342 return state.recbuf;
345 struct recdb_file_traverse_state {
346 struct ctdb_rec_buffer *recbuf;
347 struct recdb_context *recdb;
348 TALLOC_CTX *mem_ctx;
349 uint32_t dmaster;
350 uint32_t reqid;
351 bool persistent;
352 bool failed;
353 int fd;
354 int max_size;
355 int num_buffers;
358 static int recdb_file_traverse(struct tdb_context *tdb,
359 TDB_DATA key, TDB_DATA data,
360 void *private_data)
362 struct recdb_file_traverse_state *state =
363 (struct recdb_file_traverse_state *)private_data;
364 int ret;
366 ret = recbuf_filter_add(state->recbuf, state->persistent,
367 state->reqid, state->dmaster, key, data);
368 if (ret != 0) {
369 state->failed = true;
370 return ret;
373 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
374 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
375 if (ret != 0) {
376 LOG("Failed to collect recovery records for %s\n",
377 recdb_name(state->recdb));
378 state->failed = true;
379 return ret;
382 state->num_buffers += 1;
384 TALLOC_FREE(state->recbuf);
385 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
386 recdb_id(state->recdb));
387 if (state->recbuf == NULL) {
388 state->failed = true;
389 return ENOMEM;
393 return 0;
396 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
397 uint32_t dmaster, int fd, int max_size)
399 struct recdb_file_traverse_state state;
400 int ret;
402 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
403 if (state.recbuf == NULL) {
404 return -1;
406 state.recdb = recdb;
407 state.mem_ctx = mem_ctx;
408 state.dmaster = dmaster;
409 state.reqid = 0;
410 state.persistent = recdb_persistent(recdb);
411 state.failed = false;
412 state.fd = fd;
413 state.max_size = max_size;
414 state.num_buffers = 0;
416 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
417 if (ret == -1 || state.failed) {
418 TALLOC_FREE(state.recbuf);
419 return -1;
422 ret = ctdb_rec_buffer_write(state.recbuf, fd);
423 if (ret != 0) {
424 LOG("Failed to collect recovery records for %s\n",
425 recdb_name(recdb));
426 TALLOC_FREE(state.recbuf);
427 return -1;
429 state.num_buffers += 1;
431 LOG("Wrote %d buffers of recovery records for %s\n",
432 state.num_buffers, recdb_name(recdb));
434 return state.num_buffers;
438 * Pull database from a single node
441 struct pull_database_state {
442 struct tevent_context *ev;
443 struct ctdb_client_context *client;
444 struct recdb_context *recdb;
445 uint32_t pnn;
446 uint64_t srvid;
447 int num_records;
450 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
451 void *private_data);
452 static void pull_database_register_done(struct tevent_req *subreq);
453 static void pull_database_old_done(struct tevent_req *subreq);
454 static void pull_database_unregister_done(struct tevent_req *subreq);
455 static void pull_database_new_done(struct tevent_req *subreq);
457 static struct tevent_req *pull_database_send(
458 TALLOC_CTX *mem_ctx,
459 struct tevent_context *ev,
460 struct ctdb_client_context *client,
461 uint32_t pnn, uint32_t caps,
462 struct recdb_context *recdb)
464 struct tevent_req *req, *subreq;
465 struct pull_database_state *state;
466 struct ctdb_req_control request;
468 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
469 if (req == NULL) {
470 return NULL;
473 state->ev = ev;
474 state->client = client;
475 state->recdb = recdb;
476 state->pnn = pnn;
477 state->srvid = srvid_next();
479 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
480 subreq = ctdb_client_set_message_handler_send(
481 state, state->ev, state->client,
482 state->srvid, pull_database_handler,
483 req);
484 if (tevent_req_nomem(subreq, req)) {
485 return tevent_req_post(req, ev);
488 tevent_req_set_callback(subreq, pull_database_register_done,
489 req);
491 } else {
492 struct ctdb_pulldb pulldb;
494 pulldb.db_id = recdb_id(recdb);
495 pulldb.lmaster = CTDB_LMASTER_ANY;
497 ctdb_req_control_pull_db(&request, &pulldb);
498 subreq = ctdb_client_control_send(state, state->ev,
499 state->client,
500 pnn, TIMEOUT(),
501 &request);
502 if (tevent_req_nomem(subreq, req)) {
503 return tevent_req_post(req, ev);
505 tevent_req_set_callback(subreq, pull_database_old_done, req);
508 return req;
511 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
512 void *private_data)
514 struct tevent_req *req = talloc_get_type_abort(
515 private_data, struct tevent_req);
516 struct pull_database_state *state = tevent_req_data(
517 req, struct pull_database_state);
518 struct ctdb_rec_buffer *recbuf;
519 int ret;
520 bool status;
522 if (srvid != state->srvid) {
523 return;
526 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf);
527 if (ret != 0) {
528 LOG("Invalid data received for DB_PULL messages\n");
529 return;
532 if (recbuf->db_id != recdb_id(state->recdb)) {
533 talloc_free(recbuf);
534 LOG("Invalid dbid:%08x for DB_PULL messages for %s\n",
535 recbuf->db_id, recdb_name(state->recdb));
536 return;
539 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
540 recbuf);
541 if (! status) {
542 talloc_free(recbuf);
543 LOG("Failed to add records to recdb for %s\n",
544 recdb_name(state->recdb));
545 return;
548 state->num_records += recbuf->count;
549 talloc_free(recbuf);
552 static void pull_database_register_done(struct tevent_req *subreq)
554 struct tevent_req *req = tevent_req_callback_data(
555 subreq, struct tevent_req);
556 struct pull_database_state *state = tevent_req_data(
557 req, struct pull_database_state);
558 struct ctdb_req_control request;
559 struct ctdb_pulldb_ext pulldb_ext;
560 int ret;
561 bool status;
563 status = ctdb_client_set_message_handler_recv(subreq, &ret);
564 TALLOC_FREE(subreq);
565 if (! status) {
566 LOG("failed to set message handler for DB_PULL for %s\n",
567 recdb_name(state->recdb));
568 tevent_req_error(req, ret);
569 return;
572 pulldb_ext.db_id = recdb_id(state->recdb);
573 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
574 pulldb_ext.srvid = state->srvid;
576 ctdb_req_control_db_pull(&request, &pulldb_ext);
577 subreq = ctdb_client_control_send(state, state->ev, state->client,
578 state->pnn, TIMEOUT(), &request);
579 if (tevent_req_nomem(subreq, req)) {
580 return;
582 tevent_req_set_callback(subreq, pull_database_new_done, req);
585 static void pull_database_old_done(struct tevent_req *subreq)
587 struct tevent_req *req = tevent_req_callback_data(
588 subreq, struct tevent_req);
589 struct pull_database_state *state = tevent_req_data(
590 req, struct pull_database_state);
591 struct ctdb_reply_control *reply;
592 struct ctdb_rec_buffer *recbuf;
593 int ret;
594 bool status;
596 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
597 TALLOC_FREE(subreq);
598 if (! status) {
599 LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
600 recdb_name(state->recdb), state->pnn, ret);
601 tevent_req_error(req, ret);
602 return;
605 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
606 talloc_free(reply);
607 if (ret != 0) {
608 tevent_req_error(req, ret);
609 return;
612 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
613 recbuf);
614 if (! status) {
615 talloc_free(recbuf);
616 tevent_req_error(req, EIO);
617 return;
620 state->num_records = recbuf->count;
621 talloc_free(recbuf);
623 LOG("Pulled %d records for db %s from node %d\n",
624 state->num_records, recdb_name(state->recdb), state->pnn);
626 tevent_req_done(req);
629 static void pull_database_new_done(struct tevent_req *subreq)
631 struct tevent_req *req = tevent_req_callback_data(
632 subreq, struct tevent_req);
633 struct pull_database_state *state = tevent_req_data(
634 req, struct pull_database_state);
635 struct ctdb_reply_control *reply;
636 uint32_t num_records;
637 int ret;
638 bool status;
640 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
641 TALLOC_FREE(subreq);
642 if (! status) {
643 LOG("control DB_PULL failed for %s on node %u, ret=%d\n",
644 recdb_name(state->recdb), state->pnn, ret);
645 tevent_req_error(req, ret);
646 return;
649 ret = ctdb_reply_control_db_pull(reply, &num_records);
650 talloc_free(reply);
651 if (num_records != state->num_records) {
652 LOG("mismatch (%u != %u) in DB_PULL records for %s\n",
653 num_records, state->num_records, recdb_name(state->recdb));
654 tevent_req_error(req, EIO);
655 return;
658 LOG("Pulled %d records for db %s from node %d\n",
659 state->num_records, recdb_name(state->recdb), state->pnn);
661 subreq = ctdb_client_remove_message_handler_send(
662 state, state->ev, state->client,
663 state->srvid, req);
664 if (tevent_req_nomem(subreq, req)) {
665 return;
667 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
670 static void pull_database_unregister_done(struct tevent_req *subreq)
672 struct tevent_req *req = tevent_req_callback_data(
673 subreq, struct tevent_req);
674 struct pull_database_state *state = tevent_req_data(
675 req, struct pull_database_state);
676 int ret;
677 bool status;
679 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
680 TALLOC_FREE(subreq);
681 if (! status) {
682 LOG("failed to remove message handler for DB_PULL for %s\n",
683 recdb_name(state->recdb));
684 tevent_req_error(req, ret);
685 return;
688 tevent_req_done(req);
691 static bool pull_database_recv(struct tevent_req *req, int *perr)
693 return generic_recv(req, perr);
697 * Push database to specified nodes (old style)
700 struct push_database_old_state {
701 struct tevent_context *ev;
702 struct ctdb_client_context *client;
703 struct recdb_context *recdb;
704 uint32_t *pnn_list;
705 int count;
706 struct ctdb_rec_buffer *recbuf;
707 int index;
710 static void push_database_old_push_done(struct tevent_req *subreq);
712 static struct tevent_req *push_database_old_send(
713 TALLOC_CTX *mem_ctx,
714 struct tevent_context *ev,
715 struct ctdb_client_context *client,
716 uint32_t *pnn_list, int count,
717 struct recdb_context *recdb)
719 struct tevent_req *req, *subreq;
720 struct push_database_old_state *state;
721 struct ctdb_req_control request;
722 uint32_t pnn;
724 req = tevent_req_create(mem_ctx, &state,
725 struct push_database_old_state);
726 if (req == NULL) {
727 return NULL;
730 state->ev = ev;
731 state->client = client;
732 state->recdb = recdb;
733 state->pnn_list = pnn_list;
734 state->count = count;
735 state->index = 0;
737 state->recbuf = recdb_records(recdb, state,
738 ctdb_client_pnn(client));
739 if (tevent_req_nomem(state->recbuf, req)) {
740 return tevent_req_post(req, ev);
743 pnn = state->pnn_list[state->index];
745 ctdb_req_control_push_db(&request, state->recbuf);
746 subreq = ctdb_client_control_send(state, ev, client, pnn,
747 TIMEOUT(), &request);
748 if (tevent_req_nomem(subreq, req)) {
749 return tevent_req_post(req, ev);
751 tevent_req_set_callback(subreq, push_database_old_push_done, req);
753 return req;
756 static void push_database_old_push_done(struct tevent_req *subreq)
758 struct tevent_req *req = tevent_req_callback_data(
759 subreq, struct tevent_req);
760 struct push_database_old_state *state = tevent_req_data(
761 req, struct push_database_old_state);
762 struct ctdb_req_control request;
763 uint32_t pnn;
764 int ret;
765 bool status;
767 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
768 TALLOC_FREE(subreq);
769 if (! status) {
770 LOG("control PUSH_DB failed for db %s on node %u, ret=%d\n",
771 recdb_name(state->recdb), state->pnn_list[state->index],
772 ret);
773 tevent_req_error(req, ret);
774 return;
777 state->index += 1;
778 if (state->index == state->count) {
779 TALLOC_FREE(state->recbuf);
780 tevent_req_done(req);
781 return;
784 pnn = state->pnn_list[state->index];
786 ctdb_req_control_push_db(&request, state->recbuf);
787 subreq = ctdb_client_control_send(state, state->ev, state->client,
788 pnn, TIMEOUT(), &request);
789 if (tevent_req_nomem(subreq, req)) {
790 return;
792 tevent_req_set_callback(subreq, push_database_old_push_done, req);
795 static bool push_database_old_recv(struct tevent_req *req, int *perr)
797 return generic_recv(req, perr);
801 * Push database to specified nodes (new style)
804 struct push_database_new_state {
805 struct tevent_context *ev;
806 struct ctdb_client_context *client;
807 struct recdb_context *recdb;
808 uint32_t *pnn_list;
809 int count;
810 uint64_t srvid;
811 uint32_t dmaster;
812 int fd;
813 int num_buffers;
814 int num_buffers_sent;
815 int num_records;
818 static void push_database_new_started(struct tevent_req *subreq);
819 static void push_database_new_send_msg(struct tevent_req *req);
820 static void push_database_new_send_done(struct tevent_req *subreq);
821 static void push_database_new_confirmed(struct tevent_req *subreq);
823 static struct tevent_req *push_database_new_send(
824 TALLOC_CTX *mem_ctx,
825 struct tevent_context *ev,
826 struct ctdb_client_context *client,
827 uint32_t *pnn_list, int count,
828 struct recdb_context *recdb,
829 int max_size)
831 struct tevent_req *req, *subreq;
832 struct push_database_new_state *state;
833 struct ctdb_req_control request;
834 struct ctdb_pulldb_ext pulldb_ext;
835 char *filename;
836 off_t offset;
838 req = tevent_req_create(mem_ctx, &state,
839 struct push_database_new_state);
840 if (req == NULL) {
841 return NULL;
844 state->ev = ev;
845 state->client = client;
846 state->recdb = recdb;
847 state->pnn_list = pnn_list;
848 state->count = count;
850 state->srvid = srvid_next();
851 state->dmaster = ctdb_client_pnn(client);
852 state->num_buffers_sent = 0;
853 state->num_records = 0;
855 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
856 if (tevent_req_nomem(filename, req)) {
857 return tevent_req_post(req, ev);
860 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
861 if (state->fd == -1) {
862 tevent_req_error(req, errno);
863 return tevent_req_post(req, ev);
865 unlink(filename);
866 talloc_free(filename);
868 state->num_buffers = recdb_file(recdb, state, state->dmaster,
869 state->fd, max_size);
870 if (state->num_buffers == -1) {
871 tevent_req_error(req, ENOMEM);
872 return tevent_req_post(req, ev);
875 offset = lseek(state->fd, 0, SEEK_SET);
876 if (offset != 0) {
877 tevent_req_error(req, EIO);
878 return tevent_req_post(req, ev);
881 pulldb_ext.db_id = recdb_id(recdb);
882 pulldb_ext.srvid = state->srvid;
884 ctdb_req_control_db_push_start(&request, &pulldb_ext);
885 subreq = ctdb_client_control_multi_send(state, ev, client,
886 pnn_list, count,
887 TIMEOUT(), &request);
888 if (tevent_req_nomem(subreq, req)) {
889 return tevent_req_post(req, ev);
891 tevent_req_set_callback(subreq, push_database_new_started, req);
893 return req;
896 static void push_database_new_started(struct tevent_req *subreq)
898 struct tevent_req *req = tevent_req_callback_data(
899 subreq, struct tevent_req);
900 struct push_database_new_state *state = tevent_req_data(
901 req, struct push_database_new_state);
902 int *err_list;
903 int ret;
904 bool status;
906 status = ctdb_client_control_multi_recv(subreq, &ret, state,
907 &err_list, NULL);
908 TALLOC_FREE(subreq);
909 if (! status) {
910 int ret2;
911 uint32_t pnn;
913 ret2 = ctdb_client_control_multi_error(state->pnn_list,
914 state->count,
915 err_list, &pnn);
916 if (ret2 != 0) {
917 LOG("control DB_PUSH_START failed for db %s "
918 "on node %u, ret=%d\n",
919 recdb_name(state->recdb), pnn, ret2);
920 } else {
921 LOG("control DB_PUSH_START failed for db %s, ret=%d\n",
922 recdb_name(state->recdb), ret);
924 talloc_free(err_list);
926 tevent_req_error(req, ret);
927 return;
930 push_database_new_send_msg(req);
933 static void push_database_new_send_msg(struct tevent_req *req)
935 struct push_database_new_state *state = tevent_req_data(
936 req, struct push_database_new_state);
937 struct tevent_req *subreq;
938 struct ctdb_rec_buffer *recbuf;
939 struct ctdb_req_message message;
940 TDB_DATA data;
941 int ret;
943 if (state->num_buffers_sent == state->num_buffers) {
944 struct ctdb_req_control request;
946 ctdb_req_control_db_push_confirm(&request,
947 recdb_id(state->recdb));
948 subreq = ctdb_client_control_multi_send(state, state->ev,
949 state->client,
950 state->pnn_list,
951 state->count,
952 TIMEOUT(), &request);
953 if (tevent_req_nomem(subreq, req)) {
954 return;
956 tevent_req_set_callback(subreq, push_database_new_confirmed,
957 req);
958 return;
961 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
962 if (ret != 0) {
963 tevent_req_error(req, ret);
964 return;
967 data.dsize = ctdb_rec_buffer_len(recbuf);
968 data.dptr = talloc_size(state, data.dsize);
969 if (tevent_req_nomem(data.dptr, req)) {
970 return;
973 ctdb_rec_buffer_push(recbuf, data.dptr);
975 message.srvid = state->srvid;
976 message.data.data = data;
978 LOG("Pushing buffer %d with %d records for %s\n",
979 state->num_buffers_sent, recbuf->count, recdb_name(state->recdb));
981 subreq = ctdb_client_message_multi_send(state, state->ev,
982 state->client,
983 state->pnn_list, state->count,
984 &message);
985 if (tevent_req_nomem(subreq, req)) {
986 return;
988 tevent_req_set_callback(subreq, push_database_new_send_done, req);
990 state->num_records += recbuf->count;
992 talloc_free(data.dptr);
993 talloc_free(recbuf);
996 static void push_database_new_send_done(struct tevent_req *subreq)
998 struct tevent_req *req = tevent_req_callback_data(
999 subreq, struct tevent_req);
1000 struct push_database_new_state *state = tevent_req_data(
1001 req, struct push_database_new_state);
1002 bool status;
1003 int ret;
1005 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1006 TALLOC_FREE(subreq);
1007 if (! status) {
1008 LOG("Sending recovery records failed for %s\n",
1009 recdb_name(state->recdb));
1010 tevent_req_error(req, ret);
1011 return;
1014 state->num_buffers_sent += 1;
1016 push_database_new_send_msg(req);
1019 static void push_database_new_confirmed(struct tevent_req *subreq)
1021 struct tevent_req *req = tevent_req_callback_data(
1022 subreq, struct tevent_req);
1023 struct push_database_new_state *state = tevent_req_data(
1024 req, struct push_database_new_state);
1025 struct ctdb_reply_control **reply;
1026 int *err_list;
1027 bool status;
1028 int ret, i;
1029 uint32_t num_records;
1031 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1032 &err_list, &reply);
1033 TALLOC_FREE(subreq);
1034 if (! status) {
1035 int ret2;
1036 uint32_t pnn;
1038 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1039 state->count, err_list,
1040 &pnn);
1041 if (ret2 != 0) {
1042 LOG("control DB_PUSH_CONFIRM failed for %s on node %u,"
1043 " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1044 } else {
1045 LOG("control DB_PUSH_CONFIRM failed for %s, ret=%d\n",
1046 recdb_name(state->recdb), ret);
1048 tevent_req_error(req, ret);
1049 return;
1052 for (i=0; i<state->count; i++) {
1053 ret = ctdb_reply_control_db_push_confirm(reply[i],
1054 &num_records);
1055 if (ret != 0) {
1056 tevent_req_error(req, EPROTO);
1057 return;
1060 if (num_records != state->num_records) {
1061 LOG("Node %u received %d of %d records for %s\n",
1062 state->pnn_list[i], num_records,
1063 state->num_records, recdb_name(state->recdb));
1064 tevent_req_error(req, EPROTO);
1065 return;
1069 talloc_free(reply);
1071 LOG("Pushed %d records for db %s\n",
1072 state->num_records, recdb_name(state->recdb));
1074 tevent_req_done(req);
1077 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1079 return generic_recv(req, perr);
1083 * wrapper for push_database_old and push_database_new
1086 struct push_database_state {
1087 bool old_done, new_done;
1090 static void push_database_old_done(struct tevent_req *subreq);
1091 static void push_database_new_done(struct tevent_req *subreq);
1093 static struct tevent_req *push_database_send(
1094 TALLOC_CTX *mem_ctx,
1095 struct tevent_context *ev,
1096 struct ctdb_client_context *client,
1097 uint32_t *pnn_list, int count, uint32_t *caps,
1098 struct ctdb_tunable_list *tun_list,
1099 struct recdb_context *recdb)
1101 struct tevent_req *req, *subreq;
1102 struct push_database_state *state;
1103 uint32_t *old_list, *new_list;
1104 int old_count, new_count;
1105 int i;
1107 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1108 if (req == NULL) {
1109 return NULL;
1112 state->old_done = false;
1113 state->new_done = false;
1115 old_count = 0;
1116 new_count = 0;
1117 old_list = talloc_array(state, uint32_t, count);
1118 new_list = talloc_array(state, uint32_t, count);
1119 if (tevent_req_nomem(old_list, req) ||
1120 tevent_req_nomem(new_list,req)) {
1121 return tevent_req_post(req, ev);
1124 for (i=0; i<count; i++) {
1125 uint32_t pnn = pnn_list[i];
1127 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1128 new_list[new_count] = pnn;
1129 new_count += 1;
1130 } else {
1131 old_list[old_count] = pnn;
1132 old_count += 1;
1136 if (old_count > 0) {
1137 subreq = push_database_old_send(state, ev, client,
1138 old_list, old_count, recdb);
1139 if (tevent_req_nomem(subreq, req)) {
1140 return tevent_req_post(req, ev);
1142 tevent_req_set_callback(subreq, push_database_old_done, req);
1143 } else {
1144 state->old_done = true;
1147 if (new_count > 0) {
1148 subreq = push_database_new_send(state, ev, client,
1149 new_list, new_count, recdb,
1150 tun_list->rec_buffer_size_limit);
1151 if (tevent_req_nomem(subreq, req)) {
1152 return tevent_req_post(req, ev);
1154 tevent_req_set_callback(subreq, push_database_new_done, req);
1155 } else {
1156 state->new_done = true;
1159 return req;
1162 static void push_database_old_done(struct tevent_req *subreq)
1164 struct tevent_req *req = tevent_req_callback_data(
1165 subreq, struct tevent_req);
1166 struct push_database_state *state = tevent_req_data(
1167 req, struct push_database_state);
1168 bool status;
1169 int ret;
1171 status = push_database_old_recv(subreq, &ret);
1172 if (! status) {
1173 tevent_req_error(req, ret);
1174 return;
1177 state->old_done = true;
1179 if (state->old_done && state->new_done) {
1180 tevent_req_done(req);
1184 static void push_database_new_done(struct tevent_req *subreq)
1186 struct tevent_req *req = tevent_req_callback_data(
1187 subreq, struct tevent_req);
1188 struct push_database_state *state = tevent_req_data(
1189 req, struct push_database_state);
1190 bool status;
1191 int ret;
1193 status = push_database_new_recv(subreq, &ret);
1194 if (! status) {
1195 tevent_req_error(req, ret);
1196 return;
1199 state->new_done = true;
1201 if (state->old_done && state->new_done) {
1202 tevent_req_done(req);
1206 static bool push_database_recv(struct tevent_req *req, int *perr)
1208 return generic_recv(req, perr);
1212 * Collect databases using highest sequence number
1215 struct collect_highseqnum_db_state {
1216 struct tevent_context *ev;
1217 struct ctdb_client_context *client;
1218 uint32_t *pnn_list;
1219 int count;
1220 uint32_t *caps;
1221 uint32_t *ban_credits;
1222 uint32_t db_id;
1223 struct recdb_context *recdb;
1224 uint32_t max_pnn;
1227 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1228 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1230 static struct tevent_req *collect_highseqnum_db_send(
1231 TALLOC_CTX *mem_ctx,
1232 struct tevent_context *ev,
1233 struct ctdb_client_context *client,
1234 uint32_t *pnn_list, int count, uint32_t *caps,
1235 uint32_t *ban_credits, uint32_t db_id,
1236 struct recdb_context *recdb)
1238 struct tevent_req *req, *subreq;
1239 struct collect_highseqnum_db_state *state;
1240 struct ctdb_req_control request;
1242 req = tevent_req_create(mem_ctx, &state,
1243 struct collect_highseqnum_db_state);
1244 if (req == NULL) {
1245 return NULL;
1248 state->ev = ev;
1249 state->client = client;
1250 state->pnn_list = pnn_list;
1251 state->count = count;
1252 state->caps = caps;
1253 state->ban_credits = ban_credits;
1254 state->db_id = db_id;
1255 state->recdb = recdb;
1257 ctdb_req_control_get_db_seqnum(&request, db_id);
1258 subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1259 state->pnn_list, state->count,
1260 TIMEOUT(), &request);
1261 if (tevent_req_nomem(subreq, req)) {
1262 return tevent_req_post(req, ev);
1264 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1265 req);
1267 return req;
1270 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1272 struct tevent_req *req = tevent_req_callback_data(
1273 subreq, struct tevent_req);
1274 struct collect_highseqnum_db_state *state = tevent_req_data(
1275 req, struct collect_highseqnum_db_state);
1276 struct ctdb_reply_control **reply;
1277 int *err_list;
1278 bool status;
1279 int ret, i;
1280 uint64_t seqnum, max_seqnum;
1282 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1283 &err_list, &reply);
1284 TALLOC_FREE(subreq);
1285 if (! status) {
1286 int ret2;
1287 uint32_t pnn;
1289 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1290 state->count, err_list,
1291 &pnn);
1292 if (ret2 != 0) {
1293 LOG("control GET_DB_SEQNUM failed for %s on node %u,"
1294 " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1295 } else {
1296 LOG("control GET_DB_SEQNUM failed for %s, ret=%d\n",
1297 recdb_name(state->recdb), ret);
1299 tevent_req_error(req, ret);
1300 return;
1303 max_seqnum = 0;
1304 state->max_pnn = state->pnn_list[0];
1305 for (i=0; i<state->count; i++) {
1306 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1307 if (ret != 0) {
1308 tevent_req_error(req, EPROTO);
1309 return;
1312 if (max_seqnum < seqnum) {
1313 max_seqnum = seqnum;
1314 state->max_pnn = state->pnn_list[i];
1318 talloc_free(reply);
1320 LOG("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1321 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1323 subreq = pull_database_send(state, state->ev, state->client,
1324 state->max_pnn,
1325 state->caps[state->max_pnn],
1326 state->recdb);
1327 if (tevent_req_nomem(subreq, req)) {
1328 return;
1330 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1331 req);
1334 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1336 struct tevent_req *req = tevent_req_callback_data(
1337 subreq, struct tevent_req);
1338 struct collect_highseqnum_db_state *state = tevent_req_data(
1339 req, struct collect_highseqnum_db_state);
1340 int ret;
1341 bool status;
1343 status = pull_database_recv(subreq, &ret);
1344 TALLOC_FREE(subreq);
1345 if (! status) {
1346 state->ban_credits[state->max_pnn] += 1;
1347 tevent_req_error(req, ret);
1348 return;
1351 tevent_req_done(req);
1354 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1356 return generic_recv(req, perr);
1360 * Collect all databases
1363 struct collect_all_db_state {
1364 struct tevent_context *ev;
1365 struct ctdb_client_context *client;
1366 uint32_t *pnn_list;
1367 int count;
1368 uint32_t *caps;
1369 uint32_t *ban_credits;
1370 uint32_t db_id;
1371 struct recdb_context *recdb;
1372 struct ctdb_pulldb pulldb;
1373 int index;
1376 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1378 static struct tevent_req *collect_all_db_send(
1379 TALLOC_CTX *mem_ctx,
1380 struct tevent_context *ev,
1381 struct ctdb_client_context *client,
1382 uint32_t *pnn_list, int count, uint32_t *caps,
1383 uint32_t *ban_credits, uint32_t db_id,
1384 struct recdb_context *recdb)
1386 struct tevent_req *req, *subreq;
1387 struct collect_all_db_state *state;
1388 uint32_t pnn;
1390 req = tevent_req_create(mem_ctx, &state,
1391 struct collect_all_db_state);
1392 if (req == NULL) {
1393 return NULL;
1396 state->ev = ev;
1397 state->client = client;
1398 state->pnn_list = pnn_list;
1399 state->count = count;
1400 state->caps = caps;
1401 state->db_id = db_id;
1402 state->recdb = recdb;
1403 state->index = 0;
1405 pnn = state->pnn_list[state->index];
1407 subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1408 if (tevent_req_nomem(subreq, req)) {
1409 return tevent_req_post(req, ev);
1411 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1413 return req;
1416 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1418 struct tevent_req *req = tevent_req_callback_data(
1419 subreq, struct tevent_req);
1420 struct collect_all_db_state *state = tevent_req_data(
1421 req, struct collect_all_db_state);
1422 uint32_t pnn;
1423 int ret;
1424 bool status;
1426 status = pull_database_recv(subreq, &ret);
1427 TALLOC_FREE(subreq);
1428 if (! status) {
1429 pnn = state->pnn_list[state->index];
1430 state->ban_credits[pnn] += 1;
1431 tevent_req_error(req, ret);
1432 return;
1435 state->index += 1;
1436 if (state->index == state->count) {
1437 tevent_req_done(req);
1438 return;
1441 pnn = state->pnn_list[state->index];
1442 subreq = pull_database_send(state, state->ev, state->client,
1443 pnn, state->caps[pnn], state->recdb);
1444 if (tevent_req_nomem(subreq, req)) {
1445 return;
1447 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1450 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1452 return generic_recv(req, perr);
1457 * For each database do the following:
1458 * - Get DB name
1459 * - Get DB path
1460 * - Freeze database on all nodes
1461 * - Start transaction on all nodes
1462 * - Collect database from all nodes
1463 * - Wipe database on all nodes
1464 * - Push database to all nodes
1465 * - Commit transaction on all nodes
1466 * - Thaw database on all nodes
1469 struct recover_db_state {
1470 struct tevent_context *ev;
1471 struct ctdb_client_context *client;
1472 struct ctdb_tunable_list *tun_list;
1473 uint32_t *pnn_list;
1474 int count;
1475 uint32_t *caps;
1476 uint32_t *ban_credits;
1477 uint32_t db_id;
1478 bool persistent;
1480 uint32_t destnode;
1481 struct ctdb_transdb transdb;
1483 const char *db_name, *db_path;
1484 struct recdb_context *recdb;
1487 static void recover_db_name_done(struct tevent_req *subreq);
1488 static void recover_db_path_done(struct tevent_req *subreq);
1489 static void recover_db_freeze_done(struct tevent_req *subreq);
1490 static void recover_db_transaction_started(struct tevent_req *subreq);
1491 static void recover_db_collect_done(struct tevent_req *subreq);
1492 static void recover_db_wipedb_done(struct tevent_req *subreq);
1493 static void recover_db_pushdb_done(struct tevent_req *subreq);
1494 static void recover_db_transaction_committed(struct tevent_req *subreq);
1495 static void recover_db_thaw_done(struct tevent_req *subreq);
1497 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1498 struct tevent_context *ev,
1499 struct ctdb_client_context *client,
1500 struct ctdb_tunable_list *tun_list,
1501 uint32_t *pnn_list, int count,
1502 uint32_t *caps,
1503 uint32_t *ban_credits,
1504 uint32_t generation,
1505 uint32_t db_id, bool persistent)
1507 struct tevent_req *req, *subreq;
1508 struct recover_db_state *state;
1509 struct ctdb_req_control request;
1511 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1512 if (req == NULL) {
1513 return NULL;
1516 state->ev = ev;
1517 state->client = client;
1518 state->tun_list = tun_list;
1519 state->pnn_list = pnn_list;
1520 state->count = count;
1521 state->caps = caps;
1522 state->ban_credits = ban_credits;
1523 state->db_id = db_id;
1524 state->persistent = persistent;
1526 state->destnode = ctdb_client_pnn(client);
1527 state->transdb.db_id = db_id;
1528 state->transdb.tid = generation;
1530 ctdb_req_control_get_dbname(&request, db_id);
1531 subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1532 TIMEOUT(), &request);
1533 if (tevent_req_nomem(subreq, req)) {
1534 return tevent_req_post(req, ev);
1536 tevent_req_set_callback(subreq, recover_db_name_done, req);
1538 return req;
1541 static void recover_db_name_done(struct tevent_req *subreq)
1543 struct tevent_req *req = tevent_req_callback_data(
1544 subreq, struct tevent_req);
1545 struct recover_db_state *state = tevent_req_data(
1546 req, struct recover_db_state);
1547 struct ctdb_reply_control *reply;
1548 struct ctdb_req_control request;
1549 int ret;
1550 bool status;
1552 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1553 TALLOC_FREE(subreq);
1554 if (! status) {
1555 LOG("control GET_DBNAME failed for db=0x%x\n, ret=%d",
1556 state->db_id, ret);
1557 tevent_req_error(req, ret);
1558 return;
1561 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1562 if (ret != 0) {
1563 LOG("control GET_DBNAME failed for db=0x%x\n, ret=%d\n",
1564 state->db_id, ret);
1565 tevent_req_error(req, EPROTO);
1566 return;
1569 talloc_free(reply);
1571 ctdb_req_control_getdbpath(&request, state->db_id);
1572 subreq = ctdb_client_control_send(state, state->ev, state->client,
1573 state->destnode, TIMEOUT(),
1574 &request);
1575 if (tevent_req_nomem(subreq, req)) {
1576 return;
1578 tevent_req_set_callback(subreq, recover_db_path_done, req);
1581 static void recover_db_path_done(struct tevent_req *subreq)
1583 struct tevent_req *req = tevent_req_callback_data(
1584 subreq, struct tevent_req);
1585 struct recover_db_state *state = tevent_req_data(
1586 req, struct recover_db_state);
1587 struct ctdb_reply_control *reply;
1588 struct ctdb_req_control request;
1589 int ret;
1590 bool status;
1592 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1593 TALLOC_FREE(subreq);
1594 if (! status) {
1595 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1596 state->db_name, ret);
1597 tevent_req_error(req, ret);
1598 return;
1601 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1602 if (ret != 0) {
1603 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1604 state->db_name, ret);
1605 tevent_req_error(req, EPROTO);
1606 return;
1609 talloc_free(reply);
1611 ctdb_req_control_db_freeze(&request, state->db_id);
1612 subreq = ctdb_client_control_multi_send(state, state->ev,
1613 state->client,
1614 state->pnn_list, state->count,
1615 TIMEOUT(), &request);
1616 if (tevent_req_nomem(subreq, req)) {
1617 return;
1619 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1622 static void recover_db_freeze_done(struct tevent_req *subreq)
1624 struct tevent_req *req = tevent_req_callback_data(
1625 subreq, struct tevent_req);
1626 struct recover_db_state *state = tevent_req_data(
1627 req, struct recover_db_state);
1628 struct ctdb_req_control request;
1629 int *err_list;
1630 int ret;
1631 bool status;
1633 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1634 NULL);
1635 TALLOC_FREE(subreq);
1636 if (! status) {
1637 int ret2;
1638 uint32_t pnn;
1640 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1641 state->count, err_list,
1642 &pnn);
1643 if (ret2 != 0) {
1644 LOG("control FREEZE_DB failed for db %s on node %u,"
1645 " ret=%d\n", state->db_name, pnn, ret2);
1646 } else {
1647 LOG("control FREEZE_DB failed for db %s, ret=%d\n",
1648 state->db_name, ret);
1650 tevent_req_error(req, ret);
1651 return;
1654 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1655 subreq = ctdb_client_control_multi_send(state, state->ev,
1656 state->client,
1657 state->pnn_list, state->count,
1658 TIMEOUT(), &request);
1659 if (tevent_req_nomem(subreq, req)) {
1660 return;
1662 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1665 static void recover_db_transaction_started(struct tevent_req *subreq)
1667 struct tevent_req *req = tevent_req_callback_data(
1668 subreq, struct tevent_req);
1669 struct recover_db_state *state = tevent_req_data(
1670 req, struct recover_db_state);
1671 int *err_list;
1672 int ret;
1673 bool status;
1675 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1676 NULL);
1677 TALLOC_FREE(subreq);
1678 if (! status) {
1679 int ret2;
1680 uint32_t pnn;
1682 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1683 state->count,
1684 err_list, &pnn);
1685 if (ret2 != 0) {
1686 LOG("control TRANSACTION_DB failed for db=%s,"
1687 " ret=%d\n", state->db_name, pnn, ret2);
1688 } else {
1689 LOG("control TRANSACTION_DB failed for db=%s,"
1690 " ret=%d\n", state->db_name, ret);
1692 tevent_req_error(req, ret);
1693 return;
1696 state->recdb = recdb_create(state, state->db_id, state->db_name,
1697 state->db_path,
1698 state->tun_list->database_hash_size,
1699 state->persistent);
1700 if (tevent_req_nomem(state->recdb, req)) {
1701 return;
1704 if (state->persistent && state->tun_list->recover_pdb_by_seqnum != 0) {
1705 subreq = collect_highseqnum_db_send(
1706 state, state->ev, state->client,
1707 state->pnn_list, state->count, state->caps,
1708 state->ban_credits, state->db_id,
1709 state->recdb);
1710 } else {
1711 subreq = collect_all_db_send(
1712 state, state->ev, state->client,
1713 state->pnn_list, state->count, state->caps,
1714 state->ban_credits, state->db_id,
1715 state->recdb);
1717 if (tevent_req_nomem(subreq, req)) {
1718 return;
1720 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1723 static void recover_db_collect_done(struct tevent_req *subreq)
1725 struct tevent_req *req = tevent_req_callback_data(
1726 subreq, struct tevent_req);
1727 struct recover_db_state *state = tevent_req_data(
1728 req, struct recover_db_state);
1729 struct ctdb_req_control request;
1730 int ret;
1731 bool status;
1733 if (state->persistent && state->tun_list->recover_pdb_by_seqnum != 0) {
1734 status = collect_highseqnum_db_recv(subreq, &ret);
1735 } else {
1736 status = collect_all_db_recv(subreq, &ret);
1738 TALLOC_FREE(subreq);
1739 if (! status) {
1740 tevent_req_error(req, ret);
1741 return;
1744 ctdb_req_control_wipe_database(&request, &state->transdb);
1745 subreq = ctdb_client_control_multi_send(state, state->ev,
1746 state->client,
1747 state->pnn_list, state->count,
1748 TIMEOUT(), &request);
1749 if (tevent_req_nomem(subreq, req)) {
1750 return;
1752 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1755 static void recover_db_wipedb_done(struct tevent_req *subreq)
1757 struct tevent_req *req = tevent_req_callback_data(
1758 subreq, struct tevent_req);
1759 struct recover_db_state *state = tevent_req_data(
1760 req, struct recover_db_state);
1761 int *err_list;
1762 int ret;
1763 bool status;
1765 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1766 NULL);
1767 TALLOC_FREE(subreq);
1768 if (! status) {
1769 int ret2;
1770 uint32_t pnn;
1772 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1773 state->count,
1774 err_list, &pnn);
1775 if (ret2 != 0) {
1776 LOG("control WIPEDB failed for db %s on node %u,"
1777 " ret=%d\n", state->db_name, pnn, ret2);
1778 } else {
1779 LOG("control WIPEDB failed for db %s, ret=%d\n",
1780 state->db_name, pnn, ret);
1782 tevent_req_error(req, ret);
1783 return;
1786 subreq = push_database_send(state, state->ev, state->client,
1787 state->pnn_list, state->count,
1788 state->caps, state->tun_list,
1789 state->recdb);
1790 if (tevent_req_nomem(subreq, req)) {
1791 return;
1793 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1796 static void recover_db_pushdb_done(struct tevent_req *subreq)
1798 struct tevent_req *req = tevent_req_callback_data(
1799 subreq, struct tevent_req);
1800 struct recover_db_state *state = tevent_req_data(
1801 req, struct recover_db_state);
1802 struct ctdb_req_control request;
1803 int ret;
1804 bool status;
1806 status = push_database_recv(subreq, &ret);
1807 TALLOC_FREE(subreq);
1808 if (! status) {
1809 tevent_req_error(req, ret);
1810 return;
1813 TALLOC_FREE(state->recdb);
1815 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1816 subreq = ctdb_client_control_multi_send(state, state->ev,
1817 state->client,
1818 state->pnn_list, state->count,
1819 TIMEOUT(), &request);
1820 if (tevent_req_nomem(subreq, req)) {
1821 return;
1823 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1826 static void recover_db_transaction_committed(struct tevent_req *subreq)
1828 struct tevent_req *req = tevent_req_callback_data(
1829 subreq, struct tevent_req);
1830 struct recover_db_state *state = tevent_req_data(
1831 req, struct recover_db_state);
1832 struct ctdb_req_control request;
1833 int *err_list;
1834 int ret;
1835 bool status;
1837 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1838 NULL);
1839 TALLOC_FREE(subreq);
1840 if (! status) {
1841 int ret2;
1842 uint32_t pnn;
1844 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1845 state->count,
1846 err_list, &pnn);
1847 if (ret2 != 0) {
1848 LOG("control DB_TRANSACTION_COMMIT failed for db %s"
1849 " on node %u, ret=%d", state->db_name, pnn, ret2);
1850 } else {
1851 LOG("control DB_TRANSACTION_COMMIT failed for db %s\n,"
1852 " ret=%d", state->db_name, ret);
1854 tevent_req_error(req, ret);
1855 return;
1858 ctdb_req_control_db_thaw(&request, state->db_id);
1859 subreq = ctdb_client_control_multi_send(state, state->ev,
1860 state->client,
1861 state->pnn_list, state->count,
1862 TIMEOUT(), &request);
1863 if (tevent_req_nomem(subreq, req)) {
1864 return;
1866 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1869 static void recover_db_thaw_done(struct tevent_req *subreq)
1871 struct tevent_req *req = tevent_req_callback_data(
1872 subreq, struct tevent_req);
1873 struct recover_db_state *state = tevent_req_data(
1874 req, struct recover_db_state);
1875 int *err_list;
1876 int ret;
1877 bool status;
1879 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1880 NULL);
1881 TALLOC_FREE(subreq);
1882 if (! status) {
1883 int ret2;
1884 uint32_t pnn;
1886 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1887 state->count,
1888 err_list, &pnn);
1889 if (ret2 != 0) {
1890 LOG("control DB_THAW failed for db %s on node %u,"
1891 " ret=%d\n", state->db_name, pnn, ret2);
1892 } else {
1893 LOG("control DB_THAW failed for db %s, ret=%d\n",
1894 state->db_name, ret);
1896 tevent_req_error(req, ret);
1897 return;
1900 tevent_req_done(req);
1903 static bool recover_db_recv(struct tevent_req *req)
1905 return generic_recv(req, NULL);
1910 * Start database recovery for each database
1912 * Try to recover each database 5 times before failing recovery.
1915 struct db_recovery_state {
1916 struct tevent_context *ev;
1917 struct ctdb_dbid_map *dbmap;
1918 int num_replies;
1919 int num_failed;
1922 struct db_recovery_one_state {
1923 struct tevent_req *req;
1924 struct ctdb_client_context *client;
1925 struct ctdb_dbid_map *dbmap;
1926 struct ctdb_tunable_list *tun_list;
1927 uint32_t *pnn_list;
1928 int count;
1929 uint32_t *caps;
1930 uint32_t *ban_credits;
1931 uint32_t generation;
1932 uint32_t db_id;
1933 bool persistent;
1934 int num_fails;
1937 static void db_recovery_one_done(struct tevent_req *subreq);
1939 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1940 struct tevent_context *ev,
1941 struct ctdb_client_context *client,
1942 struct ctdb_dbid_map *dbmap,
1943 struct ctdb_tunable_list *tun_list,
1944 uint32_t *pnn_list, int count,
1945 uint32_t *caps,
1946 uint32_t *ban_credits,
1947 uint32_t generation)
1949 struct tevent_req *req, *subreq;
1950 struct db_recovery_state *state;
1951 int i;
1953 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1954 if (req == NULL) {
1955 return NULL;
1958 state->ev = ev;
1959 state->dbmap = dbmap;
1960 state->num_replies = 0;
1961 state->num_failed = 0;
1963 if (dbmap->num == 0) {
1964 tevent_req_done(req);
1965 return tevent_req_post(req, ev);
1968 for (i=0; i<dbmap->num; i++) {
1969 struct db_recovery_one_state *substate;
1971 substate = talloc_zero(state, struct db_recovery_one_state);
1972 if (tevent_req_nomem(substate, req)) {
1973 return tevent_req_post(req, ev);
1976 substate->req = req;
1977 substate->client = client;
1978 substate->dbmap = dbmap;
1979 substate->tun_list = tun_list;
1980 substate->pnn_list = pnn_list;
1981 substate->count = count;
1982 substate->caps = caps;
1983 substate->ban_credits = ban_credits;
1984 substate->generation = generation;
1985 substate->db_id = dbmap->dbs[i].db_id;
1986 substate->persistent = dbmap->dbs[i].flags &
1987 CTDB_DB_FLAGS_PERSISTENT;
1989 subreq = recover_db_send(state, ev, client, tun_list,
1990 pnn_list, count, caps, ban_credits,
1991 generation, substate->db_id,
1992 substate->persistent);
1993 if (tevent_req_nomem(subreq, req)) {
1994 return tevent_req_post(req, ev);
1996 tevent_req_set_callback(subreq, db_recovery_one_done,
1997 substate);
1998 LOG("recover database 0x%08x\n", substate->db_id);
2001 return req;
2004 static void db_recovery_one_done(struct tevent_req *subreq)
2006 struct db_recovery_one_state *substate = tevent_req_callback_data(
2007 subreq, struct db_recovery_one_state);
2008 struct tevent_req *req = substate->req;
2009 struct db_recovery_state *state = tevent_req_data(
2010 req, struct db_recovery_state);
2011 bool status;
2013 status = recover_db_recv(subreq);
2014 TALLOC_FREE(subreq);
2016 if (status) {
2017 talloc_free(substate);
2018 goto done;
2021 substate->num_fails += 1;
2022 if (substate->num_fails < NUM_RETRIES) {
2023 subreq = recover_db_send(state, state->ev, substate->client,
2024 substate->tun_list,
2025 substate->pnn_list, substate->count,
2026 substate->caps, substate->ban_credits,
2027 substate->generation, substate->db_id,
2028 substate->persistent);
2029 if (tevent_req_nomem(subreq, req)) {
2030 goto failed;
2032 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2033 LOG("recover database 0x%08x, attempt %d\n", substate->db_id,
2034 substate->num_fails+1);
2035 return;
2038 failed:
2039 state->num_failed += 1;
2041 done:
2042 state->num_replies += 1;
2044 if (state->num_replies == state->dbmap->num) {
2045 tevent_req_done(req);
2049 static bool db_recovery_recv(struct tevent_req *req, int *count)
2051 struct db_recovery_state *state = tevent_req_data(
2052 req, struct db_recovery_state);
2053 int err;
2055 if (tevent_req_is_unix_error(req, &err)) {
2056 *count = 0;
2057 return false;
2060 *count = state->num_replies - state->num_failed;
2062 if (state->num_failed > 0) {
2063 return false;
2066 return true;
2071 * Run the parallel database recovery
2073 * - Get tunables
2074 * - Get nodemap
2075 * - Get vnnmap
2076 * - Get capabilities from all nodes
2077 * - Get dbmap
2078 * - Set RECOVERY_ACTIVE
2079 * - Send START_RECOVERY
2080 * - Update vnnmap on all nodes
2081 * - Run database recovery
2082 * - Send END_RECOVERY
2083 * - Set RECOVERY_NORMAL
2086 struct recovery_state {
2087 struct tevent_context *ev;
2088 struct ctdb_client_context *client;
2089 uint32_t generation;
2090 uint32_t *pnn_list;
2091 int count;
2092 uint32_t destnode;
2093 struct ctdb_node_map *nodemap;
2094 uint32_t *caps;
2095 uint32_t *ban_credits;
2096 struct ctdb_tunable_list *tun_list;
2097 struct ctdb_vnn_map *vnnmap;
2098 struct ctdb_dbid_map *dbmap;
2101 static void recovery_tunables_done(struct tevent_req *subreq);
2102 static void recovery_nodemap_done(struct tevent_req *subreq);
2103 static void recovery_vnnmap_done(struct tevent_req *subreq);
2104 static void recovery_capabilities_done(struct tevent_req *subreq);
2105 static void recovery_dbmap_done(struct tevent_req *subreq);
2106 static void recovery_active_done(struct tevent_req *subreq);
2107 static void recovery_start_recovery_done(struct tevent_req *subreq);
2108 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2109 static void recovery_db_recovery_done(struct tevent_req *subreq);
2110 static void recovery_failed_done(struct tevent_req *subreq);
2111 static void recovery_normal_done(struct tevent_req *subreq);
2112 static void recovery_end_recovery_done(struct tevent_req *subreq);
2114 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2115 struct tevent_context *ev,
2116 struct ctdb_client_context *client,
2117 uint32_t generation)
2119 struct tevent_req *req, *subreq;
2120 struct recovery_state *state;
2121 struct ctdb_req_control request;
2123 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2124 if (req == NULL) {
2125 return NULL;
2128 state->ev = ev;
2129 state->client = client;
2130 state->generation = generation;
2131 state->destnode = ctdb_client_pnn(client);
2133 ctdb_req_control_get_all_tunables(&request);
2134 subreq = ctdb_client_control_send(state, state->ev, state->client,
2135 state->destnode, TIMEOUT(),
2136 &request);
2137 if (tevent_req_nomem(subreq, req)) {
2138 return tevent_req_post(req, ev);
2140 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2142 return req;
2145 static void recovery_tunables_done(struct tevent_req *subreq)
2147 struct tevent_req *req = tevent_req_callback_data(
2148 subreq, struct tevent_req);
2149 struct recovery_state *state = tevent_req_data(
2150 req, struct recovery_state);
2151 struct ctdb_reply_control *reply;
2152 struct ctdb_req_control request;
2153 int ret;
2154 bool status;
2156 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2157 TALLOC_FREE(subreq);
2158 if (! status) {
2159 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2160 tevent_req_error(req, ret);
2161 return;
2164 ret = ctdb_reply_control_get_all_tunables(reply, state,
2165 &state->tun_list);
2166 if (ret != 0) {
2167 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2168 tevent_req_error(req, EPROTO);
2169 return;
2172 talloc_free(reply);
2174 recover_timeout = state->tun_list->recover_timeout;
2176 ctdb_req_control_get_nodemap(&request);
2177 subreq = ctdb_client_control_send(state, state->ev, state->client,
2178 state->destnode, TIMEOUT(),
2179 &request);
2180 if (tevent_req_nomem(subreq, req)) {
2181 return;
2183 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2186 static void recovery_nodemap_done(struct tevent_req *subreq)
2188 struct tevent_req *req = tevent_req_callback_data(
2189 subreq, struct tevent_req);
2190 struct recovery_state *state = tevent_req_data(
2191 req, struct recovery_state);
2192 struct ctdb_reply_control *reply;
2193 struct ctdb_req_control request;
2194 bool status;
2195 int ret;
2197 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2198 TALLOC_FREE(subreq);
2199 if (! status) {
2200 LOG("control GET_NODEMAP failed to node %u, ret=%d\n",
2201 state->destnode, ret);
2202 tevent_req_error(req, ret);
2203 return;
2206 ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2207 if (ret != 0) {
2208 LOG("control GET_NODEMAP failed, ret=%d\n", ret);
2209 tevent_req_error(req, ret);
2210 return;
2213 state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2214 state, &state->pnn_list);
2215 if (state->count <= 0) {
2216 tevent_req_error(req, ENOMEM);
2217 return;
2220 state->ban_credits = talloc_zero_array(state, uint32_t,
2221 state->nodemap->num);
2222 if (tevent_req_nomem(state->ban_credits, req)) {
2223 return;
2226 ctdb_req_control_getvnnmap(&request);
2227 subreq = ctdb_client_control_send(state, state->ev, state->client,
2228 state->destnode, TIMEOUT(),
2229 &request);
2230 if (tevent_req_nomem(subreq, req)) {
2231 return;
2233 tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2236 static void recovery_vnnmap_done(struct tevent_req *subreq)
2238 struct tevent_req *req = tevent_req_callback_data(
2239 subreq, struct tevent_req);
2240 struct recovery_state *state = tevent_req_data(
2241 req, struct recovery_state);
2242 struct ctdb_reply_control *reply;
2243 struct ctdb_req_control request;
2244 bool status;
2245 int ret;
2247 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2248 TALLOC_FREE(subreq);
2249 if (! status) {
2250 LOG("control GETVNNMAP failed to node %u, ret=%d\n",
2251 state->destnode, ret);
2252 tevent_req_error(req, ret);
2253 return;
2256 ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2257 if (ret != 0) {
2258 LOG("control GETVNNMAP failed, ret=%d\n", ret);
2259 tevent_req_error(req, ret);
2260 return;
2263 ctdb_req_control_get_capabilities(&request);
2264 subreq = ctdb_client_control_multi_send(state, state->ev,
2265 state->client,
2266 state->pnn_list, state->count,
2267 TIMEOUT(), &request);
2268 if (tevent_req_nomem(subreq, req)) {
2269 return;
2271 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2274 static void recovery_capabilities_done(struct tevent_req *subreq)
2276 struct tevent_req *req = tevent_req_callback_data(
2277 subreq, struct tevent_req);
2278 struct recovery_state *state = tevent_req_data(
2279 req, struct recovery_state);
2280 struct ctdb_reply_control **reply;
2281 struct ctdb_req_control request;
2282 int *err_list;
2283 int ret, i;
2284 bool status;
2286 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2287 &reply);
2288 TALLOC_FREE(subreq);
2289 if (! status) {
2290 int ret2;
2291 uint32_t pnn;
2293 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2294 state->count,
2295 err_list, &pnn);
2296 if (ret2 != 0) {
2297 LOG("control GET_CAPABILITIES failed on node %u,"
2298 " ret=%d\n", pnn, ret2);
2299 } else {
2300 LOG("control GET_CAPABILITIES failed, ret=%d\n", ret);
2302 tevent_req_error(req, ret);
2303 return;
2306 /* Make the array size same as nodemap */
2307 state->caps = talloc_zero_array(state, uint32_t,
2308 state->nodemap->num);
2309 if (tevent_req_nomem(state->caps, req)) {
2310 return;
2313 for (i=0; i<state->count; i++) {
2314 uint32_t pnn;
2316 pnn = state->pnn_list[i];
2317 ret = ctdb_reply_control_get_capabilities(reply[i],
2318 &state->caps[pnn]);
2319 if (ret != 0) {
2320 LOG("control GET_CAPABILITIES failed on node %u\n", pnn);
2321 tevent_req_error(req, EPROTO);
2322 return;
2326 talloc_free(reply);
2328 ctdb_req_control_get_dbmap(&request);
2329 subreq = ctdb_client_control_send(state, state->ev, state->client,
2330 state->destnode, TIMEOUT(),
2331 &request);
2332 if (tevent_req_nomem(subreq, req)) {
2333 return;
2335 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2338 static void recovery_dbmap_done(struct tevent_req *subreq)
2340 struct tevent_req *req = tevent_req_callback_data(
2341 subreq, struct tevent_req);
2342 struct recovery_state *state = tevent_req_data(
2343 req, struct recovery_state);
2344 struct ctdb_reply_control *reply;
2345 struct ctdb_req_control request;
2346 int ret;
2347 bool status;
2349 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2350 TALLOC_FREE(subreq);
2351 if (! status) {
2352 LOG("control GET_DBMAP failed to node %u, ret=%d\n",
2353 state->destnode, ret);
2354 tevent_req_error(req, ret);
2355 return;
2358 ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2359 if (ret != 0) {
2360 LOG("control GET_DBMAP failed, ret=%d\n", ret);
2361 tevent_req_error(req, ret);
2362 return;
2365 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2366 subreq = ctdb_client_control_multi_send(state, state->ev,
2367 state->client,
2368 state->pnn_list, state->count,
2369 TIMEOUT(), &request);
2370 if (tevent_req_nomem(subreq, req)) {
2371 return;
2373 tevent_req_set_callback(subreq, recovery_active_done, req);
2376 static void recovery_active_done(struct tevent_req *subreq)
2378 struct tevent_req *req = tevent_req_callback_data(
2379 subreq, struct tevent_req);
2380 struct recovery_state *state = tevent_req_data(
2381 req, struct recovery_state);
2382 struct ctdb_req_control request;
2383 struct ctdb_vnn_map *vnnmap;
2384 int *err_list;
2385 int ret, count, i;
2386 bool status;
2388 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2389 NULL);
2390 TALLOC_FREE(subreq);
2391 if (! status) {
2392 int ret2;
2393 uint32_t pnn;
2395 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2396 state->count,
2397 err_list, &pnn);
2398 if (ret2 != 0) {
2399 LOG("failed to set recovery mode to ACTIVE on node %u,"
2400 " ret=%d\n", pnn, ret2);
2401 } else {
2402 LOG("failed to set recovery mode to ACTIVE, ret=%d\n",
2403 ret);
2405 tevent_req_error(req, ret);
2406 return;
2409 LOG("set recovery mode to ACTIVE\n");
2411 /* Calculate new VNNMAP */
2412 count = 0;
2413 for (i=0; i<state->nodemap->num; i++) {
2414 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2415 continue;
2417 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2418 continue;
2420 count += 1;
2423 if (count == 0) {
2424 LOG("no active lmasters found. Adding recmaster anyway\n");
2427 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2428 if (tevent_req_nomem(vnnmap, req)) {
2429 return;
2432 vnnmap->size = (count == 0 ? 1 : count);
2433 vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2434 if (tevent_req_nomem(vnnmap->map, req)) {
2435 return;
2438 if (count == 0) {
2439 vnnmap->map[0] = state->destnode;
2440 } else {
2441 count = 0;
2442 for (i=0; i<state->nodemap->num; i++) {
2443 if (state->nodemap->node[i].flags &
2444 NODE_FLAGS_INACTIVE) {
2445 continue;
2447 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2448 continue;
2451 vnnmap->map[count] = state->nodemap->node[i].pnn;
2452 count += 1;
2456 vnnmap->generation = state->generation;
2458 talloc_free(state->vnnmap);
2459 state->vnnmap = vnnmap;
2461 ctdb_req_control_start_recovery(&request);
2462 subreq = ctdb_client_control_multi_send(state, state->ev,
2463 state->client,
2464 state->pnn_list, state->count,
2465 TIMEOUT(), &request);
2466 if (tevent_req_nomem(subreq, req)) {
2467 return;
2469 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2472 static void recovery_start_recovery_done(struct tevent_req *subreq)
2474 struct tevent_req *req = tevent_req_callback_data(
2475 subreq, struct tevent_req);
2476 struct recovery_state *state = tevent_req_data(
2477 req, struct recovery_state);
2478 struct ctdb_req_control request;
2479 int *err_list;
2480 int ret;
2481 bool status;
2483 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2484 NULL);
2485 TALLOC_FREE(subreq);
2486 if (! status) {
2487 int ret2;
2488 uint32_t pnn;
2490 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2491 state->count,
2492 err_list, &pnn);
2493 if (ret2 != 0) {
2494 LOG("failed to run start_recovery event on node %u,"
2495 " ret=%d\n", pnn, ret2);
2496 } else {
2497 LOG("failed to run start_recovery event, ret=%d\n",
2498 ret);
2500 tevent_req_error(req, ret);
2501 return;
2504 LOG("start_recovery event finished\n");
2506 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2507 subreq = ctdb_client_control_multi_send(state, state->ev,
2508 state->client,
2509 state->pnn_list, state->count,
2510 TIMEOUT(), &request);
2511 if (tevent_req_nomem(subreq, req)) {
2512 return;
2514 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2517 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2519 struct tevent_req *req = tevent_req_callback_data(
2520 subreq, struct tevent_req);
2521 struct recovery_state *state = tevent_req_data(
2522 req, struct recovery_state);
2523 int *err_list;
2524 int ret;
2525 bool status;
2527 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2528 NULL);
2529 TALLOC_FREE(subreq);
2530 if (! status) {
2531 int ret2;
2532 uint32_t pnn;
2534 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2535 state->count,
2536 err_list, &pnn);
2537 if (ret2 != 0) {
2538 LOG("failed to update VNNMAP on node %u, ret=%d\n",
2539 pnn, ret2);
2540 } else {
2541 LOG("failed to update VNNMAP, ret=%d\n", ret);
2543 tevent_req_error(req, ret);
2544 return;
2547 LOG("updated VNNMAP\n");
2549 subreq = db_recovery_send(state, state->ev, state->client,
2550 state->dbmap, state->tun_list,
2551 state->pnn_list, state->count,
2552 state->caps, state->ban_credits,
2553 state->vnnmap->generation);
2554 if (tevent_req_nomem(subreq, req)) {
2555 return;
2557 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2560 static void recovery_db_recovery_done(struct tevent_req *subreq)
2562 struct tevent_req *req = tevent_req_callback_data(
2563 subreq, struct tevent_req);
2564 struct recovery_state *state = tevent_req_data(
2565 req, struct recovery_state);
2566 struct ctdb_req_control request;
2567 bool status;
2568 int count;
2570 status = db_recovery_recv(subreq, &count);
2571 TALLOC_FREE(subreq);
2573 LOG("%d of %d databases recovered\n", count, state->dbmap->num);
2575 if (! status) {
2576 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2577 int i;
2579 /* Bans are not enabled */
2580 if (state->tun_list->enable_bans == 0) {
2581 tevent_req_error(req, EIO);
2582 return;
2585 for (i=0; i<state->count; i++) {
2586 uint32_t pnn;
2587 pnn = state->pnn_list[i];
2588 if (state->ban_credits[pnn] > max_credits) {
2589 max_pnn = pnn;
2590 max_credits = state->ban_credits[pnn];
2594 /* If pulling database fails multiple times */
2595 if (max_credits >= NUM_RETRIES) {
2596 struct ctdb_req_message message;
2598 LOG("Assigning banning credits to node %u\n", max_pnn);
2600 message.srvid = CTDB_SRVID_BANNING;
2601 message.data.pnn = max_pnn;
2603 subreq = ctdb_client_message_send(
2604 state, state->ev, state->client,
2605 ctdb_client_pnn(state->client),
2606 &message);
2607 if (tevent_req_nomem(subreq, req)) {
2608 return;
2610 tevent_req_set_callback(subreq, recovery_failed_done,
2611 req);
2612 } else {
2613 tevent_req_error(req, EIO);
2615 return;
2618 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2619 subreq = ctdb_client_control_multi_send(state, state->ev,
2620 state->client,
2621 state->pnn_list, state->count,
2622 TIMEOUT(), &request);
2623 if (tevent_req_nomem(subreq, req)) {
2624 return;
2626 tevent_req_set_callback(subreq, recovery_normal_done, req);
2629 static void recovery_failed_done(struct tevent_req *subreq)
2631 struct tevent_req *req = tevent_req_callback_data(
2632 subreq, struct tevent_req);
2633 int ret;
2634 bool status;
2636 status = ctdb_client_message_recv(subreq, &ret);
2637 TALLOC_FREE(subreq);
2638 if (! status) {
2639 LOG("failed to assign banning credits, ret=%d\n", ret);
2642 tevent_req_error(req, EIO);
2645 static void recovery_normal_done(struct tevent_req *subreq)
2647 struct tevent_req *req = tevent_req_callback_data(
2648 subreq, struct tevent_req);
2649 struct recovery_state *state = tevent_req_data(
2650 req, struct recovery_state);
2651 struct ctdb_req_control request;
2652 int *err_list;
2653 int ret;
2654 bool status;
2656 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2657 NULL);
2658 TALLOC_FREE(subreq);
2659 if (! status) {
2660 int ret2;
2661 uint32_t pnn;
2663 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2664 state->count,
2665 err_list, &pnn);
2666 if (ret2 != 0) {
2667 LOG("failed to set recovery mode to NORMAL on node %u,"
2668 " ret=%d\n", pnn, ret2);
2669 } else {
2670 LOG("failed to set recovery mode to NORMAL, ret=%d\n",
2671 ret);
2673 tevent_req_error(req, ret);
2674 return;
2677 LOG("set recovery mode to NORMAL\n");
2679 ctdb_req_control_end_recovery(&request);
2680 subreq = ctdb_client_control_multi_send(state, state->ev,
2681 state->client,
2682 state->pnn_list, state->count,
2683 TIMEOUT(), &request);
2684 if (tevent_req_nomem(subreq, req)) {
2685 return;
2687 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2690 static void recovery_end_recovery_done(struct tevent_req *subreq)
2692 struct tevent_req *req = tevent_req_callback_data(
2693 subreq, struct tevent_req);
2694 struct recovery_state *state = tevent_req_data(
2695 req, struct recovery_state);
2696 int *err_list;
2697 int ret;
2698 bool status;
2700 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2701 NULL);
2702 TALLOC_FREE(subreq);
2703 if (! status) {
2704 int ret2;
2705 uint32_t pnn;
2707 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2708 state->count,
2709 err_list, &pnn);
2710 if (ret2 != 0) {
2711 LOG("failed to run recovered event on node %u,"
2712 " ret=%d\n", pnn, ret2);
2713 } else {
2714 LOG("failed to run recovered event, ret=%d\n", ret);
2716 tevent_req_error(req, ret);
2717 return;
2720 LOG("recovered event finished\n");
2722 tevent_req_done(req);
2725 static void recovery_recv(struct tevent_req *req, int *perr)
2727 generic_recv(req, perr);
2730 static void usage(const char *progname)
2732 fprintf(stderr, "\nUsage: %s <log-fd> <output-fd> <ctdb-socket-path> <generation>\n",
2733 progname);
2738 * Arguments - log fd, write fd, socket path, generation
2740 int main(int argc, char *argv[])
2742 int log_fd, write_fd;
2743 const char *sockpath;
2744 TALLOC_CTX *mem_ctx;
2745 struct tevent_context *ev;
2746 struct ctdb_client_context *client;
2747 int ret;
2748 struct tevent_req *req;
2749 uint32_t generation;
2751 if (argc != 5) {
2752 usage(argv[0]);
2753 exit(1);
2756 log_fd = atoi(argv[1]);
2757 if (log_fd != STDOUT_FILENO && log_fd != STDERR_FILENO) {
2758 close(STDOUT_FILENO);
2759 close(STDERR_FILENO);
2760 dup2(log_fd, STDOUT_FILENO);
2761 dup2(log_fd, STDERR_FILENO);
2763 close(log_fd);
2765 write_fd = atoi(argv[2]);
2766 sockpath = argv[3];
2767 generation = (uint32_t)strtoul(argv[4], NULL, 0);
2769 mem_ctx = talloc_new(NULL);
2770 if (mem_ctx == NULL) {
2771 LOG("talloc_new() failed\n");
2772 goto failed;
2775 ev = tevent_context_init(mem_ctx);
2776 if (ev == NULL) {
2777 LOG("tevent_context_init() failed\n");
2778 goto failed;
2781 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2782 if (ret != 0) {
2783 LOG("ctdb_client_init() failed, ret=%d\n", ret);
2784 goto failed;
2787 req = recovery_send(mem_ctx, ev, client, generation);
2788 if (req == NULL) {
2789 LOG("database_recover_send() failed\n");
2790 goto failed;
2793 if (! tevent_req_poll(req, ev)) {
2794 LOG("tevent_req_poll() failed\n");
2795 goto failed;
2798 recovery_recv(req, &ret);
2799 TALLOC_FREE(req);
2800 if (ret != 0) {
2801 LOG("database recovery failed, ret=%d\n", ret);
2802 goto failed;
2805 sys_write(write_fd, &ret, sizeof(ret));
2806 return 0;
2808 failed:
2809 talloc_free(mem_ctx);
2810 return 1;