ldb: Release ldb 1.3.0
[Samba.git] / ctdb / server / ctdb_recovery_helper.c
blobab9ab4152b099f331c62e32b73af3143e38a0962
1 /*
2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client.h"
38 #include "common/logging.h"
40 static int recover_timeout = 30;
42 #define NUM_RETRIES 3
44 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
47 * Utility functions
50 static bool generic_recv(struct tevent_req *req, int *perr)
52 int err;
54 if (tevent_req_is_unix_error(req, &err)) {
55 if (perr != NULL) {
56 *perr = err;
58 return false;
61 return true;
64 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
66 static uint64_t srvid_next(void)
68 rec_srvid += 1;
69 return rec_srvid;
73 * Recovery database functions
76 struct recdb_context {
77 uint32_t db_id;
78 const char *db_name;
79 const char *db_path;
80 struct tdb_wrap *db;
81 bool persistent;
84 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
85 const char *db_name,
86 const char *db_path,
87 uint32_t hash_size, bool persistent)
89 static char *db_dir_state = NULL;
90 struct recdb_context *recdb;
91 unsigned int tdb_flags;
93 recdb = talloc(mem_ctx, struct recdb_context);
94 if (recdb == NULL) {
95 return NULL;
98 if (db_dir_state == NULL) {
99 db_dir_state = getenv("CTDB_DBDIR_STATE");
102 recdb->db_name = db_name;
103 recdb->db_id = db_id;
104 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
105 db_dir_state != NULL ?
106 db_dir_state :
107 dirname(discard_const(db_path)),
108 db_name);
109 if (recdb->db_path == NULL) {
110 talloc_free(recdb);
111 return NULL;
113 unlink(recdb->db_path);
115 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
116 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
117 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
118 if (recdb->db == NULL) {
119 talloc_free(recdb);
120 D_ERR("failed to create recovery db %s\n", recdb->db_path);
121 return NULL;
124 recdb->persistent = persistent;
126 return recdb;
129 static uint32_t recdb_id(struct recdb_context *recdb)
131 return recdb->db_id;
134 static const char *recdb_name(struct recdb_context *recdb)
136 return recdb->db_name;
139 static const char *recdb_path(struct recdb_context *recdb)
141 return recdb->db_path;
144 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
146 return recdb->db->tdb;
149 static bool recdb_persistent(struct recdb_context *recdb)
151 return recdb->persistent;
154 struct recdb_add_traverse_state {
155 struct recdb_context *recdb;
156 int mypnn;
159 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
160 TDB_DATA key, TDB_DATA data,
161 void *private_data)
163 struct recdb_add_traverse_state *state =
164 (struct recdb_add_traverse_state *)private_data;
165 struct ctdb_ltdb_header *hdr;
166 TDB_DATA prev_data;
167 int ret;
169 /* header is not marshalled separately in the pulldb control */
170 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
171 return -1;
174 hdr = (struct ctdb_ltdb_header *)data.dptr;
176 /* fetch the existing record, if any */
177 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
179 if (prev_data.dptr != NULL) {
180 struct ctdb_ltdb_header prev_hdr;
182 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
183 free(prev_data.dptr);
184 if (hdr->rsn < prev_hdr.rsn ||
185 (hdr->rsn == prev_hdr.rsn &&
186 prev_hdr.dmaster != state->mypnn)) {
187 return 0;
191 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
192 if (ret != 0) {
193 return -1;
195 return 0;
198 static bool recdb_add(struct recdb_context *recdb, int mypnn,
199 struct ctdb_rec_buffer *recbuf)
201 struct recdb_add_traverse_state state;
202 int ret;
204 state.recdb = recdb;
205 state.mypnn = mypnn;
207 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
208 if (ret != 0) {
209 return false;
212 return true;
215 /* This function decides which records from recdb are retained */
216 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
217 uint32_t reqid, uint32_t dmaster,
218 TDB_DATA key, TDB_DATA data)
220 struct ctdb_ltdb_header *header;
221 int ret;
223 /* Skip empty records */
224 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
225 return 0;
228 /* update the dmaster field to point to us */
229 header = (struct ctdb_ltdb_header *)data.dptr;
230 if (!persistent) {
231 header->dmaster = dmaster;
232 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
235 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
236 if (ret != 0) {
237 return ret;
240 return 0;
243 struct recdb_records_traverse_state {
244 struct ctdb_rec_buffer *recbuf;
245 uint32_t dmaster;
246 uint32_t reqid;
247 bool persistent;
248 bool failed;
251 static int recdb_records_traverse(struct tdb_context *tdb,
252 TDB_DATA key, TDB_DATA data,
253 void *private_data)
255 struct recdb_records_traverse_state *state =
256 (struct recdb_records_traverse_state *)private_data;
257 int ret;
259 ret = recbuf_filter_add(state->recbuf, state->persistent,
260 state->reqid, state->dmaster, key, data);
261 if (ret != 0) {
262 state->failed = true;
263 return ret;
266 return 0;
269 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
270 TALLOC_CTX *mem_ctx,
271 uint32_t dmaster)
273 struct recdb_records_traverse_state state;
274 int ret;
276 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
277 if (state.recbuf == NULL) {
278 return NULL;
280 state.dmaster = dmaster;
281 state.reqid = 0;
282 state.persistent = recdb_persistent(recdb);
283 state.failed = false;
285 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
286 &state);
287 if (ret == -1 || state.failed) {
288 D_ERR("Failed to marshall recovery records for %s\n",
289 recdb_name(recdb));
290 TALLOC_FREE(state.recbuf);
291 return NULL;
294 return state.recbuf;
297 struct recdb_file_traverse_state {
298 struct ctdb_rec_buffer *recbuf;
299 struct recdb_context *recdb;
300 TALLOC_CTX *mem_ctx;
301 uint32_t dmaster;
302 uint32_t reqid;
303 bool persistent;
304 bool failed;
305 int fd;
306 int max_size;
307 int num_buffers;
310 static int recdb_file_traverse(struct tdb_context *tdb,
311 TDB_DATA key, TDB_DATA data,
312 void *private_data)
314 struct recdb_file_traverse_state *state =
315 (struct recdb_file_traverse_state *)private_data;
316 int ret;
318 ret = recbuf_filter_add(state->recbuf, state->persistent,
319 state->reqid, state->dmaster, key, data);
320 if (ret != 0) {
321 state->failed = true;
322 return ret;
325 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
326 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
327 if (ret != 0) {
328 D_ERR("Failed to collect recovery records for %s\n",
329 recdb_name(state->recdb));
330 state->failed = true;
331 return ret;
334 state->num_buffers += 1;
336 TALLOC_FREE(state->recbuf);
337 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
338 recdb_id(state->recdb));
339 if (state->recbuf == NULL) {
340 state->failed = true;
341 return ENOMEM;
345 return 0;
348 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
349 uint32_t dmaster, int fd, int max_size)
351 struct recdb_file_traverse_state state;
352 int ret;
354 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
355 if (state.recbuf == NULL) {
356 return -1;
358 state.recdb = recdb;
359 state.mem_ctx = mem_ctx;
360 state.dmaster = dmaster;
361 state.reqid = 0;
362 state.persistent = recdb_persistent(recdb);
363 state.failed = false;
364 state.fd = fd;
365 state.max_size = max_size;
366 state.num_buffers = 0;
368 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
369 if (ret == -1 || state.failed) {
370 TALLOC_FREE(state.recbuf);
371 return -1;
374 ret = ctdb_rec_buffer_write(state.recbuf, fd);
375 if (ret != 0) {
376 D_ERR("Failed to collect recovery records for %s\n",
377 recdb_name(recdb));
378 TALLOC_FREE(state.recbuf);
379 return -1;
381 state.num_buffers += 1;
383 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
384 state.num_buffers, recdb_name(recdb));
386 return state.num_buffers;
390 * Pull database from a single node
393 struct pull_database_state {
394 struct tevent_context *ev;
395 struct ctdb_client_context *client;
396 struct recdb_context *recdb;
397 uint32_t pnn;
398 uint64_t srvid;
399 int num_records;
402 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
403 void *private_data);
404 static void pull_database_register_done(struct tevent_req *subreq);
405 static void pull_database_old_done(struct tevent_req *subreq);
406 static void pull_database_unregister_done(struct tevent_req *subreq);
407 static void pull_database_new_done(struct tevent_req *subreq);
409 static struct tevent_req *pull_database_send(
410 TALLOC_CTX *mem_ctx,
411 struct tevent_context *ev,
412 struct ctdb_client_context *client,
413 uint32_t pnn, uint32_t caps,
414 struct recdb_context *recdb)
416 struct tevent_req *req, *subreq;
417 struct pull_database_state *state;
418 struct ctdb_req_control request;
420 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
421 if (req == NULL) {
422 return NULL;
425 state->ev = ev;
426 state->client = client;
427 state->recdb = recdb;
428 state->pnn = pnn;
429 state->srvid = srvid_next();
431 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
432 subreq = ctdb_client_set_message_handler_send(
433 state, state->ev, state->client,
434 state->srvid, pull_database_handler,
435 req);
436 if (tevent_req_nomem(subreq, req)) {
437 return tevent_req_post(req, ev);
440 tevent_req_set_callback(subreq, pull_database_register_done,
441 req);
443 } else {
444 struct ctdb_pulldb pulldb;
446 pulldb.db_id = recdb_id(recdb);
447 pulldb.lmaster = CTDB_LMASTER_ANY;
449 ctdb_req_control_pull_db(&request, &pulldb);
450 subreq = ctdb_client_control_send(state, state->ev,
451 state->client,
452 pnn, TIMEOUT(),
453 &request);
454 if (tevent_req_nomem(subreq, req)) {
455 return tevent_req_post(req, ev);
457 tevent_req_set_callback(subreq, pull_database_old_done, req);
460 return req;
463 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
464 void *private_data)
466 struct tevent_req *req = talloc_get_type_abort(
467 private_data, struct tevent_req);
468 struct pull_database_state *state = tevent_req_data(
469 req, struct pull_database_state);
470 struct ctdb_rec_buffer *recbuf;
471 size_t np;
472 int ret;
473 bool status;
475 if (srvid != state->srvid) {
476 return;
479 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
480 if (ret != 0) {
481 D_ERR("Invalid data received for DB_PULL messages\n");
482 return;
485 if (recbuf->db_id != recdb_id(state->recdb)) {
486 talloc_free(recbuf);
487 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
488 recbuf->db_id, recdb_name(state->recdb));
489 return;
492 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
493 recbuf);
494 if (! status) {
495 talloc_free(recbuf);
496 D_ERR("Failed to add records to recdb for %s\n",
497 recdb_name(state->recdb));
498 return;
501 state->num_records += recbuf->count;
502 talloc_free(recbuf);
505 static void pull_database_register_done(struct tevent_req *subreq)
507 struct tevent_req *req = tevent_req_callback_data(
508 subreq, struct tevent_req);
509 struct pull_database_state *state = tevent_req_data(
510 req, struct pull_database_state);
511 struct ctdb_req_control request;
512 struct ctdb_pulldb_ext pulldb_ext;
513 int ret;
514 bool status;
516 status = ctdb_client_set_message_handler_recv(subreq, &ret);
517 TALLOC_FREE(subreq);
518 if (! status) {
519 D_ERR("Failed to set message handler for DB_PULL for %s\n",
520 recdb_name(state->recdb));
521 tevent_req_error(req, ret);
522 return;
525 pulldb_ext.db_id = recdb_id(state->recdb);
526 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
527 pulldb_ext.srvid = state->srvid;
529 ctdb_req_control_db_pull(&request, &pulldb_ext);
530 subreq = ctdb_client_control_send(state, state->ev, state->client,
531 state->pnn, TIMEOUT(), &request);
532 if (tevent_req_nomem(subreq, req)) {
533 return;
535 tevent_req_set_callback(subreq, pull_database_new_done, req);
538 static void pull_database_old_done(struct tevent_req *subreq)
540 struct tevent_req *req = tevent_req_callback_data(
541 subreq, struct tevent_req);
542 struct pull_database_state *state = tevent_req_data(
543 req, struct pull_database_state);
544 struct ctdb_reply_control *reply;
545 struct ctdb_rec_buffer *recbuf;
546 int ret;
547 bool status;
549 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
550 TALLOC_FREE(subreq);
551 if (! status) {
552 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
553 recdb_name(state->recdb), state->pnn, ret);
554 tevent_req_error(req, ret);
555 return;
558 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
559 talloc_free(reply);
560 if (ret != 0) {
561 tevent_req_error(req, ret);
562 return;
565 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
566 recbuf);
567 if (! status) {
568 talloc_free(recbuf);
569 tevent_req_error(req, EIO);
570 return;
573 state->num_records = recbuf->count;
574 talloc_free(recbuf);
576 D_INFO("Pulled %d records for db %s from node %d\n",
577 state->num_records, recdb_name(state->recdb), state->pnn);
579 tevent_req_done(req);
582 static void pull_database_new_done(struct tevent_req *subreq)
584 struct tevent_req *req = tevent_req_callback_data(
585 subreq, struct tevent_req);
586 struct pull_database_state *state = tevent_req_data(
587 req, struct pull_database_state);
588 struct ctdb_reply_control *reply;
589 uint32_t num_records;
590 int ret;
591 bool status;
593 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
594 TALLOC_FREE(subreq);
595 if (! status) {
596 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
597 recdb_name(state->recdb), state->pnn, ret);
598 tevent_req_error(req, ret);
599 return;
602 ret = ctdb_reply_control_db_pull(reply, &num_records);
603 talloc_free(reply);
604 if (num_records != state->num_records) {
605 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
606 num_records, state->num_records,
607 recdb_name(state->recdb));
608 tevent_req_error(req, EIO);
609 return;
612 D_INFO("Pulled %d records for db %s from node %d\n",
613 state->num_records, recdb_name(state->recdb), state->pnn);
615 subreq = ctdb_client_remove_message_handler_send(
616 state, state->ev, state->client,
617 state->srvid, req);
618 if (tevent_req_nomem(subreq, req)) {
619 return;
621 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
624 static void pull_database_unregister_done(struct tevent_req *subreq)
626 struct tevent_req *req = tevent_req_callback_data(
627 subreq, struct tevent_req);
628 struct pull_database_state *state = tevent_req_data(
629 req, struct pull_database_state);
630 int ret;
631 bool status;
633 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
634 TALLOC_FREE(subreq);
635 if (! status) {
636 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
637 recdb_name(state->recdb));
638 tevent_req_error(req, ret);
639 return;
642 tevent_req_done(req);
645 static bool pull_database_recv(struct tevent_req *req, int *perr)
647 return generic_recv(req, perr);
651 * Push database to specified nodes (old style)
654 struct push_database_old_state {
655 struct tevent_context *ev;
656 struct ctdb_client_context *client;
657 struct recdb_context *recdb;
658 uint32_t *pnn_list;
659 int count;
660 struct ctdb_rec_buffer *recbuf;
661 int index;
664 static void push_database_old_push_done(struct tevent_req *subreq);
666 static struct tevent_req *push_database_old_send(
667 TALLOC_CTX *mem_ctx,
668 struct tevent_context *ev,
669 struct ctdb_client_context *client,
670 uint32_t *pnn_list, int count,
671 struct recdb_context *recdb)
673 struct tevent_req *req, *subreq;
674 struct push_database_old_state *state;
675 struct ctdb_req_control request;
676 uint32_t pnn;
678 req = tevent_req_create(mem_ctx, &state,
679 struct push_database_old_state);
680 if (req == NULL) {
681 return NULL;
684 state->ev = ev;
685 state->client = client;
686 state->recdb = recdb;
687 state->pnn_list = pnn_list;
688 state->count = count;
689 state->index = 0;
691 state->recbuf = recdb_records(recdb, state,
692 ctdb_client_pnn(client));
693 if (tevent_req_nomem(state->recbuf, req)) {
694 return tevent_req_post(req, ev);
697 pnn = state->pnn_list[state->index];
699 ctdb_req_control_push_db(&request, state->recbuf);
700 subreq = ctdb_client_control_send(state, ev, client, pnn,
701 TIMEOUT(), &request);
702 if (tevent_req_nomem(subreq, req)) {
703 return tevent_req_post(req, ev);
705 tevent_req_set_callback(subreq, push_database_old_push_done, req);
707 return req;
710 static void push_database_old_push_done(struct tevent_req *subreq)
712 struct tevent_req *req = tevent_req_callback_data(
713 subreq, struct tevent_req);
714 struct push_database_old_state *state = tevent_req_data(
715 req, struct push_database_old_state);
716 struct ctdb_req_control request;
717 uint32_t pnn;
718 int ret;
719 bool status;
721 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
722 TALLOC_FREE(subreq);
723 if (! status) {
724 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
725 recdb_name(state->recdb), state->pnn_list[state->index],
726 ret);
727 tevent_req_error(req, ret);
728 return;
731 state->index += 1;
732 if (state->index == state->count) {
733 TALLOC_FREE(state->recbuf);
734 tevent_req_done(req);
735 return;
738 pnn = state->pnn_list[state->index];
740 ctdb_req_control_push_db(&request, state->recbuf);
741 subreq = ctdb_client_control_send(state, state->ev, state->client,
742 pnn, TIMEOUT(), &request);
743 if (tevent_req_nomem(subreq, req)) {
744 return;
746 tevent_req_set_callback(subreq, push_database_old_push_done, req);
749 static bool push_database_old_recv(struct tevent_req *req, int *perr)
751 return generic_recv(req, perr);
755 * Push database to specified nodes (new style)
758 struct push_database_new_state {
759 struct tevent_context *ev;
760 struct ctdb_client_context *client;
761 struct recdb_context *recdb;
762 uint32_t *pnn_list;
763 int count;
764 uint64_t srvid;
765 uint32_t dmaster;
766 int fd;
767 int num_buffers;
768 int num_buffers_sent;
769 int num_records;
772 static void push_database_new_started(struct tevent_req *subreq);
773 static void push_database_new_send_msg(struct tevent_req *req);
774 static void push_database_new_send_done(struct tevent_req *subreq);
775 static void push_database_new_confirmed(struct tevent_req *subreq);
777 static struct tevent_req *push_database_new_send(
778 TALLOC_CTX *mem_ctx,
779 struct tevent_context *ev,
780 struct ctdb_client_context *client,
781 uint32_t *pnn_list, int count,
782 struct recdb_context *recdb,
783 int max_size)
785 struct tevent_req *req, *subreq;
786 struct push_database_new_state *state;
787 struct ctdb_req_control request;
788 struct ctdb_pulldb_ext pulldb_ext;
789 char *filename;
790 off_t offset;
792 req = tevent_req_create(mem_ctx, &state,
793 struct push_database_new_state);
794 if (req == NULL) {
795 return NULL;
798 state->ev = ev;
799 state->client = client;
800 state->recdb = recdb;
801 state->pnn_list = pnn_list;
802 state->count = count;
804 state->srvid = srvid_next();
805 state->dmaster = ctdb_client_pnn(client);
806 state->num_buffers_sent = 0;
807 state->num_records = 0;
809 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
810 if (tevent_req_nomem(filename, req)) {
811 return tevent_req_post(req, ev);
814 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
815 if (state->fd == -1) {
816 tevent_req_error(req, errno);
817 return tevent_req_post(req, ev);
819 unlink(filename);
820 talloc_free(filename);
822 state->num_buffers = recdb_file(recdb, state, state->dmaster,
823 state->fd, max_size);
824 if (state->num_buffers == -1) {
825 tevent_req_error(req, ENOMEM);
826 return tevent_req_post(req, ev);
829 offset = lseek(state->fd, 0, SEEK_SET);
830 if (offset != 0) {
831 tevent_req_error(req, EIO);
832 return tevent_req_post(req, ev);
835 pulldb_ext.db_id = recdb_id(recdb);
836 pulldb_ext.srvid = state->srvid;
838 ctdb_req_control_db_push_start(&request, &pulldb_ext);
839 subreq = ctdb_client_control_multi_send(state, ev, client,
840 pnn_list, count,
841 TIMEOUT(), &request);
842 if (tevent_req_nomem(subreq, req)) {
843 return tevent_req_post(req, ev);
845 tevent_req_set_callback(subreq, push_database_new_started, req);
847 return req;
850 static void push_database_new_started(struct tevent_req *subreq)
852 struct tevent_req *req = tevent_req_callback_data(
853 subreq, struct tevent_req);
854 struct push_database_new_state *state = tevent_req_data(
855 req, struct push_database_new_state);
856 int *err_list;
857 int ret;
858 bool status;
860 status = ctdb_client_control_multi_recv(subreq, &ret, state,
861 &err_list, NULL);
862 TALLOC_FREE(subreq);
863 if (! status) {
864 int ret2;
865 uint32_t pnn;
867 ret2 = ctdb_client_control_multi_error(state->pnn_list,
868 state->count,
869 err_list, &pnn);
870 if (ret2 != 0) {
871 D_ERR("control DB_PUSH_START failed for db %s"
872 " on node %u, ret=%d\n",
873 recdb_name(state->recdb), pnn, ret2);
874 } else {
875 D_ERR("control DB_PUSH_START failed for db %s,"
876 " ret=%d\n",
877 recdb_name(state->recdb), ret);
879 talloc_free(err_list);
881 tevent_req_error(req, ret);
882 return;
885 push_database_new_send_msg(req);
888 static void push_database_new_send_msg(struct tevent_req *req)
890 struct push_database_new_state *state = tevent_req_data(
891 req, struct push_database_new_state);
892 struct tevent_req *subreq;
893 struct ctdb_rec_buffer *recbuf;
894 struct ctdb_req_message message;
895 TDB_DATA data;
896 size_t np;
897 int ret;
899 if (state->num_buffers_sent == state->num_buffers) {
900 struct ctdb_req_control request;
902 ctdb_req_control_db_push_confirm(&request,
903 recdb_id(state->recdb));
904 subreq = ctdb_client_control_multi_send(state, state->ev,
905 state->client,
906 state->pnn_list,
907 state->count,
908 TIMEOUT(), &request);
909 if (tevent_req_nomem(subreq, req)) {
910 return;
912 tevent_req_set_callback(subreq, push_database_new_confirmed,
913 req);
914 return;
917 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
918 if (ret != 0) {
919 tevent_req_error(req, ret);
920 return;
923 data.dsize = ctdb_rec_buffer_len(recbuf);
924 data.dptr = talloc_size(state, data.dsize);
925 if (tevent_req_nomem(data.dptr, req)) {
926 return;
929 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
931 message.srvid = state->srvid;
932 message.data.data = data;
934 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
935 state->num_buffers_sent, recbuf->count,
936 recdb_name(state->recdb));
938 subreq = ctdb_client_message_multi_send(state, state->ev,
939 state->client,
940 state->pnn_list, state->count,
941 &message);
942 if (tevent_req_nomem(subreq, req)) {
943 return;
945 tevent_req_set_callback(subreq, push_database_new_send_done, req);
947 state->num_records += recbuf->count;
949 talloc_free(data.dptr);
950 talloc_free(recbuf);
953 static void push_database_new_send_done(struct tevent_req *subreq)
955 struct tevent_req *req = tevent_req_callback_data(
956 subreq, struct tevent_req);
957 struct push_database_new_state *state = tevent_req_data(
958 req, struct push_database_new_state);
959 bool status;
960 int ret;
962 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
963 TALLOC_FREE(subreq);
964 if (! status) {
965 D_ERR("Sending recovery records failed for %s\n",
966 recdb_name(state->recdb));
967 tevent_req_error(req, ret);
968 return;
971 state->num_buffers_sent += 1;
973 push_database_new_send_msg(req);
976 static void push_database_new_confirmed(struct tevent_req *subreq)
978 struct tevent_req *req = tevent_req_callback_data(
979 subreq, struct tevent_req);
980 struct push_database_new_state *state = tevent_req_data(
981 req, struct push_database_new_state);
982 struct ctdb_reply_control **reply;
983 int *err_list;
984 bool status;
985 int ret, i;
986 uint32_t num_records;
988 status = ctdb_client_control_multi_recv(subreq, &ret, state,
989 &err_list, &reply);
990 TALLOC_FREE(subreq);
991 if (! status) {
992 int ret2;
993 uint32_t pnn;
995 ret2 = ctdb_client_control_multi_error(state->pnn_list,
996 state->count, err_list,
997 &pnn);
998 if (ret2 != 0) {
999 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1000 " on node %u, ret=%d\n",
1001 recdb_name(state->recdb), pnn, ret2);
1002 } else {
1003 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1004 " ret=%d\n",
1005 recdb_name(state->recdb), ret);
1007 tevent_req_error(req, ret);
1008 return;
1011 for (i=0; i<state->count; i++) {
1012 ret = ctdb_reply_control_db_push_confirm(reply[i],
1013 &num_records);
1014 if (ret != 0) {
1015 tevent_req_error(req, EPROTO);
1016 return;
1019 if (num_records != state->num_records) {
1020 D_ERR("Node %u received %d of %d records for %s\n",
1021 state->pnn_list[i], num_records,
1022 state->num_records, recdb_name(state->recdb));
1023 tevent_req_error(req, EPROTO);
1024 return;
1028 talloc_free(reply);
1030 D_INFO("Pushed %d records for db %s\n",
1031 state->num_records, recdb_name(state->recdb));
1033 tevent_req_done(req);
1036 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1038 return generic_recv(req, perr);
1042 * wrapper for push_database_old and push_database_new
1045 struct push_database_state {
1046 bool old_done, new_done;
1049 static void push_database_old_done(struct tevent_req *subreq);
1050 static void push_database_new_done(struct tevent_req *subreq);
1052 static struct tevent_req *push_database_send(
1053 TALLOC_CTX *mem_ctx,
1054 struct tevent_context *ev,
1055 struct ctdb_client_context *client,
1056 uint32_t *pnn_list, int count, uint32_t *caps,
1057 struct ctdb_tunable_list *tun_list,
1058 struct recdb_context *recdb)
1060 struct tevent_req *req, *subreq;
1061 struct push_database_state *state;
1062 uint32_t *old_list, *new_list;
1063 int old_count, new_count;
1064 int i;
1066 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1067 if (req == NULL) {
1068 return NULL;
1071 state->old_done = false;
1072 state->new_done = false;
1074 old_count = 0;
1075 new_count = 0;
1076 old_list = talloc_array(state, uint32_t, count);
1077 new_list = talloc_array(state, uint32_t, count);
1078 if (tevent_req_nomem(old_list, req) ||
1079 tevent_req_nomem(new_list,req)) {
1080 return tevent_req_post(req, ev);
1083 for (i=0; i<count; i++) {
1084 uint32_t pnn = pnn_list[i];
1086 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1087 new_list[new_count] = pnn;
1088 new_count += 1;
1089 } else {
1090 old_list[old_count] = pnn;
1091 old_count += 1;
1095 if (old_count > 0) {
1096 subreq = push_database_old_send(state, ev, client,
1097 old_list, old_count, recdb);
1098 if (tevent_req_nomem(subreq, req)) {
1099 return tevent_req_post(req, ev);
1101 tevent_req_set_callback(subreq, push_database_old_done, req);
1102 } else {
1103 state->old_done = true;
1106 if (new_count > 0) {
1107 subreq = push_database_new_send(state, ev, client,
1108 new_list, new_count, recdb,
1109 tun_list->rec_buffer_size_limit);
1110 if (tevent_req_nomem(subreq, req)) {
1111 return tevent_req_post(req, ev);
1113 tevent_req_set_callback(subreq, push_database_new_done, req);
1114 } else {
1115 state->new_done = true;
1118 return req;
1121 static void push_database_old_done(struct tevent_req *subreq)
1123 struct tevent_req *req = tevent_req_callback_data(
1124 subreq, struct tevent_req);
1125 struct push_database_state *state = tevent_req_data(
1126 req, struct push_database_state);
1127 bool status;
1128 int ret;
1130 status = push_database_old_recv(subreq, &ret);
1131 if (! status) {
1132 tevent_req_error(req, ret);
1133 return;
1136 state->old_done = true;
1138 if (state->old_done && state->new_done) {
1139 tevent_req_done(req);
1143 static void push_database_new_done(struct tevent_req *subreq)
1145 struct tevent_req *req = tevent_req_callback_data(
1146 subreq, struct tevent_req);
1147 struct push_database_state *state = tevent_req_data(
1148 req, struct push_database_state);
1149 bool status;
1150 int ret;
1152 status = push_database_new_recv(subreq, &ret);
1153 if (! status) {
1154 tevent_req_error(req, ret);
1155 return;
1158 state->new_done = true;
1160 if (state->old_done && state->new_done) {
1161 tevent_req_done(req);
1165 static bool push_database_recv(struct tevent_req *req, int *perr)
1167 return generic_recv(req, perr);
1171 * Collect databases using highest sequence number
1174 struct collect_highseqnum_db_state {
1175 struct tevent_context *ev;
1176 struct ctdb_client_context *client;
1177 uint32_t *pnn_list;
1178 int count;
1179 uint32_t *caps;
1180 uint32_t *ban_credits;
1181 uint32_t db_id;
1182 struct recdb_context *recdb;
1183 uint32_t max_pnn;
1186 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1187 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1189 static struct tevent_req *collect_highseqnum_db_send(
1190 TALLOC_CTX *mem_ctx,
1191 struct tevent_context *ev,
1192 struct ctdb_client_context *client,
1193 uint32_t *pnn_list, int count, uint32_t *caps,
1194 uint32_t *ban_credits, uint32_t db_id,
1195 struct recdb_context *recdb)
1197 struct tevent_req *req, *subreq;
1198 struct collect_highseqnum_db_state *state;
1199 struct ctdb_req_control request;
1201 req = tevent_req_create(mem_ctx, &state,
1202 struct collect_highseqnum_db_state);
1203 if (req == NULL) {
1204 return NULL;
1207 state->ev = ev;
1208 state->client = client;
1209 state->pnn_list = pnn_list;
1210 state->count = count;
1211 state->caps = caps;
1212 state->ban_credits = ban_credits;
1213 state->db_id = db_id;
1214 state->recdb = recdb;
1216 ctdb_req_control_get_db_seqnum(&request, db_id);
1217 subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1218 state->pnn_list, state->count,
1219 TIMEOUT(), &request);
1220 if (tevent_req_nomem(subreq, req)) {
1221 return tevent_req_post(req, ev);
1223 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1224 req);
1226 return req;
1229 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1231 struct tevent_req *req = tevent_req_callback_data(
1232 subreq, struct tevent_req);
1233 struct collect_highseqnum_db_state *state = tevent_req_data(
1234 req, struct collect_highseqnum_db_state);
1235 struct ctdb_reply_control **reply;
1236 int *err_list;
1237 bool status;
1238 int ret, i;
1239 uint64_t seqnum, max_seqnum;
1241 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1242 &err_list, &reply);
1243 TALLOC_FREE(subreq);
1244 if (! status) {
1245 int ret2;
1246 uint32_t pnn;
1248 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1249 state->count, err_list,
1250 &pnn);
1251 if (ret2 != 0) {
1252 D_ERR("control GET_DB_SEQNUM failed for db %s"
1253 " on node %u, ret=%d\n",
1254 recdb_name(state->recdb), pnn, ret2);
1255 } else {
1256 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1257 " ret=%d\n",
1258 recdb_name(state->recdb), ret);
1260 tevent_req_error(req, ret);
1261 return;
1264 max_seqnum = 0;
1265 state->max_pnn = state->pnn_list[0];
1266 for (i=0; i<state->count; i++) {
1267 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1268 if (ret != 0) {
1269 tevent_req_error(req, EPROTO);
1270 return;
1273 if (max_seqnum < seqnum) {
1274 max_seqnum = seqnum;
1275 state->max_pnn = state->pnn_list[i];
1279 talloc_free(reply);
1281 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1282 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1284 subreq = pull_database_send(state, state->ev, state->client,
1285 state->max_pnn,
1286 state->caps[state->max_pnn],
1287 state->recdb);
1288 if (tevent_req_nomem(subreq, req)) {
1289 return;
1291 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1292 req);
1295 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1297 struct tevent_req *req = tevent_req_callback_data(
1298 subreq, struct tevent_req);
1299 struct collect_highseqnum_db_state *state = tevent_req_data(
1300 req, struct collect_highseqnum_db_state);
1301 int ret;
1302 bool status;
1304 status = pull_database_recv(subreq, &ret);
1305 TALLOC_FREE(subreq);
1306 if (! status) {
1307 state->ban_credits[state->max_pnn] += 1;
1308 tevent_req_error(req, ret);
1309 return;
1312 tevent_req_done(req);
1315 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1317 return generic_recv(req, perr);
1321 * Collect all databases
1324 struct collect_all_db_state {
1325 struct tevent_context *ev;
1326 struct ctdb_client_context *client;
1327 uint32_t *pnn_list;
1328 int count;
1329 uint32_t *caps;
1330 uint32_t *ban_credits;
1331 uint32_t db_id;
1332 struct recdb_context *recdb;
1333 struct ctdb_pulldb pulldb;
1334 int index;
1337 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1339 static struct tevent_req *collect_all_db_send(
1340 TALLOC_CTX *mem_ctx,
1341 struct tevent_context *ev,
1342 struct ctdb_client_context *client,
1343 uint32_t *pnn_list, int count, uint32_t *caps,
1344 uint32_t *ban_credits, uint32_t db_id,
1345 struct recdb_context *recdb)
1347 struct tevent_req *req, *subreq;
1348 struct collect_all_db_state *state;
1349 uint32_t pnn;
1351 req = tevent_req_create(mem_ctx, &state,
1352 struct collect_all_db_state);
1353 if (req == NULL) {
1354 return NULL;
1357 state->ev = ev;
1358 state->client = client;
1359 state->pnn_list = pnn_list;
1360 state->count = count;
1361 state->caps = caps;
1362 state->ban_credits = ban_credits;
1363 state->db_id = db_id;
1364 state->recdb = recdb;
1365 state->index = 0;
1367 pnn = state->pnn_list[state->index];
1369 subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1370 if (tevent_req_nomem(subreq, req)) {
1371 return tevent_req_post(req, ev);
1373 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1375 return req;
1378 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1380 struct tevent_req *req = tevent_req_callback_data(
1381 subreq, struct tevent_req);
1382 struct collect_all_db_state *state = tevent_req_data(
1383 req, struct collect_all_db_state);
1384 uint32_t pnn;
1385 int ret;
1386 bool status;
1388 status = pull_database_recv(subreq, &ret);
1389 TALLOC_FREE(subreq);
1390 if (! status) {
1391 pnn = state->pnn_list[state->index];
1392 state->ban_credits[pnn] += 1;
1393 tevent_req_error(req, ret);
1394 return;
1397 state->index += 1;
1398 if (state->index == state->count) {
1399 tevent_req_done(req);
1400 return;
1403 pnn = state->pnn_list[state->index];
1404 subreq = pull_database_send(state, state->ev, state->client,
1405 pnn, state->caps[pnn], state->recdb);
1406 if (tevent_req_nomem(subreq, req)) {
1407 return;
1409 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1412 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1414 return generic_recv(req, perr);
1419 * For each database do the following:
1420 * - Get DB name
1421 * - Get DB path
1422 * - Freeze database on all nodes
1423 * - Start transaction on all nodes
1424 * - Collect database from all nodes
1425 * - Wipe database on all nodes
1426 * - Push database to all nodes
1427 * - Commit transaction on all nodes
1428 * - Thaw database on all nodes
1431 struct recover_db_state {
1432 struct tevent_context *ev;
1433 struct ctdb_client_context *client;
1434 struct ctdb_tunable_list *tun_list;
1435 uint32_t *pnn_list;
1436 int count;
1437 uint32_t *caps;
1438 uint32_t *ban_credits;
1439 uint32_t db_id;
1440 uint8_t db_flags;
1442 uint32_t destnode;
1443 struct ctdb_transdb transdb;
1445 const char *db_name, *db_path;
1446 struct recdb_context *recdb;
1449 static void recover_db_name_done(struct tevent_req *subreq);
1450 static void recover_db_path_done(struct tevent_req *subreq);
1451 static void recover_db_freeze_done(struct tevent_req *subreq);
1452 static void recover_db_transaction_started(struct tevent_req *subreq);
1453 static void recover_db_collect_done(struct tevent_req *subreq);
1454 static void recover_db_wipedb_done(struct tevent_req *subreq);
1455 static void recover_db_pushdb_done(struct tevent_req *subreq);
1456 static void recover_db_transaction_committed(struct tevent_req *subreq);
1457 static void recover_db_thaw_done(struct tevent_req *subreq);
1459 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1460 struct tevent_context *ev,
1461 struct ctdb_client_context *client,
1462 struct ctdb_tunable_list *tun_list,
1463 uint32_t *pnn_list, int count,
1464 uint32_t *caps,
1465 uint32_t *ban_credits,
1466 uint32_t generation,
1467 uint32_t db_id, uint8_t db_flags)
1469 struct tevent_req *req, *subreq;
1470 struct recover_db_state *state;
1471 struct ctdb_req_control request;
1473 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1474 if (req == NULL) {
1475 return NULL;
1478 state->ev = ev;
1479 state->client = client;
1480 state->tun_list = tun_list;
1481 state->pnn_list = pnn_list;
1482 state->count = count;
1483 state->caps = caps;
1484 state->ban_credits = ban_credits;
1485 state->db_id = db_id;
1486 state->db_flags = db_flags;
1488 state->destnode = ctdb_client_pnn(client);
1489 state->transdb.db_id = db_id;
1490 state->transdb.tid = generation;
1492 ctdb_req_control_get_dbname(&request, db_id);
1493 subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1494 TIMEOUT(), &request);
1495 if (tevent_req_nomem(subreq, req)) {
1496 return tevent_req_post(req, ev);
1498 tevent_req_set_callback(subreq, recover_db_name_done, req);
1500 return req;
1503 static void recover_db_name_done(struct tevent_req *subreq)
1505 struct tevent_req *req = tevent_req_callback_data(
1506 subreq, struct tevent_req);
1507 struct recover_db_state *state = tevent_req_data(
1508 req, struct recover_db_state);
1509 struct ctdb_reply_control *reply;
1510 struct ctdb_req_control request;
1511 int ret;
1512 bool status;
1514 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1515 TALLOC_FREE(subreq);
1516 if (! status) {
1517 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1518 state->db_id, ret);
1519 tevent_req_error(req, ret);
1520 return;
1523 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1524 if (ret != 0) {
1525 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1526 state->db_id, ret);
1527 tevent_req_error(req, EPROTO);
1528 return;
1531 talloc_free(reply);
1533 ctdb_req_control_getdbpath(&request, state->db_id);
1534 subreq = ctdb_client_control_send(state, state->ev, state->client,
1535 state->destnode, TIMEOUT(),
1536 &request);
1537 if (tevent_req_nomem(subreq, req)) {
1538 return;
1540 tevent_req_set_callback(subreq, recover_db_path_done, req);
1543 static void recover_db_path_done(struct tevent_req *subreq)
1545 struct tevent_req *req = tevent_req_callback_data(
1546 subreq, struct tevent_req);
1547 struct recover_db_state *state = tevent_req_data(
1548 req, struct recover_db_state);
1549 struct ctdb_reply_control *reply;
1550 struct ctdb_req_control request;
1551 int ret;
1552 bool status;
1554 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1555 TALLOC_FREE(subreq);
1556 if (! status) {
1557 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1558 state->db_name, ret);
1559 tevent_req_error(req, ret);
1560 return;
1563 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1564 if (ret != 0) {
1565 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1566 state->db_name, ret);
1567 tevent_req_error(req, EPROTO);
1568 return;
1571 talloc_free(reply);
1573 ctdb_req_control_db_freeze(&request, state->db_id);
1574 subreq = ctdb_client_control_multi_send(state, state->ev,
1575 state->client,
1576 state->pnn_list, state->count,
1577 TIMEOUT(), &request);
1578 if (tevent_req_nomem(subreq, req)) {
1579 return;
1581 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1584 static void recover_db_freeze_done(struct tevent_req *subreq)
1586 struct tevent_req *req = tevent_req_callback_data(
1587 subreq, struct tevent_req);
1588 struct recover_db_state *state = tevent_req_data(
1589 req, struct recover_db_state);
1590 struct ctdb_req_control request;
1591 int *err_list;
1592 int ret;
1593 bool status;
1595 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1596 NULL);
1597 TALLOC_FREE(subreq);
1598 if (! status) {
1599 int ret2;
1600 uint32_t pnn;
1602 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1603 state->count, err_list,
1604 &pnn);
1605 if (ret2 != 0) {
1606 D_ERR("control FREEZE_DB failed for db %s"
1607 " on node %u, ret=%d\n",
1608 state->db_name, pnn, ret2);
1609 state->ban_credits[pnn] += 1;
1610 } else {
1611 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1612 state->db_name, ret);
1614 tevent_req_error(req, ret);
1615 return;
1618 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1619 subreq = ctdb_client_control_multi_send(state, state->ev,
1620 state->client,
1621 state->pnn_list, state->count,
1622 TIMEOUT(), &request);
1623 if (tevent_req_nomem(subreq, req)) {
1624 return;
1626 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1629 static void recover_db_transaction_started(struct tevent_req *subreq)
1631 struct tevent_req *req = tevent_req_callback_data(
1632 subreq, struct tevent_req);
1633 struct recover_db_state *state = tevent_req_data(
1634 req, struct recover_db_state);
1635 int *err_list;
1636 int ret;
1637 bool status;
1639 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1640 NULL);
1641 TALLOC_FREE(subreq);
1642 if (! status) {
1643 int ret2;
1644 uint32_t pnn;
1646 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1647 state->count,
1648 err_list, &pnn);
1649 if (ret2 != 0) {
1650 D_ERR("control TRANSACTION_DB failed for db=%s"
1651 " on node %u, ret=%d\n",
1652 state->db_name, pnn, ret2);
1653 } else {
1654 D_ERR("control TRANSACTION_DB failed for db=%s,"
1655 " ret=%d\n", state->db_name, ret);
1657 tevent_req_error(req, ret);
1658 return;
1661 state->recdb = recdb_create(state, state->db_id, state->db_name,
1662 state->db_path,
1663 state->tun_list->database_hash_size,
1664 state->db_flags & CTDB_DB_FLAGS_PERSISTENT);
1665 if (tevent_req_nomem(state->recdb, req)) {
1666 return;
1669 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1670 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1671 subreq = collect_highseqnum_db_send(
1672 state, state->ev, state->client,
1673 state->pnn_list, state->count, state->caps,
1674 state->ban_credits, state->db_id,
1675 state->recdb);
1676 } else {
1677 subreq = collect_all_db_send(
1678 state, state->ev, state->client,
1679 state->pnn_list, state->count, state->caps,
1680 state->ban_credits, state->db_id,
1681 state->recdb);
1683 if (tevent_req_nomem(subreq, req)) {
1684 return;
1686 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1689 static void recover_db_collect_done(struct tevent_req *subreq)
1691 struct tevent_req *req = tevent_req_callback_data(
1692 subreq, struct tevent_req);
1693 struct recover_db_state *state = tevent_req_data(
1694 req, struct recover_db_state);
1695 struct ctdb_req_control request;
1696 int ret;
1697 bool status;
1699 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1700 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1701 status = collect_highseqnum_db_recv(subreq, &ret);
1702 } else {
1703 status = collect_all_db_recv(subreq, &ret);
1705 TALLOC_FREE(subreq);
1706 if (! status) {
1707 tevent_req_error(req, ret);
1708 return;
1711 ctdb_req_control_wipe_database(&request, &state->transdb);
1712 subreq = ctdb_client_control_multi_send(state, state->ev,
1713 state->client,
1714 state->pnn_list, state->count,
1715 TIMEOUT(), &request);
1716 if (tevent_req_nomem(subreq, req)) {
1717 return;
1719 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1722 static void recover_db_wipedb_done(struct tevent_req *subreq)
1724 struct tevent_req *req = tevent_req_callback_data(
1725 subreq, struct tevent_req);
1726 struct recover_db_state *state = tevent_req_data(
1727 req, struct recover_db_state);
1728 int *err_list;
1729 int ret;
1730 bool status;
1732 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1733 NULL);
1734 TALLOC_FREE(subreq);
1735 if (! status) {
1736 int ret2;
1737 uint32_t pnn;
1739 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1740 state->count,
1741 err_list, &pnn);
1742 if (ret2 != 0) {
1743 D_ERR("control WIPEDB failed for db %s on node %u,"
1744 " ret=%d\n", state->db_name, pnn, ret2);
1745 } else {
1746 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1747 state->db_name, ret);
1749 tevent_req_error(req, ret);
1750 return;
1753 subreq = push_database_send(state, state->ev, state->client,
1754 state->pnn_list, state->count,
1755 state->caps, state->tun_list,
1756 state->recdb);
1757 if (tevent_req_nomem(subreq, req)) {
1758 return;
1760 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1763 static void recover_db_pushdb_done(struct tevent_req *subreq)
1765 struct tevent_req *req = tevent_req_callback_data(
1766 subreq, struct tevent_req);
1767 struct recover_db_state *state = tevent_req_data(
1768 req, struct recover_db_state);
1769 struct ctdb_req_control request;
1770 int ret;
1771 bool status;
1773 status = push_database_recv(subreq, &ret);
1774 TALLOC_FREE(subreq);
1775 if (! status) {
1776 tevent_req_error(req, ret);
1777 return;
1780 TALLOC_FREE(state->recdb);
1782 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1783 subreq = ctdb_client_control_multi_send(state, state->ev,
1784 state->client,
1785 state->pnn_list, state->count,
1786 TIMEOUT(), &request);
1787 if (tevent_req_nomem(subreq, req)) {
1788 return;
1790 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1793 static void recover_db_transaction_committed(struct tevent_req *subreq)
1795 struct tevent_req *req = tevent_req_callback_data(
1796 subreq, struct tevent_req);
1797 struct recover_db_state *state = tevent_req_data(
1798 req, struct recover_db_state);
1799 struct ctdb_req_control request;
1800 int *err_list;
1801 int ret;
1802 bool status;
1804 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1805 NULL);
1806 TALLOC_FREE(subreq);
1807 if (! status) {
1808 int ret2;
1809 uint32_t pnn;
1811 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1812 state->count,
1813 err_list, &pnn);
1814 if (ret2 != 0) {
1815 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1816 " on node %u, ret=%d\n",
1817 state->db_name, pnn, ret2);
1818 } else {
1819 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1820 " ret=%d\n", state->db_name, ret);
1822 tevent_req_error(req, ret);
1823 return;
1826 ctdb_req_control_db_thaw(&request, state->db_id);
1827 subreq = ctdb_client_control_multi_send(state, state->ev,
1828 state->client,
1829 state->pnn_list, state->count,
1830 TIMEOUT(), &request);
1831 if (tevent_req_nomem(subreq, req)) {
1832 return;
1834 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1837 static void recover_db_thaw_done(struct tevent_req *subreq)
1839 struct tevent_req *req = tevent_req_callback_data(
1840 subreq, struct tevent_req);
1841 struct recover_db_state *state = tevent_req_data(
1842 req, struct recover_db_state);
1843 int *err_list;
1844 int ret;
1845 bool status;
1847 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1848 NULL);
1849 TALLOC_FREE(subreq);
1850 if (! status) {
1851 int ret2;
1852 uint32_t pnn;
1854 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1855 state->count,
1856 err_list, &pnn);
1857 if (ret2 != 0) {
1858 D_ERR("control DB_THAW failed for db %s on node %u,"
1859 " ret=%d\n", state->db_name, pnn, ret2);
1860 } else {
1861 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1862 state->db_name, ret);
1864 tevent_req_error(req, ret);
1865 return;
1868 tevent_req_done(req);
1871 static bool recover_db_recv(struct tevent_req *req)
1873 return generic_recv(req, NULL);
1878 * Start database recovery for each database
1880 * Try to recover each database 5 times before failing recovery.
1883 struct db_recovery_state {
1884 struct tevent_context *ev;
1885 struct ctdb_dbid_map *dbmap;
1886 int num_replies;
1887 int num_failed;
1890 struct db_recovery_one_state {
1891 struct tevent_req *req;
1892 struct ctdb_client_context *client;
1893 struct ctdb_dbid_map *dbmap;
1894 struct ctdb_tunable_list *tun_list;
1895 uint32_t *pnn_list;
1896 int count;
1897 uint32_t *caps;
1898 uint32_t *ban_credits;
1899 uint32_t generation;
1900 uint32_t db_id;
1901 uint8_t db_flags;
1902 int num_fails;
1905 static void db_recovery_one_done(struct tevent_req *subreq);
1907 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1908 struct tevent_context *ev,
1909 struct ctdb_client_context *client,
1910 struct ctdb_dbid_map *dbmap,
1911 struct ctdb_tunable_list *tun_list,
1912 uint32_t *pnn_list, int count,
1913 uint32_t *caps,
1914 uint32_t *ban_credits,
1915 uint32_t generation)
1917 struct tevent_req *req, *subreq;
1918 struct db_recovery_state *state;
1919 int i;
1921 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1922 if (req == NULL) {
1923 return NULL;
1926 state->ev = ev;
1927 state->dbmap = dbmap;
1928 state->num_replies = 0;
1929 state->num_failed = 0;
1931 if (dbmap->num == 0) {
1932 tevent_req_done(req);
1933 return tevent_req_post(req, ev);
1936 for (i=0; i<dbmap->num; i++) {
1937 struct db_recovery_one_state *substate;
1939 substate = talloc_zero(state, struct db_recovery_one_state);
1940 if (tevent_req_nomem(substate, req)) {
1941 return tevent_req_post(req, ev);
1944 substate->req = req;
1945 substate->client = client;
1946 substate->dbmap = dbmap;
1947 substate->tun_list = tun_list;
1948 substate->pnn_list = pnn_list;
1949 substate->count = count;
1950 substate->caps = caps;
1951 substate->ban_credits = ban_credits;
1952 substate->generation = generation;
1953 substate->db_id = dbmap->dbs[i].db_id;
1954 substate->db_flags = dbmap->dbs[i].flags;
1956 subreq = recover_db_send(state, ev, client, tun_list,
1957 pnn_list, count, caps, ban_credits,
1958 generation, substate->db_id,
1959 substate->db_flags);
1960 if (tevent_req_nomem(subreq, req)) {
1961 return tevent_req_post(req, ev);
1963 tevent_req_set_callback(subreq, db_recovery_one_done,
1964 substate);
1965 D_NOTICE("recover database 0x%08x\n", substate->db_id);
1968 return req;
1971 static void db_recovery_one_done(struct tevent_req *subreq)
1973 struct db_recovery_one_state *substate = tevent_req_callback_data(
1974 subreq, struct db_recovery_one_state);
1975 struct tevent_req *req = substate->req;
1976 struct db_recovery_state *state = tevent_req_data(
1977 req, struct db_recovery_state);
1978 bool status;
1980 status = recover_db_recv(subreq);
1981 TALLOC_FREE(subreq);
1983 if (status) {
1984 talloc_free(substate);
1985 goto done;
1988 substate->num_fails += 1;
1989 if (substate->num_fails < NUM_RETRIES) {
1990 subreq = recover_db_send(state, state->ev, substate->client,
1991 substate->tun_list,
1992 substate->pnn_list, substate->count,
1993 substate->caps, substate->ban_credits,
1994 substate->generation, substate->db_id,
1995 substate->db_flags);
1996 if (tevent_req_nomem(subreq, req)) {
1997 goto failed;
1999 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2000 D_NOTICE("recover database 0x%08x, attempt %d\n",
2001 substate->db_id, substate->num_fails+1);
2002 return;
2005 failed:
2006 state->num_failed += 1;
2008 done:
2009 state->num_replies += 1;
2011 if (state->num_replies == state->dbmap->num) {
2012 tevent_req_done(req);
2016 static bool db_recovery_recv(struct tevent_req *req, int *count)
2018 struct db_recovery_state *state = tevent_req_data(
2019 req, struct db_recovery_state);
2020 int err;
2022 if (tevent_req_is_unix_error(req, &err)) {
2023 *count = 0;
2024 return false;
2027 *count = state->num_replies - state->num_failed;
2029 if (state->num_failed > 0) {
2030 return false;
2033 return true;
2038 * Run the parallel database recovery
2040 * - Get tunables
2041 * - Get nodemap
2042 * - Get vnnmap
2043 * - Get capabilities from all nodes
2044 * - Get dbmap
2045 * - Set RECOVERY_ACTIVE
2046 * - Send START_RECOVERY
2047 * - Update vnnmap on all nodes
2048 * - Run database recovery
2049 * - Set RECOVERY_NORMAL
2050 * - Send END_RECOVERY
2053 struct recovery_state {
2054 struct tevent_context *ev;
2055 struct ctdb_client_context *client;
2056 uint32_t generation;
2057 uint32_t *pnn_list;
2058 int count;
2059 uint32_t destnode;
2060 struct ctdb_node_map *nodemap;
2061 uint32_t *caps;
2062 uint32_t *ban_credits;
2063 struct ctdb_tunable_list *tun_list;
2064 struct ctdb_vnn_map *vnnmap;
2065 struct ctdb_dbid_map *dbmap;
2068 static void recovery_tunables_done(struct tevent_req *subreq);
2069 static void recovery_nodemap_done(struct tevent_req *subreq);
2070 static void recovery_vnnmap_done(struct tevent_req *subreq);
2071 static void recovery_capabilities_done(struct tevent_req *subreq);
2072 static void recovery_dbmap_done(struct tevent_req *subreq);
2073 static void recovery_active_done(struct tevent_req *subreq);
2074 static void recovery_start_recovery_done(struct tevent_req *subreq);
2075 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2076 static void recovery_db_recovery_done(struct tevent_req *subreq);
2077 static void recovery_failed_done(struct tevent_req *subreq);
2078 static void recovery_normal_done(struct tevent_req *subreq);
2079 static void recovery_end_recovery_done(struct tevent_req *subreq);
2081 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2082 struct tevent_context *ev,
2083 struct ctdb_client_context *client,
2084 uint32_t generation)
2086 struct tevent_req *req, *subreq;
2087 struct recovery_state *state;
2088 struct ctdb_req_control request;
2090 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2091 if (req == NULL) {
2092 return NULL;
2095 state->ev = ev;
2096 state->client = client;
2097 state->generation = generation;
2098 state->destnode = ctdb_client_pnn(client);
2100 ctdb_req_control_get_all_tunables(&request);
2101 subreq = ctdb_client_control_send(state, state->ev, state->client,
2102 state->destnode, TIMEOUT(),
2103 &request);
2104 if (tevent_req_nomem(subreq, req)) {
2105 return tevent_req_post(req, ev);
2107 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2109 return req;
2112 static void recovery_tunables_done(struct tevent_req *subreq)
2114 struct tevent_req *req = tevent_req_callback_data(
2115 subreq, struct tevent_req);
2116 struct recovery_state *state = tevent_req_data(
2117 req, struct recovery_state);
2118 struct ctdb_reply_control *reply;
2119 struct ctdb_req_control request;
2120 int ret;
2121 bool status;
2123 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2124 TALLOC_FREE(subreq);
2125 if (! status) {
2126 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2127 tevent_req_error(req, ret);
2128 return;
2131 ret = ctdb_reply_control_get_all_tunables(reply, state,
2132 &state->tun_list);
2133 if (ret != 0) {
2134 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2135 tevent_req_error(req, EPROTO);
2136 return;
2139 talloc_free(reply);
2141 recover_timeout = state->tun_list->recover_timeout;
2143 ctdb_req_control_get_nodemap(&request);
2144 subreq = ctdb_client_control_send(state, state->ev, state->client,
2145 state->destnode, TIMEOUT(),
2146 &request);
2147 if (tevent_req_nomem(subreq, req)) {
2148 return;
2150 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2153 static void recovery_nodemap_done(struct tevent_req *subreq)
2155 struct tevent_req *req = tevent_req_callback_data(
2156 subreq, struct tevent_req);
2157 struct recovery_state *state = tevent_req_data(
2158 req, struct recovery_state);
2159 struct ctdb_reply_control *reply;
2160 struct ctdb_req_control request;
2161 bool status;
2162 int ret;
2164 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2165 TALLOC_FREE(subreq);
2166 if (! status) {
2167 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2168 state->destnode, ret);
2169 tevent_req_error(req, ret);
2170 return;
2173 ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2174 if (ret != 0) {
2175 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2176 tevent_req_error(req, ret);
2177 return;
2180 state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2181 state, &state->pnn_list);
2182 if (state->count <= 0) {
2183 tevent_req_error(req, ENOMEM);
2184 return;
2187 state->ban_credits = talloc_zero_array(state, uint32_t,
2188 state->nodemap->num);
2189 if (tevent_req_nomem(state->ban_credits, req)) {
2190 return;
2193 ctdb_req_control_getvnnmap(&request);
2194 subreq = ctdb_client_control_send(state, state->ev, state->client,
2195 state->destnode, TIMEOUT(),
2196 &request);
2197 if (tevent_req_nomem(subreq, req)) {
2198 return;
2200 tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2203 static void recovery_vnnmap_done(struct tevent_req *subreq)
2205 struct tevent_req *req = tevent_req_callback_data(
2206 subreq, struct tevent_req);
2207 struct recovery_state *state = tevent_req_data(
2208 req, struct recovery_state);
2209 struct ctdb_reply_control *reply;
2210 struct ctdb_req_control request;
2211 bool status;
2212 int ret;
2214 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2215 TALLOC_FREE(subreq);
2216 if (! status) {
2217 D_ERR("control GETVNNMAP failed to node %u, ret=%d\n",
2218 state->destnode, ret);
2219 tevent_req_error(req, ret);
2220 return;
2223 ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2224 if (ret != 0) {
2225 D_ERR("control GETVNNMAP failed, ret=%d\n", ret);
2226 tevent_req_error(req, ret);
2227 return;
2230 ctdb_req_control_get_capabilities(&request);
2231 subreq = ctdb_client_control_multi_send(state, state->ev,
2232 state->client,
2233 state->pnn_list, state->count,
2234 TIMEOUT(), &request);
2235 if (tevent_req_nomem(subreq, req)) {
2236 return;
2238 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2241 static void recovery_capabilities_done(struct tevent_req *subreq)
2243 struct tevent_req *req = tevent_req_callback_data(
2244 subreq, struct tevent_req);
2245 struct recovery_state *state = tevent_req_data(
2246 req, struct recovery_state);
2247 struct ctdb_reply_control **reply;
2248 struct ctdb_req_control request;
2249 int *err_list;
2250 int ret, i;
2251 bool status;
2253 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2254 &reply);
2255 TALLOC_FREE(subreq);
2256 if (! status) {
2257 int ret2;
2258 uint32_t pnn;
2260 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2261 state->count,
2262 err_list, &pnn);
2263 if (ret2 != 0) {
2264 D_ERR("control GET_CAPABILITIES failed on node %u,"
2265 " ret=%d\n", pnn, ret2);
2266 } else {
2267 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2268 ret);
2270 tevent_req_error(req, ret);
2271 return;
2274 /* Make the array size same as nodemap */
2275 state->caps = talloc_zero_array(state, uint32_t,
2276 state->nodemap->num);
2277 if (tevent_req_nomem(state->caps, req)) {
2278 return;
2281 for (i=0; i<state->count; i++) {
2282 uint32_t pnn;
2284 pnn = state->pnn_list[i];
2285 ret = ctdb_reply_control_get_capabilities(reply[i],
2286 &state->caps[pnn]);
2287 if (ret != 0) {
2288 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2289 pnn);
2290 tevent_req_error(req, EPROTO);
2291 return;
2295 talloc_free(reply);
2297 ctdb_req_control_get_dbmap(&request);
2298 subreq = ctdb_client_control_send(state, state->ev, state->client,
2299 state->destnode, TIMEOUT(),
2300 &request);
2301 if (tevent_req_nomem(subreq, req)) {
2302 return;
2304 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2307 static void recovery_dbmap_done(struct tevent_req *subreq)
2309 struct tevent_req *req = tevent_req_callback_data(
2310 subreq, struct tevent_req);
2311 struct recovery_state *state = tevent_req_data(
2312 req, struct recovery_state);
2313 struct ctdb_reply_control *reply;
2314 struct ctdb_req_control request;
2315 int ret;
2316 bool status;
2318 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2319 TALLOC_FREE(subreq);
2320 if (! status) {
2321 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2322 state->destnode, ret);
2323 tevent_req_error(req, ret);
2324 return;
2327 ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2328 if (ret != 0) {
2329 D_ERR("control GET_DBMAP failed, ret=%d\n", ret);
2330 tevent_req_error(req, ret);
2331 return;
2334 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2335 subreq = ctdb_client_control_multi_send(state, state->ev,
2336 state->client,
2337 state->pnn_list, state->count,
2338 TIMEOUT(), &request);
2339 if (tevent_req_nomem(subreq, req)) {
2340 return;
2342 tevent_req_set_callback(subreq, recovery_active_done, req);
2345 static void recovery_active_done(struct tevent_req *subreq)
2347 struct tevent_req *req = tevent_req_callback_data(
2348 subreq, struct tevent_req);
2349 struct recovery_state *state = tevent_req_data(
2350 req, struct recovery_state);
2351 struct ctdb_req_control request;
2352 struct ctdb_vnn_map *vnnmap;
2353 int *err_list;
2354 int ret, count, i;
2355 bool status;
2357 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2358 NULL);
2359 TALLOC_FREE(subreq);
2360 if (! status) {
2361 int ret2;
2362 uint32_t pnn;
2364 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2365 state->count,
2366 err_list, &pnn);
2367 if (ret2 != 0) {
2368 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2369 " ret=%d\n", pnn, ret2);
2370 } else {
2371 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2372 ret);
2374 tevent_req_error(req, ret);
2375 return;
2378 D_ERR("Set recovery mode to ACTIVE\n");
2380 /* Calculate new VNNMAP */
2381 count = 0;
2382 for (i=0; i<state->nodemap->num; i++) {
2383 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2384 continue;
2386 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2387 continue;
2389 count += 1;
2392 if (count == 0) {
2393 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2396 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2397 if (tevent_req_nomem(vnnmap, req)) {
2398 return;
2401 vnnmap->size = (count == 0 ? 1 : count);
2402 vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2403 if (tevent_req_nomem(vnnmap->map, req)) {
2404 return;
2407 if (count == 0) {
2408 vnnmap->map[0] = state->destnode;
2409 } else {
2410 count = 0;
2411 for (i=0; i<state->nodemap->num; i++) {
2412 if (state->nodemap->node[i].flags &
2413 NODE_FLAGS_INACTIVE) {
2414 continue;
2416 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2417 continue;
2420 vnnmap->map[count] = state->nodemap->node[i].pnn;
2421 count += 1;
2425 vnnmap->generation = state->generation;
2427 talloc_free(state->vnnmap);
2428 state->vnnmap = vnnmap;
2430 ctdb_req_control_start_recovery(&request);
2431 subreq = ctdb_client_control_multi_send(state, state->ev,
2432 state->client,
2433 state->pnn_list, state->count,
2434 TIMEOUT(), &request);
2435 if (tevent_req_nomem(subreq, req)) {
2436 return;
2438 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2441 static void recovery_start_recovery_done(struct tevent_req *subreq)
2443 struct tevent_req *req = tevent_req_callback_data(
2444 subreq, struct tevent_req);
2445 struct recovery_state *state = tevent_req_data(
2446 req, struct recovery_state);
2447 struct ctdb_req_control request;
2448 int *err_list;
2449 int ret;
2450 bool status;
2452 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2453 NULL);
2454 TALLOC_FREE(subreq);
2455 if (! status) {
2456 int ret2;
2457 uint32_t pnn;
2459 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2460 state->count,
2461 err_list, &pnn);
2462 if (ret2 != 0) {
2463 D_ERR("failed to run start_recovery event on node %u,"
2464 " ret=%d\n", pnn, ret2);
2465 } else {
2466 D_ERR("failed to run start_recovery event, ret=%d\n",
2467 ret);
2469 tevent_req_error(req, ret);
2470 return;
2473 D_ERR("start_recovery event finished\n");
2475 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2476 subreq = ctdb_client_control_multi_send(state, state->ev,
2477 state->client,
2478 state->pnn_list, state->count,
2479 TIMEOUT(), &request);
2480 if (tevent_req_nomem(subreq, req)) {
2481 return;
2483 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2486 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2488 struct tevent_req *req = tevent_req_callback_data(
2489 subreq, struct tevent_req);
2490 struct recovery_state *state = tevent_req_data(
2491 req, struct recovery_state);
2492 int *err_list;
2493 int ret;
2494 bool status;
2496 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2497 NULL);
2498 TALLOC_FREE(subreq);
2499 if (! status) {
2500 int ret2;
2501 uint32_t pnn;
2503 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2504 state->count,
2505 err_list, &pnn);
2506 if (ret2 != 0) {
2507 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2508 pnn, ret2);
2509 } else {
2510 D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2512 tevent_req_error(req, ret);
2513 return;
2516 D_NOTICE("updated VNNMAP\n");
2518 subreq = db_recovery_send(state, state->ev, state->client,
2519 state->dbmap, state->tun_list,
2520 state->pnn_list, state->count,
2521 state->caps, state->ban_credits,
2522 state->vnnmap->generation);
2523 if (tevent_req_nomem(subreq, req)) {
2524 return;
2526 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2529 static void recovery_db_recovery_done(struct tevent_req *subreq)
2531 struct tevent_req *req = tevent_req_callback_data(
2532 subreq, struct tevent_req);
2533 struct recovery_state *state = tevent_req_data(
2534 req, struct recovery_state);
2535 struct ctdb_req_control request;
2536 bool status;
2537 int count;
2539 status = db_recovery_recv(subreq, &count);
2540 TALLOC_FREE(subreq);
2542 D_ERR("%d of %d databases recovered\n", count, state->dbmap->num);
2544 if (! status) {
2545 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2546 int i;
2548 /* Bans are not enabled */
2549 if (state->tun_list->enable_bans == 0) {
2550 tevent_req_error(req, EIO);
2551 return;
2554 for (i=0; i<state->count; i++) {
2555 uint32_t pnn;
2556 pnn = state->pnn_list[i];
2557 if (state->ban_credits[pnn] > max_credits) {
2558 max_pnn = pnn;
2559 max_credits = state->ban_credits[pnn];
2563 /* If pulling database fails multiple times */
2564 if (max_credits >= NUM_RETRIES) {
2565 struct ctdb_req_message message;
2567 D_ERR("Assigning banning credits to node %u\n",
2568 max_pnn);
2570 message.srvid = CTDB_SRVID_BANNING;
2571 message.data.pnn = max_pnn;
2573 subreq = ctdb_client_message_send(
2574 state, state->ev, state->client,
2575 ctdb_client_pnn(state->client),
2576 &message);
2577 if (tevent_req_nomem(subreq, req)) {
2578 return;
2580 tevent_req_set_callback(subreq, recovery_failed_done,
2581 req);
2582 } else {
2583 tevent_req_error(req, EIO);
2585 return;
2588 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2589 subreq = ctdb_client_control_multi_send(state, state->ev,
2590 state->client,
2591 state->pnn_list, state->count,
2592 TIMEOUT(), &request);
2593 if (tevent_req_nomem(subreq, req)) {
2594 return;
2596 tevent_req_set_callback(subreq, recovery_normal_done, req);
2599 static void recovery_failed_done(struct tevent_req *subreq)
2601 struct tevent_req *req = tevent_req_callback_data(
2602 subreq, struct tevent_req);
2603 int ret;
2604 bool status;
2606 status = ctdb_client_message_recv(subreq, &ret);
2607 TALLOC_FREE(subreq);
2608 if (! status) {
2609 D_ERR("failed to assign banning credits, ret=%d\n", ret);
2612 tevent_req_error(req, EIO);
2615 static void recovery_normal_done(struct tevent_req *subreq)
2617 struct tevent_req *req = tevent_req_callback_data(
2618 subreq, struct tevent_req);
2619 struct recovery_state *state = tevent_req_data(
2620 req, struct recovery_state);
2621 struct ctdb_req_control request;
2622 int *err_list;
2623 int ret;
2624 bool status;
2626 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2627 NULL);
2628 TALLOC_FREE(subreq);
2629 if (! status) {
2630 int ret2;
2631 uint32_t pnn;
2633 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2634 state->count,
2635 err_list, &pnn);
2636 if (ret2 != 0) {
2637 D_ERR("failed to set recovery mode NORMAL on node %u,"
2638 " ret=%d\n", pnn, ret2);
2639 } else {
2640 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
2641 ret);
2643 tevent_req_error(req, ret);
2644 return;
2647 D_ERR("Set recovery mode to NORMAL\n");
2649 ctdb_req_control_end_recovery(&request);
2650 subreq = ctdb_client_control_multi_send(state, state->ev,
2651 state->client,
2652 state->pnn_list, state->count,
2653 TIMEOUT(), &request);
2654 if (tevent_req_nomem(subreq, req)) {
2655 return;
2657 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2660 static void recovery_end_recovery_done(struct tevent_req *subreq)
2662 struct tevent_req *req = tevent_req_callback_data(
2663 subreq, struct tevent_req);
2664 struct recovery_state *state = tevent_req_data(
2665 req, struct recovery_state);
2666 int *err_list;
2667 int ret;
2668 bool status;
2670 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2671 NULL);
2672 TALLOC_FREE(subreq);
2673 if (! status) {
2674 int ret2;
2675 uint32_t pnn;
2677 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2678 state->count,
2679 err_list, &pnn);
2680 if (ret2 != 0) {
2681 D_ERR("failed to run recovered event on node %u,"
2682 " ret=%d\n", pnn, ret2);
2683 } else {
2684 D_ERR("failed to run recovered event, ret=%d\n", ret);
2686 tevent_req_error(req, ret);
2687 return;
2690 D_ERR("recovered event finished\n");
2692 tevent_req_done(req);
2695 static void recovery_recv(struct tevent_req *req, int *perr)
2697 generic_recv(req, perr);
2700 static void usage(const char *progname)
2702 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2703 progname);
2708 * Arguments - log fd, write fd, socket path, generation
2710 int main(int argc, char *argv[])
2712 int write_fd;
2713 const char *sockpath;
2714 TALLOC_CTX *mem_ctx;
2715 struct tevent_context *ev;
2716 struct ctdb_client_context *client;
2717 int ret;
2718 struct tevent_req *req;
2719 uint32_t generation;
2721 if (argc != 4) {
2722 usage(argv[0]);
2723 exit(1);
2726 write_fd = atoi(argv[1]);
2727 sockpath = argv[2];
2728 generation = (uint32_t)strtoul(argv[3], NULL, 0);
2730 mem_ctx = talloc_new(NULL);
2731 if (mem_ctx == NULL) {
2732 fprintf(stderr, "recovery: talloc_new() failed\n");
2733 goto failed;
2736 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
2737 if (ret != 0) {
2738 fprintf(stderr, "recovery: Unable to initialize logging\n");
2739 goto failed;
2742 ev = tevent_context_init(mem_ctx);
2743 if (ev == NULL) {
2744 D_ERR("tevent_context_init() failed\n");
2745 goto failed;
2748 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2749 if (ret != 0) {
2750 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
2751 goto failed;
2754 req = recovery_send(mem_ctx, ev, client, generation);
2755 if (req == NULL) {
2756 D_ERR("database_recover_send() failed\n");
2757 goto failed;
2760 if (! tevent_req_poll(req, ev)) {
2761 D_ERR("tevent_req_poll() failed\n");
2762 goto failed;
2765 recovery_recv(req, &ret);
2766 TALLOC_FREE(req);
2767 if (ret != 0) {
2768 D_ERR("database recovery failed, ret=%d\n", ret);
2769 goto failed;
2772 sys_write(write_fd, &ret, sizeof(ret));
2773 return 0;
2775 failed:
2776 TALLOC_FREE(mem_ctx);
2777 return 1;