s3: VFS: vfs_snapper: Make chflags return errno = EROFS on a shadow copy path.
[Samba.git] / ctdb / server / ctdb_recovery_helper.c
blob0597c507ba6edb2f9c1098bb6cb05728912b5515
1 /*
2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
33 #include "lib/util/util.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client.h"
39 #include "common/logging.h"
41 static int recover_timeout = 30;
43 #define NUM_RETRIES 3
45 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
48 * Utility functions
51 static bool generic_recv(struct tevent_req *req, int *perr)
53 int err;
55 if (tevent_req_is_unix_error(req, &err)) {
56 if (perr != NULL) {
57 *perr = err;
59 return false;
62 return true;
65 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
67 static uint64_t srvid_next(void)
69 rec_srvid += 1;
70 return rec_srvid;
74 * Recovery database functions
77 struct recdb_context {
78 uint32_t db_id;
79 const char *db_name;
80 const char *db_path;
81 struct tdb_wrap *db;
82 bool persistent;
85 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
86 const char *db_name,
87 const char *db_path,
88 uint32_t hash_size, bool persistent)
90 static char *db_dir_state = NULL;
91 struct recdb_context *recdb;
92 unsigned int tdb_flags;
94 recdb = talloc(mem_ctx, struct recdb_context);
95 if (recdb == NULL) {
96 return NULL;
99 if (db_dir_state == NULL) {
100 db_dir_state = getenv("CTDB_DBDIR_STATE");
103 recdb->db_name = db_name;
104 recdb->db_id = db_id;
105 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
106 db_dir_state != NULL ?
107 db_dir_state :
108 dirname(discard_const(db_path)),
109 db_name);
110 if (recdb->db_path == NULL) {
111 talloc_free(recdb);
112 return NULL;
114 unlink(recdb->db_path);
116 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
117 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
118 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
119 if (recdb->db == NULL) {
120 talloc_free(recdb);
121 D_ERR("failed to create recovery db %s\n", recdb->db_path);
122 return NULL;
125 recdb->persistent = persistent;
127 return recdb;
130 static uint32_t recdb_id(struct recdb_context *recdb)
132 return recdb->db_id;
135 static const char *recdb_name(struct recdb_context *recdb)
137 return recdb->db_name;
140 static const char *recdb_path(struct recdb_context *recdb)
142 return recdb->db_path;
145 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
147 return recdb->db->tdb;
150 static bool recdb_persistent(struct recdb_context *recdb)
152 return recdb->persistent;
155 struct recdb_add_traverse_state {
156 struct recdb_context *recdb;
157 uint32_t mypnn;
160 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
161 TDB_DATA key, TDB_DATA data,
162 void *private_data)
164 struct recdb_add_traverse_state *state =
165 (struct recdb_add_traverse_state *)private_data;
166 struct ctdb_ltdb_header *hdr;
167 TDB_DATA prev_data;
168 int ret;
170 /* header is not marshalled separately in the pulldb control */
171 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
172 return -1;
175 hdr = (struct ctdb_ltdb_header *)data.dptr;
177 /* fetch the existing record, if any */
178 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
180 if (prev_data.dptr != NULL) {
181 struct ctdb_ltdb_header prev_hdr;
183 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
184 free(prev_data.dptr);
185 if (hdr->rsn < prev_hdr.rsn ||
186 (hdr->rsn == prev_hdr.rsn &&
187 prev_hdr.dmaster != state->mypnn)) {
188 return 0;
192 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
193 if (ret != 0) {
194 return -1;
196 return 0;
199 static bool recdb_add(struct recdb_context *recdb, int mypnn,
200 struct ctdb_rec_buffer *recbuf)
202 struct recdb_add_traverse_state state;
203 int ret;
205 state.recdb = recdb;
206 state.mypnn = mypnn;
208 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
209 if (ret != 0) {
210 return false;
213 return true;
216 /* This function decides which records from recdb are retained */
217 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
218 uint32_t reqid, uint32_t dmaster,
219 TDB_DATA key, TDB_DATA data)
221 struct ctdb_ltdb_header *header;
222 int ret;
224 /* Skip empty records */
225 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
226 return 0;
229 /* update the dmaster field to point to us */
230 header = (struct ctdb_ltdb_header *)data.dptr;
231 if (!persistent) {
232 header->dmaster = dmaster;
233 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
236 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
237 if (ret != 0) {
238 return ret;
241 return 0;
244 struct recdb_records_traverse_state {
245 struct ctdb_rec_buffer *recbuf;
246 uint32_t dmaster;
247 uint32_t reqid;
248 bool persistent;
249 bool failed;
252 static int recdb_records_traverse(struct tdb_context *tdb,
253 TDB_DATA key, TDB_DATA data,
254 void *private_data)
256 struct recdb_records_traverse_state *state =
257 (struct recdb_records_traverse_state *)private_data;
258 int ret;
260 ret = recbuf_filter_add(state->recbuf, state->persistent,
261 state->reqid, state->dmaster, key, data);
262 if (ret != 0) {
263 state->failed = true;
264 return ret;
267 return 0;
270 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
271 TALLOC_CTX *mem_ctx,
272 uint32_t dmaster)
274 struct recdb_records_traverse_state state;
275 int ret;
277 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
278 if (state.recbuf == NULL) {
279 return NULL;
281 state.dmaster = dmaster;
282 state.reqid = 0;
283 state.persistent = recdb_persistent(recdb);
284 state.failed = false;
286 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
287 &state);
288 if (ret == -1 || state.failed) {
289 D_ERR("Failed to marshall recovery records for %s\n",
290 recdb_name(recdb));
291 TALLOC_FREE(state.recbuf);
292 return NULL;
295 return state.recbuf;
298 struct recdb_file_traverse_state {
299 struct ctdb_rec_buffer *recbuf;
300 struct recdb_context *recdb;
301 TALLOC_CTX *mem_ctx;
302 uint32_t dmaster;
303 uint32_t reqid;
304 bool persistent;
305 bool failed;
306 int fd;
307 size_t max_size;
308 unsigned int num_buffers;
311 static int recdb_file_traverse(struct tdb_context *tdb,
312 TDB_DATA key, TDB_DATA data,
313 void *private_data)
315 struct recdb_file_traverse_state *state =
316 (struct recdb_file_traverse_state *)private_data;
317 int ret;
319 ret = recbuf_filter_add(state->recbuf, state->persistent,
320 state->reqid, state->dmaster, key, data);
321 if (ret != 0) {
322 state->failed = true;
323 return ret;
326 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
327 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
328 if (ret != 0) {
329 D_ERR("Failed to collect recovery records for %s\n",
330 recdb_name(state->recdb));
331 state->failed = true;
332 return ret;
335 state->num_buffers += 1;
337 TALLOC_FREE(state->recbuf);
338 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
339 recdb_id(state->recdb));
340 if (state->recbuf == NULL) {
341 state->failed = true;
342 return ENOMEM;
346 return 0;
349 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
350 uint32_t dmaster, int fd, int max_size)
352 struct recdb_file_traverse_state state;
353 int ret;
355 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
356 if (state.recbuf == NULL) {
357 return -1;
359 state.recdb = recdb;
360 state.mem_ctx = mem_ctx;
361 state.dmaster = dmaster;
362 state.reqid = 0;
363 state.persistent = recdb_persistent(recdb);
364 state.failed = false;
365 state.fd = fd;
366 state.max_size = max_size;
367 state.num_buffers = 0;
369 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
370 if (ret == -1 || state.failed) {
371 TALLOC_FREE(state.recbuf);
372 return -1;
375 ret = ctdb_rec_buffer_write(state.recbuf, fd);
376 if (ret != 0) {
377 D_ERR("Failed to collect recovery records for %s\n",
378 recdb_name(recdb));
379 TALLOC_FREE(state.recbuf);
380 return -1;
382 state.num_buffers += 1;
384 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
385 state.num_buffers, recdb_name(recdb));
387 return state.num_buffers;
391 * Pull database from a single node
394 struct pull_database_state {
395 struct tevent_context *ev;
396 struct ctdb_client_context *client;
397 struct recdb_context *recdb;
398 uint32_t pnn;
399 uint64_t srvid;
400 unsigned int num_records;
401 int result;
404 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
405 void *private_data);
406 static void pull_database_register_done(struct tevent_req *subreq);
407 static void pull_database_old_done(struct tevent_req *subreq);
408 static void pull_database_unregister_done(struct tevent_req *subreq);
409 static void pull_database_new_done(struct tevent_req *subreq);
411 static struct tevent_req *pull_database_send(
412 TALLOC_CTX *mem_ctx,
413 struct tevent_context *ev,
414 struct ctdb_client_context *client,
415 uint32_t pnn, uint32_t caps,
416 struct recdb_context *recdb)
418 struct tevent_req *req, *subreq;
419 struct pull_database_state *state;
420 struct ctdb_req_control request;
422 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
423 if (req == NULL) {
424 return NULL;
427 state->ev = ev;
428 state->client = client;
429 state->recdb = recdb;
430 state->pnn = pnn;
431 state->srvid = srvid_next();
433 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
434 subreq = ctdb_client_set_message_handler_send(
435 state, state->ev, state->client,
436 state->srvid, pull_database_handler,
437 req);
438 if (tevent_req_nomem(subreq, req)) {
439 return tevent_req_post(req, ev);
442 tevent_req_set_callback(subreq, pull_database_register_done,
443 req);
445 } else {
446 struct ctdb_pulldb pulldb;
448 pulldb.db_id = recdb_id(recdb);
449 pulldb.lmaster = CTDB_LMASTER_ANY;
451 ctdb_req_control_pull_db(&request, &pulldb);
452 subreq = ctdb_client_control_send(state, state->ev,
453 state->client,
454 pnn, TIMEOUT(),
455 &request);
456 if (tevent_req_nomem(subreq, req)) {
457 return tevent_req_post(req, ev);
459 tevent_req_set_callback(subreq, pull_database_old_done, req);
462 return req;
465 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
466 void *private_data)
468 struct tevent_req *req = talloc_get_type_abort(
469 private_data, struct tevent_req);
470 struct pull_database_state *state = tevent_req_data(
471 req, struct pull_database_state);
472 struct ctdb_rec_buffer *recbuf;
473 size_t np;
474 int ret;
475 bool status;
477 if (srvid != state->srvid) {
478 return;
481 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
482 if (ret != 0) {
483 D_ERR("Invalid data received for DB_PULL messages\n");
484 return;
487 if (recbuf->db_id != recdb_id(state->recdb)) {
488 talloc_free(recbuf);
489 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
490 recbuf->db_id, recdb_name(state->recdb));
491 return;
494 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
495 recbuf);
496 if (! status) {
497 talloc_free(recbuf);
498 D_ERR("Failed to add records to recdb for %s\n",
499 recdb_name(state->recdb));
500 return;
503 state->num_records += recbuf->count;
504 talloc_free(recbuf);
507 static void pull_database_register_done(struct tevent_req *subreq)
509 struct tevent_req *req = tevent_req_callback_data(
510 subreq, struct tevent_req);
511 struct pull_database_state *state = tevent_req_data(
512 req, struct pull_database_state);
513 struct ctdb_req_control request;
514 struct ctdb_pulldb_ext pulldb_ext;
515 int ret;
516 bool status;
518 status = ctdb_client_set_message_handler_recv(subreq, &ret);
519 TALLOC_FREE(subreq);
520 if (! status) {
521 D_ERR("Failed to set message handler for DB_PULL for %s\n",
522 recdb_name(state->recdb));
523 tevent_req_error(req, ret);
524 return;
527 pulldb_ext.db_id = recdb_id(state->recdb);
528 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
529 pulldb_ext.srvid = state->srvid;
531 ctdb_req_control_db_pull(&request, &pulldb_ext);
532 subreq = ctdb_client_control_send(state, state->ev, state->client,
533 state->pnn, TIMEOUT(), &request);
534 if (tevent_req_nomem(subreq, req)) {
535 return;
537 tevent_req_set_callback(subreq, pull_database_new_done, req);
540 static void pull_database_old_done(struct tevent_req *subreq)
542 struct tevent_req *req = tevent_req_callback_data(
543 subreq, struct tevent_req);
544 struct pull_database_state *state = tevent_req_data(
545 req, struct pull_database_state);
546 struct ctdb_reply_control *reply;
547 struct ctdb_rec_buffer *recbuf;
548 int ret;
549 bool status;
551 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
552 TALLOC_FREE(subreq);
553 if (! status) {
554 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
555 recdb_name(state->recdb), state->pnn, ret);
556 tevent_req_error(req, ret);
557 return;
560 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
561 talloc_free(reply);
562 if (ret != 0) {
563 tevent_req_error(req, ret);
564 return;
567 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
568 recbuf);
569 if (! status) {
570 talloc_free(recbuf);
571 tevent_req_error(req, EIO);
572 return;
575 state->num_records = recbuf->count;
576 talloc_free(recbuf);
578 D_INFO("Pulled %d records for db %s from node %d\n",
579 state->num_records, recdb_name(state->recdb), state->pnn);
581 tevent_req_done(req);
584 static void pull_database_new_done(struct tevent_req *subreq)
586 struct tevent_req *req = tevent_req_callback_data(
587 subreq, struct tevent_req);
588 struct pull_database_state *state = tevent_req_data(
589 req, struct pull_database_state);
590 struct ctdb_reply_control *reply;
591 uint32_t num_records;
592 int ret;
593 bool status;
595 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
596 TALLOC_FREE(subreq);
597 if (! status) {
598 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
599 recdb_name(state->recdb), state->pnn, ret);
600 state->result = ret;
601 goto unregister;
604 ret = ctdb_reply_control_db_pull(reply, &num_records);
605 talloc_free(reply);
606 if (num_records != state->num_records) {
607 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
608 num_records, state->num_records,
609 recdb_name(state->recdb));
610 state->result = EIO;
611 goto unregister;
614 D_INFO("Pulled %d records for db %s from node %d\n",
615 state->num_records, recdb_name(state->recdb), state->pnn);
617 unregister:
619 subreq = ctdb_client_remove_message_handler_send(
620 state, state->ev, state->client,
621 state->srvid, req);
622 if (tevent_req_nomem(subreq, req)) {
623 return;
625 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
628 static void pull_database_unregister_done(struct tevent_req *subreq)
630 struct tevent_req *req = tevent_req_callback_data(
631 subreq, struct tevent_req);
632 struct pull_database_state *state = tevent_req_data(
633 req, struct pull_database_state);
634 int ret;
635 bool status;
637 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
638 TALLOC_FREE(subreq);
639 if (! status) {
640 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
641 recdb_name(state->recdb));
642 tevent_req_error(req, ret);
643 return;
646 if (state->result != 0) {
647 tevent_req_error(req, state->result);
648 return;
651 tevent_req_done(req);
654 static bool pull_database_recv(struct tevent_req *req, int *perr)
656 return generic_recv(req, perr);
660 * Push database to specified nodes (old style)
663 struct push_database_old_state {
664 struct tevent_context *ev;
665 struct ctdb_client_context *client;
666 struct recdb_context *recdb;
667 uint32_t *pnn_list;
668 int count;
669 struct ctdb_rec_buffer *recbuf;
670 int index;
673 static void push_database_old_push_done(struct tevent_req *subreq);
675 static struct tevent_req *push_database_old_send(
676 TALLOC_CTX *mem_ctx,
677 struct tevent_context *ev,
678 struct ctdb_client_context *client,
679 uint32_t *pnn_list, int count,
680 struct recdb_context *recdb)
682 struct tevent_req *req, *subreq;
683 struct push_database_old_state *state;
684 struct ctdb_req_control request;
685 uint32_t pnn;
687 req = tevent_req_create(mem_ctx, &state,
688 struct push_database_old_state);
689 if (req == NULL) {
690 return NULL;
693 state->ev = ev;
694 state->client = client;
695 state->recdb = recdb;
696 state->pnn_list = pnn_list;
697 state->count = count;
698 state->index = 0;
700 state->recbuf = recdb_records(recdb, state,
701 ctdb_client_pnn(client));
702 if (tevent_req_nomem(state->recbuf, req)) {
703 return tevent_req_post(req, ev);
706 pnn = state->pnn_list[state->index];
708 ctdb_req_control_push_db(&request, state->recbuf);
709 subreq = ctdb_client_control_send(state, ev, client, pnn,
710 TIMEOUT(), &request);
711 if (tevent_req_nomem(subreq, req)) {
712 return tevent_req_post(req, ev);
714 tevent_req_set_callback(subreq, push_database_old_push_done, req);
716 return req;
719 static void push_database_old_push_done(struct tevent_req *subreq)
721 struct tevent_req *req = tevent_req_callback_data(
722 subreq, struct tevent_req);
723 struct push_database_old_state *state = tevent_req_data(
724 req, struct push_database_old_state);
725 struct ctdb_req_control request;
726 uint32_t pnn;
727 int ret;
728 bool status;
730 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
731 TALLOC_FREE(subreq);
732 if (! status) {
733 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
734 recdb_name(state->recdb), state->pnn_list[state->index],
735 ret);
736 tevent_req_error(req, ret);
737 return;
740 state->index += 1;
741 if (state->index == state->count) {
742 TALLOC_FREE(state->recbuf);
743 tevent_req_done(req);
744 return;
747 pnn = state->pnn_list[state->index];
749 ctdb_req_control_push_db(&request, state->recbuf);
750 subreq = ctdb_client_control_send(state, state->ev, state->client,
751 pnn, TIMEOUT(), &request);
752 if (tevent_req_nomem(subreq, req)) {
753 return;
755 tevent_req_set_callback(subreq, push_database_old_push_done, req);
758 static bool push_database_old_recv(struct tevent_req *req, int *perr)
760 return generic_recv(req, perr);
764 * Push database to specified nodes (new style)
767 struct push_database_new_state {
768 struct tevent_context *ev;
769 struct ctdb_client_context *client;
770 struct recdb_context *recdb;
771 uint32_t *pnn_list;
772 int count;
773 uint64_t srvid;
774 uint32_t dmaster;
775 int fd;
776 int num_buffers;
777 int num_buffers_sent;
778 unsigned int num_records;
781 static void push_database_new_started(struct tevent_req *subreq);
782 static void push_database_new_send_msg(struct tevent_req *req);
783 static void push_database_new_send_done(struct tevent_req *subreq);
784 static void push_database_new_confirmed(struct tevent_req *subreq);
786 static struct tevent_req *push_database_new_send(
787 TALLOC_CTX *mem_ctx,
788 struct tevent_context *ev,
789 struct ctdb_client_context *client,
790 uint32_t *pnn_list, int count,
791 struct recdb_context *recdb,
792 int max_size)
794 struct tevent_req *req, *subreq;
795 struct push_database_new_state *state;
796 struct ctdb_req_control request;
797 struct ctdb_pulldb_ext pulldb_ext;
798 char *filename;
799 off_t offset;
801 req = tevent_req_create(mem_ctx, &state,
802 struct push_database_new_state);
803 if (req == NULL) {
804 return NULL;
807 state->ev = ev;
808 state->client = client;
809 state->recdb = recdb;
810 state->pnn_list = pnn_list;
811 state->count = count;
813 state->srvid = srvid_next();
814 state->dmaster = ctdb_client_pnn(client);
815 state->num_buffers_sent = 0;
816 state->num_records = 0;
818 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
819 if (tevent_req_nomem(filename, req)) {
820 return tevent_req_post(req, ev);
823 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
824 if (state->fd == -1) {
825 tevent_req_error(req, errno);
826 return tevent_req_post(req, ev);
828 unlink(filename);
829 talloc_free(filename);
831 state->num_buffers = recdb_file(recdb, state, state->dmaster,
832 state->fd, max_size);
833 if (state->num_buffers == -1) {
834 tevent_req_error(req, ENOMEM);
835 return tevent_req_post(req, ev);
838 offset = lseek(state->fd, 0, SEEK_SET);
839 if (offset != 0) {
840 tevent_req_error(req, EIO);
841 return tevent_req_post(req, ev);
844 pulldb_ext.db_id = recdb_id(recdb);
845 pulldb_ext.srvid = state->srvid;
847 ctdb_req_control_db_push_start(&request, &pulldb_ext);
848 subreq = ctdb_client_control_multi_send(state, ev, client,
849 pnn_list, count,
850 TIMEOUT(), &request);
851 if (tevent_req_nomem(subreq, req)) {
852 return tevent_req_post(req, ev);
854 tevent_req_set_callback(subreq, push_database_new_started, req);
856 return req;
859 static void push_database_new_started(struct tevent_req *subreq)
861 struct tevent_req *req = tevent_req_callback_data(
862 subreq, struct tevent_req);
863 struct push_database_new_state *state = tevent_req_data(
864 req, struct push_database_new_state);
865 int *err_list;
866 int ret;
867 bool status;
869 status = ctdb_client_control_multi_recv(subreq, &ret, state,
870 &err_list, NULL);
871 TALLOC_FREE(subreq);
872 if (! status) {
873 int ret2;
874 uint32_t pnn;
876 ret2 = ctdb_client_control_multi_error(state->pnn_list,
877 state->count,
878 err_list, &pnn);
879 if (ret2 != 0) {
880 D_ERR("control DB_PUSH_START failed for db %s"
881 " on node %u, ret=%d\n",
882 recdb_name(state->recdb), pnn, ret2);
883 } else {
884 D_ERR("control DB_PUSH_START failed for db %s,"
885 " ret=%d\n",
886 recdb_name(state->recdb), ret);
888 talloc_free(err_list);
890 tevent_req_error(req, ret);
891 return;
894 push_database_new_send_msg(req);
897 static void push_database_new_send_msg(struct tevent_req *req)
899 struct push_database_new_state *state = tevent_req_data(
900 req, struct push_database_new_state);
901 struct tevent_req *subreq;
902 struct ctdb_rec_buffer *recbuf;
903 struct ctdb_req_message message;
904 TDB_DATA data;
905 size_t np;
906 int ret;
908 if (state->num_buffers_sent == state->num_buffers) {
909 struct ctdb_req_control request;
911 ctdb_req_control_db_push_confirm(&request,
912 recdb_id(state->recdb));
913 subreq = ctdb_client_control_multi_send(state, state->ev,
914 state->client,
915 state->pnn_list,
916 state->count,
917 TIMEOUT(), &request);
918 if (tevent_req_nomem(subreq, req)) {
919 return;
921 tevent_req_set_callback(subreq, push_database_new_confirmed,
922 req);
923 return;
926 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
927 if (ret != 0) {
928 tevent_req_error(req, ret);
929 return;
932 data.dsize = ctdb_rec_buffer_len(recbuf);
933 data.dptr = talloc_size(state, data.dsize);
934 if (tevent_req_nomem(data.dptr, req)) {
935 return;
938 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
940 message.srvid = state->srvid;
941 message.data.data = data;
943 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
944 state->num_buffers_sent, recbuf->count,
945 recdb_name(state->recdb));
947 subreq = ctdb_client_message_multi_send(state, state->ev,
948 state->client,
949 state->pnn_list, state->count,
950 &message);
951 if (tevent_req_nomem(subreq, req)) {
952 return;
954 tevent_req_set_callback(subreq, push_database_new_send_done, req);
956 state->num_records += recbuf->count;
958 talloc_free(data.dptr);
959 talloc_free(recbuf);
962 static void push_database_new_send_done(struct tevent_req *subreq)
964 struct tevent_req *req = tevent_req_callback_data(
965 subreq, struct tevent_req);
966 struct push_database_new_state *state = tevent_req_data(
967 req, struct push_database_new_state);
968 bool status;
969 int ret;
971 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
972 TALLOC_FREE(subreq);
973 if (! status) {
974 D_ERR("Sending recovery records failed for %s\n",
975 recdb_name(state->recdb));
976 tevent_req_error(req, ret);
977 return;
980 state->num_buffers_sent += 1;
982 push_database_new_send_msg(req);
985 static void push_database_new_confirmed(struct tevent_req *subreq)
987 struct tevent_req *req = tevent_req_callback_data(
988 subreq, struct tevent_req);
989 struct push_database_new_state *state = tevent_req_data(
990 req, struct push_database_new_state);
991 struct ctdb_reply_control **reply;
992 int *err_list;
993 bool status;
994 int ret, i;
995 uint32_t num_records;
997 status = ctdb_client_control_multi_recv(subreq, &ret, state,
998 &err_list, &reply);
999 TALLOC_FREE(subreq);
1000 if (! status) {
1001 int ret2;
1002 uint32_t pnn;
1004 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1005 state->count, err_list,
1006 &pnn);
1007 if (ret2 != 0) {
1008 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1009 " on node %u, ret=%d\n",
1010 recdb_name(state->recdb), pnn, ret2);
1011 } else {
1012 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1013 " ret=%d\n",
1014 recdb_name(state->recdb), ret);
1016 tevent_req_error(req, ret);
1017 return;
1020 for (i=0; i<state->count; i++) {
1021 ret = ctdb_reply_control_db_push_confirm(reply[i],
1022 &num_records);
1023 if (ret != 0) {
1024 tevent_req_error(req, EPROTO);
1025 return;
1028 if (num_records != state->num_records) {
1029 D_ERR("Node %u received %d of %d records for %s\n",
1030 state->pnn_list[i], num_records,
1031 state->num_records, recdb_name(state->recdb));
1032 tevent_req_error(req, EPROTO);
1033 return;
1037 talloc_free(reply);
1039 D_INFO("Pushed %d records for db %s\n",
1040 state->num_records, recdb_name(state->recdb));
1042 tevent_req_done(req);
1045 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1047 return generic_recv(req, perr);
1051 * wrapper for push_database_old and push_database_new
1054 struct push_database_state {
1055 bool old_done, new_done;
1058 static void push_database_old_done(struct tevent_req *subreq);
1059 static void push_database_new_done(struct tevent_req *subreq);
1061 static struct tevent_req *push_database_send(
1062 TALLOC_CTX *mem_ctx,
1063 struct tevent_context *ev,
1064 struct ctdb_client_context *client,
1065 uint32_t *pnn_list, int count, uint32_t *caps,
1066 struct ctdb_tunable_list *tun_list,
1067 struct recdb_context *recdb)
1069 struct tevent_req *req, *subreq;
1070 struct push_database_state *state;
1071 uint32_t *old_list, *new_list;
1072 unsigned int old_count, new_count;
1073 int i;
1075 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1076 if (req == NULL) {
1077 return NULL;
1080 state->old_done = false;
1081 state->new_done = false;
1083 old_count = 0;
1084 new_count = 0;
1085 old_list = talloc_array(state, uint32_t, count);
1086 new_list = talloc_array(state, uint32_t, count);
1087 if (tevent_req_nomem(old_list, req) ||
1088 tevent_req_nomem(new_list,req)) {
1089 return tevent_req_post(req, ev);
1092 for (i=0; i<count; i++) {
1093 uint32_t pnn = pnn_list[i];
1095 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1096 new_list[new_count] = pnn;
1097 new_count += 1;
1098 } else {
1099 old_list[old_count] = pnn;
1100 old_count += 1;
1104 if (old_count > 0) {
1105 subreq = push_database_old_send(state, ev, client,
1106 old_list, old_count, recdb);
1107 if (tevent_req_nomem(subreq, req)) {
1108 return tevent_req_post(req, ev);
1110 tevent_req_set_callback(subreq, push_database_old_done, req);
1111 } else {
1112 state->old_done = true;
1115 if (new_count > 0) {
1116 subreq = push_database_new_send(state, ev, client,
1117 new_list, new_count, recdb,
1118 tun_list->rec_buffer_size_limit);
1119 if (tevent_req_nomem(subreq, req)) {
1120 return tevent_req_post(req, ev);
1122 tevent_req_set_callback(subreq, push_database_new_done, req);
1123 } else {
1124 state->new_done = true;
1127 return req;
1130 static void push_database_old_done(struct tevent_req *subreq)
1132 struct tevent_req *req = tevent_req_callback_data(
1133 subreq, struct tevent_req);
1134 struct push_database_state *state = tevent_req_data(
1135 req, struct push_database_state);
1136 bool status;
1137 int ret;
1139 status = push_database_old_recv(subreq, &ret);
1140 if (! status) {
1141 tevent_req_error(req, ret);
1142 return;
1145 state->old_done = true;
1147 if (state->old_done && state->new_done) {
1148 tevent_req_done(req);
1152 static void push_database_new_done(struct tevent_req *subreq)
1154 struct tevent_req *req = tevent_req_callback_data(
1155 subreq, struct tevent_req);
1156 struct push_database_state *state = tevent_req_data(
1157 req, struct push_database_state);
1158 bool status;
1159 int ret;
1161 status = push_database_new_recv(subreq, &ret);
1162 if (! status) {
1163 tevent_req_error(req, ret);
1164 return;
1167 state->new_done = true;
1169 if (state->old_done && state->new_done) {
1170 tevent_req_done(req);
1174 static bool push_database_recv(struct tevent_req *req, int *perr)
1176 return generic_recv(req, perr);
1180 * Collect databases using highest sequence number
1183 struct collect_highseqnum_db_state {
1184 struct tevent_context *ev;
1185 struct ctdb_client_context *client;
1186 uint32_t *pnn_list;
1187 int count;
1188 uint32_t *caps;
1189 uint32_t *ban_credits;
1190 uint32_t db_id;
1191 struct recdb_context *recdb;
1192 uint32_t max_pnn;
1195 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1196 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1198 static struct tevent_req *collect_highseqnum_db_send(
1199 TALLOC_CTX *mem_ctx,
1200 struct tevent_context *ev,
1201 struct ctdb_client_context *client,
1202 uint32_t *pnn_list, int count, uint32_t *caps,
1203 uint32_t *ban_credits, uint32_t db_id,
1204 struct recdb_context *recdb)
1206 struct tevent_req *req, *subreq;
1207 struct collect_highseqnum_db_state *state;
1208 struct ctdb_req_control request;
1210 req = tevent_req_create(mem_ctx, &state,
1211 struct collect_highseqnum_db_state);
1212 if (req == NULL) {
1213 return NULL;
1216 state->ev = ev;
1217 state->client = client;
1218 state->pnn_list = pnn_list;
1219 state->count = count;
1220 state->caps = caps;
1221 state->ban_credits = ban_credits;
1222 state->db_id = db_id;
1223 state->recdb = recdb;
1225 ctdb_req_control_get_db_seqnum(&request, db_id);
1226 subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1227 state->pnn_list, state->count,
1228 TIMEOUT(), &request);
1229 if (tevent_req_nomem(subreq, req)) {
1230 return tevent_req_post(req, ev);
1232 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1233 req);
1235 return req;
1238 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1240 struct tevent_req *req = tevent_req_callback_data(
1241 subreq, struct tevent_req);
1242 struct collect_highseqnum_db_state *state = tevent_req_data(
1243 req, struct collect_highseqnum_db_state);
1244 struct ctdb_reply_control **reply;
1245 int *err_list;
1246 bool status;
1247 int ret, i;
1248 uint64_t seqnum, max_seqnum;
1250 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1251 &err_list, &reply);
1252 TALLOC_FREE(subreq);
1253 if (! status) {
1254 int ret2;
1255 uint32_t pnn;
1257 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1258 state->count, err_list,
1259 &pnn);
1260 if (ret2 != 0) {
1261 D_ERR("control GET_DB_SEQNUM failed for db %s"
1262 " on node %u, ret=%d\n",
1263 recdb_name(state->recdb), pnn, ret2);
1264 } else {
1265 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1266 " ret=%d\n",
1267 recdb_name(state->recdb), ret);
1269 tevent_req_error(req, ret);
1270 return;
1273 max_seqnum = 0;
1274 state->max_pnn = state->pnn_list[0];
1275 for (i=0; i<state->count; i++) {
1276 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1277 if (ret != 0) {
1278 tevent_req_error(req, EPROTO);
1279 return;
1282 if (max_seqnum < seqnum) {
1283 max_seqnum = seqnum;
1284 state->max_pnn = state->pnn_list[i];
1288 talloc_free(reply);
1290 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1291 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1293 subreq = pull_database_send(state, state->ev, state->client,
1294 state->max_pnn,
1295 state->caps[state->max_pnn],
1296 state->recdb);
1297 if (tevent_req_nomem(subreq, req)) {
1298 return;
1300 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1301 req);
1304 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1306 struct tevent_req *req = tevent_req_callback_data(
1307 subreq, struct tevent_req);
1308 struct collect_highseqnum_db_state *state = tevent_req_data(
1309 req, struct collect_highseqnum_db_state);
1310 int ret;
1311 bool status;
1313 status = pull_database_recv(subreq, &ret);
1314 TALLOC_FREE(subreq);
1315 if (! status) {
1316 state->ban_credits[state->max_pnn] += 1;
1317 tevent_req_error(req, ret);
1318 return;
1321 tevent_req_done(req);
1324 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1326 return generic_recv(req, perr);
1330 * Collect all databases
1333 struct collect_all_db_state {
1334 struct tevent_context *ev;
1335 struct ctdb_client_context *client;
1336 uint32_t *pnn_list;
1337 int count;
1338 uint32_t *caps;
1339 uint32_t *ban_credits;
1340 uint32_t db_id;
1341 struct recdb_context *recdb;
1342 struct ctdb_pulldb pulldb;
1343 int index;
1346 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1348 static struct tevent_req *collect_all_db_send(
1349 TALLOC_CTX *mem_ctx,
1350 struct tevent_context *ev,
1351 struct ctdb_client_context *client,
1352 uint32_t *pnn_list, int count, uint32_t *caps,
1353 uint32_t *ban_credits, uint32_t db_id,
1354 struct recdb_context *recdb)
1356 struct tevent_req *req, *subreq;
1357 struct collect_all_db_state *state;
1358 uint32_t pnn;
1360 req = tevent_req_create(mem_ctx, &state,
1361 struct collect_all_db_state);
1362 if (req == NULL) {
1363 return NULL;
1366 state->ev = ev;
1367 state->client = client;
1368 state->pnn_list = pnn_list;
1369 state->count = count;
1370 state->caps = caps;
1371 state->ban_credits = ban_credits;
1372 state->db_id = db_id;
1373 state->recdb = recdb;
1374 state->index = 0;
1376 pnn = state->pnn_list[state->index];
1378 subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1379 if (tevent_req_nomem(subreq, req)) {
1380 return tevent_req_post(req, ev);
1382 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1384 return req;
1387 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1389 struct tevent_req *req = tevent_req_callback_data(
1390 subreq, struct tevent_req);
1391 struct collect_all_db_state *state = tevent_req_data(
1392 req, struct collect_all_db_state);
1393 uint32_t pnn;
1394 int ret;
1395 bool status;
1397 status = pull_database_recv(subreq, &ret);
1398 TALLOC_FREE(subreq);
1399 if (! status) {
1400 pnn = state->pnn_list[state->index];
1401 state->ban_credits[pnn] += 1;
1402 tevent_req_error(req, ret);
1403 return;
1406 state->index += 1;
1407 if (state->index == state->count) {
1408 tevent_req_done(req);
1409 return;
1412 pnn = state->pnn_list[state->index];
1413 subreq = pull_database_send(state, state->ev, state->client,
1414 pnn, state->caps[pnn], state->recdb);
1415 if (tevent_req_nomem(subreq, req)) {
1416 return;
1418 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1421 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1423 return generic_recv(req, perr);
1428 * For each database do the following:
1429 * - Get DB name
1430 * - Get DB path
1431 * - Freeze database on all nodes
1432 * - Start transaction on all nodes
1433 * - Collect database from all nodes
1434 * - Wipe database on all nodes
1435 * - Push database to all nodes
1436 * - Commit transaction on all nodes
1437 * - Thaw database on all nodes
1440 struct recover_db_state {
1441 struct tevent_context *ev;
1442 struct ctdb_client_context *client;
1443 struct ctdb_tunable_list *tun_list;
1444 uint32_t *pnn_list;
1445 int count;
1446 uint32_t *caps;
1447 uint32_t *ban_credits;
1448 uint32_t db_id;
1449 uint8_t db_flags;
1451 uint32_t destnode;
1452 struct ctdb_transdb transdb;
1454 const char *db_name, *db_path;
1455 struct recdb_context *recdb;
1458 static void recover_db_name_done(struct tevent_req *subreq);
1459 static void recover_db_path_done(struct tevent_req *subreq);
1460 static void recover_db_freeze_done(struct tevent_req *subreq);
1461 static void recover_db_transaction_started(struct tevent_req *subreq);
1462 static void recover_db_collect_done(struct tevent_req *subreq);
1463 static void recover_db_wipedb_done(struct tevent_req *subreq);
1464 static void recover_db_pushdb_done(struct tevent_req *subreq);
1465 static void recover_db_transaction_committed(struct tevent_req *subreq);
1466 static void recover_db_thaw_done(struct tevent_req *subreq);
1468 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1469 struct tevent_context *ev,
1470 struct ctdb_client_context *client,
1471 struct ctdb_tunable_list *tun_list,
1472 uint32_t *pnn_list, int count,
1473 uint32_t *caps,
1474 uint32_t *ban_credits,
1475 uint32_t generation,
1476 uint32_t db_id, uint8_t db_flags)
1478 struct tevent_req *req, *subreq;
1479 struct recover_db_state *state;
1480 struct ctdb_req_control request;
1482 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1483 if (req == NULL) {
1484 return NULL;
1487 state->ev = ev;
1488 state->client = client;
1489 state->tun_list = tun_list;
1490 state->pnn_list = pnn_list;
1491 state->count = count;
1492 state->caps = caps;
1493 state->ban_credits = ban_credits;
1494 state->db_id = db_id;
1495 state->db_flags = db_flags;
1497 state->destnode = ctdb_client_pnn(client);
1498 state->transdb.db_id = db_id;
1499 state->transdb.tid = generation;
1501 ctdb_req_control_get_dbname(&request, db_id);
1502 subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1503 TIMEOUT(), &request);
1504 if (tevent_req_nomem(subreq, req)) {
1505 return tevent_req_post(req, ev);
1507 tevent_req_set_callback(subreq, recover_db_name_done, req);
1509 return req;
1512 static void recover_db_name_done(struct tevent_req *subreq)
1514 struct tevent_req *req = tevent_req_callback_data(
1515 subreq, struct tevent_req);
1516 struct recover_db_state *state = tevent_req_data(
1517 req, struct recover_db_state);
1518 struct ctdb_reply_control *reply;
1519 struct ctdb_req_control request;
1520 int ret;
1521 bool status;
1523 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1524 TALLOC_FREE(subreq);
1525 if (! status) {
1526 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1527 state->db_id, ret);
1528 tevent_req_error(req, ret);
1529 return;
1532 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1533 if (ret != 0) {
1534 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1535 state->db_id, ret);
1536 tevent_req_error(req, EPROTO);
1537 return;
1540 talloc_free(reply);
1542 ctdb_req_control_getdbpath(&request, state->db_id);
1543 subreq = ctdb_client_control_send(state, state->ev, state->client,
1544 state->destnode, TIMEOUT(),
1545 &request);
1546 if (tevent_req_nomem(subreq, req)) {
1547 return;
1549 tevent_req_set_callback(subreq, recover_db_path_done, req);
1552 static void recover_db_path_done(struct tevent_req *subreq)
1554 struct tevent_req *req = tevent_req_callback_data(
1555 subreq, struct tevent_req);
1556 struct recover_db_state *state = tevent_req_data(
1557 req, struct recover_db_state);
1558 struct ctdb_reply_control *reply;
1559 struct ctdb_req_control request;
1560 int ret;
1561 bool status;
1563 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1564 TALLOC_FREE(subreq);
1565 if (! status) {
1566 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1567 state->db_name, ret);
1568 tevent_req_error(req, ret);
1569 return;
1572 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1573 if (ret != 0) {
1574 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1575 state->db_name, ret);
1576 tevent_req_error(req, EPROTO);
1577 return;
1580 talloc_free(reply);
1582 ctdb_req_control_db_freeze(&request, state->db_id);
1583 subreq = ctdb_client_control_multi_send(state, state->ev,
1584 state->client,
1585 state->pnn_list, state->count,
1586 TIMEOUT(), &request);
1587 if (tevent_req_nomem(subreq, req)) {
1588 return;
1590 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1593 static void recover_db_freeze_done(struct tevent_req *subreq)
1595 struct tevent_req *req = tevent_req_callback_data(
1596 subreq, struct tevent_req);
1597 struct recover_db_state *state = tevent_req_data(
1598 req, struct recover_db_state);
1599 struct ctdb_req_control request;
1600 int *err_list;
1601 int ret;
1602 bool status;
1604 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1605 NULL);
1606 TALLOC_FREE(subreq);
1607 if (! status) {
1608 int ret2;
1609 uint32_t pnn;
1611 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1612 state->count, err_list,
1613 &pnn);
1614 if (ret2 != 0) {
1615 D_ERR("control FREEZE_DB failed for db %s"
1616 " on node %u, ret=%d\n",
1617 state->db_name, pnn, ret2);
1618 state->ban_credits[pnn] += 1;
1619 } else {
1620 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1621 state->db_name, ret);
1623 tevent_req_error(req, ret);
1624 return;
1627 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1628 subreq = ctdb_client_control_multi_send(state, state->ev,
1629 state->client,
1630 state->pnn_list, state->count,
1631 TIMEOUT(), &request);
1632 if (tevent_req_nomem(subreq, req)) {
1633 return;
1635 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1638 static void recover_db_transaction_started(struct tevent_req *subreq)
1640 struct tevent_req *req = tevent_req_callback_data(
1641 subreq, struct tevent_req);
1642 struct recover_db_state *state = tevent_req_data(
1643 req, struct recover_db_state);
1644 int *err_list;
1645 int ret;
1646 bool status;
1648 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1649 NULL);
1650 TALLOC_FREE(subreq);
1651 if (! status) {
1652 int ret2;
1653 uint32_t pnn;
1655 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1656 state->count,
1657 err_list, &pnn);
1658 if (ret2 != 0) {
1659 D_ERR("control TRANSACTION_DB failed for db=%s"
1660 " on node %u, ret=%d\n",
1661 state->db_name, pnn, ret2);
1662 } else {
1663 D_ERR("control TRANSACTION_DB failed for db=%s,"
1664 " ret=%d\n", state->db_name, ret);
1666 tevent_req_error(req, ret);
1667 return;
1670 state->recdb = recdb_create(state, state->db_id, state->db_name,
1671 state->db_path,
1672 state->tun_list->database_hash_size,
1673 state->db_flags & CTDB_DB_FLAGS_PERSISTENT);
1674 if (tevent_req_nomem(state->recdb, req)) {
1675 return;
1678 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1679 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1680 subreq = collect_highseqnum_db_send(
1681 state, state->ev, state->client,
1682 state->pnn_list, state->count, state->caps,
1683 state->ban_credits, state->db_id,
1684 state->recdb);
1685 } else {
1686 subreq = collect_all_db_send(
1687 state, state->ev, state->client,
1688 state->pnn_list, state->count, state->caps,
1689 state->ban_credits, state->db_id,
1690 state->recdb);
1692 if (tevent_req_nomem(subreq, req)) {
1693 return;
1695 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1698 static void recover_db_collect_done(struct tevent_req *subreq)
1700 struct tevent_req *req = tevent_req_callback_data(
1701 subreq, struct tevent_req);
1702 struct recover_db_state *state = tevent_req_data(
1703 req, struct recover_db_state);
1704 struct ctdb_req_control request;
1705 int ret;
1706 bool status;
1708 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1709 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1710 status = collect_highseqnum_db_recv(subreq, &ret);
1711 } else {
1712 status = collect_all_db_recv(subreq, &ret);
1714 TALLOC_FREE(subreq);
1715 if (! status) {
1716 tevent_req_error(req, ret);
1717 return;
1720 ctdb_req_control_wipe_database(&request, &state->transdb);
1721 subreq = ctdb_client_control_multi_send(state, state->ev,
1722 state->client,
1723 state->pnn_list, state->count,
1724 TIMEOUT(), &request);
1725 if (tevent_req_nomem(subreq, req)) {
1726 return;
1728 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1731 static void recover_db_wipedb_done(struct tevent_req *subreq)
1733 struct tevent_req *req = tevent_req_callback_data(
1734 subreq, struct tevent_req);
1735 struct recover_db_state *state = tevent_req_data(
1736 req, struct recover_db_state);
1737 int *err_list;
1738 int ret;
1739 bool status;
1741 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1742 NULL);
1743 TALLOC_FREE(subreq);
1744 if (! status) {
1745 int ret2;
1746 uint32_t pnn;
1748 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1749 state->count,
1750 err_list, &pnn);
1751 if (ret2 != 0) {
1752 D_ERR("control WIPEDB failed for db %s on node %u,"
1753 " ret=%d\n", state->db_name, pnn, ret2);
1754 } else {
1755 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1756 state->db_name, ret);
1758 tevent_req_error(req, ret);
1759 return;
1762 subreq = push_database_send(state, state->ev, state->client,
1763 state->pnn_list, state->count,
1764 state->caps, state->tun_list,
1765 state->recdb);
1766 if (tevent_req_nomem(subreq, req)) {
1767 return;
1769 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1772 static void recover_db_pushdb_done(struct tevent_req *subreq)
1774 struct tevent_req *req = tevent_req_callback_data(
1775 subreq, struct tevent_req);
1776 struct recover_db_state *state = tevent_req_data(
1777 req, struct recover_db_state);
1778 struct ctdb_req_control request;
1779 int ret;
1780 bool status;
1782 status = push_database_recv(subreq, &ret);
1783 TALLOC_FREE(subreq);
1784 if (! status) {
1785 tevent_req_error(req, ret);
1786 return;
1789 TALLOC_FREE(state->recdb);
1791 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1792 subreq = ctdb_client_control_multi_send(state, state->ev,
1793 state->client,
1794 state->pnn_list, state->count,
1795 TIMEOUT(), &request);
1796 if (tevent_req_nomem(subreq, req)) {
1797 return;
1799 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1802 static void recover_db_transaction_committed(struct tevent_req *subreq)
1804 struct tevent_req *req = tevent_req_callback_data(
1805 subreq, struct tevent_req);
1806 struct recover_db_state *state = tevent_req_data(
1807 req, struct recover_db_state);
1808 struct ctdb_req_control request;
1809 int *err_list;
1810 int ret;
1811 bool status;
1813 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1814 NULL);
1815 TALLOC_FREE(subreq);
1816 if (! status) {
1817 int ret2;
1818 uint32_t pnn;
1820 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1821 state->count,
1822 err_list, &pnn);
1823 if (ret2 != 0) {
1824 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1825 " on node %u, ret=%d\n",
1826 state->db_name, pnn, ret2);
1827 } else {
1828 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1829 " ret=%d\n", state->db_name, ret);
1831 tevent_req_error(req, ret);
1832 return;
1835 ctdb_req_control_db_thaw(&request, state->db_id);
1836 subreq = ctdb_client_control_multi_send(state, state->ev,
1837 state->client,
1838 state->pnn_list, state->count,
1839 TIMEOUT(), &request);
1840 if (tevent_req_nomem(subreq, req)) {
1841 return;
1843 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1846 static void recover_db_thaw_done(struct tevent_req *subreq)
1848 struct tevent_req *req = tevent_req_callback_data(
1849 subreq, struct tevent_req);
1850 struct recover_db_state *state = tevent_req_data(
1851 req, struct recover_db_state);
1852 int *err_list;
1853 int ret;
1854 bool status;
1856 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1857 NULL);
1858 TALLOC_FREE(subreq);
1859 if (! status) {
1860 int ret2;
1861 uint32_t pnn;
1863 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1864 state->count,
1865 err_list, &pnn);
1866 if (ret2 != 0) {
1867 D_ERR("control DB_THAW failed for db %s on node %u,"
1868 " ret=%d\n", state->db_name, pnn, ret2);
1869 } else {
1870 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1871 state->db_name, ret);
1873 tevent_req_error(req, ret);
1874 return;
1877 tevent_req_done(req);
1880 static bool recover_db_recv(struct tevent_req *req)
1882 return generic_recv(req, NULL);
1887 * Start database recovery for each database
1889 * Try to recover each database 5 times before failing recovery.
1892 struct db_recovery_state {
1893 struct tevent_context *ev;
1894 struct ctdb_dbid_map *dbmap;
1895 unsigned int num_replies;
1896 unsigned int num_failed;
1899 struct db_recovery_one_state {
1900 struct tevent_req *req;
1901 struct ctdb_client_context *client;
1902 struct ctdb_dbid_map *dbmap;
1903 struct ctdb_tunable_list *tun_list;
1904 uint32_t *pnn_list;
1905 int count;
1906 uint32_t *caps;
1907 uint32_t *ban_credits;
1908 uint32_t generation;
1909 uint32_t db_id;
1910 uint8_t db_flags;
1911 int num_fails;
1914 static void db_recovery_one_done(struct tevent_req *subreq);
1916 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1917 struct tevent_context *ev,
1918 struct ctdb_client_context *client,
1919 struct ctdb_dbid_map *dbmap,
1920 struct ctdb_tunable_list *tun_list,
1921 uint32_t *pnn_list, int count,
1922 uint32_t *caps,
1923 uint32_t *ban_credits,
1924 uint32_t generation)
1926 struct tevent_req *req, *subreq;
1927 struct db_recovery_state *state;
1928 unsigned int i;
1930 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1931 if (req == NULL) {
1932 return NULL;
1935 state->ev = ev;
1936 state->dbmap = dbmap;
1937 state->num_replies = 0;
1938 state->num_failed = 0;
1940 if (dbmap->num == 0) {
1941 tevent_req_done(req);
1942 return tevent_req_post(req, ev);
1945 for (i=0; i<dbmap->num; i++) {
1946 struct db_recovery_one_state *substate;
1948 substate = talloc_zero(state, struct db_recovery_one_state);
1949 if (tevent_req_nomem(substate, req)) {
1950 return tevent_req_post(req, ev);
1953 substate->req = req;
1954 substate->client = client;
1955 substate->dbmap = dbmap;
1956 substate->tun_list = tun_list;
1957 substate->pnn_list = pnn_list;
1958 substate->count = count;
1959 substate->caps = caps;
1960 substate->ban_credits = ban_credits;
1961 substate->generation = generation;
1962 substate->db_id = dbmap->dbs[i].db_id;
1963 substate->db_flags = dbmap->dbs[i].flags;
1965 subreq = recover_db_send(state, ev, client, tun_list,
1966 pnn_list, count, caps, ban_credits,
1967 generation, substate->db_id,
1968 substate->db_flags);
1969 if (tevent_req_nomem(subreq, req)) {
1970 return tevent_req_post(req, ev);
1972 tevent_req_set_callback(subreq, db_recovery_one_done,
1973 substate);
1974 D_NOTICE("recover database 0x%08x\n", substate->db_id);
1977 return req;
1980 static void db_recovery_one_done(struct tevent_req *subreq)
1982 struct db_recovery_one_state *substate = tevent_req_callback_data(
1983 subreq, struct db_recovery_one_state);
1984 struct tevent_req *req = substate->req;
1985 struct db_recovery_state *state = tevent_req_data(
1986 req, struct db_recovery_state);
1987 bool status;
1989 status = recover_db_recv(subreq);
1990 TALLOC_FREE(subreq);
1992 if (status) {
1993 talloc_free(substate);
1994 goto done;
1997 substate->num_fails += 1;
1998 if (substate->num_fails < NUM_RETRIES) {
1999 subreq = recover_db_send(state, state->ev, substate->client,
2000 substate->tun_list,
2001 substate->pnn_list, substate->count,
2002 substate->caps, substate->ban_credits,
2003 substate->generation, substate->db_id,
2004 substate->db_flags);
2005 if (tevent_req_nomem(subreq, req)) {
2006 goto failed;
2008 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2009 D_NOTICE("recover database 0x%08x, attempt %d\n",
2010 substate->db_id, substate->num_fails+1);
2011 return;
2014 failed:
2015 state->num_failed += 1;
2017 done:
2018 state->num_replies += 1;
2020 if (state->num_replies == state->dbmap->num) {
2021 tevent_req_done(req);
2025 static bool db_recovery_recv(struct tevent_req *req, unsigned int *count)
2027 struct db_recovery_state *state = tevent_req_data(
2028 req, struct db_recovery_state);
2029 int err;
2031 if (tevent_req_is_unix_error(req, &err)) {
2032 *count = 0;
2033 return false;
2036 *count = state->num_replies - state->num_failed;
2038 if (state->num_failed > 0) {
2039 return false;
2042 return true;
2047 * Run the parallel database recovery
2049 * - Get tunables
2050 * - Get nodemap
2051 * - Get vnnmap
2052 * - Get capabilities from all nodes
2053 * - Get dbmap
2054 * - Set RECOVERY_ACTIVE
2055 * - Send START_RECOVERY
2056 * - Update vnnmap on all nodes
2057 * - Run database recovery
2058 * - Set RECOVERY_NORMAL
2059 * - Send END_RECOVERY
2062 struct recovery_state {
2063 struct tevent_context *ev;
2064 struct ctdb_client_context *client;
2065 uint32_t generation;
2066 uint32_t *pnn_list;
2067 unsigned int count;
2068 uint32_t destnode;
2069 struct ctdb_node_map *nodemap;
2070 uint32_t *caps;
2071 uint32_t *ban_credits;
2072 struct ctdb_tunable_list *tun_list;
2073 struct ctdb_vnn_map *vnnmap;
2074 struct ctdb_dbid_map *dbmap;
2077 static void recovery_tunables_done(struct tevent_req *subreq);
2078 static void recovery_nodemap_done(struct tevent_req *subreq);
2079 static void recovery_vnnmap_done(struct tevent_req *subreq);
2080 static void recovery_capabilities_done(struct tevent_req *subreq);
2081 static void recovery_dbmap_done(struct tevent_req *subreq);
2082 static void recovery_active_done(struct tevent_req *subreq);
2083 static void recovery_start_recovery_done(struct tevent_req *subreq);
2084 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2085 static void recovery_db_recovery_done(struct tevent_req *subreq);
2086 static void recovery_failed_done(struct tevent_req *subreq);
2087 static void recovery_normal_done(struct tevent_req *subreq);
2088 static void recovery_end_recovery_done(struct tevent_req *subreq);
2090 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2091 struct tevent_context *ev,
2092 struct ctdb_client_context *client,
2093 uint32_t generation)
2095 struct tevent_req *req, *subreq;
2096 struct recovery_state *state;
2097 struct ctdb_req_control request;
2099 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2100 if (req == NULL) {
2101 return NULL;
2104 state->ev = ev;
2105 state->client = client;
2106 state->generation = generation;
2107 state->destnode = ctdb_client_pnn(client);
2109 ctdb_req_control_get_all_tunables(&request);
2110 subreq = ctdb_client_control_send(state, state->ev, state->client,
2111 state->destnode, TIMEOUT(),
2112 &request);
2113 if (tevent_req_nomem(subreq, req)) {
2114 return tevent_req_post(req, ev);
2116 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2118 return req;
2121 static void recovery_tunables_done(struct tevent_req *subreq)
2123 struct tevent_req *req = tevent_req_callback_data(
2124 subreq, struct tevent_req);
2125 struct recovery_state *state = tevent_req_data(
2126 req, struct recovery_state);
2127 struct ctdb_reply_control *reply;
2128 struct ctdb_req_control request;
2129 int ret;
2130 bool status;
2132 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2133 TALLOC_FREE(subreq);
2134 if (! status) {
2135 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2136 tevent_req_error(req, ret);
2137 return;
2140 ret = ctdb_reply_control_get_all_tunables(reply, state,
2141 &state->tun_list);
2142 if (ret != 0) {
2143 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2144 tevent_req_error(req, EPROTO);
2145 return;
2148 talloc_free(reply);
2150 recover_timeout = state->tun_list->recover_timeout;
2152 ctdb_req_control_get_nodemap(&request);
2153 subreq = ctdb_client_control_send(state, state->ev, state->client,
2154 state->destnode, TIMEOUT(),
2155 &request);
2156 if (tevent_req_nomem(subreq, req)) {
2157 return;
2159 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2162 static void recovery_nodemap_done(struct tevent_req *subreq)
2164 struct tevent_req *req = tevent_req_callback_data(
2165 subreq, struct tevent_req);
2166 struct recovery_state *state = tevent_req_data(
2167 req, struct recovery_state);
2168 struct ctdb_reply_control *reply;
2169 struct ctdb_req_control request;
2170 bool status;
2171 int ret;
2173 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2174 TALLOC_FREE(subreq);
2175 if (! status) {
2176 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2177 state->destnode, ret);
2178 tevent_req_error(req, ret);
2179 return;
2182 ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2183 if (ret != 0) {
2184 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2185 tevent_req_error(req, ret);
2186 return;
2189 state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2190 state, &state->pnn_list);
2191 if (state->count <= 0) {
2192 tevent_req_error(req, ENOMEM);
2193 return;
2196 state->ban_credits = talloc_zero_array(state, uint32_t,
2197 state->nodemap->num);
2198 if (tevent_req_nomem(state->ban_credits, req)) {
2199 return;
2202 ctdb_req_control_getvnnmap(&request);
2203 subreq = ctdb_client_control_send(state, state->ev, state->client,
2204 state->destnode, TIMEOUT(),
2205 &request);
2206 if (tevent_req_nomem(subreq, req)) {
2207 return;
2209 tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2212 static void recovery_vnnmap_done(struct tevent_req *subreq)
2214 struct tevent_req *req = tevent_req_callback_data(
2215 subreq, struct tevent_req);
2216 struct recovery_state *state = tevent_req_data(
2217 req, struct recovery_state);
2218 struct ctdb_reply_control *reply;
2219 struct ctdb_req_control request;
2220 bool status;
2221 int ret;
2223 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2224 TALLOC_FREE(subreq);
2225 if (! status) {
2226 D_ERR("control GETVNNMAP failed to node %u, ret=%d\n",
2227 state->destnode, ret);
2228 tevent_req_error(req, ret);
2229 return;
2232 ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2233 if (ret != 0) {
2234 D_ERR("control GETVNNMAP failed, ret=%d\n", ret);
2235 tevent_req_error(req, ret);
2236 return;
2239 ctdb_req_control_get_capabilities(&request);
2240 subreq = ctdb_client_control_multi_send(state, state->ev,
2241 state->client,
2242 state->pnn_list, state->count,
2243 TIMEOUT(), &request);
2244 if (tevent_req_nomem(subreq, req)) {
2245 return;
2247 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2250 static void recovery_capabilities_done(struct tevent_req *subreq)
2252 struct tevent_req *req = tevent_req_callback_data(
2253 subreq, struct tevent_req);
2254 struct recovery_state *state = tevent_req_data(
2255 req, struct recovery_state);
2256 struct ctdb_reply_control **reply;
2257 struct ctdb_req_control request;
2258 int *err_list;
2259 unsigned int i;
2260 int ret;
2261 bool status;
2263 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2264 &reply);
2265 TALLOC_FREE(subreq);
2266 if (! status) {
2267 int ret2;
2268 uint32_t pnn;
2270 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2271 state->count,
2272 err_list, &pnn);
2273 if (ret2 != 0) {
2274 D_ERR("control GET_CAPABILITIES failed on node %u,"
2275 " ret=%d\n", pnn, ret2);
2276 } else {
2277 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2278 ret);
2280 tevent_req_error(req, ret);
2281 return;
2284 /* Make the array size same as nodemap */
2285 state->caps = talloc_zero_array(state, uint32_t,
2286 state->nodemap->num);
2287 if (tevent_req_nomem(state->caps, req)) {
2288 return;
2291 for (i=0; i<state->count; i++) {
2292 uint32_t pnn;
2294 pnn = state->pnn_list[i];
2295 ret = ctdb_reply_control_get_capabilities(reply[i],
2296 &state->caps[pnn]);
2297 if (ret != 0) {
2298 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2299 pnn);
2300 tevent_req_error(req, EPROTO);
2301 return;
2305 talloc_free(reply);
2307 ctdb_req_control_get_dbmap(&request);
2308 subreq = ctdb_client_control_send(state, state->ev, state->client,
2309 state->destnode, TIMEOUT(),
2310 &request);
2311 if (tevent_req_nomem(subreq, req)) {
2312 return;
2314 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2317 static void recovery_dbmap_done(struct tevent_req *subreq)
2319 struct tevent_req *req = tevent_req_callback_data(
2320 subreq, struct tevent_req);
2321 struct recovery_state *state = tevent_req_data(
2322 req, struct recovery_state);
2323 struct ctdb_reply_control *reply;
2324 struct ctdb_req_control request;
2325 int ret;
2326 bool status;
2328 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2329 TALLOC_FREE(subreq);
2330 if (! status) {
2331 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2332 state->destnode, ret);
2333 tevent_req_error(req, ret);
2334 return;
2337 ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2338 if (ret != 0) {
2339 D_ERR("control GET_DBMAP failed, ret=%d\n", ret);
2340 tevent_req_error(req, ret);
2341 return;
2344 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2345 subreq = ctdb_client_control_multi_send(state, state->ev,
2346 state->client,
2347 state->pnn_list, state->count,
2348 TIMEOUT(), &request);
2349 if (tevent_req_nomem(subreq, req)) {
2350 return;
2352 tevent_req_set_callback(subreq, recovery_active_done, req);
2355 static void recovery_active_done(struct tevent_req *subreq)
2357 struct tevent_req *req = tevent_req_callback_data(
2358 subreq, struct tevent_req);
2359 struct recovery_state *state = tevent_req_data(
2360 req, struct recovery_state);
2361 struct ctdb_req_control request;
2362 struct ctdb_vnn_map *vnnmap;
2363 int *err_list;
2364 int ret;
2365 unsigned int count, i;
2366 bool status;
2368 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2369 NULL);
2370 TALLOC_FREE(subreq);
2371 if (! status) {
2372 int ret2;
2373 uint32_t pnn;
2375 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2376 state->count,
2377 err_list, &pnn);
2378 if (ret2 != 0) {
2379 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2380 " ret=%d\n", pnn, ret2);
2381 } else {
2382 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2383 ret);
2385 tevent_req_error(req, ret);
2386 return;
2389 D_ERR("Set recovery mode to ACTIVE\n");
2391 /* Calculate new VNNMAP */
2392 count = 0;
2393 for (i=0; i<state->nodemap->num; i++) {
2394 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2395 continue;
2397 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2398 continue;
2400 count += 1;
2403 if (count == 0) {
2404 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2407 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2408 if (tevent_req_nomem(vnnmap, req)) {
2409 return;
2412 vnnmap->size = (count == 0 ? 1 : count);
2413 vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2414 if (tevent_req_nomem(vnnmap->map, req)) {
2415 return;
2418 if (count == 0) {
2419 vnnmap->map[0] = state->destnode;
2420 } else {
2421 count = 0;
2422 for (i=0; i<state->nodemap->num; i++) {
2423 if (state->nodemap->node[i].flags &
2424 NODE_FLAGS_INACTIVE) {
2425 continue;
2427 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2428 continue;
2431 vnnmap->map[count] = state->nodemap->node[i].pnn;
2432 count += 1;
2436 vnnmap->generation = state->generation;
2438 talloc_free(state->vnnmap);
2439 state->vnnmap = vnnmap;
2441 ctdb_req_control_start_recovery(&request);
2442 subreq = ctdb_client_control_multi_send(state, state->ev,
2443 state->client,
2444 state->pnn_list, state->count,
2445 TIMEOUT(), &request);
2446 if (tevent_req_nomem(subreq, req)) {
2447 return;
2449 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2452 static void recovery_start_recovery_done(struct tevent_req *subreq)
2454 struct tevent_req *req = tevent_req_callback_data(
2455 subreq, struct tevent_req);
2456 struct recovery_state *state = tevent_req_data(
2457 req, struct recovery_state);
2458 struct ctdb_req_control request;
2459 int *err_list;
2460 int ret;
2461 bool status;
2463 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2464 NULL);
2465 TALLOC_FREE(subreq);
2466 if (! status) {
2467 int ret2;
2468 uint32_t pnn;
2470 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2471 state->count,
2472 err_list, &pnn);
2473 if (ret2 != 0) {
2474 D_ERR("failed to run start_recovery event on node %u,"
2475 " ret=%d\n", pnn, ret2);
2476 } else {
2477 D_ERR("failed to run start_recovery event, ret=%d\n",
2478 ret);
2480 tevent_req_error(req, ret);
2481 return;
2484 D_ERR("start_recovery event finished\n");
2486 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2487 subreq = ctdb_client_control_multi_send(state, state->ev,
2488 state->client,
2489 state->pnn_list, state->count,
2490 TIMEOUT(), &request);
2491 if (tevent_req_nomem(subreq, req)) {
2492 return;
2494 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2497 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2499 struct tevent_req *req = tevent_req_callback_data(
2500 subreq, struct tevent_req);
2501 struct recovery_state *state = tevent_req_data(
2502 req, struct recovery_state);
2503 int *err_list;
2504 int ret;
2505 bool status;
2507 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2508 NULL);
2509 TALLOC_FREE(subreq);
2510 if (! status) {
2511 int ret2;
2512 uint32_t pnn;
2514 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2515 state->count,
2516 err_list, &pnn);
2517 if (ret2 != 0) {
2518 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2519 pnn, ret2);
2520 } else {
2521 D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2523 tevent_req_error(req, ret);
2524 return;
2527 D_NOTICE("updated VNNMAP\n");
2529 subreq = db_recovery_send(state, state->ev, state->client,
2530 state->dbmap, state->tun_list,
2531 state->pnn_list, state->count,
2532 state->caps, state->ban_credits,
2533 state->vnnmap->generation);
2534 if (tevent_req_nomem(subreq, req)) {
2535 return;
2537 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2540 static void recovery_db_recovery_done(struct tevent_req *subreq)
2542 struct tevent_req *req = tevent_req_callback_data(
2543 subreq, struct tevent_req);
2544 struct recovery_state *state = tevent_req_data(
2545 req, struct recovery_state);
2546 struct ctdb_req_control request;
2547 bool status;
2548 unsigned int count;
2550 status = db_recovery_recv(subreq, &count);
2551 TALLOC_FREE(subreq);
2553 D_ERR("%d of %d databases recovered\n", count, state->dbmap->num);
2555 if (! status) {
2556 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2557 unsigned int i;
2559 /* Bans are not enabled */
2560 if (state->tun_list->enable_bans == 0) {
2561 tevent_req_error(req, EIO);
2562 return;
2565 for (i=0; i<state->count; i++) {
2566 uint32_t pnn;
2567 pnn = state->pnn_list[i];
2568 if (state->ban_credits[pnn] > max_credits) {
2569 max_pnn = pnn;
2570 max_credits = state->ban_credits[pnn];
2574 /* If pulling database fails multiple times */
2575 if (max_credits >= NUM_RETRIES) {
2576 struct ctdb_ban_state ban_state = {
2577 .pnn = max_pnn,
2578 .time = state->tun_list->recovery_ban_period,
2581 D_ERR("Banning node %u for %u seconds\n",
2582 ban_state.pnn,
2583 ban_state.time);
2585 ctdb_req_control_set_ban_state(&request,
2586 &ban_state);
2587 subreq = ctdb_client_control_send(state,
2588 state->ev,
2589 state->client,
2590 ban_state.pnn,
2591 TIMEOUT(),
2592 &request);
2593 if (tevent_req_nomem(subreq, req)) {
2594 return;
2596 tevent_req_set_callback(subreq,
2597 recovery_failed_done,
2598 req);
2599 } else {
2600 tevent_req_error(req, EIO);
2602 return;
2605 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2606 subreq = ctdb_client_control_multi_send(state, state->ev,
2607 state->client,
2608 state->pnn_list, state->count,
2609 TIMEOUT(), &request);
2610 if (tevent_req_nomem(subreq, req)) {
2611 return;
2613 tevent_req_set_callback(subreq, recovery_normal_done, req);
2616 static void recovery_failed_done(struct tevent_req *subreq)
2618 struct tevent_req *req = tevent_req_callback_data(
2619 subreq, struct tevent_req);
2620 struct recovery_state *state = tevent_req_data(
2621 req, struct recovery_state);
2622 struct ctdb_reply_control *reply;
2623 int ret;
2624 bool status;
2626 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2627 TALLOC_FREE(subreq);
2628 if (! status) {
2629 D_ERR("failed to ban node, ret=%d\n", ret);
2630 goto done;
2633 ret = ctdb_reply_control_set_ban_state(reply);
2634 if (ret != 0) {
2635 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2638 done:
2639 tevent_req_error(req, EIO);
2642 static void recovery_normal_done(struct tevent_req *subreq)
2644 struct tevent_req *req = tevent_req_callback_data(
2645 subreq, struct tevent_req);
2646 struct recovery_state *state = tevent_req_data(
2647 req, struct recovery_state);
2648 struct ctdb_req_control request;
2649 int *err_list;
2650 int ret;
2651 bool status;
2653 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2654 NULL);
2655 TALLOC_FREE(subreq);
2656 if (! status) {
2657 int ret2;
2658 uint32_t pnn;
2660 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2661 state->count,
2662 err_list, &pnn);
2663 if (ret2 != 0) {
2664 D_ERR("failed to set recovery mode NORMAL on node %u,"
2665 " ret=%d\n", pnn, ret2);
2666 } else {
2667 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
2668 ret);
2670 tevent_req_error(req, ret);
2671 return;
2674 D_ERR("Set recovery mode to NORMAL\n");
2676 ctdb_req_control_end_recovery(&request);
2677 subreq = ctdb_client_control_multi_send(state, state->ev,
2678 state->client,
2679 state->pnn_list, state->count,
2680 TIMEOUT(), &request);
2681 if (tevent_req_nomem(subreq, req)) {
2682 return;
2684 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2687 static void recovery_end_recovery_done(struct tevent_req *subreq)
2689 struct tevent_req *req = tevent_req_callback_data(
2690 subreq, struct tevent_req);
2691 struct recovery_state *state = tevent_req_data(
2692 req, struct recovery_state);
2693 int *err_list;
2694 int ret;
2695 bool status;
2697 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2698 NULL);
2699 TALLOC_FREE(subreq);
2700 if (! status) {
2701 int ret2;
2702 uint32_t pnn;
2704 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2705 state->count,
2706 err_list, &pnn);
2707 if (ret2 != 0) {
2708 D_ERR("failed to run recovered event on node %u,"
2709 " ret=%d\n", pnn, ret2);
2710 } else {
2711 D_ERR("failed to run recovered event, ret=%d\n", ret);
2713 tevent_req_error(req, ret);
2714 return;
2717 D_ERR("recovered event finished\n");
2719 tevent_req_done(req);
2722 static void recovery_recv(struct tevent_req *req, int *perr)
2724 generic_recv(req, perr);
2727 static void usage(const char *progname)
2729 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2730 progname);
2735 * Arguments - log fd, write fd, socket path, generation
2737 int main(int argc, char *argv[])
2739 int write_fd;
2740 const char *sockpath;
2741 TALLOC_CTX *mem_ctx = NULL;
2742 struct tevent_context *ev;
2743 struct ctdb_client_context *client;
2744 int ret = 0;
2745 struct tevent_req *req;
2746 uint32_t generation;
2748 if (argc != 4) {
2749 usage(argv[0]);
2750 exit(1);
2753 write_fd = atoi(argv[1]);
2754 sockpath = argv[2];
2755 generation = (uint32_t)smb_strtoul(argv[3],
2756 NULL,
2758 &ret,
2759 SMB_STR_STANDARD);
2760 if (ret != 0) {
2761 fprintf(stderr, "recovery: unable to initialize generation\n");
2762 goto failed;
2765 mem_ctx = talloc_new(NULL);
2766 if (mem_ctx == NULL) {
2767 fprintf(stderr, "recovery: talloc_new() failed\n");
2768 goto failed;
2771 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
2772 if (ret != 0) {
2773 fprintf(stderr, "recovery: Unable to initialize logging\n");
2774 goto failed;
2777 ev = tevent_context_init(mem_ctx);
2778 if (ev == NULL) {
2779 D_ERR("tevent_context_init() failed\n");
2780 goto failed;
2783 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2784 if (ret != 0) {
2785 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
2786 goto failed;
2789 req = recovery_send(mem_ctx, ev, client, generation);
2790 if (req == NULL) {
2791 D_ERR("database_recover_send() failed\n");
2792 goto failed;
2795 if (! tevent_req_poll(req, ev)) {
2796 D_ERR("tevent_req_poll() failed\n");
2797 goto failed;
2800 recovery_recv(req, &ret);
2801 TALLOC_FREE(req);
2802 if (ret != 0) {
2803 D_ERR("database recovery failed, ret=%d\n", ret);
2804 goto failed;
2807 sys_write(write_fd, &ret, sizeof(ret));
2808 return 0;
2810 failed:
2811 TALLOC_FREE(mem_ctx);
2812 return 1;