2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
33 #include "lib/util/util.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client.h"
39 #include "common/logging.h"
41 static int recover_timeout
= 30;
45 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
51 static bool generic_recv(struct tevent_req
*req
, int *perr
)
55 if (tevent_req_is_unix_error(req
, &err
)) {
65 static uint64_t rec_srvid
= CTDB_SRVID_RECOVERY
;
67 static uint64_t srvid_next(void)
74 * Recovery database functions
77 struct recdb_context
{
85 static struct recdb_context
*recdb_create(TALLOC_CTX
*mem_ctx
, uint32_t db_id
,
88 uint32_t hash_size
, bool persistent
)
90 static char *db_dir_state
= NULL
;
91 struct recdb_context
*recdb
;
92 unsigned int tdb_flags
;
94 recdb
= talloc(mem_ctx
, struct recdb_context
);
99 if (db_dir_state
== NULL
) {
100 db_dir_state
= getenv("CTDB_DBDIR_STATE");
103 recdb
->db_name
= db_name
;
104 recdb
->db_id
= db_id
;
105 recdb
->db_path
= talloc_asprintf(recdb
, "%s/recdb.%s",
106 db_dir_state
!= NULL
?
108 dirname(discard_const(db_path
)),
110 if (recdb
->db_path
== NULL
) {
114 unlink(recdb
->db_path
);
116 tdb_flags
= TDB_NOLOCK
| TDB_INCOMPATIBLE_HASH
| TDB_DISALLOW_NESTING
;
117 recdb
->db
= tdb_wrap_open(mem_ctx
, recdb
->db_path
, hash_size
,
118 tdb_flags
, O_RDWR
|O_CREAT
|O_EXCL
, 0600);
119 if (recdb
->db
== NULL
) {
121 D_ERR("failed to create recovery db %s\n", recdb
->db_path
);
125 recdb
->persistent
= persistent
;
130 static uint32_t recdb_id(struct recdb_context
*recdb
)
135 static const char *recdb_name(struct recdb_context
*recdb
)
137 return recdb
->db_name
;
140 static const char *recdb_path(struct recdb_context
*recdb
)
142 return recdb
->db_path
;
145 static struct tdb_context
*recdb_tdb(struct recdb_context
*recdb
)
147 return recdb
->db
->tdb
;
150 static bool recdb_persistent(struct recdb_context
*recdb
)
152 return recdb
->persistent
;
155 struct recdb_add_traverse_state
{
156 struct recdb_context
*recdb
;
160 static int recdb_add_traverse(uint32_t reqid
, struct ctdb_ltdb_header
*header
,
161 TDB_DATA key
, TDB_DATA data
,
164 struct recdb_add_traverse_state
*state
=
165 (struct recdb_add_traverse_state
*)private_data
;
166 struct ctdb_ltdb_header
*hdr
;
170 /* header is not marshalled separately in the pulldb control */
171 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
175 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
177 /* fetch the existing record, if any */
178 prev_data
= tdb_fetch(recdb_tdb(state
->recdb
), key
);
180 if (prev_data
.dptr
!= NULL
) {
181 struct ctdb_ltdb_header prev_hdr
;
183 prev_hdr
= *(struct ctdb_ltdb_header
*)prev_data
.dptr
;
184 free(prev_data
.dptr
);
185 if (hdr
->rsn
< prev_hdr
.rsn
||
186 (hdr
->rsn
== prev_hdr
.rsn
&&
187 prev_hdr
.dmaster
!= state
->mypnn
)) {
192 ret
= tdb_store(recdb_tdb(state
->recdb
), key
, data
, TDB_REPLACE
);
199 static bool recdb_add(struct recdb_context
*recdb
, int mypnn
,
200 struct ctdb_rec_buffer
*recbuf
)
202 struct recdb_add_traverse_state state
;
208 ret
= ctdb_rec_buffer_traverse(recbuf
, recdb_add_traverse
, &state
);
216 /* This function decides which records from recdb are retained */
217 static int recbuf_filter_add(struct ctdb_rec_buffer
*recbuf
, bool persistent
,
218 uint32_t reqid
, uint32_t dmaster
,
219 TDB_DATA key
, TDB_DATA data
)
221 struct ctdb_ltdb_header
*header
;
224 /* Skip empty records */
225 if (data
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
229 /* update the dmaster field to point to us */
230 header
= (struct ctdb_ltdb_header
*)data
.dptr
;
232 header
->dmaster
= dmaster
;
233 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
236 ret
= ctdb_rec_buffer_add(recbuf
, recbuf
, reqid
, NULL
, key
, data
);
244 struct recdb_records_traverse_state
{
245 struct ctdb_rec_buffer
*recbuf
;
252 static int recdb_records_traverse(struct tdb_context
*tdb
,
253 TDB_DATA key
, TDB_DATA data
,
256 struct recdb_records_traverse_state
*state
=
257 (struct recdb_records_traverse_state
*)private_data
;
260 ret
= recbuf_filter_add(state
->recbuf
, state
->persistent
,
261 state
->reqid
, state
->dmaster
, key
, data
);
263 state
->failed
= true;
270 static struct ctdb_rec_buffer
*recdb_records(struct recdb_context
*recdb
,
274 struct recdb_records_traverse_state state
;
277 state
.recbuf
= ctdb_rec_buffer_init(mem_ctx
, recdb_id(recdb
));
278 if (state
.recbuf
== NULL
) {
281 state
.dmaster
= dmaster
;
283 state
.persistent
= recdb_persistent(recdb
);
284 state
.failed
= false;
286 ret
= tdb_traverse_read(recdb_tdb(recdb
), recdb_records_traverse
,
288 if (ret
== -1 || state
.failed
) {
289 D_ERR("Failed to marshall recovery records for %s\n",
291 TALLOC_FREE(state
.recbuf
);
298 struct recdb_file_traverse_state
{
299 struct ctdb_rec_buffer
*recbuf
;
300 struct recdb_context
*recdb
;
308 unsigned int num_buffers
;
311 static int recdb_file_traverse(struct tdb_context
*tdb
,
312 TDB_DATA key
, TDB_DATA data
,
315 struct recdb_file_traverse_state
*state
=
316 (struct recdb_file_traverse_state
*)private_data
;
319 ret
= recbuf_filter_add(state
->recbuf
, state
->persistent
,
320 state
->reqid
, state
->dmaster
, key
, data
);
322 state
->failed
= true;
326 if (ctdb_rec_buffer_len(state
->recbuf
) > state
->max_size
) {
327 ret
= ctdb_rec_buffer_write(state
->recbuf
, state
->fd
);
329 D_ERR("Failed to collect recovery records for %s\n",
330 recdb_name(state
->recdb
));
331 state
->failed
= true;
335 state
->num_buffers
+= 1;
337 TALLOC_FREE(state
->recbuf
);
338 state
->recbuf
= ctdb_rec_buffer_init(state
->mem_ctx
,
339 recdb_id(state
->recdb
));
340 if (state
->recbuf
== NULL
) {
341 state
->failed
= true;
349 static int recdb_file(struct recdb_context
*recdb
, TALLOC_CTX
*mem_ctx
,
350 uint32_t dmaster
, int fd
, int max_size
)
352 struct recdb_file_traverse_state state
;
355 state
.recbuf
= ctdb_rec_buffer_init(mem_ctx
, recdb_id(recdb
));
356 if (state
.recbuf
== NULL
) {
360 state
.mem_ctx
= mem_ctx
;
361 state
.dmaster
= dmaster
;
363 state
.persistent
= recdb_persistent(recdb
);
364 state
.failed
= false;
366 state
.max_size
= max_size
;
367 state
.num_buffers
= 0;
369 ret
= tdb_traverse_read(recdb_tdb(recdb
), recdb_file_traverse
, &state
);
370 if (ret
== -1 || state
.failed
) {
371 TALLOC_FREE(state
.recbuf
);
375 ret
= ctdb_rec_buffer_write(state
.recbuf
, fd
);
377 D_ERR("Failed to collect recovery records for %s\n",
379 TALLOC_FREE(state
.recbuf
);
382 state
.num_buffers
+= 1;
384 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
385 state
.num_buffers
, recdb_name(recdb
));
387 return state
.num_buffers
;
391 * Pull database from a single node
394 struct pull_database_state
{
395 struct tevent_context
*ev
;
396 struct ctdb_client_context
*client
;
397 struct recdb_context
*recdb
;
400 unsigned int num_records
;
404 static void pull_database_handler(uint64_t srvid
, TDB_DATA data
,
406 static void pull_database_register_done(struct tevent_req
*subreq
);
407 static void pull_database_old_done(struct tevent_req
*subreq
);
408 static void pull_database_unregister_done(struct tevent_req
*subreq
);
409 static void pull_database_new_done(struct tevent_req
*subreq
);
411 static struct tevent_req
*pull_database_send(
413 struct tevent_context
*ev
,
414 struct ctdb_client_context
*client
,
415 uint32_t pnn
, uint32_t caps
,
416 struct recdb_context
*recdb
)
418 struct tevent_req
*req
, *subreq
;
419 struct pull_database_state
*state
;
420 struct ctdb_req_control request
;
422 req
= tevent_req_create(mem_ctx
, &state
, struct pull_database_state
);
428 state
->client
= client
;
429 state
->recdb
= recdb
;
431 state
->srvid
= srvid_next();
433 if (caps
& CTDB_CAP_FRAGMENTED_CONTROLS
) {
434 subreq
= ctdb_client_set_message_handler_send(
435 state
, state
->ev
, state
->client
,
436 state
->srvid
, pull_database_handler
,
438 if (tevent_req_nomem(subreq
, req
)) {
439 return tevent_req_post(req
, ev
);
442 tevent_req_set_callback(subreq
, pull_database_register_done
,
446 struct ctdb_pulldb pulldb
;
448 pulldb
.db_id
= recdb_id(recdb
);
449 pulldb
.lmaster
= CTDB_LMASTER_ANY
;
451 ctdb_req_control_pull_db(&request
, &pulldb
);
452 subreq
= ctdb_client_control_send(state
, state
->ev
,
456 if (tevent_req_nomem(subreq
, req
)) {
457 return tevent_req_post(req
, ev
);
459 tevent_req_set_callback(subreq
, pull_database_old_done
, req
);
465 static void pull_database_handler(uint64_t srvid
, TDB_DATA data
,
468 struct tevent_req
*req
= talloc_get_type_abort(
469 private_data
, struct tevent_req
);
470 struct pull_database_state
*state
= tevent_req_data(
471 req
, struct pull_database_state
);
472 struct ctdb_rec_buffer
*recbuf
;
477 if (srvid
!= state
->srvid
) {
481 ret
= ctdb_rec_buffer_pull(data
.dptr
, data
.dsize
, state
, &recbuf
, &np
);
483 D_ERR("Invalid data received for DB_PULL messages\n");
487 if (recbuf
->db_id
!= recdb_id(state
->recdb
)) {
489 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
490 recbuf
->db_id
, recdb_name(state
->recdb
));
494 status
= recdb_add(state
->recdb
, ctdb_client_pnn(state
->client
),
498 D_ERR("Failed to add records to recdb for %s\n",
499 recdb_name(state
->recdb
));
503 state
->num_records
+= recbuf
->count
;
507 static void pull_database_register_done(struct tevent_req
*subreq
)
509 struct tevent_req
*req
= tevent_req_callback_data(
510 subreq
, struct tevent_req
);
511 struct pull_database_state
*state
= tevent_req_data(
512 req
, struct pull_database_state
);
513 struct ctdb_req_control request
;
514 struct ctdb_pulldb_ext pulldb_ext
;
518 status
= ctdb_client_set_message_handler_recv(subreq
, &ret
);
521 D_ERR("Failed to set message handler for DB_PULL for %s\n",
522 recdb_name(state
->recdb
));
523 tevent_req_error(req
, ret
);
527 pulldb_ext
.db_id
= recdb_id(state
->recdb
);
528 pulldb_ext
.lmaster
= CTDB_LMASTER_ANY
;
529 pulldb_ext
.srvid
= state
->srvid
;
531 ctdb_req_control_db_pull(&request
, &pulldb_ext
);
532 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
533 state
->pnn
, TIMEOUT(), &request
);
534 if (tevent_req_nomem(subreq
, req
)) {
537 tevent_req_set_callback(subreq
, pull_database_new_done
, req
);
540 static void pull_database_old_done(struct tevent_req
*subreq
)
542 struct tevent_req
*req
= tevent_req_callback_data(
543 subreq
, struct tevent_req
);
544 struct pull_database_state
*state
= tevent_req_data(
545 req
, struct pull_database_state
);
546 struct ctdb_reply_control
*reply
;
547 struct ctdb_rec_buffer
*recbuf
;
551 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
554 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
555 recdb_name(state
->recdb
), state
->pnn
, ret
);
556 tevent_req_error(req
, ret
);
560 ret
= ctdb_reply_control_pull_db(reply
, state
, &recbuf
);
563 tevent_req_error(req
, ret
);
567 status
= recdb_add(state
->recdb
, ctdb_client_pnn(state
->client
),
571 tevent_req_error(req
, EIO
);
575 state
->num_records
= recbuf
->count
;
578 D_INFO("Pulled %d records for db %s from node %d\n",
579 state
->num_records
, recdb_name(state
->recdb
), state
->pnn
);
581 tevent_req_done(req
);
584 static void pull_database_new_done(struct tevent_req
*subreq
)
586 struct tevent_req
*req
= tevent_req_callback_data(
587 subreq
, struct tevent_req
);
588 struct pull_database_state
*state
= tevent_req_data(
589 req
, struct pull_database_state
);
590 struct ctdb_reply_control
*reply
;
591 uint32_t num_records
;
595 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
598 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
599 recdb_name(state
->recdb
), state
->pnn
, ret
);
604 ret
= ctdb_reply_control_db_pull(reply
, &num_records
);
606 if (num_records
!= state
->num_records
) {
607 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
608 num_records
, state
->num_records
,
609 recdb_name(state
->recdb
));
614 D_INFO("Pulled %d records for db %s from node %d\n",
615 state
->num_records
, recdb_name(state
->recdb
), state
->pnn
);
619 subreq
= ctdb_client_remove_message_handler_send(
620 state
, state
->ev
, state
->client
,
622 if (tevent_req_nomem(subreq
, req
)) {
625 tevent_req_set_callback(subreq
, pull_database_unregister_done
, req
);
628 static void pull_database_unregister_done(struct tevent_req
*subreq
)
630 struct tevent_req
*req
= tevent_req_callback_data(
631 subreq
, struct tevent_req
);
632 struct pull_database_state
*state
= tevent_req_data(
633 req
, struct pull_database_state
);
637 status
= ctdb_client_remove_message_handler_recv(subreq
, &ret
);
640 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
641 recdb_name(state
->recdb
));
642 tevent_req_error(req
, ret
);
646 if (state
->result
!= 0) {
647 tevent_req_error(req
, state
->result
);
651 tevent_req_done(req
);
654 static bool pull_database_recv(struct tevent_req
*req
, int *perr
)
656 return generic_recv(req
, perr
);
660 * Push database to specified nodes (old style)
663 struct push_database_old_state
{
664 struct tevent_context
*ev
;
665 struct ctdb_client_context
*client
;
666 struct recdb_context
*recdb
;
669 struct ctdb_rec_buffer
*recbuf
;
673 static void push_database_old_push_done(struct tevent_req
*subreq
);
675 static struct tevent_req
*push_database_old_send(
677 struct tevent_context
*ev
,
678 struct ctdb_client_context
*client
,
679 uint32_t *pnn_list
, int count
,
680 struct recdb_context
*recdb
)
682 struct tevent_req
*req
, *subreq
;
683 struct push_database_old_state
*state
;
684 struct ctdb_req_control request
;
687 req
= tevent_req_create(mem_ctx
, &state
,
688 struct push_database_old_state
);
694 state
->client
= client
;
695 state
->recdb
= recdb
;
696 state
->pnn_list
= pnn_list
;
697 state
->count
= count
;
700 state
->recbuf
= recdb_records(recdb
, state
,
701 ctdb_client_pnn(client
));
702 if (tevent_req_nomem(state
->recbuf
, req
)) {
703 return tevent_req_post(req
, ev
);
706 pnn
= state
->pnn_list
[state
->index
];
708 ctdb_req_control_push_db(&request
, state
->recbuf
);
709 subreq
= ctdb_client_control_send(state
, ev
, client
, pnn
,
710 TIMEOUT(), &request
);
711 if (tevent_req_nomem(subreq
, req
)) {
712 return tevent_req_post(req
, ev
);
714 tevent_req_set_callback(subreq
, push_database_old_push_done
, req
);
719 static void push_database_old_push_done(struct tevent_req
*subreq
)
721 struct tevent_req
*req
= tevent_req_callback_data(
722 subreq
, struct tevent_req
);
723 struct push_database_old_state
*state
= tevent_req_data(
724 req
, struct push_database_old_state
);
725 struct ctdb_req_control request
;
730 status
= ctdb_client_control_recv(subreq
, &ret
, NULL
, NULL
);
733 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
734 recdb_name(state
->recdb
), state
->pnn_list
[state
->index
],
736 tevent_req_error(req
, ret
);
741 if (state
->index
== state
->count
) {
742 TALLOC_FREE(state
->recbuf
);
743 tevent_req_done(req
);
747 pnn
= state
->pnn_list
[state
->index
];
749 ctdb_req_control_push_db(&request
, state
->recbuf
);
750 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
751 pnn
, TIMEOUT(), &request
);
752 if (tevent_req_nomem(subreq
, req
)) {
755 tevent_req_set_callback(subreq
, push_database_old_push_done
, req
);
758 static bool push_database_old_recv(struct tevent_req
*req
, int *perr
)
760 return generic_recv(req
, perr
);
764 * Push database to specified nodes (new style)
767 struct push_database_new_state
{
768 struct tevent_context
*ev
;
769 struct ctdb_client_context
*client
;
770 struct recdb_context
*recdb
;
777 int num_buffers_sent
;
778 unsigned int num_records
;
781 static void push_database_new_started(struct tevent_req
*subreq
);
782 static void push_database_new_send_msg(struct tevent_req
*req
);
783 static void push_database_new_send_done(struct tevent_req
*subreq
);
784 static void push_database_new_confirmed(struct tevent_req
*subreq
);
786 static struct tevent_req
*push_database_new_send(
788 struct tevent_context
*ev
,
789 struct ctdb_client_context
*client
,
790 uint32_t *pnn_list
, int count
,
791 struct recdb_context
*recdb
,
794 struct tevent_req
*req
, *subreq
;
795 struct push_database_new_state
*state
;
796 struct ctdb_req_control request
;
797 struct ctdb_pulldb_ext pulldb_ext
;
801 req
= tevent_req_create(mem_ctx
, &state
,
802 struct push_database_new_state
);
808 state
->client
= client
;
809 state
->recdb
= recdb
;
810 state
->pnn_list
= pnn_list
;
811 state
->count
= count
;
813 state
->srvid
= srvid_next();
814 state
->dmaster
= ctdb_client_pnn(client
);
815 state
->num_buffers_sent
= 0;
816 state
->num_records
= 0;
818 filename
= talloc_asprintf(state
, "%s.dat", recdb_path(recdb
));
819 if (tevent_req_nomem(filename
, req
)) {
820 return tevent_req_post(req
, ev
);
823 state
->fd
= open(filename
, O_RDWR
|O_CREAT
, 0644);
824 if (state
->fd
== -1) {
825 tevent_req_error(req
, errno
);
826 return tevent_req_post(req
, ev
);
829 talloc_free(filename
);
831 state
->num_buffers
= recdb_file(recdb
, state
, state
->dmaster
,
832 state
->fd
, max_size
);
833 if (state
->num_buffers
== -1) {
834 tevent_req_error(req
, ENOMEM
);
835 return tevent_req_post(req
, ev
);
838 offset
= lseek(state
->fd
, 0, SEEK_SET
);
840 tevent_req_error(req
, EIO
);
841 return tevent_req_post(req
, ev
);
844 pulldb_ext
.db_id
= recdb_id(recdb
);
845 pulldb_ext
.srvid
= state
->srvid
;
847 ctdb_req_control_db_push_start(&request
, &pulldb_ext
);
848 subreq
= ctdb_client_control_multi_send(state
, ev
, client
,
850 TIMEOUT(), &request
);
851 if (tevent_req_nomem(subreq
, req
)) {
852 return tevent_req_post(req
, ev
);
854 tevent_req_set_callback(subreq
, push_database_new_started
, req
);
859 static void push_database_new_started(struct tevent_req
*subreq
)
861 struct tevent_req
*req
= tevent_req_callback_data(
862 subreq
, struct tevent_req
);
863 struct push_database_new_state
*state
= tevent_req_data(
864 req
, struct push_database_new_state
);
869 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
876 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
880 D_ERR("control DB_PUSH_START failed for db %s"
881 " on node %u, ret=%d\n",
882 recdb_name(state
->recdb
), pnn
, ret2
);
884 D_ERR("control DB_PUSH_START failed for db %s,"
886 recdb_name(state
->recdb
), ret
);
888 talloc_free(err_list
);
890 tevent_req_error(req
, ret
);
894 push_database_new_send_msg(req
);
897 static void push_database_new_send_msg(struct tevent_req
*req
)
899 struct push_database_new_state
*state
= tevent_req_data(
900 req
, struct push_database_new_state
);
901 struct tevent_req
*subreq
;
902 struct ctdb_rec_buffer
*recbuf
;
903 struct ctdb_req_message message
;
908 if (state
->num_buffers_sent
== state
->num_buffers
) {
909 struct ctdb_req_control request
;
911 ctdb_req_control_db_push_confirm(&request
,
912 recdb_id(state
->recdb
));
913 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
917 TIMEOUT(), &request
);
918 if (tevent_req_nomem(subreq
, req
)) {
921 tevent_req_set_callback(subreq
, push_database_new_confirmed
,
926 ret
= ctdb_rec_buffer_read(state
->fd
, state
, &recbuf
);
928 tevent_req_error(req
, ret
);
932 data
.dsize
= ctdb_rec_buffer_len(recbuf
);
933 data
.dptr
= talloc_size(state
, data
.dsize
);
934 if (tevent_req_nomem(data
.dptr
, req
)) {
938 ctdb_rec_buffer_push(recbuf
, data
.dptr
, &np
);
940 message
.srvid
= state
->srvid
;
941 message
.data
.data
= data
;
943 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
944 state
->num_buffers_sent
, recbuf
->count
,
945 recdb_name(state
->recdb
));
947 subreq
= ctdb_client_message_multi_send(state
, state
->ev
,
949 state
->pnn_list
, state
->count
,
951 if (tevent_req_nomem(subreq
, req
)) {
954 tevent_req_set_callback(subreq
, push_database_new_send_done
, req
);
956 state
->num_records
+= recbuf
->count
;
958 talloc_free(data
.dptr
);
962 static void push_database_new_send_done(struct tevent_req
*subreq
)
964 struct tevent_req
*req
= tevent_req_callback_data(
965 subreq
, struct tevent_req
);
966 struct push_database_new_state
*state
= tevent_req_data(
967 req
, struct push_database_new_state
);
971 status
= ctdb_client_message_multi_recv(subreq
, &ret
, NULL
, NULL
);
974 D_ERR("Sending recovery records failed for %s\n",
975 recdb_name(state
->recdb
));
976 tevent_req_error(req
, ret
);
980 state
->num_buffers_sent
+= 1;
982 push_database_new_send_msg(req
);
985 static void push_database_new_confirmed(struct tevent_req
*subreq
)
987 struct tevent_req
*req
= tevent_req_callback_data(
988 subreq
, struct tevent_req
);
989 struct push_database_new_state
*state
= tevent_req_data(
990 req
, struct push_database_new_state
);
991 struct ctdb_reply_control
**reply
;
995 uint32_t num_records
;
997 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
1004 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1005 state
->count
, err_list
,
1008 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1009 " on node %u, ret=%d\n",
1010 recdb_name(state
->recdb
), pnn
, ret2
);
1012 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1014 recdb_name(state
->recdb
), ret
);
1016 tevent_req_error(req
, ret
);
1020 for (i
=0; i
<state
->count
; i
++) {
1021 ret
= ctdb_reply_control_db_push_confirm(reply
[i
],
1024 tevent_req_error(req
, EPROTO
);
1028 if (num_records
!= state
->num_records
) {
1029 D_ERR("Node %u received %d of %d records for %s\n",
1030 state
->pnn_list
[i
], num_records
,
1031 state
->num_records
, recdb_name(state
->recdb
));
1032 tevent_req_error(req
, EPROTO
);
1039 D_INFO("Pushed %d records for db %s\n",
1040 state
->num_records
, recdb_name(state
->recdb
));
1042 tevent_req_done(req
);
1045 static bool push_database_new_recv(struct tevent_req
*req
, int *perr
)
1047 return generic_recv(req
, perr
);
1051 * wrapper for push_database_old and push_database_new
1054 struct push_database_state
{
1055 bool old_done
, new_done
;
1058 static void push_database_old_done(struct tevent_req
*subreq
);
1059 static void push_database_new_done(struct tevent_req
*subreq
);
1061 static struct tevent_req
*push_database_send(
1062 TALLOC_CTX
*mem_ctx
,
1063 struct tevent_context
*ev
,
1064 struct ctdb_client_context
*client
,
1065 uint32_t *pnn_list
, int count
, uint32_t *caps
,
1066 struct ctdb_tunable_list
*tun_list
,
1067 struct recdb_context
*recdb
)
1069 struct tevent_req
*req
, *subreq
;
1070 struct push_database_state
*state
;
1071 uint32_t *old_list
, *new_list
;
1072 unsigned int old_count
, new_count
;
1075 req
= tevent_req_create(mem_ctx
, &state
, struct push_database_state
);
1080 state
->old_done
= false;
1081 state
->new_done
= false;
1085 old_list
= talloc_array(state
, uint32_t, count
);
1086 new_list
= talloc_array(state
, uint32_t, count
);
1087 if (tevent_req_nomem(old_list
, req
) ||
1088 tevent_req_nomem(new_list
,req
)) {
1089 return tevent_req_post(req
, ev
);
1092 for (i
=0; i
<count
; i
++) {
1093 uint32_t pnn
= pnn_list
[i
];
1095 if (caps
[pnn
] & CTDB_CAP_FRAGMENTED_CONTROLS
) {
1096 new_list
[new_count
] = pnn
;
1099 old_list
[old_count
] = pnn
;
1104 if (old_count
> 0) {
1105 subreq
= push_database_old_send(state
, ev
, client
,
1106 old_list
, old_count
, recdb
);
1107 if (tevent_req_nomem(subreq
, req
)) {
1108 return tevent_req_post(req
, ev
);
1110 tevent_req_set_callback(subreq
, push_database_old_done
, req
);
1112 state
->old_done
= true;
1115 if (new_count
> 0) {
1116 subreq
= push_database_new_send(state
, ev
, client
,
1117 new_list
, new_count
, recdb
,
1118 tun_list
->rec_buffer_size_limit
);
1119 if (tevent_req_nomem(subreq
, req
)) {
1120 return tevent_req_post(req
, ev
);
1122 tevent_req_set_callback(subreq
, push_database_new_done
, req
);
1124 state
->new_done
= true;
1130 static void push_database_old_done(struct tevent_req
*subreq
)
1132 struct tevent_req
*req
= tevent_req_callback_data(
1133 subreq
, struct tevent_req
);
1134 struct push_database_state
*state
= tevent_req_data(
1135 req
, struct push_database_state
);
1139 status
= push_database_old_recv(subreq
, &ret
);
1141 tevent_req_error(req
, ret
);
1145 state
->old_done
= true;
1147 if (state
->old_done
&& state
->new_done
) {
1148 tevent_req_done(req
);
1152 static void push_database_new_done(struct tevent_req
*subreq
)
1154 struct tevent_req
*req
= tevent_req_callback_data(
1155 subreq
, struct tevent_req
);
1156 struct push_database_state
*state
= tevent_req_data(
1157 req
, struct push_database_state
);
1161 status
= push_database_new_recv(subreq
, &ret
);
1163 tevent_req_error(req
, ret
);
1167 state
->new_done
= true;
1169 if (state
->old_done
&& state
->new_done
) {
1170 tevent_req_done(req
);
1174 static bool push_database_recv(struct tevent_req
*req
, int *perr
)
1176 return generic_recv(req
, perr
);
1180 * Collect databases using highest sequence number
1183 struct collect_highseqnum_db_state
{
1184 struct tevent_context
*ev
;
1185 struct ctdb_client_context
*client
;
1189 uint32_t *ban_credits
;
1191 struct recdb_context
*recdb
;
1195 static void collect_highseqnum_db_seqnum_done(struct tevent_req
*subreq
);
1196 static void collect_highseqnum_db_pulldb_done(struct tevent_req
*subreq
);
1198 static struct tevent_req
*collect_highseqnum_db_send(
1199 TALLOC_CTX
*mem_ctx
,
1200 struct tevent_context
*ev
,
1201 struct ctdb_client_context
*client
,
1202 uint32_t *pnn_list
, int count
, uint32_t *caps
,
1203 uint32_t *ban_credits
, uint32_t db_id
,
1204 struct recdb_context
*recdb
)
1206 struct tevent_req
*req
, *subreq
;
1207 struct collect_highseqnum_db_state
*state
;
1208 struct ctdb_req_control request
;
1210 req
= tevent_req_create(mem_ctx
, &state
,
1211 struct collect_highseqnum_db_state
);
1217 state
->client
= client
;
1218 state
->pnn_list
= pnn_list
;
1219 state
->count
= count
;
1221 state
->ban_credits
= ban_credits
;
1222 state
->db_id
= db_id
;
1223 state
->recdb
= recdb
;
1225 ctdb_req_control_get_db_seqnum(&request
, db_id
);
1226 subreq
= ctdb_client_control_multi_send(mem_ctx
, ev
, client
,
1227 state
->pnn_list
, state
->count
,
1228 TIMEOUT(), &request
);
1229 if (tevent_req_nomem(subreq
, req
)) {
1230 return tevent_req_post(req
, ev
);
1232 tevent_req_set_callback(subreq
, collect_highseqnum_db_seqnum_done
,
1238 static void collect_highseqnum_db_seqnum_done(struct tevent_req
*subreq
)
1240 struct tevent_req
*req
= tevent_req_callback_data(
1241 subreq
, struct tevent_req
);
1242 struct collect_highseqnum_db_state
*state
= tevent_req_data(
1243 req
, struct collect_highseqnum_db_state
);
1244 struct ctdb_reply_control
**reply
;
1248 uint64_t seqnum
, max_seqnum
;
1250 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
1252 TALLOC_FREE(subreq
);
1257 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1258 state
->count
, err_list
,
1261 D_ERR("control GET_DB_SEQNUM failed for db %s"
1262 " on node %u, ret=%d\n",
1263 recdb_name(state
->recdb
), pnn
, ret2
);
1265 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1267 recdb_name(state
->recdb
), ret
);
1269 tevent_req_error(req
, ret
);
1274 state
->max_pnn
= state
->pnn_list
[0];
1275 for (i
=0; i
<state
->count
; i
++) {
1276 ret
= ctdb_reply_control_get_db_seqnum(reply
[i
], &seqnum
);
1278 tevent_req_error(req
, EPROTO
);
1282 if (max_seqnum
< seqnum
) {
1283 max_seqnum
= seqnum
;
1284 state
->max_pnn
= state
->pnn_list
[i
];
1290 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64
"\n",
1291 recdb_name(state
->recdb
), state
->max_pnn
, max_seqnum
);
1293 subreq
= pull_database_send(state
, state
->ev
, state
->client
,
1295 state
->caps
[state
->max_pnn
],
1297 if (tevent_req_nomem(subreq
, req
)) {
1300 tevent_req_set_callback(subreq
, collect_highseqnum_db_pulldb_done
,
1304 static void collect_highseqnum_db_pulldb_done(struct tevent_req
*subreq
)
1306 struct tevent_req
*req
= tevent_req_callback_data(
1307 subreq
, struct tevent_req
);
1308 struct collect_highseqnum_db_state
*state
= tevent_req_data(
1309 req
, struct collect_highseqnum_db_state
);
1313 status
= pull_database_recv(subreq
, &ret
);
1314 TALLOC_FREE(subreq
);
1316 state
->ban_credits
[state
->max_pnn
] += 1;
1317 tevent_req_error(req
, ret
);
1321 tevent_req_done(req
);
1324 static bool collect_highseqnum_db_recv(struct tevent_req
*req
, int *perr
)
1326 return generic_recv(req
, perr
);
1330 * Collect all databases
1333 struct collect_all_db_state
{
1334 struct tevent_context
*ev
;
1335 struct ctdb_client_context
*client
;
1339 uint32_t *ban_credits
;
1341 struct recdb_context
*recdb
;
1342 struct ctdb_pulldb pulldb
;
1346 static void collect_all_db_pulldb_done(struct tevent_req
*subreq
);
1348 static struct tevent_req
*collect_all_db_send(
1349 TALLOC_CTX
*mem_ctx
,
1350 struct tevent_context
*ev
,
1351 struct ctdb_client_context
*client
,
1352 uint32_t *pnn_list
, int count
, uint32_t *caps
,
1353 uint32_t *ban_credits
, uint32_t db_id
,
1354 struct recdb_context
*recdb
)
1356 struct tevent_req
*req
, *subreq
;
1357 struct collect_all_db_state
*state
;
1360 req
= tevent_req_create(mem_ctx
, &state
,
1361 struct collect_all_db_state
);
1367 state
->client
= client
;
1368 state
->pnn_list
= pnn_list
;
1369 state
->count
= count
;
1371 state
->ban_credits
= ban_credits
;
1372 state
->db_id
= db_id
;
1373 state
->recdb
= recdb
;
1376 pnn
= state
->pnn_list
[state
->index
];
1378 subreq
= pull_database_send(state
, ev
, client
, pnn
, caps
[pnn
], recdb
);
1379 if (tevent_req_nomem(subreq
, req
)) {
1380 return tevent_req_post(req
, ev
);
1382 tevent_req_set_callback(subreq
, collect_all_db_pulldb_done
, req
);
1387 static void collect_all_db_pulldb_done(struct tevent_req
*subreq
)
1389 struct tevent_req
*req
= tevent_req_callback_data(
1390 subreq
, struct tevent_req
);
1391 struct collect_all_db_state
*state
= tevent_req_data(
1392 req
, struct collect_all_db_state
);
1397 status
= pull_database_recv(subreq
, &ret
);
1398 TALLOC_FREE(subreq
);
1400 pnn
= state
->pnn_list
[state
->index
];
1401 state
->ban_credits
[pnn
] += 1;
1402 tevent_req_error(req
, ret
);
1407 if (state
->index
== state
->count
) {
1408 tevent_req_done(req
);
1412 pnn
= state
->pnn_list
[state
->index
];
1413 subreq
= pull_database_send(state
, state
->ev
, state
->client
,
1414 pnn
, state
->caps
[pnn
], state
->recdb
);
1415 if (tevent_req_nomem(subreq
, req
)) {
1418 tevent_req_set_callback(subreq
, collect_all_db_pulldb_done
, req
);
1421 static bool collect_all_db_recv(struct tevent_req
*req
, int *perr
)
1423 return generic_recv(req
, perr
);
1428 * For each database do the following:
1431 * - Freeze database on all nodes
1432 * - Start transaction on all nodes
1433 * - Collect database from all nodes
1434 * - Wipe database on all nodes
1435 * - Push database to all nodes
1436 * - Commit transaction on all nodes
1437 * - Thaw database on all nodes
1440 struct recover_db_state
{
1441 struct tevent_context
*ev
;
1442 struct ctdb_client_context
*client
;
1443 struct ctdb_tunable_list
*tun_list
;
1447 uint32_t *ban_credits
;
1452 struct ctdb_transdb transdb
;
1454 const char *db_name
, *db_path
;
1455 struct recdb_context
*recdb
;
1458 static void recover_db_name_done(struct tevent_req
*subreq
);
1459 static void recover_db_path_done(struct tevent_req
*subreq
);
1460 static void recover_db_freeze_done(struct tevent_req
*subreq
);
1461 static void recover_db_transaction_started(struct tevent_req
*subreq
);
1462 static void recover_db_collect_done(struct tevent_req
*subreq
);
1463 static void recover_db_wipedb_done(struct tevent_req
*subreq
);
1464 static void recover_db_pushdb_done(struct tevent_req
*subreq
);
1465 static void recover_db_transaction_committed(struct tevent_req
*subreq
);
1466 static void recover_db_thaw_done(struct tevent_req
*subreq
);
1468 static struct tevent_req
*recover_db_send(TALLOC_CTX
*mem_ctx
,
1469 struct tevent_context
*ev
,
1470 struct ctdb_client_context
*client
,
1471 struct ctdb_tunable_list
*tun_list
,
1472 uint32_t *pnn_list
, int count
,
1474 uint32_t *ban_credits
,
1475 uint32_t generation
,
1476 uint32_t db_id
, uint8_t db_flags
)
1478 struct tevent_req
*req
, *subreq
;
1479 struct recover_db_state
*state
;
1480 struct ctdb_req_control request
;
1482 req
= tevent_req_create(mem_ctx
, &state
, struct recover_db_state
);
1488 state
->client
= client
;
1489 state
->tun_list
= tun_list
;
1490 state
->pnn_list
= pnn_list
;
1491 state
->count
= count
;
1493 state
->ban_credits
= ban_credits
;
1494 state
->db_id
= db_id
;
1495 state
->db_flags
= db_flags
;
1497 state
->destnode
= ctdb_client_pnn(client
);
1498 state
->transdb
.db_id
= db_id
;
1499 state
->transdb
.tid
= generation
;
1501 ctdb_req_control_get_dbname(&request
, db_id
);
1502 subreq
= ctdb_client_control_send(state
, ev
, client
, state
->destnode
,
1503 TIMEOUT(), &request
);
1504 if (tevent_req_nomem(subreq
, req
)) {
1505 return tevent_req_post(req
, ev
);
1507 tevent_req_set_callback(subreq
, recover_db_name_done
, req
);
1512 static void recover_db_name_done(struct tevent_req
*subreq
)
1514 struct tevent_req
*req
= tevent_req_callback_data(
1515 subreq
, struct tevent_req
);
1516 struct recover_db_state
*state
= tevent_req_data(
1517 req
, struct recover_db_state
);
1518 struct ctdb_reply_control
*reply
;
1519 struct ctdb_req_control request
;
1523 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1524 TALLOC_FREE(subreq
);
1526 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1528 tevent_req_error(req
, ret
);
1532 ret
= ctdb_reply_control_get_dbname(reply
, state
, &state
->db_name
);
1534 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1536 tevent_req_error(req
, EPROTO
);
1542 ctdb_req_control_getdbpath(&request
, state
->db_id
);
1543 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
1544 state
->destnode
, TIMEOUT(),
1546 if (tevent_req_nomem(subreq
, req
)) {
1549 tevent_req_set_callback(subreq
, recover_db_path_done
, req
);
1552 static void recover_db_path_done(struct tevent_req
*subreq
)
1554 struct tevent_req
*req
= tevent_req_callback_data(
1555 subreq
, struct tevent_req
);
1556 struct recover_db_state
*state
= tevent_req_data(
1557 req
, struct recover_db_state
);
1558 struct ctdb_reply_control
*reply
;
1559 struct ctdb_req_control request
;
1563 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1564 TALLOC_FREE(subreq
);
1566 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1567 state
->db_name
, ret
);
1568 tevent_req_error(req
, ret
);
1572 ret
= ctdb_reply_control_getdbpath(reply
, state
, &state
->db_path
);
1574 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1575 state
->db_name
, ret
);
1576 tevent_req_error(req
, EPROTO
);
1582 ctdb_req_control_db_freeze(&request
, state
->db_id
);
1583 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1585 state
->pnn_list
, state
->count
,
1586 TIMEOUT(), &request
);
1587 if (tevent_req_nomem(subreq
, req
)) {
1590 tevent_req_set_callback(subreq
, recover_db_freeze_done
, req
);
1593 static void recover_db_freeze_done(struct tevent_req
*subreq
)
1595 struct tevent_req
*req
= tevent_req_callback_data(
1596 subreq
, struct tevent_req
);
1597 struct recover_db_state
*state
= tevent_req_data(
1598 req
, struct recover_db_state
);
1599 struct ctdb_req_control request
;
1604 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1606 TALLOC_FREE(subreq
);
1611 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1612 state
->count
, err_list
,
1615 D_ERR("control FREEZE_DB failed for db %s"
1616 " on node %u, ret=%d\n",
1617 state
->db_name
, pnn
, ret2
);
1618 state
->ban_credits
[pnn
] += 1;
1620 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1621 state
->db_name
, ret
);
1623 tevent_req_error(req
, ret
);
1627 ctdb_req_control_db_transaction_start(&request
, &state
->transdb
);
1628 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1630 state
->pnn_list
, state
->count
,
1631 TIMEOUT(), &request
);
1632 if (tevent_req_nomem(subreq
, req
)) {
1635 tevent_req_set_callback(subreq
, recover_db_transaction_started
, req
);
1638 static void recover_db_transaction_started(struct tevent_req
*subreq
)
1640 struct tevent_req
*req
= tevent_req_callback_data(
1641 subreq
, struct tevent_req
);
1642 struct recover_db_state
*state
= tevent_req_data(
1643 req
, struct recover_db_state
);
1648 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1650 TALLOC_FREE(subreq
);
1655 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1659 D_ERR("control TRANSACTION_DB failed for db=%s"
1660 " on node %u, ret=%d\n",
1661 state
->db_name
, pnn
, ret2
);
1663 D_ERR("control TRANSACTION_DB failed for db=%s,"
1664 " ret=%d\n", state
->db_name
, ret
);
1666 tevent_req_error(req
, ret
);
1670 state
->recdb
= recdb_create(state
, state
->db_id
, state
->db_name
,
1672 state
->tun_list
->database_hash_size
,
1673 state
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
);
1674 if (tevent_req_nomem(state
->recdb
, req
)) {
1678 if ((state
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
) ||
1679 (state
->db_flags
& CTDB_DB_FLAGS_REPLICATED
)) {
1680 subreq
= collect_highseqnum_db_send(
1681 state
, state
->ev
, state
->client
,
1682 state
->pnn_list
, state
->count
, state
->caps
,
1683 state
->ban_credits
, state
->db_id
,
1686 subreq
= collect_all_db_send(
1687 state
, state
->ev
, state
->client
,
1688 state
->pnn_list
, state
->count
, state
->caps
,
1689 state
->ban_credits
, state
->db_id
,
1692 if (tevent_req_nomem(subreq
, req
)) {
1695 tevent_req_set_callback(subreq
, recover_db_collect_done
, req
);
1698 static void recover_db_collect_done(struct tevent_req
*subreq
)
1700 struct tevent_req
*req
= tevent_req_callback_data(
1701 subreq
, struct tevent_req
);
1702 struct recover_db_state
*state
= tevent_req_data(
1703 req
, struct recover_db_state
);
1704 struct ctdb_req_control request
;
1708 if ((state
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
) ||
1709 (state
->db_flags
& CTDB_DB_FLAGS_REPLICATED
)) {
1710 status
= collect_highseqnum_db_recv(subreq
, &ret
);
1712 status
= collect_all_db_recv(subreq
, &ret
);
1714 TALLOC_FREE(subreq
);
1716 tevent_req_error(req
, ret
);
1720 ctdb_req_control_wipe_database(&request
, &state
->transdb
);
1721 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1723 state
->pnn_list
, state
->count
,
1724 TIMEOUT(), &request
);
1725 if (tevent_req_nomem(subreq
, req
)) {
1728 tevent_req_set_callback(subreq
, recover_db_wipedb_done
, req
);
1731 static void recover_db_wipedb_done(struct tevent_req
*subreq
)
1733 struct tevent_req
*req
= tevent_req_callback_data(
1734 subreq
, struct tevent_req
);
1735 struct recover_db_state
*state
= tevent_req_data(
1736 req
, struct recover_db_state
);
1741 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1743 TALLOC_FREE(subreq
);
1748 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1752 D_ERR("control WIPEDB failed for db %s on node %u,"
1753 " ret=%d\n", state
->db_name
, pnn
, ret2
);
1755 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1756 state
->db_name
, ret
);
1758 tevent_req_error(req
, ret
);
1762 subreq
= push_database_send(state
, state
->ev
, state
->client
,
1763 state
->pnn_list
, state
->count
,
1764 state
->caps
, state
->tun_list
,
1766 if (tevent_req_nomem(subreq
, req
)) {
1769 tevent_req_set_callback(subreq
, recover_db_pushdb_done
, req
);
1772 static void recover_db_pushdb_done(struct tevent_req
*subreq
)
1774 struct tevent_req
*req
= tevent_req_callback_data(
1775 subreq
, struct tevent_req
);
1776 struct recover_db_state
*state
= tevent_req_data(
1777 req
, struct recover_db_state
);
1778 struct ctdb_req_control request
;
1782 status
= push_database_recv(subreq
, &ret
);
1783 TALLOC_FREE(subreq
);
1785 tevent_req_error(req
, ret
);
1789 TALLOC_FREE(state
->recdb
);
1791 ctdb_req_control_db_transaction_commit(&request
, &state
->transdb
);
1792 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1794 state
->pnn_list
, state
->count
,
1795 TIMEOUT(), &request
);
1796 if (tevent_req_nomem(subreq
, req
)) {
1799 tevent_req_set_callback(subreq
, recover_db_transaction_committed
, req
);
1802 static void recover_db_transaction_committed(struct tevent_req
*subreq
)
1804 struct tevent_req
*req
= tevent_req_callback_data(
1805 subreq
, struct tevent_req
);
1806 struct recover_db_state
*state
= tevent_req_data(
1807 req
, struct recover_db_state
);
1808 struct ctdb_req_control request
;
1813 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1815 TALLOC_FREE(subreq
);
1820 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1824 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1825 " on node %u, ret=%d\n",
1826 state
->db_name
, pnn
, ret2
);
1828 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1829 " ret=%d\n", state
->db_name
, ret
);
1831 tevent_req_error(req
, ret
);
1835 ctdb_req_control_db_thaw(&request
, state
->db_id
);
1836 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1838 state
->pnn_list
, state
->count
,
1839 TIMEOUT(), &request
);
1840 if (tevent_req_nomem(subreq
, req
)) {
1843 tevent_req_set_callback(subreq
, recover_db_thaw_done
, req
);
1846 static void recover_db_thaw_done(struct tevent_req
*subreq
)
1848 struct tevent_req
*req
= tevent_req_callback_data(
1849 subreq
, struct tevent_req
);
1850 struct recover_db_state
*state
= tevent_req_data(
1851 req
, struct recover_db_state
);
1856 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1858 TALLOC_FREE(subreq
);
1863 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1867 D_ERR("control DB_THAW failed for db %s on node %u,"
1868 " ret=%d\n", state
->db_name
, pnn
, ret2
);
1870 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1871 state
->db_name
, ret
);
1873 tevent_req_error(req
, ret
);
1877 tevent_req_done(req
);
1880 static bool recover_db_recv(struct tevent_req
*req
)
1882 return generic_recv(req
, NULL
);
1887 * Start database recovery for each database
1889 * Try to recover each database 5 times before failing recovery.
1892 struct db_recovery_state
{
1893 struct tevent_context
*ev
;
1894 struct ctdb_dbid_map
*dbmap
;
1895 unsigned int num_replies
;
1896 unsigned int num_failed
;
1899 struct db_recovery_one_state
{
1900 struct tevent_req
*req
;
1901 struct ctdb_client_context
*client
;
1902 struct ctdb_dbid_map
*dbmap
;
1903 struct ctdb_tunable_list
*tun_list
;
1907 uint32_t *ban_credits
;
1908 uint32_t generation
;
1914 static void db_recovery_one_done(struct tevent_req
*subreq
);
1916 static struct tevent_req
*db_recovery_send(TALLOC_CTX
*mem_ctx
,
1917 struct tevent_context
*ev
,
1918 struct ctdb_client_context
*client
,
1919 struct ctdb_dbid_map
*dbmap
,
1920 struct ctdb_tunable_list
*tun_list
,
1921 uint32_t *pnn_list
, int count
,
1923 uint32_t *ban_credits
,
1924 uint32_t generation
)
1926 struct tevent_req
*req
, *subreq
;
1927 struct db_recovery_state
*state
;
1930 req
= tevent_req_create(mem_ctx
, &state
, struct db_recovery_state
);
1936 state
->dbmap
= dbmap
;
1937 state
->num_replies
= 0;
1938 state
->num_failed
= 0;
1940 if (dbmap
->num
== 0) {
1941 tevent_req_done(req
);
1942 return tevent_req_post(req
, ev
);
1945 for (i
=0; i
<dbmap
->num
; i
++) {
1946 struct db_recovery_one_state
*substate
;
1948 substate
= talloc_zero(state
, struct db_recovery_one_state
);
1949 if (tevent_req_nomem(substate
, req
)) {
1950 return tevent_req_post(req
, ev
);
1953 substate
->req
= req
;
1954 substate
->client
= client
;
1955 substate
->dbmap
= dbmap
;
1956 substate
->tun_list
= tun_list
;
1957 substate
->pnn_list
= pnn_list
;
1958 substate
->count
= count
;
1959 substate
->caps
= caps
;
1960 substate
->ban_credits
= ban_credits
;
1961 substate
->generation
= generation
;
1962 substate
->db_id
= dbmap
->dbs
[i
].db_id
;
1963 substate
->db_flags
= dbmap
->dbs
[i
].flags
;
1965 subreq
= recover_db_send(state
, ev
, client
, tun_list
,
1966 pnn_list
, count
, caps
, ban_credits
,
1967 generation
, substate
->db_id
,
1968 substate
->db_flags
);
1969 if (tevent_req_nomem(subreq
, req
)) {
1970 return tevent_req_post(req
, ev
);
1972 tevent_req_set_callback(subreq
, db_recovery_one_done
,
1974 D_NOTICE("recover database 0x%08x\n", substate
->db_id
);
1980 static void db_recovery_one_done(struct tevent_req
*subreq
)
1982 struct db_recovery_one_state
*substate
= tevent_req_callback_data(
1983 subreq
, struct db_recovery_one_state
);
1984 struct tevent_req
*req
= substate
->req
;
1985 struct db_recovery_state
*state
= tevent_req_data(
1986 req
, struct db_recovery_state
);
1989 status
= recover_db_recv(subreq
);
1990 TALLOC_FREE(subreq
);
1993 talloc_free(substate
);
1997 substate
->num_fails
+= 1;
1998 if (substate
->num_fails
< NUM_RETRIES
) {
1999 subreq
= recover_db_send(state
, state
->ev
, substate
->client
,
2001 substate
->pnn_list
, substate
->count
,
2002 substate
->caps
, substate
->ban_credits
,
2003 substate
->generation
, substate
->db_id
,
2004 substate
->db_flags
);
2005 if (tevent_req_nomem(subreq
, req
)) {
2008 tevent_req_set_callback(subreq
, db_recovery_one_done
, substate
);
2009 D_NOTICE("recover database 0x%08x, attempt %d\n",
2010 substate
->db_id
, substate
->num_fails
+1);
2015 state
->num_failed
+= 1;
2018 state
->num_replies
+= 1;
2020 if (state
->num_replies
== state
->dbmap
->num
) {
2021 tevent_req_done(req
);
2025 static bool db_recovery_recv(struct tevent_req
*req
, unsigned int *count
)
2027 struct db_recovery_state
*state
= tevent_req_data(
2028 req
, struct db_recovery_state
);
2031 if (tevent_req_is_unix_error(req
, &err
)) {
2036 *count
= state
->num_replies
- state
->num_failed
;
2038 if (state
->num_failed
> 0) {
2047 * Run the parallel database recovery
2052 * - Get capabilities from all nodes
2054 * - Set RECOVERY_ACTIVE
2055 * - Send START_RECOVERY
2056 * - Update vnnmap on all nodes
2057 * - Run database recovery
2058 * - Set RECOVERY_NORMAL
2059 * - Send END_RECOVERY
2062 struct recovery_state
{
2063 struct tevent_context
*ev
;
2064 struct ctdb_client_context
*client
;
2065 uint32_t generation
;
2069 struct ctdb_node_map
*nodemap
;
2071 uint32_t *ban_credits
;
2072 struct ctdb_tunable_list
*tun_list
;
2073 struct ctdb_vnn_map
*vnnmap
;
2074 struct ctdb_dbid_map
*dbmap
;
2077 static void recovery_tunables_done(struct tevent_req
*subreq
);
2078 static void recovery_nodemap_done(struct tevent_req
*subreq
);
2079 static void recovery_vnnmap_done(struct tevent_req
*subreq
);
2080 static void recovery_capabilities_done(struct tevent_req
*subreq
);
2081 static void recovery_dbmap_done(struct tevent_req
*subreq
);
2082 static void recovery_active_done(struct tevent_req
*subreq
);
2083 static void recovery_start_recovery_done(struct tevent_req
*subreq
);
2084 static void recovery_vnnmap_update_done(struct tevent_req
*subreq
);
2085 static void recovery_db_recovery_done(struct tevent_req
*subreq
);
2086 static void recovery_failed_done(struct tevent_req
*subreq
);
2087 static void recovery_normal_done(struct tevent_req
*subreq
);
2088 static void recovery_end_recovery_done(struct tevent_req
*subreq
);
2090 static struct tevent_req
*recovery_send(TALLOC_CTX
*mem_ctx
,
2091 struct tevent_context
*ev
,
2092 struct ctdb_client_context
*client
,
2093 uint32_t generation
)
2095 struct tevent_req
*req
, *subreq
;
2096 struct recovery_state
*state
;
2097 struct ctdb_req_control request
;
2099 req
= tevent_req_create(mem_ctx
, &state
, struct recovery_state
);
2105 state
->client
= client
;
2106 state
->generation
= generation
;
2107 state
->destnode
= ctdb_client_pnn(client
);
2109 ctdb_req_control_get_all_tunables(&request
);
2110 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2111 state
->destnode
, TIMEOUT(),
2113 if (tevent_req_nomem(subreq
, req
)) {
2114 return tevent_req_post(req
, ev
);
2116 tevent_req_set_callback(subreq
, recovery_tunables_done
, req
);
2121 static void recovery_tunables_done(struct tevent_req
*subreq
)
2123 struct tevent_req
*req
= tevent_req_callback_data(
2124 subreq
, struct tevent_req
);
2125 struct recovery_state
*state
= tevent_req_data(
2126 req
, struct recovery_state
);
2127 struct ctdb_reply_control
*reply
;
2128 struct ctdb_req_control request
;
2132 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2133 TALLOC_FREE(subreq
);
2135 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret
);
2136 tevent_req_error(req
, ret
);
2140 ret
= ctdb_reply_control_get_all_tunables(reply
, state
,
2143 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret
);
2144 tevent_req_error(req
, EPROTO
);
2150 recover_timeout
= state
->tun_list
->recover_timeout
;
2152 ctdb_req_control_get_nodemap(&request
);
2153 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2154 state
->destnode
, TIMEOUT(),
2156 if (tevent_req_nomem(subreq
, req
)) {
2159 tevent_req_set_callback(subreq
, recovery_nodemap_done
, req
);
2162 static void recovery_nodemap_done(struct tevent_req
*subreq
)
2164 struct tevent_req
*req
= tevent_req_callback_data(
2165 subreq
, struct tevent_req
);
2166 struct recovery_state
*state
= tevent_req_data(
2167 req
, struct recovery_state
);
2168 struct ctdb_reply_control
*reply
;
2169 struct ctdb_req_control request
;
2173 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2174 TALLOC_FREE(subreq
);
2176 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2177 state
->destnode
, ret
);
2178 tevent_req_error(req
, ret
);
2182 ret
= ctdb_reply_control_get_nodemap(reply
, state
, &state
->nodemap
);
2184 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret
);
2185 tevent_req_error(req
, ret
);
2189 state
->count
= list_of_active_nodes(state
->nodemap
, CTDB_UNKNOWN_PNN
,
2190 state
, &state
->pnn_list
);
2191 if (state
->count
<= 0) {
2192 tevent_req_error(req
, ENOMEM
);
2196 state
->ban_credits
= talloc_zero_array(state
, uint32_t,
2197 state
->nodemap
->num
);
2198 if (tevent_req_nomem(state
->ban_credits
, req
)) {
2202 ctdb_req_control_getvnnmap(&request
);
2203 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2204 state
->destnode
, TIMEOUT(),
2206 if (tevent_req_nomem(subreq
, req
)) {
2209 tevent_req_set_callback(subreq
, recovery_vnnmap_done
, req
);
2212 static void recovery_vnnmap_done(struct tevent_req
*subreq
)
2214 struct tevent_req
*req
= tevent_req_callback_data(
2215 subreq
, struct tevent_req
);
2216 struct recovery_state
*state
= tevent_req_data(
2217 req
, struct recovery_state
);
2218 struct ctdb_reply_control
*reply
;
2219 struct ctdb_req_control request
;
2223 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2224 TALLOC_FREE(subreq
);
2226 D_ERR("control GETVNNMAP failed to node %u, ret=%d\n",
2227 state
->destnode
, ret
);
2228 tevent_req_error(req
, ret
);
2232 ret
= ctdb_reply_control_getvnnmap(reply
, state
, &state
->vnnmap
);
2234 D_ERR("control GETVNNMAP failed, ret=%d\n", ret
);
2235 tevent_req_error(req
, ret
);
2239 ctdb_req_control_get_capabilities(&request
);
2240 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2242 state
->pnn_list
, state
->count
,
2243 TIMEOUT(), &request
);
2244 if (tevent_req_nomem(subreq
, req
)) {
2247 tevent_req_set_callback(subreq
, recovery_capabilities_done
, req
);
2250 static void recovery_capabilities_done(struct tevent_req
*subreq
)
2252 struct tevent_req
*req
= tevent_req_callback_data(
2253 subreq
, struct tevent_req
);
2254 struct recovery_state
*state
= tevent_req_data(
2255 req
, struct recovery_state
);
2256 struct ctdb_reply_control
**reply
;
2257 struct ctdb_req_control request
;
2263 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
2265 TALLOC_FREE(subreq
);
2270 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2274 D_ERR("control GET_CAPABILITIES failed on node %u,"
2275 " ret=%d\n", pnn
, ret2
);
2277 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2280 tevent_req_error(req
, ret
);
2284 /* Make the array size same as nodemap */
2285 state
->caps
= talloc_zero_array(state
, uint32_t,
2286 state
->nodemap
->num
);
2287 if (tevent_req_nomem(state
->caps
, req
)) {
2291 for (i
=0; i
<state
->count
; i
++) {
2294 pnn
= state
->pnn_list
[i
];
2295 ret
= ctdb_reply_control_get_capabilities(reply
[i
],
2298 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2300 tevent_req_error(req
, EPROTO
);
2307 ctdb_req_control_get_dbmap(&request
);
2308 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2309 state
->destnode
, TIMEOUT(),
2311 if (tevent_req_nomem(subreq
, req
)) {
2314 tevent_req_set_callback(subreq
, recovery_dbmap_done
, req
);
2317 static void recovery_dbmap_done(struct tevent_req
*subreq
)
2319 struct tevent_req
*req
= tevent_req_callback_data(
2320 subreq
, struct tevent_req
);
2321 struct recovery_state
*state
= tevent_req_data(
2322 req
, struct recovery_state
);
2323 struct ctdb_reply_control
*reply
;
2324 struct ctdb_req_control request
;
2328 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2329 TALLOC_FREE(subreq
);
2331 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2332 state
->destnode
, ret
);
2333 tevent_req_error(req
, ret
);
2337 ret
= ctdb_reply_control_get_dbmap(reply
, state
, &state
->dbmap
);
2339 D_ERR("control GET_DBMAP failed, ret=%d\n", ret
);
2340 tevent_req_error(req
, ret
);
2344 ctdb_req_control_set_recmode(&request
, CTDB_RECOVERY_ACTIVE
);
2345 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2347 state
->pnn_list
, state
->count
,
2348 TIMEOUT(), &request
);
2349 if (tevent_req_nomem(subreq
, req
)) {
2352 tevent_req_set_callback(subreq
, recovery_active_done
, req
);
2355 static void recovery_active_done(struct tevent_req
*subreq
)
2357 struct tevent_req
*req
= tevent_req_callback_data(
2358 subreq
, struct tevent_req
);
2359 struct recovery_state
*state
= tevent_req_data(
2360 req
, struct recovery_state
);
2361 struct ctdb_req_control request
;
2362 struct ctdb_vnn_map
*vnnmap
;
2365 unsigned int count
, i
;
2368 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2370 TALLOC_FREE(subreq
);
2375 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2379 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2380 " ret=%d\n", pnn
, ret2
);
2382 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2385 tevent_req_error(req
, ret
);
2389 D_ERR("Set recovery mode to ACTIVE\n");
2391 /* Calculate new VNNMAP */
2393 for (i
=0; i
<state
->nodemap
->num
; i
++) {
2394 if (state
->nodemap
->node
[i
].flags
& NODE_FLAGS_INACTIVE
) {
2397 if (!(state
->caps
[i
] & CTDB_CAP_LMASTER
)) {
2404 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2407 vnnmap
= talloc_zero(state
, struct ctdb_vnn_map
);
2408 if (tevent_req_nomem(vnnmap
, req
)) {
2412 vnnmap
->size
= (count
== 0 ? 1 : count
);
2413 vnnmap
->map
= talloc_array(vnnmap
, uint32_t, vnnmap
->size
);
2414 if (tevent_req_nomem(vnnmap
->map
, req
)) {
2419 vnnmap
->map
[0] = state
->destnode
;
2422 for (i
=0; i
<state
->nodemap
->num
; i
++) {
2423 if (state
->nodemap
->node
[i
].flags
&
2424 NODE_FLAGS_INACTIVE
) {
2427 if (!(state
->caps
[i
] & CTDB_CAP_LMASTER
)) {
2431 vnnmap
->map
[count
] = state
->nodemap
->node
[i
].pnn
;
2436 vnnmap
->generation
= state
->generation
;
2438 talloc_free(state
->vnnmap
);
2439 state
->vnnmap
= vnnmap
;
2441 ctdb_req_control_start_recovery(&request
);
2442 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2444 state
->pnn_list
, state
->count
,
2445 TIMEOUT(), &request
);
2446 if (tevent_req_nomem(subreq
, req
)) {
2449 tevent_req_set_callback(subreq
, recovery_start_recovery_done
, req
);
2452 static void recovery_start_recovery_done(struct tevent_req
*subreq
)
2454 struct tevent_req
*req
= tevent_req_callback_data(
2455 subreq
, struct tevent_req
);
2456 struct recovery_state
*state
= tevent_req_data(
2457 req
, struct recovery_state
);
2458 struct ctdb_req_control request
;
2463 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2465 TALLOC_FREE(subreq
);
2470 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2474 D_ERR("failed to run start_recovery event on node %u,"
2475 " ret=%d\n", pnn
, ret2
);
2477 D_ERR("failed to run start_recovery event, ret=%d\n",
2480 tevent_req_error(req
, ret
);
2484 D_ERR("start_recovery event finished\n");
2486 ctdb_req_control_setvnnmap(&request
, state
->vnnmap
);
2487 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2489 state
->pnn_list
, state
->count
,
2490 TIMEOUT(), &request
);
2491 if (tevent_req_nomem(subreq
, req
)) {
2494 tevent_req_set_callback(subreq
, recovery_vnnmap_update_done
, req
);
2497 static void recovery_vnnmap_update_done(struct tevent_req
*subreq
)
2499 struct tevent_req
*req
= tevent_req_callback_data(
2500 subreq
, struct tevent_req
);
2501 struct recovery_state
*state
= tevent_req_data(
2502 req
, struct recovery_state
);
2507 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2509 TALLOC_FREE(subreq
);
2514 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2518 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2521 D_ERR("failed to update VNNMAP, ret=%d\n", ret
);
2523 tevent_req_error(req
, ret
);
2527 D_NOTICE("updated VNNMAP\n");
2529 subreq
= db_recovery_send(state
, state
->ev
, state
->client
,
2530 state
->dbmap
, state
->tun_list
,
2531 state
->pnn_list
, state
->count
,
2532 state
->caps
, state
->ban_credits
,
2533 state
->vnnmap
->generation
);
2534 if (tevent_req_nomem(subreq
, req
)) {
2537 tevent_req_set_callback(subreq
, recovery_db_recovery_done
, req
);
2540 static void recovery_db_recovery_done(struct tevent_req
*subreq
)
2542 struct tevent_req
*req
= tevent_req_callback_data(
2543 subreq
, struct tevent_req
);
2544 struct recovery_state
*state
= tevent_req_data(
2545 req
, struct recovery_state
);
2546 struct ctdb_req_control request
;
2550 status
= db_recovery_recv(subreq
, &count
);
2551 TALLOC_FREE(subreq
);
2553 D_ERR("%d of %d databases recovered\n", count
, state
->dbmap
->num
);
2556 uint32_t max_pnn
= CTDB_UNKNOWN_PNN
, max_credits
= 0;
2559 /* Bans are not enabled */
2560 if (state
->tun_list
->enable_bans
== 0) {
2561 tevent_req_error(req
, EIO
);
2565 for (i
=0; i
<state
->count
; i
++) {
2567 pnn
= state
->pnn_list
[i
];
2568 if (state
->ban_credits
[pnn
] > max_credits
) {
2570 max_credits
= state
->ban_credits
[pnn
];
2574 /* If pulling database fails multiple times */
2575 if (max_credits
>= NUM_RETRIES
) {
2576 struct ctdb_ban_state ban_state
= {
2578 .time
= state
->tun_list
->recovery_ban_period
,
2581 D_ERR("Banning node %u for %u seconds\n",
2585 ctdb_req_control_set_ban_state(&request
,
2587 subreq
= ctdb_client_control_send(state
,
2593 if (tevent_req_nomem(subreq
, req
)) {
2596 tevent_req_set_callback(subreq
,
2597 recovery_failed_done
,
2600 tevent_req_error(req
, EIO
);
2605 ctdb_req_control_set_recmode(&request
, CTDB_RECOVERY_NORMAL
);
2606 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2608 state
->pnn_list
, state
->count
,
2609 TIMEOUT(), &request
);
2610 if (tevent_req_nomem(subreq
, req
)) {
2613 tevent_req_set_callback(subreq
, recovery_normal_done
, req
);
2616 static void recovery_failed_done(struct tevent_req
*subreq
)
2618 struct tevent_req
*req
= tevent_req_callback_data(
2619 subreq
, struct tevent_req
);
2620 struct recovery_state
*state
= tevent_req_data(
2621 req
, struct recovery_state
);
2622 struct ctdb_reply_control
*reply
;
2626 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2627 TALLOC_FREE(subreq
);
2629 D_ERR("failed to ban node, ret=%d\n", ret
);
2633 ret
= ctdb_reply_control_set_ban_state(reply
);
2635 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret
);
2639 tevent_req_error(req
, EIO
);
2642 static void recovery_normal_done(struct tevent_req
*subreq
)
2644 struct tevent_req
*req
= tevent_req_callback_data(
2645 subreq
, struct tevent_req
);
2646 struct recovery_state
*state
= tevent_req_data(
2647 req
, struct recovery_state
);
2648 struct ctdb_req_control request
;
2653 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
2655 TALLOC_FREE(subreq
);
2660 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2664 D_ERR("failed to set recovery mode NORMAL on node %u,"
2665 " ret=%d\n", pnn
, ret2
);
2667 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
2670 tevent_req_error(req
, ret
);
2674 D_ERR("Set recovery mode to NORMAL\n");
2676 ctdb_req_control_end_recovery(&request
);
2677 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2679 state
->pnn_list
, state
->count
,
2680 TIMEOUT(), &request
);
2681 if (tevent_req_nomem(subreq
, req
)) {
2684 tevent_req_set_callback(subreq
, recovery_end_recovery_done
, req
);
2687 static void recovery_end_recovery_done(struct tevent_req
*subreq
)
2689 struct tevent_req
*req
= tevent_req_callback_data(
2690 subreq
, struct tevent_req
);
2691 struct recovery_state
*state
= tevent_req_data(
2692 req
, struct recovery_state
);
2697 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
2699 TALLOC_FREE(subreq
);
2704 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2708 D_ERR("failed to run recovered event on node %u,"
2709 " ret=%d\n", pnn
, ret2
);
2711 D_ERR("failed to run recovered event, ret=%d\n", ret
);
2713 tevent_req_error(req
, ret
);
2717 D_ERR("recovered event finished\n");
2719 tevent_req_done(req
);
2722 static void recovery_recv(struct tevent_req
*req
, int *perr
)
2724 generic_recv(req
, perr
);
2727 static void usage(const char *progname
)
2729 fprintf(stderr
, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2735 * Arguments - log fd, write fd, socket path, generation
2737 int main(int argc
, char *argv
[])
2740 const char *sockpath
;
2741 TALLOC_CTX
*mem_ctx
= NULL
;
2742 struct tevent_context
*ev
;
2743 struct ctdb_client_context
*client
;
2745 struct tevent_req
*req
;
2746 uint32_t generation
;
2753 write_fd
= atoi(argv
[1]);
2755 generation
= (uint32_t)smb_strtoul(argv
[3],
2761 fprintf(stderr
, "recovery: unable to initialize generation\n");
2765 mem_ctx
= talloc_new(NULL
);
2766 if (mem_ctx
== NULL
) {
2767 fprintf(stderr
, "recovery: talloc_new() failed\n");
2771 ret
= logging_init(mem_ctx
, NULL
, NULL
, "ctdb-recovery");
2773 fprintf(stderr
, "recovery: Unable to initialize logging\n");
2777 ev
= tevent_context_init(mem_ctx
);
2779 D_ERR("tevent_context_init() failed\n");
2783 ret
= ctdb_client_init(mem_ctx
, ev
, sockpath
, &client
);
2785 D_ERR("ctdb_client_init() failed, ret=%d\n", ret
);
2789 req
= recovery_send(mem_ctx
, ev
, client
, generation
);
2791 D_ERR("database_recover_send() failed\n");
2795 if (! tevent_req_poll(req
, ev
)) {
2796 D_ERR("tevent_req_poll() failed\n");
2800 recovery_recv(req
, &ret
);
2803 D_ERR("database recovery failed, ret=%d\n", ret
);
2807 sys_write(write_fd
, &ret
, sizeof(ret
));
2811 TALLOC_FREE(mem_ctx
);