2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/sys_rw.h"
32 #include "lib/util/time.h"
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/util.h"
36 #include "protocol/protocol.h"
37 #include "protocol/protocol_api.h"
38 #include "client/client.h"
40 #include "common/logging.h"
42 static int recover_timeout
= 30;
46 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
52 static bool generic_recv(struct tevent_req
*req
, int *perr
)
56 if (tevent_req_is_unix_error(req
, &err
)) {
66 static uint64_t rec_srvid
= CTDB_SRVID_RECOVERY
;
68 static uint64_t srvid_next(void)
75 * Node related functions
81 uint32_t *ban_credits
;
86 static struct node_list
*node_list_init(TALLOC_CTX
*mem_ctx
, unsigned int size
)
88 struct node_list
*nlist
;
91 nlist
= talloc_zero(mem_ctx
, struct node_list
);
96 nlist
->pnn_list
= talloc_array(nlist
, uint32_t, size
);
97 nlist
->caps
= talloc_zero_array(nlist
, uint32_t, size
);
98 nlist
->ban_credits
= talloc_zero_array(nlist
, uint32_t, size
);
100 if (nlist
->pnn_list
== NULL
||
101 nlist
->caps
== NULL
||
102 nlist
->ban_credits
== NULL
) {
108 for (i
=0; i
<nlist
->size
; i
++) {
109 nlist
->pnn_list
[i
] = CTDB_UNKNOWN_PNN
;
115 static bool node_list_add(struct node_list
*nlist
, uint32_t pnn
)
119 if (nlist
->count
== nlist
->size
) {
123 for (i
=0; i
<nlist
->count
; i
++) {
124 if (nlist
->pnn_list
[i
] == pnn
) {
129 nlist
->pnn_list
[nlist
->count
] = pnn
;
135 static uint32_t *node_list_lmaster(struct node_list
*nlist
,
137 unsigned int *pnn_count
)
140 unsigned int count
, i
;
142 pnn_list
= talloc_zero_array(mem_ctx
, uint32_t, nlist
->count
);
143 if (pnn_list
== NULL
) {
148 for (i
=0; i
<nlist
->count
; i
++) {
149 if (!(nlist
->caps
[i
] & CTDB_CAP_LMASTER
)) {
153 pnn_list
[count
] = nlist
->pnn_list
[i
];
161 static void node_list_ban_credits(struct node_list
*nlist
, uint32_t pnn
)
165 for (i
=0; i
<nlist
->count
; i
++) {
166 if (nlist
->pnn_list
[i
] == pnn
) {
167 nlist
->ban_credits
[i
] += 1;
174 * Database list functions
176 * Simple, naive implementation that could be updated to a db_hash or similar
180 struct db
*prev
, *next
;
185 unsigned int num_nodes
;
189 unsigned int num_dbs
;
191 unsigned int num_nodes
;
194 static struct db_list
*db_list_init(TALLOC_CTX
*mem_ctx
, unsigned int num_nodes
)
198 l
= talloc_zero(mem_ctx
, struct db_list
);
199 l
->num_nodes
= num_nodes
;
204 static struct db
*db_list_find(struct db_list
*dblist
, uint32_t db_id
)
208 if (dblist
== NULL
) {
213 while (db
!= NULL
&& db
->db_id
!= db_id
) {
220 static int db_list_add(struct db_list
*dblist
,
225 struct db
*db
= NULL
;
227 if (dblist
== NULL
) {
231 db
= talloc_zero(dblist
, struct db
);
237 db
->db_flags
= db_flags
;
238 db
->pnn_list
= talloc_zero_array(db
, uint32_t, dblist
->num_nodes
);
239 if (db
->pnn_list
== NULL
) {
243 db
->pnn_list
[0] = node
;
246 DLIST_ADD_END(dblist
->db
, db
);
252 static int db_list_check_and_add(struct db_list
*dblist
,
257 struct db
*db
= NULL
;
261 * These flags are masked out because they are only set on a
262 * node when a client attaches to that node, so they might not
263 * be set yet. They can't be passed as part of the attch, so
264 * they're no use here.
266 db_flags
&= ~(CTDB_DB_FLAGS_READONLY
| CTDB_DB_FLAGS_STICKY
);
268 if (dblist
== NULL
) {
272 db
= db_list_find(dblist
, db_id
);
274 ret
= db_list_add(dblist
, db_id
, db_flags
, node
);
278 if (db
->db_flags
!= db_flags
) {
279 D_ERR("Incompatible database flags for 0x%"PRIx32
" "
280 "(0x%"PRIx32
" != 0x%"PRIx32
")\n",
287 if (db
->num_nodes
>= dblist
->num_nodes
) {
291 db
->pnn_list
[db
->num_nodes
] = node
;
298 * Recovery database functions
301 struct recdb_context
{
309 static struct recdb_context
*recdb_create(TALLOC_CTX
*mem_ctx
, uint32_t db_id
,
312 uint32_t hash_size
, bool persistent
)
314 static char *db_dir_state
= NULL
;
315 struct recdb_context
*recdb
;
316 unsigned int tdb_flags
;
318 recdb
= talloc(mem_ctx
, struct recdb_context
);
323 if (db_dir_state
== NULL
) {
324 db_dir_state
= getenv("CTDB_DBDIR_STATE");
327 recdb
->db_name
= db_name
;
328 recdb
->db_id
= db_id
;
329 recdb
->db_path
= talloc_asprintf(recdb
, "%s/recdb.%s",
330 db_dir_state
!= NULL
?
332 dirname(discard_const(db_path
)),
334 if (recdb
->db_path
== NULL
) {
338 unlink(recdb
->db_path
);
340 tdb_flags
= TDB_NOLOCK
| TDB_INCOMPATIBLE_HASH
| TDB_DISALLOW_NESTING
;
341 recdb
->db
= tdb_wrap_open(mem_ctx
, recdb
->db_path
, hash_size
,
342 tdb_flags
, O_RDWR
|O_CREAT
|O_EXCL
, 0600);
343 if (recdb
->db
== NULL
) {
345 D_ERR("failed to create recovery db %s\n", recdb
->db_path
);
349 recdb
->persistent
= persistent
;
354 static uint32_t recdb_id(struct recdb_context
*recdb
)
359 static const char *recdb_name(struct recdb_context
*recdb
)
361 return recdb
->db_name
;
364 static const char *recdb_path(struct recdb_context
*recdb
)
366 return recdb
->db_path
;
369 static struct tdb_context
*recdb_tdb(struct recdb_context
*recdb
)
371 return recdb
->db
->tdb
;
374 static bool recdb_persistent(struct recdb_context
*recdb
)
376 return recdb
->persistent
;
379 struct recdb_add_traverse_state
{
380 struct recdb_context
*recdb
;
384 static int recdb_add_traverse(uint32_t reqid
, struct ctdb_ltdb_header
*header
,
385 TDB_DATA key
, TDB_DATA data
,
388 struct recdb_add_traverse_state
*state
=
389 (struct recdb_add_traverse_state
*)private_data
;
390 struct ctdb_ltdb_header
*hdr
;
394 /* header is not marshalled separately in the pulldb control */
395 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
399 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
401 /* fetch the existing record, if any */
402 prev_data
= tdb_fetch(recdb_tdb(state
->recdb
), key
);
404 if (prev_data
.dptr
!= NULL
) {
405 struct ctdb_ltdb_header prev_hdr
;
407 prev_hdr
= *(struct ctdb_ltdb_header
*)prev_data
.dptr
;
408 free(prev_data
.dptr
);
409 if (hdr
->rsn
< prev_hdr
.rsn
||
410 (hdr
->rsn
== prev_hdr
.rsn
&&
411 prev_hdr
.dmaster
!= state
->mypnn
)) {
416 ret
= tdb_store(recdb_tdb(state
->recdb
), key
, data
, TDB_REPLACE
);
423 static bool recdb_add(struct recdb_context
*recdb
, int mypnn
,
424 struct ctdb_rec_buffer
*recbuf
)
426 struct recdb_add_traverse_state state
;
432 ret
= ctdb_rec_buffer_traverse(recbuf
, recdb_add_traverse
, &state
);
440 /* This function decides which records from recdb are retained */
441 static int recbuf_filter_add(struct ctdb_rec_buffer
*recbuf
, bool persistent
,
442 uint32_t reqid
, uint32_t dmaster
,
443 TDB_DATA key
, TDB_DATA data
)
445 struct ctdb_ltdb_header
*header
;
448 /* Skip empty records */
449 if (data
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
453 /* update the dmaster field to point to us */
454 header
= (struct ctdb_ltdb_header
*)data
.dptr
;
456 header
->dmaster
= dmaster
;
457 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
460 ret
= ctdb_rec_buffer_add(recbuf
, recbuf
, reqid
, NULL
, key
, data
);
468 struct recdb_records_traverse_state
{
469 struct ctdb_rec_buffer
*recbuf
;
476 static int recdb_records_traverse(struct tdb_context
*tdb
,
477 TDB_DATA key
, TDB_DATA data
,
480 struct recdb_records_traverse_state
*state
=
481 (struct recdb_records_traverse_state
*)private_data
;
484 ret
= recbuf_filter_add(state
->recbuf
, state
->persistent
,
485 state
->reqid
, state
->dmaster
, key
, data
);
487 state
->failed
= true;
494 static struct ctdb_rec_buffer
*recdb_records(struct recdb_context
*recdb
,
498 struct recdb_records_traverse_state state
;
501 state
.recbuf
= ctdb_rec_buffer_init(mem_ctx
, recdb_id(recdb
));
502 if (state
.recbuf
== NULL
) {
505 state
.dmaster
= dmaster
;
507 state
.persistent
= recdb_persistent(recdb
);
508 state
.failed
= false;
510 ret
= tdb_traverse_read(recdb_tdb(recdb
), recdb_records_traverse
,
512 if (ret
== -1 || state
.failed
) {
513 D_ERR("Failed to marshall recovery records for %s\n",
515 TALLOC_FREE(state
.recbuf
);
522 struct recdb_file_traverse_state
{
523 struct ctdb_rec_buffer
*recbuf
;
524 struct recdb_context
*recdb
;
532 unsigned int num_buffers
;
535 static int recdb_file_traverse(struct tdb_context
*tdb
,
536 TDB_DATA key
, TDB_DATA data
,
539 struct recdb_file_traverse_state
*state
=
540 (struct recdb_file_traverse_state
*)private_data
;
543 ret
= recbuf_filter_add(state
->recbuf
, state
->persistent
,
544 state
->reqid
, state
->dmaster
, key
, data
);
546 state
->failed
= true;
550 if (ctdb_rec_buffer_len(state
->recbuf
) > state
->max_size
) {
551 ret
= ctdb_rec_buffer_write(state
->recbuf
, state
->fd
);
553 D_ERR("Failed to collect recovery records for %s\n",
554 recdb_name(state
->recdb
));
555 state
->failed
= true;
559 state
->num_buffers
+= 1;
561 TALLOC_FREE(state
->recbuf
);
562 state
->recbuf
= ctdb_rec_buffer_init(state
->mem_ctx
,
563 recdb_id(state
->recdb
));
564 if (state
->recbuf
== NULL
) {
565 state
->failed
= true;
573 static int recdb_file(struct recdb_context
*recdb
, TALLOC_CTX
*mem_ctx
,
574 uint32_t dmaster
, int fd
, int max_size
)
576 struct recdb_file_traverse_state state
;
579 state
.recbuf
= ctdb_rec_buffer_init(mem_ctx
, recdb_id(recdb
));
580 if (state
.recbuf
== NULL
) {
584 state
.mem_ctx
= mem_ctx
;
585 state
.dmaster
= dmaster
;
587 state
.persistent
= recdb_persistent(recdb
);
588 state
.failed
= false;
590 state
.max_size
= max_size
;
591 state
.num_buffers
= 0;
593 ret
= tdb_traverse_read(recdb_tdb(recdb
), recdb_file_traverse
, &state
);
594 if (ret
== -1 || state
.failed
) {
595 TALLOC_FREE(state
.recbuf
);
599 ret
= ctdb_rec_buffer_write(state
.recbuf
, fd
);
601 D_ERR("Failed to collect recovery records for %s\n",
603 TALLOC_FREE(state
.recbuf
);
606 state
.num_buffers
+= 1;
608 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
609 state
.num_buffers
, recdb_name(recdb
));
611 return state
.num_buffers
;
615 * Pull database from a single node
618 struct pull_database_state
{
619 struct tevent_context
*ev
;
620 struct ctdb_client_context
*client
;
621 struct recdb_context
*recdb
;
624 unsigned int num_records
;
628 static void pull_database_handler(uint64_t srvid
, TDB_DATA data
,
630 static void pull_database_register_done(struct tevent_req
*subreq
);
631 static void pull_database_old_done(struct tevent_req
*subreq
);
632 static void pull_database_unregister_done(struct tevent_req
*subreq
);
633 static void pull_database_new_done(struct tevent_req
*subreq
);
635 static struct tevent_req
*pull_database_send(
637 struct tevent_context
*ev
,
638 struct ctdb_client_context
*client
,
639 uint32_t pnn
, uint32_t caps
,
640 struct recdb_context
*recdb
)
642 struct tevent_req
*req
, *subreq
;
643 struct pull_database_state
*state
;
644 struct ctdb_req_control request
;
646 req
= tevent_req_create(mem_ctx
, &state
, struct pull_database_state
);
652 state
->client
= client
;
653 state
->recdb
= recdb
;
655 state
->srvid
= srvid_next();
657 if (caps
& CTDB_CAP_FRAGMENTED_CONTROLS
) {
658 subreq
= ctdb_client_set_message_handler_send(
659 state
, state
->ev
, state
->client
,
660 state
->srvid
, pull_database_handler
,
662 if (tevent_req_nomem(subreq
, req
)) {
663 return tevent_req_post(req
, ev
);
666 tevent_req_set_callback(subreq
, pull_database_register_done
,
670 struct ctdb_pulldb pulldb
;
672 pulldb
.db_id
= recdb_id(recdb
);
673 pulldb
.lmaster
= CTDB_LMASTER_ANY
;
675 ctdb_req_control_pull_db(&request
, &pulldb
);
676 subreq
= ctdb_client_control_send(state
, state
->ev
,
680 if (tevent_req_nomem(subreq
, req
)) {
681 return tevent_req_post(req
, ev
);
683 tevent_req_set_callback(subreq
, pull_database_old_done
, req
);
689 static void pull_database_handler(uint64_t srvid
, TDB_DATA data
,
692 struct tevent_req
*req
= talloc_get_type_abort(
693 private_data
, struct tevent_req
);
694 struct pull_database_state
*state
= tevent_req_data(
695 req
, struct pull_database_state
);
696 struct ctdb_rec_buffer
*recbuf
;
701 if (srvid
!= state
->srvid
) {
705 ret
= ctdb_rec_buffer_pull(data
.dptr
, data
.dsize
, state
, &recbuf
, &np
);
707 D_ERR("Invalid data received for DB_PULL messages\n");
711 if (recbuf
->db_id
!= recdb_id(state
->recdb
)) {
713 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
714 recbuf
->db_id
, recdb_name(state
->recdb
));
718 status
= recdb_add(state
->recdb
, ctdb_client_pnn(state
->client
),
722 D_ERR("Failed to add records to recdb for %s\n",
723 recdb_name(state
->recdb
));
727 state
->num_records
+= recbuf
->count
;
731 static void pull_database_register_done(struct tevent_req
*subreq
)
733 struct tevent_req
*req
= tevent_req_callback_data(
734 subreq
, struct tevent_req
);
735 struct pull_database_state
*state
= tevent_req_data(
736 req
, struct pull_database_state
);
737 struct ctdb_req_control request
;
738 struct ctdb_pulldb_ext pulldb_ext
;
742 status
= ctdb_client_set_message_handler_recv(subreq
, &ret
);
745 D_ERR("Failed to set message handler for DB_PULL for %s\n",
746 recdb_name(state
->recdb
));
747 tevent_req_error(req
, ret
);
751 pulldb_ext
.db_id
= recdb_id(state
->recdb
);
752 pulldb_ext
.lmaster
= CTDB_LMASTER_ANY
;
753 pulldb_ext
.srvid
= state
->srvid
;
755 ctdb_req_control_db_pull(&request
, &pulldb_ext
);
756 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
757 state
->pnn
, TIMEOUT(), &request
);
758 if (tevent_req_nomem(subreq
, req
)) {
761 tevent_req_set_callback(subreq
, pull_database_new_done
, req
);
764 static void pull_database_old_done(struct tevent_req
*subreq
)
766 struct tevent_req
*req
= tevent_req_callback_data(
767 subreq
, struct tevent_req
);
768 struct pull_database_state
*state
= tevent_req_data(
769 req
, struct pull_database_state
);
770 struct ctdb_reply_control
*reply
;
771 struct ctdb_rec_buffer
*recbuf
;
775 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
778 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
779 recdb_name(state
->recdb
), state
->pnn
, ret
);
780 tevent_req_error(req
, ret
);
784 ret
= ctdb_reply_control_pull_db(reply
, state
, &recbuf
);
787 tevent_req_error(req
, ret
);
791 status
= recdb_add(state
->recdb
, ctdb_client_pnn(state
->client
),
795 tevent_req_error(req
, EIO
);
799 state
->num_records
= recbuf
->count
;
802 D_INFO("Pulled %d records for db %s from node %d\n",
803 state
->num_records
, recdb_name(state
->recdb
), state
->pnn
);
805 tevent_req_done(req
);
808 static void pull_database_new_done(struct tevent_req
*subreq
)
810 struct tevent_req
*req
= tevent_req_callback_data(
811 subreq
, struct tevent_req
);
812 struct pull_database_state
*state
= tevent_req_data(
813 req
, struct pull_database_state
);
814 struct ctdb_reply_control
*reply
;
815 uint32_t num_records
;
819 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
822 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
823 recdb_name(state
->recdb
), state
->pnn
, ret
);
828 ret
= ctdb_reply_control_db_pull(reply
, &num_records
);
830 if (num_records
!= state
->num_records
) {
831 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
832 num_records
, state
->num_records
,
833 recdb_name(state
->recdb
));
838 D_INFO("Pulled %d records for db %s from node %d\n",
839 state
->num_records
, recdb_name(state
->recdb
), state
->pnn
);
843 subreq
= ctdb_client_remove_message_handler_send(
844 state
, state
->ev
, state
->client
,
846 if (tevent_req_nomem(subreq
, req
)) {
849 tevent_req_set_callback(subreq
, pull_database_unregister_done
, req
);
852 static void pull_database_unregister_done(struct tevent_req
*subreq
)
854 struct tevent_req
*req
= tevent_req_callback_data(
855 subreq
, struct tevent_req
);
856 struct pull_database_state
*state
= tevent_req_data(
857 req
, struct pull_database_state
);
861 status
= ctdb_client_remove_message_handler_recv(subreq
, &ret
);
864 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
865 recdb_name(state
->recdb
));
866 tevent_req_error(req
, ret
);
870 if (state
->result
!= 0) {
871 tevent_req_error(req
, state
->result
);
875 tevent_req_done(req
);
878 static bool pull_database_recv(struct tevent_req
*req
, int *perr
)
880 return generic_recv(req
, perr
);
884 * Push database to specified nodes (old style)
887 struct push_database_old_state
{
888 struct tevent_context
*ev
;
889 struct ctdb_client_context
*client
;
890 struct recdb_context
*recdb
;
893 struct ctdb_rec_buffer
*recbuf
;
897 static void push_database_old_push_done(struct tevent_req
*subreq
);
899 static struct tevent_req
*push_database_old_send(
901 struct tevent_context
*ev
,
902 struct ctdb_client_context
*client
,
905 struct recdb_context
*recdb
)
907 struct tevent_req
*req
, *subreq
;
908 struct push_database_old_state
*state
;
909 struct ctdb_req_control request
;
912 req
= tevent_req_create(mem_ctx
, &state
,
913 struct push_database_old_state
);
919 state
->client
= client
;
920 state
->recdb
= recdb
;
921 state
->pnn_list
= pnn_list
;
922 state
->count
= count
;
925 state
->recbuf
= recdb_records(recdb
, state
,
926 ctdb_client_pnn(client
));
927 if (tevent_req_nomem(state
->recbuf
, req
)) {
928 return tevent_req_post(req
, ev
);
931 pnn
= state
->pnn_list
[state
->index
];
933 ctdb_req_control_push_db(&request
, state
->recbuf
);
934 subreq
= ctdb_client_control_send(state
, ev
, client
, pnn
,
935 TIMEOUT(), &request
);
936 if (tevent_req_nomem(subreq
, req
)) {
937 return tevent_req_post(req
, ev
);
939 tevent_req_set_callback(subreq
, push_database_old_push_done
, req
);
944 static void push_database_old_push_done(struct tevent_req
*subreq
)
946 struct tevent_req
*req
= tevent_req_callback_data(
947 subreq
, struct tevent_req
);
948 struct push_database_old_state
*state
= tevent_req_data(
949 req
, struct push_database_old_state
);
950 struct ctdb_req_control request
;
955 status
= ctdb_client_control_recv(subreq
, &ret
, NULL
, NULL
);
958 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
959 recdb_name(state
->recdb
), state
->pnn_list
[state
->index
],
961 tevent_req_error(req
, ret
);
966 if (state
->index
== state
->count
) {
967 TALLOC_FREE(state
->recbuf
);
968 tevent_req_done(req
);
972 pnn
= state
->pnn_list
[state
->index
];
974 ctdb_req_control_push_db(&request
, state
->recbuf
);
975 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
976 pnn
, TIMEOUT(), &request
);
977 if (tevent_req_nomem(subreq
, req
)) {
980 tevent_req_set_callback(subreq
, push_database_old_push_done
, req
);
983 static bool push_database_old_recv(struct tevent_req
*req
, int *perr
)
985 return generic_recv(req
, perr
);
989 * Push database to specified nodes (new style)
992 struct push_database_new_state
{
993 struct tevent_context
*ev
;
994 struct ctdb_client_context
*client
;
995 struct recdb_context
*recdb
;
1002 int num_buffers_sent
;
1003 unsigned int num_records
;
1006 static void push_database_new_started(struct tevent_req
*subreq
);
1007 static void push_database_new_send_msg(struct tevent_req
*req
);
1008 static void push_database_new_send_done(struct tevent_req
*subreq
);
1009 static void push_database_new_confirmed(struct tevent_req
*subreq
);
1011 static struct tevent_req
*push_database_new_send(
1012 TALLOC_CTX
*mem_ctx
,
1013 struct tevent_context
*ev
,
1014 struct ctdb_client_context
*client
,
1017 struct recdb_context
*recdb
,
1020 struct tevent_req
*req
, *subreq
;
1021 struct push_database_new_state
*state
;
1022 struct ctdb_req_control request
;
1023 struct ctdb_pulldb_ext pulldb_ext
;
1027 req
= tevent_req_create(mem_ctx
, &state
,
1028 struct push_database_new_state
);
1034 state
->client
= client
;
1035 state
->recdb
= recdb
;
1036 state
->pnn_list
= pnn_list
;
1037 state
->count
= count
;
1039 state
->srvid
= srvid_next();
1040 state
->dmaster
= ctdb_client_pnn(client
);
1041 state
->num_buffers_sent
= 0;
1042 state
->num_records
= 0;
1044 filename
= talloc_asprintf(state
, "%s.dat", recdb_path(recdb
));
1045 if (tevent_req_nomem(filename
, req
)) {
1046 return tevent_req_post(req
, ev
);
1049 state
->fd
= open(filename
, O_RDWR
|O_CREAT
, 0644);
1050 if (state
->fd
== -1) {
1051 tevent_req_error(req
, errno
);
1052 return tevent_req_post(req
, ev
);
1055 talloc_free(filename
);
1057 state
->num_buffers
= recdb_file(recdb
, state
, state
->dmaster
,
1058 state
->fd
, max_size
);
1059 if (state
->num_buffers
== -1) {
1060 tevent_req_error(req
, ENOMEM
);
1061 return tevent_req_post(req
, ev
);
1064 offset
= lseek(state
->fd
, 0, SEEK_SET
);
1066 tevent_req_error(req
, EIO
);
1067 return tevent_req_post(req
, ev
);
1070 pulldb_ext
.db_id
= recdb_id(recdb
);
1071 pulldb_ext
.srvid
= state
->srvid
;
1073 ctdb_req_control_db_push_start(&request
, &pulldb_ext
);
1074 subreq
= ctdb_client_control_multi_send(state
, ev
, client
,
1076 TIMEOUT(), &request
);
1077 if (tevent_req_nomem(subreq
, req
)) {
1078 return tevent_req_post(req
, ev
);
1080 tevent_req_set_callback(subreq
, push_database_new_started
, req
);
1085 static void push_database_new_started(struct tevent_req
*subreq
)
1087 struct tevent_req
*req
= tevent_req_callback_data(
1088 subreq
, struct tevent_req
);
1089 struct push_database_new_state
*state
= tevent_req_data(
1090 req
, struct push_database_new_state
);
1095 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
1097 TALLOC_FREE(subreq
);
1102 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1106 D_ERR("control DB_PUSH_START failed for db %s"
1107 " on node %u, ret=%d\n",
1108 recdb_name(state
->recdb
), pnn
, ret2
);
1110 D_ERR("control DB_PUSH_START failed for db %s,"
1112 recdb_name(state
->recdb
), ret
);
1114 talloc_free(err_list
);
1116 tevent_req_error(req
, ret
);
1120 push_database_new_send_msg(req
);
1123 static void push_database_new_send_msg(struct tevent_req
*req
)
1125 struct push_database_new_state
*state
= tevent_req_data(
1126 req
, struct push_database_new_state
);
1127 struct tevent_req
*subreq
;
1128 struct ctdb_rec_buffer
*recbuf
;
1129 struct ctdb_req_message message
;
1134 if (state
->num_buffers_sent
== state
->num_buffers
) {
1135 struct ctdb_req_control request
;
1137 ctdb_req_control_db_push_confirm(&request
,
1138 recdb_id(state
->recdb
));
1139 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1143 TIMEOUT(), &request
);
1144 if (tevent_req_nomem(subreq
, req
)) {
1147 tevent_req_set_callback(subreq
, push_database_new_confirmed
,
1152 ret
= ctdb_rec_buffer_read(state
->fd
, state
, &recbuf
);
1154 tevent_req_error(req
, ret
);
1158 data
.dsize
= ctdb_rec_buffer_len(recbuf
);
1159 data
.dptr
= talloc_size(state
, data
.dsize
);
1160 if (tevent_req_nomem(data
.dptr
, req
)) {
1164 ctdb_rec_buffer_push(recbuf
, data
.dptr
, &np
);
1166 message
.srvid
= state
->srvid
;
1167 message
.data
.data
= data
;
1169 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
1170 state
->num_buffers_sent
, recbuf
->count
,
1171 recdb_name(state
->recdb
));
1173 subreq
= ctdb_client_message_multi_send(state
, state
->ev
,
1175 state
->pnn_list
, state
->count
,
1177 if (tevent_req_nomem(subreq
, req
)) {
1180 tevent_req_set_callback(subreq
, push_database_new_send_done
, req
);
1182 state
->num_records
+= recbuf
->count
;
1184 talloc_free(data
.dptr
);
1185 talloc_free(recbuf
);
1188 static void push_database_new_send_done(struct tevent_req
*subreq
)
1190 struct tevent_req
*req
= tevent_req_callback_data(
1191 subreq
, struct tevent_req
);
1192 struct push_database_new_state
*state
= tevent_req_data(
1193 req
, struct push_database_new_state
);
1197 status
= ctdb_client_message_multi_recv(subreq
, &ret
, NULL
, NULL
);
1198 TALLOC_FREE(subreq
);
1200 D_ERR("Sending recovery records failed for %s\n",
1201 recdb_name(state
->recdb
));
1202 tevent_req_error(req
, ret
);
1206 state
->num_buffers_sent
+= 1;
1208 push_database_new_send_msg(req
);
1211 static void push_database_new_confirmed(struct tevent_req
*subreq
)
1213 struct tevent_req
*req
= tevent_req_callback_data(
1214 subreq
, struct tevent_req
);
1215 struct push_database_new_state
*state
= tevent_req_data(
1216 req
, struct push_database_new_state
);
1217 struct ctdb_reply_control
**reply
;
1222 uint32_t num_records
;
1224 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
1226 TALLOC_FREE(subreq
);
1231 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1232 state
->count
, err_list
,
1235 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1236 " on node %u, ret=%d\n",
1237 recdb_name(state
->recdb
), pnn
, ret2
);
1239 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1241 recdb_name(state
->recdb
), ret
);
1243 tevent_req_error(req
, ret
);
1247 for (i
=0; i
<state
->count
; i
++) {
1248 ret
= ctdb_reply_control_db_push_confirm(reply
[i
],
1251 tevent_req_error(req
, EPROTO
);
1255 if (num_records
!= state
->num_records
) {
1256 D_ERR("Node %u received %d of %d records for %s\n",
1257 state
->pnn_list
[i
], num_records
,
1258 state
->num_records
, recdb_name(state
->recdb
));
1259 tevent_req_error(req
, EPROTO
);
1266 D_INFO("Pushed %d records for db %s\n",
1267 state
->num_records
, recdb_name(state
->recdb
));
1269 tevent_req_done(req
);
1272 static bool push_database_new_recv(struct tevent_req
*req
, int *perr
)
1274 return generic_recv(req
, perr
);
1278 * wrapper for push_database_old and push_database_new
1281 struct push_database_state
{
1282 bool old_done
, new_done
;
1285 static void push_database_old_done(struct tevent_req
*subreq
);
1286 static void push_database_new_done(struct tevent_req
*subreq
);
1288 static struct tevent_req
*push_database_send(
1289 TALLOC_CTX
*mem_ctx
,
1290 struct tevent_context
*ev
,
1291 struct ctdb_client_context
*client
,
1292 struct node_list
*nlist
,
1293 struct ctdb_tunable_list
*tun_list
,
1294 struct recdb_context
*recdb
)
1296 struct tevent_req
*req
, *subreq
;
1297 struct push_database_state
*state
;
1298 uint32_t *old_list
, *new_list
;
1299 unsigned int old_count
, new_count
;
1302 req
= tevent_req_create(mem_ctx
, &state
, struct push_database_state
);
1307 state
->old_done
= false;
1308 state
->new_done
= false;
1312 old_list
= talloc_array(state
, uint32_t, nlist
->count
);
1313 new_list
= talloc_array(state
, uint32_t, nlist
->count
);
1314 if (tevent_req_nomem(old_list
, req
) ||
1315 tevent_req_nomem(new_list
,req
)) {
1316 return tevent_req_post(req
, ev
);
1319 for (i
=0; i
<nlist
->count
; i
++) {
1320 if (nlist
->caps
[i
] & CTDB_CAP_FRAGMENTED_CONTROLS
) {
1321 new_list
[new_count
] = nlist
->pnn_list
[i
];
1324 old_list
[old_count
] = nlist
->pnn_list
[i
];
1329 if (old_count
> 0) {
1330 subreq
= push_database_old_send(state
, ev
, client
,
1331 old_list
, old_count
, recdb
);
1332 if (tevent_req_nomem(subreq
, req
)) {
1333 return tevent_req_post(req
, ev
);
1335 tevent_req_set_callback(subreq
, push_database_old_done
, req
);
1337 state
->old_done
= true;
1340 if (new_count
> 0) {
1341 subreq
= push_database_new_send(state
, ev
, client
,
1342 new_list
, new_count
, recdb
,
1343 tun_list
->rec_buffer_size_limit
);
1344 if (tevent_req_nomem(subreq
, req
)) {
1345 return tevent_req_post(req
, ev
);
1347 tevent_req_set_callback(subreq
, push_database_new_done
, req
);
1349 state
->new_done
= true;
1355 static void push_database_old_done(struct tevent_req
*subreq
)
1357 struct tevent_req
*req
= tevent_req_callback_data(
1358 subreq
, struct tevent_req
);
1359 struct push_database_state
*state
= tevent_req_data(
1360 req
, struct push_database_state
);
1364 status
= push_database_old_recv(subreq
, &ret
);
1366 tevent_req_error(req
, ret
);
1370 state
->old_done
= true;
1372 if (state
->old_done
&& state
->new_done
) {
1373 tevent_req_done(req
);
1377 static void push_database_new_done(struct tevent_req
*subreq
)
1379 struct tevent_req
*req
= tevent_req_callback_data(
1380 subreq
, struct tevent_req
);
1381 struct push_database_state
*state
= tevent_req_data(
1382 req
, struct push_database_state
);
1386 status
= push_database_new_recv(subreq
, &ret
);
1388 tevent_req_error(req
, ret
);
1392 state
->new_done
= true;
1394 if (state
->old_done
&& state
->new_done
) {
1395 tevent_req_done(req
);
1399 static bool push_database_recv(struct tevent_req
*req
, int *perr
)
1401 return generic_recv(req
, perr
);
1405 * Collect databases using highest sequence number
1408 struct collect_highseqnum_db_state
{
1409 struct tevent_context
*ev
;
1410 struct ctdb_client_context
*client
;
1411 struct node_list
*nlist
;
1413 struct recdb_context
*recdb
;
1418 static void collect_highseqnum_db_seqnum_done(struct tevent_req
*subreq
);
1419 static void collect_highseqnum_db_pulldb_done(struct tevent_req
*subreq
);
1421 static struct tevent_req
*collect_highseqnum_db_send(
1422 TALLOC_CTX
*mem_ctx
,
1423 struct tevent_context
*ev
,
1424 struct ctdb_client_context
*client
,
1425 struct node_list
*nlist
,
1427 struct recdb_context
*recdb
)
1429 struct tevent_req
*req
, *subreq
;
1430 struct collect_highseqnum_db_state
*state
;
1431 struct ctdb_req_control request
;
1433 req
= tevent_req_create(mem_ctx
, &state
,
1434 struct collect_highseqnum_db_state
);
1440 state
->client
= client
;
1441 state
->nlist
= nlist
;
1442 state
->db_id
= db_id
;
1443 state
->recdb
= recdb
;
1445 ctdb_req_control_get_db_seqnum(&request
, db_id
);
1446 subreq
= ctdb_client_control_multi_send(mem_ctx
,
1453 if (tevent_req_nomem(subreq
, req
)) {
1454 return tevent_req_post(req
, ev
);
1456 tevent_req_set_callback(subreq
, collect_highseqnum_db_seqnum_done
,
1462 static void collect_highseqnum_db_seqnum_done(struct tevent_req
*subreq
)
1464 struct tevent_req
*req
= tevent_req_callback_data(
1465 subreq
, struct tevent_req
);
1466 struct collect_highseqnum_db_state
*state
= tevent_req_data(
1467 req
, struct collect_highseqnum_db_state
);
1468 struct ctdb_reply_control
**reply
;
1473 uint64_t seqnum
, max_seqnum
;
1476 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
1478 TALLOC_FREE(subreq
);
1483 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
1484 state
->nlist
->count
,
1488 D_ERR("control GET_DB_SEQNUM failed for db %s"
1489 " on node %u, ret=%d\n",
1490 recdb_name(state
->recdb
), pnn
, ret2
);
1492 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1494 recdb_name(state
->recdb
), ret
);
1496 tevent_req_error(req
, ret
);
1501 state
->max_pnn
= state
->nlist
->pnn_list
[0];
1502 max_caps
= state
->nlist
->caps
[0];
1503 for (i
=0; i
<state
->nlist
->count
; i
++) {
1504 ret
= ctdb_reply_control_get_db_seqnum(reply
[i
], &seqnum
);
1506 tevent_req_error(req
, EPROTO
);
1510 if (max_seqnum
< seqnum
) {
1511 max_seqnum
= seqnum
;
1512 state
->max_pnn
= state
->nlist
->pnn_list
[i
];
1513 max_caps
= state
->nlist
->caps
[i
];
1519 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64
"\n",
1520 recdb_name(state
->recdb
), state
->max_pnn
, max_seqnum
);
1522 subreq
= pull_database_send(state
,
1528 if (tevent_req_nomem(subreq
, req
)) {
1531 tevent_req_set_callback(subreq
, collect_highseqnum_db_pulldb_done
,
1535 static void collect_highseqnum_db_pulldb_done(struct tevent_req
*subreq
)
1537 struct tevent_req
*req
= tevent_req_callback_data(
1538 subreq
, struct tevent_req
);
1539 struct collect_highseqnum_db_state
*state
= tevent_req_data(
1540 req
, struct collect_highseqnum_db_state
);
1544 status
= pull_database_recv(subreq
, &ret
);
1545 TALLOC_FREE(subreq
);
1547 node_list_ban_credits(state
->nlist
, state
->max_pnn
);
1548 tevent_req_error(req
, ret
);
1552 tevent_req_done(req
);
1555 static bool collect_highseqnum_db_recv(struct tevent_req
*req
, int *perr
)
1557 return generic_recv(req
, perr
);
1561 * Collect all databases
1564 struct collect_all_db_state
{
1565 struct tevent_context
*ev
;
1566 struct ctdb_client_context
*client
;
1567 struct node_list
*nlist
;
1569 struct recdb_context
*recdb
;
1571 struct ctdb_pulldb pulldb
;
1575 static void collect_all_db_pulldb_done(struct tevent_req
*subreq
);
1577 static struct tevent_req
*collect_all_db_send(
1578 TALLOC_CTX
*mem_ctx
,
1579 struct tevent_context
*ev
,
1580 struct ctdb_client_context
*client
,
1581 struct node_list
*nlist
,
1583 struct recdb_context
*recdb
)
1585 struct tevent_req
*req
, *subreq
;
1586 struct collect_all_db_state
*state
;
1588 req
= tevent_req_create(mem_ctx
, &state
,
1589 struct collect_all_db_state
);
1595 state
->client
= client
;
1596 state
->nlist
= nlist
;
1597 state
->db_id
= db_id
;
1598 state
->recdb
= recdb
;
1601 subreq
= pull_database_send(state
,
1604 nlist
->pnn_list
[state
->index
],
1605 nlist
->caps
[state
->index
],
1607 if (tevent_req_nomem(subreq
, req
)) {
1608 return tevent_req_post(req
, ev
);
1610 tevent_req_set_callback(subreq
, collect_all_db_pulldb_done
, req
);
1615 static void collect_all_db_pulldb_done(struct tevent_req
*subreq
)
1617 struct tevent_req
*req
= tevent_req_callback_data(
1618 subreq
, struct tevent_req
);
1619 struct collect_all_db_state
*state
= tevent_req_data(
1620 req
, struct collect_all_db_state
);
1624 status
= pull_database_recv(subreq
, &ret
);
1625 TALLOC_FREE(subreq
);
1627 node_list_ban_credits(state
->nlist
,
1628 state
->nlist
->pnn_list
[state
->index
]);
1629 tevent_req_error(req
, ret
);
1634 if (state
->index
== state
->nlist
->count
) {
1635 tevent_req_done(req
);
1639 subreq
= pull_database_send(state
,
1642 state
->nlist
->pnn_list
[state
->index
],
1643 state
->nlist
->caps
[state
->index
],
1645 if (tevent_req_nomem(subreq
, req
)) {
1648 tevent_req_set_callback(subreq
, collect_all_db_pulldb_done
, req
);
1651 static bool collect_all_db_recv(struct tevent_req
*req
, int *perr
)
1653 return generic_recv(req
, perr
);
1658 * For each database do the following:
1661 * - Freeze database on all nodes
1662 * - Start transaction on all nodes
1663 * - Collect database from all nodes
1664 * - Wipe database on all nodes
1665 * - Push database to all nodes
1666 * - Commit transaction on all nodes
1667 * - Thaw database on all nodes
1670 struct recover_db_state
{
1671 struct tevent_context
*ev
;
1672 struct ctdb_client_context
*client
;
1673 struct ctdb_tunable_list
*tun_list
;
1674 struct node_list
*nlist
;
1679 struct ctdb_transdb transdb
;
1681 const char *db_name
, *db_path
;
1682 struct recdb_context
*recdb
;
1685 static void recover_db_name_done(struct tevent_req
*subreq
);
1686 static void recover_db_path_done(struct tevent_req
*subreq
);
1687 static void recover_db_freeze_done(struct tevent_req
*subreq
);
1688 static void recover_db_transaction_started(struct tevent_req
*subreq
);
1689 static void recover_db_collect_done(struct tevent_req
*subreq
);
1690 static void recover_db_wipedb_done(struct tevent_req
*subreq
);
1691 static void recover_db_pushdb_done(struct tevent_req
*subreq
);
1692 static void recover_db_transaction_committed(struct tevent_req
*subreq
);
1693 static void recover_db_thaw_done(struct tevent_req
*subreq
);
1695 static struct tevent_req
*recover_db_send(TALLOC_CTX
*mem_ctx
,
1696 struct tevent_context
*ev
,
1697 struct ctdb_client_context
*client
,
1698 struct ctdb_tunable_list
*tun_list
,
1699 struct node_list
*nlist
,
1700 uint32_t generation
,
1704 struct tevent_req
*req
, *subreq
;
1705 struct recover_db_state
*state
;
1706 struct ctdb_req_control request
;
1708 req
= tevent_req_create(mem_ctx
, &state
, struct recover_db_state
);
1714 state
->client
= client
;
1715 state
->tun_list
= tun_list
;
1716 state
->nlist
= nlist
;
1717 state
->db_id
= db_id
;
1718 state
->db_flags
= db_flags
;
1720 state
->destnode
= ctdb_client_pnn(client
);
1721 state
->transdb
.db_id
= db_id
;
1722 state
->transdb
.tid
= generation
;
1724 ctdb_req_control_get_dbname(&request
, db_id
);
1725 subreq
= ctdb_client_control_send(state
, ev
, client
, state
->destnode
,
1726 TIMEOUT(), &request
);
1727 if (tevent_req_nomem(subreq
, req
)) {
1728 return tevent_req_post(req
, ev
);
1730 tevent_req_set_callback(subreq
, recover_db_name_done
, req
);
1735 static void recover_db_name_done(struct tevent_req
*subreq
)
1737 struct tevent_req
*req
= tevent_req_callback_data(
1738 subreq
, struct tevent_req
);
1739 struct recover_db_state
*state
= tevent_req_data(
1740 req
, struct recover_db_state
);
1741 struct ctdb_reply_control
*reply
;
1742 struct ctdb_req_control request
;
1746 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1747 TALLOC_FREE(subreq
);
1749 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1751 tevent_req_error(req
, ret
);
1755 ret
= ctdb_reply_control_get_dbname(reply
, state
, &state
->db_name
);
1757 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1759 tevent_req_error(req
, EPROTO
);
1765 ctdb_req_control_getdbpath(&request
, state
->db_id
);
1766 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
1767 state
->destnode
, TIMEOUT(),
1769 if (tevent_req_nomem(subreq
, req
)) {
1772 tevent_req_set_callback(subreq
, recover_db_path_done
, req
);
1775 static void recover_db_path_done(struct tevent_req
*subreq
)
1777 struct tevent_req
*req
= tevent_req_callback_data(
1778 subreq
, struct tevent_req
);
1779 struct recover_db_state
*state
= tevent_req_data(
1780 req
, struct recover_db_state
);
1781 struct ctdb_reply_control
*reply
;
1782 struct ctdb_req_control request
;
1786 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1787 TALLOC_FREE(subreq
);
1789 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1790 state
->db_name
, ret
);
1791 tevent_req_error(req
, ret
);
1795 ret
= ctdb_reply_control_getdbpath(reply
, state
, &state
->db_path
);
1797 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1798 state
->db_name
, ret
);
1799 tevent_req_error(req
, EPROTO
);
1805 ctdb_req_control_db_freeze(&request
, state
->db_id
);
1806 subreq
= ctdb_client_control_multi_send(state
,
1809 state
->nlist
->pnn_list
,
1810 state
->nlist
->count
,
1813 if (tevent_req_nomem(subreq
, req
)) {
1816 tevent_req_set_callback(subreq
, recover_db_freeze_done
, req
);
1819 static void recover_db_freeze_done(struct tevent_req
*subreq
)
1821 struct tevent_req
*req
= tevent_req_callback_data(
1822 subreq
, struct tevent_req
);
1823 struct recover_db_state
*state
= tevent_req_data(
1824 req
, struct recover_db_state
);
1825 struct ctdb_req_control request
;
1830 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1832 TALLOC_FREE(subreq
);
1837 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
1838 state
->nlist
->count
,
1842 D_ERR("control FREEZE_DB failed for db %s"
1843 " on node %u, ret=%d\n",
1844 state
->db_name
, pnn
, ret2
);
1846 node_list_ban_credits(state
->nlist
, pnn
);
1848 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1849 state
->db_name
, ret
);
1851 tevent_req_error(req
, ret
);
1855 ctdb_req_control_db_transaction_start(&request
, &state
->transdb
);
1856 subreq
= ctdb_client_control_multi_send(state
,
1859 state
->nlist
->pnn_list
,
1860 state
->nlist
->count
,
1863 if (tevent_req_nomem(subreq
, req
)) {
1866 tevent_req_set_callback(subreq
, recover_db_transaction_started
, req
);
1869 static void recover_db_transaction_started(struct tevent_req
*subreq
)
1871 struct tevent_req
*req
= tevent_req_callback_data(
1872 subreq
, struct tevent_req
);
1873 struct recover_db_state
*state
= tevent_req_data(
1874 req
, struct recover_db_state
);
1879 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1881 TALLOC_FREE(subreq
);
1886 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
1887 state
->nlist
->count
,
1891 D_ERR("control TRANSACTION_DB failed for db=%s"
1892 " on node %u, ret=%d\n",
1893 state
->db_name
, pnn
, ret2
);
1895 D_ERR("control TRANSACTION_DB failed for db=%s,"
1896 " ret=%d\n", state
->db_name
, ret
);
1898 tevent_req_error(req
, ret
);
1902 state
->recdb
= recdb_create(state
, state
->db_id
, state
->db_name
,
1904 state
->tun_list
->database_hash_size
,
1905 state
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
);
1906 if (tevent_req_nomem(state
->recdb
, req
)) {
1910 if ((state
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
) ||
1911 (state
->db_flags
& CTDB_DB_FLAGS_REPLICATED
)) {
1912 subreq
= collect_highseqnum_db_send(state
,
1919 subreq
= collect_all_db_send(state
,
1926 if (tevent_req_nomem(subreq
, req
)) {
1929 tevent_req_set_callback(subreq
, recover_db_collect_done
, req
);
1932 static void recover_db_collect_done(struct tevent_req
*subreq
)
1934 struct tevent_req
*req
= tevent_req_callback_data(
1935 subreq
, struct tevent_req
);
1936 struct recover_db_state
*state
= tevent_req_data(
1937 req
, struct recover_db_state
);
1938 struct ctdb_req_control request
;
1942 if ((state
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
) ||
1943 (state
->db_flags
& CTDB_DB_FLAGS_REPLICATED
)) {
1944 status
= collect_highseqnum_db_recv(subreq
, &ret
);
1946 status
= collect_all_db_recv(subreq
, &ret
);
1948 TALLOC_FREE(subreq
);
1950 tevent_req_error(req
, ret
);
1954 ctdb_req_control_wipe_database(&request
, &state
->transdb
);
1955 subreq
= ctdb_client_control_multi_send(state
,
1958 state
->nlist
->pnn_list
,
1959 state
->nlist
->count
,
1962 if (tevent_req_nomem(subreq
, req
)) {
1965 tevent_req_set_callback(subreq
, recover_db_wipedb_done
, req
);
1968 static void recover_db_wipedb_done(struct tevent_req
*subreq
)
1970 struct tevent_req
*req
= tevent_req_callback_data(
1971 subreq
, struct tevent_req
);
1972 struct recover_db_state
*state
= tevent_req_data(
1973 req
, struct recover_db_state
);
1978 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1980 TALLOC_FREE(subreq
);
1985 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
1986 state
->nlist
->count
,
1990 D_ERR("control WIPEDB failed for db %s on node %u,"
1991 " ret=%d\n", state
->db_name
, pnn
, ret2
);
1993 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1994 state
->db_name
, ret
);
1996 tevent_req_error(req
, ret
);
2000 subreq
= push_database_send(state
,
2006 if (tevent_req_nomem(subreq
, req
)) {
2009 tevent_req_set_callback(subreq
, recover_db_pushdb_done
, req
);
2012 static void recover_db_pushdb_done(struct tevent_req
*subreq
)
2014 struct tevent_req
*req
= tevent_req_callback_data(
2015 subreq
, struct tevent_req
);
2016 struct recover_db_state
*state
= tevent_req_data(
2017 req
, struct recover_db_state
);
2018 struct ctdb_req_control request
;
2022 status
= push_database_recv(subreq
, &ret
);
2023 TALLOC_FREE(subreq
);
2025 tevent_req_error(req
, ret
);
2029 TALLOC_FREE(state
->recdb
);
2031 ctdb_req_control_db_transaction_commit(&request
, &state
->transdb
);
2032 subreq
= ctdb_client_control_multi_send(state
,
2035 state
->nlist
->pnn_list
,
2036 state
->nlist
->count
,
2039 if (tevent_req_nomem(subreq
, req
)) {
2042 tevent_req_set_callback(subreq
, recover_db_transaction_committed
, req
);
2045 static void recover_db_transaction_committed(struct tevent_req
*subreq
)
2047 struct tevent_req
*req
= tevent_req_callback_data(
2048 subreq
, struct tevent_req
);
2049 struct recover_db_state
*state
= tevent_req_data(
2050 req
, struct recover_db_state
);
2051 struct ctdb_req_control request
;
2056 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2058 TALLOC_FREE(subreq
);
2063 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
2064 state
->nlist
->count
,
2068 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
2069 " on node %u, ret=%d\n",
2070 state
->db_name
, pnn
, ret2
);
2072 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
2073 " ret=%d\n", state
->db_name
, ret
);
2075 tevent_req_error(req
, ret
);
2079 ctdb_req_control_db_thaw(&request
, state
->db_id
);
2080 subreq
= ctdb_client_control_multi_send(state
,
2083 state
->nlist
->pnn_list
,
2084 state
->nlist
->count
,
2087 if (tevent_req_nomem(subreq
, req
)) {
2090 tevent_req_set_callback(subreq
, recover_db_thaw_done
, req
);
2093 static void recover_db_thaw_done(struct tevent_req
*subreq
)
2095 struct tevent_req
*req
= tevent_req_callback_data(
2096 subreq
, struct tevent_req
);
2097 struct recover_db_state
*state
= tevent_req_data(
2098 req
, struct recover_db_state
);
2103 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2105 TALLOC_FREE(subreq
);
2110 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
2111 state
->nlist
->count
,
2115 D_ERR("control DB_THAW failed for db %s on node %u,"
2116 " ret=%d\n", state
->db_name
, pnn
, ret2
);
2118 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
2119 state
->db_name
, ret
);
2121 tevent_req_error(req
, ret
);
2125 tevent_req_done(req
);
2128 static bool recover_db_recv(struct tevent_req
*req
)
2130 return generic_recv(req
, NULL
);
2135 * Start database recovery for each database
2137 * Try to recover each database 5 times before failing recovery.
2140 struct db_recovery_state
{
2141 struct tevent_context
*ev
;
2142 struct db_list
*dblist
;
2143 unsigned int num_replies
;
2144 unsigned int num_failed
;
2147 struct db_recovery_one_state
{
2148 struct tevent_req
*req
;
2149 struct ctdb_client_context
*client
;
2150 struct db_list
*dblist
;
2151 struct ctdb_tunable_list
*tun_list
;
2152 struct node_list
*nlist
;
2153 uint32_t generation
;
2159 static void db_recovery_one_done(struct tevent_req
*subreq
);
2161 static struct tevent_req
*db_recovery_send(TALLOC_CTX
*mem_ctx
,
2162 struct tevent_context
*ev
,
2163 struct ctdb_client_context
*client
,
2164 struct db_list
*dblist
,
2165 struct ctdb_tunable_list
*tun_list
,
2166 struct node_list
*nlist
,
2167 uint32_t generation
)
2169 struct tevent_req
*req
, *subreq
;
2170 struct db_recovery_state
*state
;
2173 req
= tevent_req_create(mem_ctx
, &state
, struct db_recovery_state
);
2179 state
->dblist
= dblist
;
2180 state
->num_replies
= 0;
2181 state
->num_failed
= 0;
2183 if (dblist
->num_dbs
== 0) {
2184 tevent_req_done(req
);
2185 return tevent_req_post(req
, ev
);
2188 for (db
= dblist
->db
; db
!= NULL
; db
= db
->next
) {
2189 struct db_recovery_one_state
*substate
;
2191 substate
= talloc_zero(state
, struct db_recovery_one_state
);
2192 if (tevent_req_nomem(substate
, req
)) {
2193 return tevent_req_post(req
, ev
);
2196 substate
->req
= req
;
2197 substate
->client
= client
;
2198 substate
->dblist
= dblist
;
2199 substate
->tun_list
= tun_list
;
2200 substate
->nlist
= nlist
;
2201 substate
->generation
= generation
;
2202 substate
->db_id
= db
->db_id
;
2203 substate
->db_flags
= db
->db_flags
;
2205 subreq
= recover_db_send(state
,
2212 substate
->db_flags
);
2213 if (tevent_req_nomem(subreq
, req
)) {
2214 return tevent_req_post(req
, ev
);
2216 tevent_req_set_callback(subreq
, db_recovery_one_done
,
2218 D_NOTICE("recover database 0x%08x\n", substate
->db_id
);
2224 static void db_recovery_one_done(struct tevent_req
*subreq
)
2226 struct db_recovery_one_state
*substate
= tevent_req_callback_data(
2227 subreq
, struct db_recovery_one_state
);
2228 struct tevent_req
*req
= substate
->req
;
2229 struct db_recovery_state
*state
= tevent_req_data(
2230 req
, struct db_recovery_state
);
2233 status
= recover_db_recv(subreq
);
2234 TALLOC_FREE(subreq
);
2237 talloc_free(substate
);
2241 substate
->num_fails
+= 1;
2242 if (substate
->num_fails
< NUM_RETRIES
) {
2243 subreq
= recover_db_send(state
,
2248 substate
->generation
,
2250 substate
->db_flags
);
2251 if (tevent_req_nomem(subreq
, req
)) {
2254 tevent_req_set_callback(subreq
, db_recovery_one_done
, substate
);
2255 D_NOTICE("recover database 0x%08x, attempt %d\n",
2256 substate
->db_id
, substate
->num_fails
+1);
2261 state
->num_failed
+= 1;
2264 state
->num_replies
+= 1;
2266 if (state
->num_replies
== state
->dblist
->num_dbs
) {
2267 tevent_req_done(req
);
2271 static bool db_recovery_recv(struct tevent_req
*req
, unsigned int *count
)
2273 struct db_recovery_state
*state
= tevent_req_data(
2274 req
, struct db_recovery_state
);
2277 if (tevent_req_is_unix_error(req
, &err
)) {
2282 *count
= state
->num_replies
- state
->num_failed
;
2284 if (state
->num_failed
> 0) {
2291 struct ban_node_state
{
2292 struct tevent_context
*ev
;
2293 struct ctdb_client_context
*client
;
2294 struct ctdb_tunable_list
*tun_list
;
2295 struct node_list
*nlist
;
2301 static bool ban_node_check(struct tevent_req
*req
);
2302 static void ban_node_check_done(struct tevent_req
*subreq
);
2303 static void ban_node_done(struct tevent_req
*subreq
);
2305 static struct tevent_req
*ban_node_send(TALLOC_CTX
*mem_ctx
,
2306 struct tevent_context
*ev
,
2307 struct ctdb_client_context
*client
,
2308 struct ctdb_tunable_list
*tun_list
,
2309 struct node_list
*nlist
)
2311 struct tevent_req
*req
;
2312 struct ban_node_state
*state
;
2315 req
= tevent_req_create(mem_ctx
, &state
, struct ban_node_state
);
2321 state
->client
= client
;
2322 state
->tun_list
= tun_list
;
2323 state
->nlist
= nlist
;
2324 state
->destnode
= ctdb_client_pnn(client
);
2326 /* Bans are not enabled */
2327 if (state
->tun_list
->enable_bans
== 0) {
2328 D_ERR("Bans are not enabled\n");
2329 tevent_req_done(req
);
2330 return tevent_req_post(req
, ev
);
2333 ok
= ban_node_check(req
);
2335 return tevent_req_post(req
, ev
);
2341 static bool ban_node_check(struct tevent_req
*req
)
2343 struct tevent_req
*subreq
;
2344 struct ban_node_state
*state
= tevent_req_data(
2345 req
, struct ban_node_state
);
2346 struct ctdb_req_control request
;
2347 unsigned max_credits
= 0, i
;
2349 for (i
=0; i
<state
->nlist
->count
; i
++) {
2350 if (state
->nlist
->ban_credits
[i
] > max_credits
) {
2351 state
->max_pnn
= state
->nlist
->pnn_list
[i
];
2352 max_credits
= state
->nlist
->ban_credits
[i
];
2356 if (max_credits
< NUM_RETRIES
) {
2357 tevent_req_done(req
);
2361 ctdb_req_control_get_nodemap(&request
);
2362 subreq
= ctdb_client_control_send(state
,
2368 if (tevent_req_nomem(subreq
, req
)) {
2371 tevent_req_set_callback(subreq
, ban_node_check_done
, req
);
2376 static void ban_node_check_done(struct tevent_req
*subreq
)
2378 struct tevent_req
*req
= tevent_req_callback_data(
2379 subreq
, struct tevent_req
);
2380 struct ban_node_state
*state
= tevent_req_data(
2381 req
, struct ban_node_state
);
2382 struct ctdb_reply_control
*reply
;
2383 struct ctdb_node_map
*nodemap
;
2384 struct ctdb_req_control request
;
2385 struct ctdb_ban_state ban
;
2390 ok
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2391 TALLOC_FREE(subreq
);
2393 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2394 state
->max_pnn
, ret
);
2395 tevent_req_error(req
, ret
);
2399 ret
= ctdb_reply_control_get_nodemap(reply
, state
, &nodemap
);
2401 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret
);
2402 tevent_req_error(req
, ret
);
2406 for (i
=0; i
<nodemap
->num
; i
++) {
2407 if (nodemap
->node
[i
].pnn
!= state
->max_pnn
) {
2411 /* If the node became inactive, reset ban_credits */
2412 if (nodemap
->node
[i
].flags
& NODE_FLAGS_INACTIVE
) {
2415 for (j
=0; j
<state
->nlist
->count
; j
++) {
2416 if (state
->nlist
->pnn_list
[j
] ==
2418 state
->nlist
->ban_credits
[j
] = 0;
2422 state
->max_pnn
= CTDB_UNKNOWN_PNN
;
2426 talloc_free(nodemap
);
2429 /* If node becames inactive during recovery, pick next */
2430 if (state
->max_pnn
== CTDB_UNKNOWN_PNN
) {
2431 (void) ban_node_check(req
);
2435 ban
= (struct ctdb_ban_state
) {
2436 .pnn
= state
->max_pnn
,
2437 .time
= state
->tun_list
->recovery_ban_period
,
2440 D_ERR("Banning node %u for %u seconds\n", ban
.pnn
, ban
.time
);
2442 ctdb_req_control_set_ban_state(&request
, &ban
);
2443 subreq
= ctdb_client_control_send(state
,
2449 if (tevent_req_nomem(subreq
, req
)) {
2452 tevent_req_set_callback(subreq
, ban_node_done
, req
);
2455 static void ban_node_done(struct tevent_req
*subreq
)
2457 struct tevent_req
*req
= tevent_req_callback_data(
2458 subreq
, struct tevent_req
);
2459 struct node_ban_state
*state
= tevent_req_data(
2460 req
, struct node_ban_state
);
2461 struct ctdb_reply_control
*reply
;
2465 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2466 TALLOC_FREE(subreq
);
2468 tevent_req_error(req
, ret
);
2472 ret
= ctdb_reply_control_set_ban_state(reply
);
2474 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret
);
2475 tevent_req_error(req
, ret
);
2480 tevent_req_done(req
);
2483 static bool ban_node_recv(struct tevent_req
*req
, int *perr
)
2485 if (tevent_req_is_unix_error(req
, perr
)) {
2493 * Run the parallel database recovery
2496 * - Get nodemap from all nodes
2497 * - Get capabilities from all nodes
2499 * - Set RECOVERY_ACTIVE
2500 * - Send START_RECOVERY
2501 * - Update vnnmap on all nodes
2502 * - Run database recovery
2503 * - Set RECOVERY_NORMAL
2504 * - Send END_RECOVERY
2507 struct recovery_state
{
2508 struct tevent_context
*ev
;
2509 struct ctdb_client_context
*client
;
2510 uint32_t generation
;
2512 struct node_list
*nlist
;
2513 struct ctdb_tunable_list
*tun_list
;
2514 struct ctdb_vnn_map
*vnnmap
;
2515 struct db_list
*dblist
;
2518 static void recovery_tunables_done(struct tevent_req
*subreq
);
2519 static void recovery_nodemap_done(struct tevent_req
*subreq
);
2520 static void recovery_nodemap_verify(struct tevent_req
*subreq
);
2521 static void recovery_capabilities_done(struct tevent_req
*subreq
);
2522 static void recovery_dbmap_done(struct tevent_req
*subreq
);
2523 static void recovery_active_done(struct tevent_req
*subreq
);
2524 static void recovery_start_recovery_done(struct tevent_req
*subreq
);
2525 static void recovery_vnnmap_update_done(struct tevent_req
*subreq
);
2526 static void recovery_db_recovery_done(struct tevent_req
*subreq
);
2527 static void recovery_failed_done(struct tevent_req
*subreq
);
2528 static void recovery_normal_done(struct tevent_req
*subreq
);
2529 static void recovery_end_recovery_done(struct tevent_req
*subreq
);
2531 static struct tevent_req
*recovery_send(TALLOC_CTX
*mem_ctx
,
2532 struct tevent_context
*ev
,
2533 struct ctdb_client_context
*client
,
2534 uint32_t generation
)
2536 struct tevent_req
*req
, *subreq
;
2537 struct recovery_state
*state
;
2538 struct ctdb_req_control request
;
2540 req
= tevent_req_create(mem_ctx
, &state
, struct recovery_state
);
2546 state
->client
= client
;
2547 state
->generation
= generation
;
2548 state
->destnode
= ctdb_client_pnn(client
);
2550 ctdb_req_control_get_all_tunables(&request
);
2551 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2552 state
->destnode
, TIMEOUT(),
2554 if (tevent_req_nomem(subreq
, req
)) {
2555 return tevent_req_post(req
, ev
);
2557 tevent_req_set_callback(subreq
, recovery_tunables_done
, req
);
2562 static void recovery_tunables_done(struct tevent_req
*subreq
)
2564 struct tevent_req
*req
= tevent_req_callback_data(
2565 subreq
, struct tevent_req
);
2566 struct recovery_state
*state
= tevent_req_data(
2567 req
, struct recovery_state
);
2568 struct ctdb_reply_control
*reply
;
2569 struct ctdb_req_control request
;
2573 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2574 TALLOC_FREE(subreq
);
2576 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret
);
2577 tevent_req_error(req
, ret
);
2581 ret
= ctdb_reply_control_get_all_tunables(reply
, state
,
2584 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret
);
2585 tevent_req_error(req
, EPROTO
);
2591 recover_timeout
= state
->tun_list
->recover_timeout
;
2593 ctdb_req_control_get_nodemap(&request
);
2594 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2595 state
->destnode
, TIMEOUT(),
2597 if (tevent_req_nomem(subreq
, req
)) {
2600 tevent_req_set_callback(subreq
, recovery_nodemap_done
, req
);
2603 static void recovery_nodemap_done(struct tevent_req
*subreq
)
2605 struct tevent_req
*req
= tevent_req_callback_data(
2606 subreq
, struct tevent_req
);
2607 struct recovery_state
*state
= tevent_req_data(
2608 req
, struct recovery_state
);
2609 struct ctdb_reply_control
*reply
;
2610 struct ctdb_req_control request
;
2611 struct ctdb_node_map
*nodemap
;
2616 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2617 TALLOC_FREE(subreq
);
2619 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2620 state
->destnode
, ret
);
2621 tevent_req_error(req
, ret
);
2625 ret
= ctdb_reply_control_get_nodemap(reply
, state
, &nodemap
);
2627 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret
);
2628 tevent_req_error(req
, ret
);
2632 state
->nlist
= node_list_init(state
, nodemap
->num
);
2633 if (tevent_req_nomem(state
->nlist
, req
)) {
2637 for (i
=0; i
<nodemap
->num
; i
++) {
2640 if (nodemap
->node
[i
].flags
& NODE_FLAGS_DISCONNECTED
) {
2644 ok
= node_list_add(state
->nlist
, nodemap
->node
[i
].pnn
);
2646 tevent_req_error(req
, EINVAL
);
2651 talloc_free(nodemap
);
2654 /* Verify flags by getting local node information from each node */
2655 ctdb_req_control_get_nodemap(&request
);
2656 subreq
= ctdb_client_control_multi_send(state
,
2659 state
->nlist
->pnn_list
,
2660 state
->nlist
->count
,
2663 if (tevent_req_nomem(subreq
, req
)) {
2666 tevent_req_set_callback(subreq
, recovery_nodemap_verify
, req
);
2669 static void recovery_nodemap_verify(struct tevent_req
*subreq
)
2671 struct tevent_req
*req
= tevent_req_callback_data(
2672 subreq
, struct tevent_req
);
2673 struct recovery_state
*state
= tevent_req_data(
2674 req
, struct recovery_state
);
2675 struct ctdb_req_control request
;
2676 struct ctdb_reply_control
**reply
;
2677 struct node_list
*nlist
;
2683 status
= ctdb_client_control_multi_recv(subreq
,
2688 TALLOC_FREE(subreq
);
2693 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
2694 state
->nlist
->count
,
2698 D_ERR("control GET_NODEMAP failed on node %u,"
2699 " ret=%d\n", pnn
, ret2
);
2701 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret
);
2703 tevent_req_error(req
, ret
);
2707 nlist
= node_list_init(state
, state
->nlist
->size
);
2708 if (tevent_req_nomem(nlist
, req
)) {
2712 for (i
=0; i
<state
->nlist
->count
; i
++) {
2713 struct ctdb_node_map
*nodemap
= NULL
;
2714 uint32_t pnn
, flags
;
2718 pnn
= state
->nlist
->pnn_list
[i
];
2719 ret
= ctdb_reply_control_get_nodemap(reply
[i
],
2723 D_ERR("control GET_NODEMAP failed on node %u\n", pnn
);
2724 tevent_req_error(req
, EPROTO
);
2728 flags
= NODE_FLAGS_DISCONNECTED
;
2729 for (j
=0; j
<nodemap
->num
; j
++) {
2730 if (nodemap
->node
[j
].pnn
== pnn
) {
2731 flags
= nodemap
->node
[j
].flags
;
2736 TALLOC_FREE(nodemap
);
2738 if (flags
& NODE_FLAGS_INACTIVE
) {
2742 ok
= node_list_add(nlist
, pnn
);
2744 tevent_req_error(req
, EINVAL
);
2751 talloc_free(state
->nlist
);
2752 state
->nlist
= nlist
;
2754 ctdb_req_control_get_capabilities(&request
);
2755 subreq
= ctdb_client_control_multi_send(state
,
2758 state
->nlist
->pnn_list
,
2759 state
->nlist
->count
,
2762 if (tevent_req_nomem(subreq
, req
)) {
2765 tevent_req_set_callback(subreq
, recovery_capabilities_done
, req
);
2768 static void recovery_capabilities_done(struct tevent_req
*subreq
)
2770 struct tevent_req
*req
= tevent_req_callback_data(
2771 subreq
, struct tevent_req
);
2772 struct recovery_state
*state
= tevent_req_data(
2773 req
, struct recovery_state
);
2774 struct ctdb_reply_control
**reply
;
2775 struct ctdb_req_control request
;
2781 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
2783 TALLOC_FREE(subreq
);
2788 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
2789 state
->nlist
->count
,
2793 D_ERR("control GET_CAPABILITIES failed on node %u,"
2794 " ret=%d\n", pnn
, ret2
);
2796 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2799 tevent_req_error(req
, ret
);
2803 for (i
=0; i
<state
->nlist
->count
; i
++) {
2806 ret
= ctdb_reply_control_get_capabilities(reply
[i
], &caps
);
2808 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2809 state
->nlist
->pnn_list
[i
]);
2810 tevent_req_error(req
, EPROTO
);
2814 state
->nlist
->caps
[i
] = caps
;
2819 ctdb_req_control_get_dbmap(&request
);
2820 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2821 state
->destnode
, TIMEOUT(),
2823 if (tevent_req_nomem(subreq
, req
)) {
2826 tevent_req_set_callback(subreq
, recovery_dbmap_done
, req
);
2829 static void recovery_dbmap_done(struct tevent_req
*subreq
)
2831 struct tevent_req
*req
= tevent_req_callback_data(
2832 subreq
, struct tevent_req
);
2833 struct recovery_state
*state
= tevent_req_data(
2834 req
, struct recovery_state
);
2835 struct ctdb_reply_control
*reply
;
2836 struct ctdb_req_control request
;
2837 struct ctdb_dbid_map
*dbmap
= NULL
;
2842 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2843 TALLOC_FREE(subreq
);
2845 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2846 state
->destnode
, ret
);
2847 tevent_req_error(req
, ret
);
2851 state
->dblist
= db_list_init(state
, state
->nlist
->count
);
2852 if (tevent_req_nomem(state
->dblist
, req
)) {
2853 D_ERR("memory allocation error\n");
2857 ret
= ctdb_reply_control_get_dbmap(reply
, state
, &dbmap
);
2859 D_ERR("control GET_DBMAP failed, ret=%d\n", ret
);
2860 tevent_req_error(req
, ret
);
2864 for (j
= 0; j
< dbmap
->num
; j
++) {
2865 ret
= db_list_check_and_add(state
->dblist
,
2866 dbmap
->dbs
[j
].db_id
,
2867 dbmap
->dbs
[j
].flags
,
2870 D_ERR("failed to add database list entry, ret=%d\n",
2872 tevent_req_error(req
, ret
);
2877 ctdb_req_control_set_recmode(&request
, CTDB_RECOVERY_ACTIVE
);
2878 subreq
= ctdb_client_control_multi_send(state
,
2881 state
->nlist
->pnn_list
,
2882 state
->nlist
->count
,
2885 if (tevent_req_nomem(subreq
, req
)) {
2888 tevent_req_set_callback(subreq
, recovery_active_done
, req
);
2891 static void recovery_active_done(struct tevent_req
*subreq
)
2893 struct tevent_req
*req
= tevent_req_callback_data(
2894 subreq
, struct tevent_req
);
2895 struct recovery_state
*state
= tevent_req_data(
2896 req
, struct recovery_state
);
2897 struct ctdb_req_control request
;
2898 struct ctdb_vnn_map
*vnnmap
;
2903 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2905 TALLOC_FREE(subreq
);
2910 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
2911 state
->nlist
->count
,
2915 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2916 " ret=%d\n", pnn
, ret2
);
2918 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2921 tevent_req_error(req
, ret
);
2925 D_ERR("Set recovery mode to ACTIVE\n");
2927 /* Calculate new VNNMAP */
2928 vnnmap
= talloc_zero(state
, struct ctdb_vnn_map
);
2929 if (tevent_req_nomem(vnnmap
, req
)) {
2933 vnnmap
->map
= node_list_lmaster(state
->nlist
, vnnmap
, &vnnmap
->size
);
2934 if (tevent_req_nomem(vnnmap
->map
, req
)) {
2938 if (vnnmap
->size
== 0) {
2939 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2940 vnnmap
->map
[0] = state
->destnode
;
2944 vnnmap
->generation
= state
->generation
;
2946 state
->vnnmap
= vnnmap
;
2948 ctdb_req_control_start_recovery(&request
);
2949 subreq
= ctdb_client_control_multi_send(state
,
2952 state
->nlist
->pnn_list
,
2953 state
->nlist
->count
,
2956 if (tevent_req_nomem(subreq
, req
)) {
2959 tevent_req_set_callback(subreq
, recovery_start_recovery_done
, req
);
2962 static void recovery_start_recovery_done(struct tevent_req
*subreq
)
2964 struct tevent_req
*req
= tevent_req_callback_data(
2965 subreq
, struct tevent_req
);
2966 struct recovery_state
*state
= tevent_req_data(
2967 req
, struct recovery_state
);
2968 struct ctdb_req_control request
;
2973 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2975 TALLOC_FREE(subreq
);
2980 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
2981 state
->nlist
->count
,
2985 D_ERR("failed to run start_recovery event on node %u,"
2986 " ret=%d\n", pnn
, ret2
);
2988 D_ERR("failed to run start_recovery event, ret=%d\n",
2991 tevent_req_error(req
, ret
);
2995 D_ERR("start_recovery event finished\n");
2997 ctdb_req_control_setvnnmap(&request
, state
->vnnmap
);
2998 subreq
= ctdb_client_control_multi_send(state
,
3001 state
->nlist
->pnn_list
,
3002 state
->nlist
->count
,
3005 if (tevent_req_nomem(subreq
, req
)) {
3008 tevent_req_set_callback(subreq
, recovery_vnnmap_update_done
, req
);
3011 static void recovery_vnnmap_update_done(struct tevent_req
*subreq
)
3013 struct tevent_req
*req
= tevent_req_callback_data(
3014 subreq
, struct tevent_req
);
3015 struct recovery_state
*state
= tevent_req_data(
3016 req
, struct recovery_state
);
3021 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
3023 TALLOC_FREE(subreq
);
3028 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
3029 state
->nlist
->count
,
3033 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
3036 D_ERR("failed to update VNNMAP, ret=%d\n", ret
);
3038 tevent_req_error(req
, ret
);
3042 D_NOTICE("updated VNNMAP\n");
3044 subreq
= db_recovery_send(state
,
3050 state
->vnnmap
->generation
);
3051 if (tevent_req_nomem(subreq
, req
)) {
3054 tevent_req_set_callback(subreq
, recovery_db_recovery_done
, req
);
3057 static void recovery_db_recovery_done(struct tevent_req
*subreq
)
3059 struct tevent_req
*req
= tevent_req_callback_data(
3060 subreq
, struct tevent_req
);
3061 struct recovery_state
*state
= tevent_req_data(
3062 req
, struct recovery_state
);
3063 struct ctdb_req_control request
;
3067 status
= db_recovery_recv(subreq
, &count
);
3068 TALLOC_FREE(subreq
);
3070 D_ERR("%d of %d databases recovered\n", count
, state
->dblist
->num_dbs
);
3073 subreq
= ban_node_send(state
,
3078 if (tevent_req_nomem(subreq
, req
)) {
3081 tevent_req_set_callback(subreq
, recovery_failed_done
, req
);
3085 ctdb_req_control_set_recmode(&request
, CTDB_RECOVERY_NORMAL
);
3086 subreq
= ctdb_client_control_multi_send(state
,
3089 state
->nlist
->pnn_list
,
3090 state
->nlist
->count
,
3093 if (tevent_req_nomem(subreq
, req
)) {
3096 tevent_req_set_callback(subreq
, recovery_normal_done
, req
);
3099 static void recovery_failed_done(struct tevent_req
*subreq
)
3101 struct tevent_req
*req
= tevent_req_callback_data(
3102 subreq
, struct tevent_req
);
3106 status
= ban_node_recv(subreq
, &ret
);
3107 TALLOC_FREE(subreq
);
3109 D_ERR("failed to ban node, ret=%d\n", ret
);
3112 tevent_req_error(req
, EIO
);
3115 static void recovery_normal_done(struct tevent_req
*subreq
)
3117 struct tevent_req
*req
= tevent_req_callback_data(
3118 subreq
, struct tevent_req
);
3119 struct recovery_state
*state
= tevent_req_data(
3120 req
, struct recovery_state
);
3121 struct ctdb_req_control request
;
3126 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
3128 TALLOC_FREE(subreq
);
3133 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
3134 state
->nlist
->count
,
3138 D_ERR("failed to set recovery mode NORMAL on node %u,"
3139 " ret=%d\n", pnn
, ret2
);
3141 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
3144 tevent_req_error(req
, ret
);
3148 D_ERR("Set recovery mode to NORMAL\n");
3150 ctdb_req_control_end_recovery(&request
);
3151 subreq
= ctdb_client_control_multi_send(state
,
3154 state
->nlist
->pnn_list
,
3155 state
->nlist
->count
,
3158 if (tevent_req_nomem(subreq
, req
)) {
3161 tevent_req_set_callback(subreq
, recovery_end_recovery_done
, req
);
3164 static void recovery_end_recovery_done(struct tevent_req
*subreq
)
3166 struct tevent_req
*req
= tevent_req_callback_data(
3167 subreq
, struct tevent_req
);
3168 struct recovery_state
*state
= tevent_req_data(
3169 req
, struct recovery_state
);
3174 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
3176 TALLOC_FREE(subreq
);
3181 ret2
= ctdb_client_control_multi_error(state
->nlist
->pnn_list
,
3182 state
->nlist
->count
,
3186 D_ERR("failed to run recovered event on node %u,"
3187 " ret=%d\n", pnn
, ret2
);
3189 D_ERR("failed to run recovered event, ret=%d\n", ret
);
3191 tevent_req_error(req
, ret
);
3195 D_ERR("recovered event finished\n");
3197 tevent_req_done(req
);
3200 static void recovery_recv(struct tevent_req
*req
, int *perr
)
3202 generic_recv(req
, perr
);
3205 static void usage(const char *progname
)
3207 fprintf(stderr
, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
3213 * Arguments - log fd, write fd, socket path, generation
3215 int main(int argc
, char *argv
[])
3218 const char *sockpath
;
3219 TALLOC_CTX
*mem_ctx
= NULL
;
3220 struct tevent_context
*ev
;
3221 struct ctdb_client_context
*client
;
3223 struct tevent_req
*req
;
3224 uint32_t generation
;
3231 write_fd
= atoi(argv
[1]);
3233 generation
= (uint32_t)smb_strtoul(argv
[3],
3239 fprintf(stderr
, "recovery: unable to initialize generation\n");
3243 mem_ctx
= talloc_new(NULL
);
3244 if (mem_ctx
== NULL
) {
3245 fprintf(stderr
, "recovery: talloc_new() failed\n");
3249 ret
= logging_init(mem_ctx
, NULL
, NULL
, "ctdb-recovery");
3251 fprintf(stderr
, "recovery: Unable to initialize logging\n");
3255 ev
= tevent_context_init(mem_ctx
);
3257 D_ERR("tevent_context_init() failed\n");
3261 ret
= ctdb_client_init(mem_ctx
, ev
, sockpath
, &client
);
3263 D_ERR("ctdb_client_init() failed, ret=%d\n", ret
);
3267 req
= recovery_send(mem_ctx
, ev
, client
, generation
);
3269 D_ERR("database_recover_send() failed\n");
3273 if (! tevent_req_poll(req
, ev
)) {
3274 D_ERR("tevent_req_poll() failed\n");
3278 recovery_recv(req
, &ret
);
3281 D_ERR("database recovery failed, ret=%d\n", ret
);
3285 sys_write(write_fd
, &ret
, sizeof(ret
));
3289 TALLOC_FREE(mem_ctx
);