2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client.h"
38 #include "common/logging.h"
40 static int recover_timeout
= 30;
44 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
50 static bool generic_recv(struct tevent_req
*req
, int *perr
)
54 if (tevent_req_is_unix_error(req
, &err
)) {
64 static uint64_t rec_srvid
= CTDB_SRVID_RECOVERY
;
66 static uint64_t srvid_next(void)
73 * Recovery database functions
76 struct recdb_context
{
84 static struct recdb_context
*recdb_create(TALLOC_CTX
*mem_ctx
, uint32_t db_id
,
87 uint32_t hash_size
, bool persistent
)
89 static char *db_dir_state
= NULL
;
90 struct recdb_context
*recdb
;
91 unsigned int tdb_flags
;
93 recdb
= talloc(mem_ctx
, struct recdb_context
);
98 if (db_dir_state
== NULL
) {
99 db_dir_state
= getenv("CTDB_DBDIR_STATE");
102 recdb
->db_name
= db_name
;
103 recdb
->db_id
= db_id
;
104 recdb
->db_path
= talloc_asprintf(recdb
, "%s/recdb.%s",
105 db_dir_state
!= NULL
?
107 dirname(discard_const(db_path
)),
109 if (recdb
->db_path
== NULL
) {
113 unlink(recdb
->db_path
);
115 tdb_flags
= TDB_NOLOCK
| TDB_INCOMPATIBLE_HASH
| TDB_DISALLOW_NESTING
;
116 recdb
->db
= tdb_wrap_open(mem_ctx
, recdb
->db_path
, hash_size
,
117 tdb_flags
, O_RDWR
|O_CREAT
|O_EXCL
, 0600);
118 if (recdb
->db
== NULL
) {
120 D_ERR("failed to create recovery db %s\n", recdb
->db_path
);
124 recdb
->persistent
= persistent
;
129 static uint32_t recdb_id(struct recdb_context
*recdb
)
134 static const char *recdb_name(struct recdb_context
*recdb
)
136 return recdb
->db_name
;
139 static const char *recdb_path(struct recdb_context
*recdb
)
141 return recdb
->db_path
;
144 static struct tdb_context
*recdb_tdb(struct recdb_context
*recdb
)
146 return recdb
->db
->tdb
;
149 static bool recdb_persistent(struct recdb_context
*recdb
)
151 return recdb
->persistent
;
154 struct recdb_add_traverse_state
{
155 struct recdb_context
*recdb
;
159 static int recdb_add_traverse(uint32_t reqid
, struct ctdb_ltdb_header
*header
,
160 TDB_DATA key
, TDB_DATA data
,
163 struct recdb_add_traverse_state
*state
=
164 (struct recdb_add_traverse_state
*)private_data
;
165 struct ctdb_ltdb_header
*hdr
;
169 /* header is not marshalled separately in the pulldb control */
170 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
174 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
176 /* fetch the existing record, if any */
177 prev_data
= tdb_fetch(recdb_tdb(state
->recdb
), key
);
179 if (prev_data
.dptr
!= NULL
) {
180 struct ctdb_ltdb_header prev_hdr
;
182 prev_hdr
= *(struct ctdb_ltdb_header
*)prev_data
.dptr
;
183 free(prev_data
.dptr
);
184 if (hdr
->rsn
< prev_hdr
.rsn
||
185 (hdr
->rsn
== prev_hdr
.rsn
&&
186 prev_hdr
.dmaster
!= state
->mypnn
)) {
191 ret
= tdb_store(recdb_tdb(state
->recdb
), key
, data
, TDB_REPLACE
);
198 static bool recdb_add(struct recdb_context
*recdb
, int mypnn
,
199 struct ctdb_rec_buffer
*recbuf
)
201 struct recdb_add_traverse_state state
;
207 ret
= ctdb_rec_buffer_traverse(recbuf
, recdb_add_traverse
, &state
);
215 /* This function decides which records from recdb are retained */
216 static int recbuf_filter_add(struct ctdb_rec_buffer
*recbuf
, bool persistent
,
217 uint32_t reqid
, uint32_t dmaster
,
218 TDB_DATA key
, TDB_DATA data
)
220 struct ctdb_ltdb_header
*header
;
223 /* Skip empty records */
224 if (data
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
228 /* update the dmaster field to point to us */
229 header
= (struct ctdb_ltdb_header
*)data
.dptr
;
231 header
->dmaster
= dmaster
;
232 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
235 ret
= ctdb_rec_buffer_add(recbuf
, recbuf
, reqid
, NULL
, key
, data
);
243 struct recdb_records_traverse_state
{
244 struct ctdb_rec_buffer
*recbuf
;
251 static int recdb_records_traverse(struct tdb_context
*tdb
,
252 TDB_DATA key
, TDB_DATA data
,
255 struct recdb_records_traverse_state
*state
=
256 (struct recdb_records_traverse_state
*)private_data
;
259 ret
= recbuf_filter_add(state
->recbuf
, state
->persistent
,
260 state
->reqid
, state
->dmaster
, key
, data
);
262 state
->failed
= true;
269 static struct ctdb_rec_buffer
*recdb_records(struct recdb_context
*recdb
,
273 struct recdb_records_traverse_state state
;
276 state
.recbuf
= ctdb_rec_buffer_init(mem_ctx
, recdb_id(recdb
));
277 if (state
.recbuf
== NULL
) {
280 state
.dmaster
= dmaster
;
282 state
.persistent
= recdb_persistent(recdb
);
283 state
.failed
= false;
285 ret
= tdb_traverse_read(recdb_tdb(recdb
), recdb_records_traverse
,
287 if (ret
== -1 || state
.failed
) {
288 D_ERR("Failed to marshall recovery records for %s\n",
290 TALLOC_FREE(state
.recbuf
);
297 struct recdb_file_traverse_state
{
298 struct ctdb_rec_buffer
*recbuf
;
299 struct recdb_context
*recdb
;
310 static int recdb_file_traverse(struct tdb_context
*tdb
,
311 TDB_DATA key
, TDB_DATA data
,
314 struct recdb_file_traverse_state
*state
=
315 (struct recdb_file_traverse_state
*)private_data
;
318 ret
= recbuf_filter_add(state
->recbuf
, state
->persistent
,
319 state
->reqid
, state
->dmaster
, key
, data
);
321 state
->failed
= true;
325 if (ctdb_rec_buffer_len(state
->recbuf
) > state
->max_size
) {
326 ret
= ctdb_rec_buffer_write(state
->recbuf
, state
->fd
);
328 D_ERR("Failed to collect recovery records for %s\n",
329 recdb_name(state
->recdb
));
330 state
->failed
= true;
334 state
->num_buffers
+= 1;
336 TALLOC_FREE(state
->recbuf
);
337 state
->recbuf
= ctdb_rec_buffer_init(state
->mem_ctx
,
338 recdb_id(state
->recdb
));
339 if (state
->recbuf
== NULL
) {
340 state
->failed
= true;
348 static int recdb_file(struct recdb_context
*recdb
, TALLOC_CTX
*mem_ctx
,
349 uint32_t dmaster
, int fd
, int max_size
)
351 struct recdb_file_traverse_state state
;
354 state
.recbuf
= ctdb_rec_buffer_init(mem_ctx
, recdb_id(recdb
));
355 if (state
.recbuf
== NULL
) {
359 state
.mem_ctx
= mem_ctx
;
360 state
.dmaster
= dmaster
;
362 state
.persistent
= recdb_persistent(recdb
);
363 state
.failed
= false;
365 state
.max_size
= max_size
;
366 state
.num_buffers
= 0;
368 ret
= tdb_traverse_read(recdb_tdb(recdb
), recdb_file_traverse
, &state
);
369 if (ret
== -1 || state
.failed
) {
370 TALLOC_FREE(state
.recbuf
);
374 ret
= ctdb_rec_buffer_write(state
.recbuf
, fd
);
376 D_ERR("Failed to collect recovery records for %s\n",
378 TALLOC_FREE(state
.recbuf
);
381 state
.num_buffers
+= 1;
383 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
384 state
.num_buffers
, recdb_name(recdb
));
386 return state
.num_buffers
;
390 * Pull database from a single node
393 struct pull_database_state
{
394 struct tevent_context
*ev
;
395 struct ctdb_client_context
*client
;
396 struct recdb_context
*recdb
;
402 static void pull_database_handler(uint64_t srvid
, TDB_DATA data
,
404 static void pull_database_register_done(struct tevent_req
*subreq
);
405 static void pull_database_old_done(struct tevent_req
*subreq
);
406 static void pull_database_unregister_done(struct tevent_req
*subreq
);
407 static void pull_database_new_done(struct tevent_req
*subreq
);
409 static struct tevent_req
*pull_database_send(
411 struct tevent_context
*ev
,
412 struct ctdb_client_context
*client
,
413 uint32_t pnn
, uint32_t caps
,
414 struct recdb_context
*recdb
)
416 struct tevent_req
*req
, *subreq
;
417 struct pull_database_state
*state
;
418 struct ctdb_req_control request
;
420 req
= tevent_req_create(mem_ctx
, &state
, struct pull_database_state
);
426 state
->client
= client
;
427 state
->recdb
= recdb
;
429 state
->srvid
= srvid_next();
431 if (caps
& CTDB_CAP_FRAGMENTED_CONTROLS
) {
432 subreq
= ctdb_client_set_message_handler_send(
433 state
, state
->ev
, state
->client
,
434 state
->srvid
, pull_database_handler
,
436 if (tevent_req_nomem(subreq
, req
)) {
437 return tevent_req_post(req
, ev
);
440 tevent_req_set_callback(subreq
, pull_database_register_done
,
444 struct ctdb_pulldb pulldb
;
446 pulldb
.db_id
= recdb_id(recdb
);
447 pulldb
.lmaster
= CTDB_LMASTER_ANY
;
449 ctdb_req_control_pull_db(&request
, &pulldb
);
450 subreq
= ctdb_client_control_send(state
, state
->ev
,
454 if (tevent_req_nomem(subreq
, req
)) {
455 return tevent_req_post(req
, ev
);
457 tevent_req_set_callback(subreq
, pull_database_old_done
, req
);
463 static void pull_database_handler(uint64_t srvid
, TDB_DATA data
,
466 struct tevent_req
*req
= talloc_get_type_abort(
467 private_data
, struct tevent_req
);
468 struct pull_database_state
*state
= tevent_req_data(
469 req
, struct pull_database_state
);
470 struct ctdb_rec_buffer
*recbuf
;
475 if (srvid
!= state
->srvid
) {
479 ret
= ctdb_rec_buffer_pull(data
.dptr
, data
.dsize
, state
, &recbuf
, &np
);
481 D_ERR("Invalid data received for DB_PULL messages\n");
485 if (recbuf
->db_id
!= recdb_id(state
->recdb
)) {
487 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
488 recbuf
->db_id
, recdb_name(state
->recdb
));
492 status
= recdb_add(state
->recdb
, ctdb_client_pnn(state
->client
),
496 D_ERR("Failed to add records to recdb for %s\n",
497 recdb_name(state
->recdb
));
501 state
->num_records
+= recbuf
->count
;
505 static void pull_database_register_done(struct tevent_req
*subreq
)
507 struct tevent_req
*req
= tevent_req_callback_data(
508 subreq
, struct tevent_req
);
509 struct pull_database_state
*state
= tevent_req_data(
510 req
, struct pull_database_state
);
511 struct ctdb_req_control request
;
512 struct ctdb_pulldb_ext pulldb_ext
;
516 status
= ctdb_client_set_message_handler_recv(subreq
, &ret
);
519 D_ERR("Failed to set message handler for DB_PULL for %s\n",
520 recdb_name(state
->recdb
));
521 tevent_req_error(req
, ret
);
525 pulldb_ext
.db_id
= recdb_id(state
->recdb
);
526 pulldb_ext
.lmaster
= CTDB_LMASTER_ANY
;
527 pulldb_ext
.srvid
= state
->srvid
;
529 ctdb_req_control_db_pull(&request
, &pulldb_ext
);
530 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
531 state
->pnn
, TIMEOUT(), &request
);
532 if (tevent_req_nomem(subreq
, req
)) {
535 tevent_req_set_callback(subreq
, pull_database_new_done
, req
);
538 static void pull_database_old_done(struct tevent_req
*subreq
)
540 struct tevent_req
*req
= tevent_req_callback_data(
541 subreq
, struct tevent_req
);
542 struct pull_database_state
*state
= tevent_req_data(
543 req
, struct pull_database_state
);
544 struct ctdb_reply_control
*reply
;
545 struct ctdb_rec_buffer
*recbuf
;
549 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
552 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
553 recdb_name(state
->recdb
), state
->pnn
, ret
);
554 tevent_req_error(req
, ret
);
558 ret
= ctdb_reply_control_pull_db(reply
, state
, &recbuf
);
561 tevent_req_error(req
, ret
);
565 status
= recdb_add(state
->recdb
, ctdb_client_pnn(state
->client
),
569 tevent_req_error(req
, EIO
);
573 state
->num_records
= recbuf
->count
;
576 D_INFO("Pulled %d records for db %s from node %d\n",
577 state
->num_records
, recdb_name(state
->recdb
), state
->pnn
);
579 tevent_req_done(req
);
582 static void pull_database_new_done(struct tevent_req
*subreq
)
584 struct tevent_req
*req
= tevent_req_callback_data(
585 subreq
, struct tevent_req
);
586 struct pull_database_state
*state
= tevent_req_data(
587 req
, struct pull_database_state
);
588 struct ctdb_reply_control
*reply
;
589 uint32_t num_records
;
593 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
596 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
597 recdb_name(state
->recdb
), state
->pnn
, ret
);
598 tevent_req_error(req
, ret
);
602 ret
= ctdb_reply_control_db_pull(reply
, &num_records
);
604 if (num_records
!= state
->num_records
) {
605 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
606 num_records
, state
->num_records
,
607 recdb_name(state
->recdb
));
608 tevent_req_error(req
, EIO
);
612 D_INFO("Pulled %d records for db %s from node %d\n",
613 state
->num_records
, recdb_name(state
->recdb
), state
->pnn
);
615 subreq
= ctdb_client_remove_message_handler_send(
616 state
, state
->ev
, state
->client
,
618 if (tevent_req_nomem(subreq
, req
)) {
621 tevent_req_set_callback(subreq
, pull_database_unregister_done
, req
);
624 static void pull_database_unregister_done(struct tevent_req
*subreq
)
626 struct tevent_req
*req
= tevent_req_callback_data(
627 subreq
, struct tevent_req
);
628 struct pull_database_state
*state
= tevent_req_data(
629 req
, struct pull_database_state
);
633 status
= ctdb_client_remove_message_handler_recv(subreq
, &ret
);
636 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
637 recdb_name(state
->recdb
));
638 tevent_req_error(req
, ret
);
642 tevent_req_done(req
);
645 static bool pull_database_recv(struct tevent_req
*req
, int *perr
)
647 return generic_recv(req
, perr
);
651 * Push database to specified nodes (old style)
654 struct push_database_old_state
{
655 struct tevent_context
*ev
;
656 struct ctdb_client_context
*client
;
657 struct recdb_context
*recdb
;
660 struct ctdb_rec_buffer
*recbuf
;
664 static void push_database_old_push_done(struct tevent_req
*subreq
);
666 static struct tevent_req
*push_database_old_send(
668 struct tevent_context
*ev
,
669 struct ctdb_client_context
*client
,
670 uint32_t *pnn_list
, int count
,
671 struct recdb_context
*recdb
)
673 struct tevent_req
*req
, *subreq
;
674 struct push_database_old_state
*state
;
675 struct ctdb_req_control request
;
678 req
= tevent_req_create(mem_ctx
, &state
,
679 struct push_database_old_state
);
685 state
->client
= client
;
686 state
->recdb
= recdb
;
687 state
->pnn_list
= pnn_list
;
688 state
->count
= count
;
691 state
->recbuf
= recdb_records(recdb
, state
,
692 ctdb_client_pnn(client
));
693 if (tevent_req_nomem(state
->recbuf
, req
)) {
694 return tevent_req_post(req
, ev
);
697 pnn
= state
->pnn_list
[state
->index
];
699 ctdb_req_control_push_db(&request
, state
->recbuf
);
700 subreq
= ctdb_client_control_send(state
, ev
, client
, pnn
,
701 TIMEOUT(), &request
);
702 if (tevent_req_nomem(subreq
, req
)) {
703 return tevent_req_post(req
, ev
);
705 tevent_req_set_callback(subreq
, push_database_old_push_done
, req
);
710 static void push_database_old_push_done(struct tevent_req
*subreq
)
712 struct tevent_req
*req
= tevent_req_callback_data(
713 subreq
, struct tevent_req
);
714 struct push_database_old_state
*state
= tevent_req_data(
715 req
, struct push_database_old_state
);
716 struct ctdb_req_control request
;
721 status
= ctdb_client_control_recv(subreq
, &ret
, NULL
, NULL
);
724 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
725 recdb_name(state
->recdb
), state
->pnn_list
[state
->index
],
727 tevent_req_error(req
, ret
);
732 if (state
->index
== state
->count
) {
733 TALLOC_FREE(state
->recbuf
);
734 tevent_req_done(req
);
738 pnn
= state
->pnn_list
[state
->index
];
740 ctdb_req_control_push_db(&request
, state
->recbuf
);
741 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
742 pnn
, TIMEOUT(), &request
);
743 if (tevent_req_nomem(subreq
, req
)) {
746 tevent_req_set_callback(subreq
, push_database_old_push_done
, req
);
749 static bool push_database_old_recv(struct tevent_req
*req
, int *perr
)
751 return generic_recv(req
, perr
);
755 * Push database to specified nodes (new style)
758 struct push_database_new_state
{
759 struct tevent_context
*ev
;
760 struct ctdb_client_context
*client
;
761 struct recdb_context
*recdb
;
768 int num_buffers_sent
;
772 static void push_database_new_started(struct tevent_req
*subreq
);
773 static void push_database_new_send_msg(struct tevent_req
*req
);
774 static void push_database_new_send_done(struct tevent_req
*subreq
);
775 static void push_database_new_confirmed(struct tevent_req
*subreq
);
777 static struct tevent_req
*push_database_new_send(
779 struct tevent_context
*ev
,
780 struct ctdb_client_context
*client
,
781 uint32_t *pnn_list
, int count
,
782 struct recdb_context
*recdb
,
785 struct tevent_req
*req
, *subreq
;
786 struct push_database_new_state
*state
;
787 struct ctdb_req_control request
;
788 struct ctdb_pulldb_ext pulldb_ext
;
792 req
= tevent_req_create(mem_ctx
, &state
,
793 struct push_database_new_state
);
799 state
->client
= client
;
800 state
->recdb
= recdb
;
801 state
->pnn_list
= pnn_list
;
802 state
->count
= count
;
804 state
->srvid
= srvid_next();
805 state
->dmaster
= ctdb_client_pnn(client
);
806 state
->num_buffers_sent
= 0;
807 state
->num_records
= 0;
809 filename
= talloc_asprintf(state
, "%s.dat", recdb_path(recdb
));
810 if (tevent_req_nomem(filename
, req
)) {
811 return tevent_req_post(req
, ev
);
814 state
->fd
= open(filename
, O_RDWR
|O_CREAT
, 0644);
815 if (state
->fd
== -1) {
816 tevent_req_error(req
, errno
);
817 return tevent_req_post(req
, ev
);
820 talloc_free(filename
);
822 state
->num_buffers
= recdb_file(recdb
, state
, state
->dmaster
,
823 state
->fd
, max_size
);
824 if (state
->num_buffers
== -1) {
825 tevent_req_error(req
, ENOMEM
);
826 return tevent_req_post(req
, ev
);
829 offset
= lseek(state
->fd
, 0, SEEK_SET
);
831 tevent_req_error(req
, EIO
);
832 return tevent_req_post(req
, ev
);
835 pulldb_ext
.db_id
= recdb_id(recdb
);
836 pulldb_ext
.srvid
= state
->srvid
;
838 ctdb_req_control_db_push_start(&request
, &pulldb_ext
);
839 subreq
= ctdb_client_control_multi_send(state
, ev
, client
,
841 TIMEOUT(), &request
);
842 if (tevent_req_nomem(subreq
, req
)) {
843 return tevent_req_post(req
, ev
);
845 tevent_req_set_callback(subreq
, push_database_new_started
, req
);
850 static void push_database_new_started(struct tevent_req
*subreq
)
852 struct tevent_req
*req
= tevent_req_callback_data(
853 subreq
, struct tevent_req
);
854 struct push_database_new_state
*state
= tevent_req_data(
855 req
, struct push_database_new_state
);
860 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
867 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
871 D_ERR("control DB_PUSH_START failed for db %s"
872 " on node %u, ret=%d\n",
873 recdb_name(state
->recdb
), pnn
, ret2
);
875 D_ERR("control DB_PUSH_START failed for db %s,"
877 recdb_name(state
->recdb
), ret
);
879 talloc_free(err_list
);
881 tevent_req_error(req
, ret
);
885 push_database_new_send_msg(req
);
888 static void push_database_new_send_msg(struct tevent_req
*req
)
890 struct push_database_new_state
*state
= tevent_req_data(
891 req
, struct push_database_new_state
);
892 struct tevent_req
*subreq
;
893 struct ctdb_rec_buffer
*recbuf
;
894 struct ctdb_req_message message
;
899 if (state
->num_buffers_sent
== state
->num_buffers
) {
900 struct ctdb_req_control request
;
902 ctdb_req_control_db_push_confirm(&request
,
903 recdb_id(state
->recdb
));
904 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
908 TIMEOUT(), &request
);
909 if (tevent_req_nomem(subreq
, req
)) {
912 tevent_req_set_callback(subreq
, push_database_new_confirmed
,
917 ret
= ctdb_rec_buffer_read(state
->fd
, state
, &recbuf
);
919 tevent_req_error(req
, ret
);
923 data
.dsize
= ctdb_rec_buffer_len(recbuf
);
924 data
.dptr
= talloc_size(state
, data
.dsize
);
925 if (tevent_req_nomem(data
.dptr
, req
)) {
929 ctdb_rec_buffer_push(recbuf
, data
.dptr
, &np
);
931 message
.srvid
= state
->srvid
;
932 message
.data
.data
= data
;
934 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
935 state
->num_buffers_sent
, recbuf
->count
,
936 recdb_name(state
->recdb
));
938 subreq
= ctdb_client_message_multi_send(state
, state
->ev
,
940 state
->pnn_list
, state
->count
,
942 if (tevent_req_nomem(subreq
, req
)) {
945 tevent_req_set_callback(subreq
, push_database_new_send_done
, req
);
947 state
->num_records
+= recbuf
->count
;
949 talloc_free(data
.dptr
);
953 static void push_database_new_send_done(struct tevent_req
*subreq
)
955 struct tevent_req
*req
= tevent_req_callback_data(
956 subreq
, struct tevent_req
);
957 struct push_database_new_state
*state
= tevent_req_data(
958 req
, struct push_database_new_state
);
962 status
= ctdb_client_message_multi_recv(subreq
, &ret
, NULL
, NULL
);
965 D_ERR("Sending recovery records failed for %s\n",
966 recdb_name(state
->recdb
));
967 tevent_req_error(req
, ret
);
971 state
->num_buffers_sent
+= 1;
973 push_database_new_send_msg(req
);
976 static void push_database_new_confirmed(struct tevent_req
*subreq
)
978 struct tevent_req
*req
= tevent_req_callback_data(
979 subreq
, struct tevent_req
);
980 struct push_database_new_state
*state
= tevent_req_data(
981 req
, struct push_database_new_state
);
982 struct ctdb_reply_control
**reply
;
986 uint32_t num_records
;
988 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
995 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
996 state
->count
, err_list
,
999 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1000 " on node %u, ret=%d\n",
1001 recdb_name(state
->recdb
), pnn
, ret2
);
1003 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1005 recdb_name(state
->recdb
), ret
);
1007 tevent_req_error(req
, ret
);
1011 for (i
=0; i
<state
->count
; i
++) {
1012 ret
= ctdb_reply_control_db_push_confirm(reply
[i
],
1015 tevent_req_error(req
, EPROTO
);
1019 if (num_records
!= state
->num_records
) {
1020 D_ERR("Node %u received %d of %d records for %s\n",
1021 state
->pnn_list
[i
], num_records
,
1022 state
->num_records
, recdb_name(state
->recdb
));
1023 tevent_req_error(req
, EPROTO
);
1030 D_INFO("Pushed %d records for db %s\n",
1031 state
->num_records
, recdb_name(state
->recdb
));
1033 tevent_req_done(req
);
1036 static bool push_database_new_recv(struct tevent_req
*req
, int *perr
)
1038 return generic_recv(req
, perr
);
1042 * wrapper for push_database_old and push_database_new
1045 struct push_database_state
{
1046 bool old_done
, new_done
;
1049 static void push_database_old_done(struct tevent_req
*subreq
);
1050 static void push_database_new_done(struct tevent_req
*subreq
);
1052 static struct tevent_req
*push_database_send(
1053 TALLOC_CTX
*mem_ctx
,
1054 struct tevent_context
*ev
,
1055 struct ctdb_client_context
*client
,
1056 uint32_t *pnn_list
, int count
, uint32_t *caps
,
1057 struct ctdb_tunable_list
*tun_list
,
1058 struct recdb_context
*recdb
)
1060 struct tevent_req
*req
, *subreq
;
1061 struct push_database_state
*state
;
1062 uint32_t *old_list
, *new_list
;
1063 int old_count
, new_count
;
1066 req
= tevent_req_create(mem_ctx
, &state
, struct push_database_state
);
1071 state
->old_done
= false;
1072 state
->new_done
= false;
1076 old_list
= talloc_array(state
, uint32_t, count
);
1077 new_list
= talloc_array(state
, uint32_t, count
);
1078 if (tevent_req_nomem(old_list
, req
) ||
1079 tevent_req_nomem(new_list
,req
)) {
1080 return tevent_req_post(req
, ev
);
1083 for (i
=0; i
<count
; i
++) {
1084 uint32_t pnn
= pnn_list
[i
];
1086 if (caps
[pnn
] & CTDB_CAP_FRAGMENTED_CONTROLS
) {
1087 new_list
[new_count
] = pnn
;
1090 old_list
[old_count
] = pnn
;
1095 if (old_count
> 0) {
1096 subreq
= push_database_old_send(state
, ev
, client
,
1097 old_list
, old_count
, recdb
);
1098 if (tevent_req_nomem(subreq
, req
)) {
1099 return tevent_req_post(req
, ev
);
1101 tevent_req_set_callback(subreq
, push_database_old_done
, req
);
1103 state
->old_done
= true;
1106 if (new_count
> 0) {
1107 subreq
= push_database_new_send(state
, ev
, client
,
1108 new_list
, new_count
, recdb
,
1109 tun_list
->rec_buffer_size_limit
);
1110 if (tevent_req_nomem(subreq
, req
)) {
1111 return tevent_req_post(req
, ev
);
1113 tevent_req_set_callback(subreq
, push_database_new_done
, req
);
1115 state
->new_done
= true;
1121 static void push_database_old_done(struct tevent_req
*subreq
)
1123 struct tevent_req
*req
= tevent_req_callback_data(
1124 subreq
, struct tevent_req
);
1125 struct push_database_state
*state
= tevent_req_data(
1126 req
, struct push_database_state
);
1130 status
= push_database_old_recv(subreq
, &ret
);
1132 tevent_req_error(req
, ret
);
1136 state
->old_done
= true;
1138 if (state
->old_done
&& state
->new_done
) {
1139 tevent_req_done(req
);
1143 static void push_database_new_done(struct tevent_req
*subreq
)
1145 struct tevent_req
*req
= tevent_req_callback_data(
1146 subreq
, struct tevent_req
);
1147 struct push_database_state
*state
= tevent_req_data(
1148 req
, struct push_database_state
);
1152 status
= push_database_new_recv(subreq
, &ret
);
1154 tevent_req_error(req
, ret
);
1158 state
->new_done
= true;
1160 if (state
->old_done
&& state
->new_done
) {
1161 tevent_req_done(req
);
1165 static bool push_database_recv(struct tevent_req
*req
, int *perr
)
1167 return generic_recv(req
, perr
);
1171 * Collect databases using highest sequence number
1174 struct collect_highseqnum_db_state
{
1175 struct tevent_context
*ev
;
1176 struct ctdb_client_context
*client
;
1180 uint32_t *ban_credits
;
1182 struct recdb_context
*recdb
;
1186 static void collect_highseqnum_db_seqnum_done(struct tevent_req
*subreq
);
1187 static void collect_highseqnum_db_pulldb_done(struct tevent_req
*subreq
);
1189 static struct tevent_req
*collect_highseqnum_db_send(
1190 TALLOC_CTX
*mem_ctx
,
1191 struct tevent_context
*ev
,
1192 struct ctdb_client_context
*client
,
1193 uint32_t *pnn_list
, int count
, uint32_t *caps
,
1194 uint32_t *ban_credits
, uint32_t db_id
,
1195 struct recdb_context
*recdb
)
1197 struct tevent_req
*req
, *subreq
;
1198 struct collect_highseqnum_db_state
*state
;
1199 struct ctdb_req_control request
;
1201 req
= tevent_req_create(mem_ctx
, &state
,
1202 struct collect_highseqnum_db_state
);
1208 state
->client
= client
;
1209 state
->pnn_list
= pnn_list
;
1210 state
->count
= count
;
1212 state
->ban_credits
= ban_credits
;
1213 state
->db_id
= db_id
;
1214 state
->recdb
= recdb
;
1216 ctdb_req_control_get_db_seqnum(&request
, db_id
);
1217 subreq
= ctdb_client_control_multi_send(mem_ctx
, ev
, client
,
1218 state
->pnn_list
, state
->count
,
1219 TIMEOUT(), &request
);
1220 if (tevent_req_nomem(subreq
, req
)) {
1221 return tevent_req_post(req
, ev
);
1223 tevent_req_set_callback(subreq
, collect_highseqnum_db_seqnum_done
,
1229 static void collect_highseqnum_db_seqnum_done(struct tevent_req
*subreq
)
1231 struct tevent_req
*req
= tevent_req_callback_data(
1232 subreq
, struct tevent_req
);
1233 struct collect_highseqnum_db_state
*state
= tevent_req_data(
1234 req
, struct collect_highseqnum_db_state
);
1235 struct ctdb_reply_control
**reply
;
1239 uint64_t seqnum
, max_seqnum
;
1241 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
1243 TALLOC_FREE(subreq
);
1248 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1249 state
->count
, err_list
,
1252 D_ERR("control GET_DB_SEQNUM failed for db %s"
1253 " on node %u, ret=%d\n",
1254 recdb_name(state
->recdb
), pnn
, ret2
);
1256 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1258 recdb_name(state
->recdb
), ret
);
1260 tevent_req_error(req
, ret
);
1265 state
->max_pnn
= state
->pnn_list
[0];
1266 for (i
=0; i
<state
->count
; i
++) {
1267 ret
= ctdb_reply_control_get_db_seqnum(reply
[i
], &seqnum
);
1269 tevent_req_error(req
, EPROTO
);
1273 if (max_seqnum
< seqnum
) {
1274 max_seqnum
= seqnum
;
1275 state
->max_pnn
= state
->pnn_list
[i
];
1281 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64
"\n",
1282 recdb_name(state
->recdb
), state
->max_pnn
, max_seqnum
);
1284 subreq
= pull_database_send(state
, state
->ev
, state
->client
,
1286 state
->caps
[state
->max_pnn
],
1288 if (tevent_req_nomem(subreq
, req
)) {
1291 tevent_req_set_callback(subreq
, collect_highseqnum_db_pulldb_done
,
1295 static void collect_highseqnum_db_pulldb_done(struct tevent_req
*subreq
)
1297 struct tevent_req
*req
= tevent_req_callback_data(
1298 subreq
, struct tevent_req
);
1299 struct collect_highseqnum_db_state
*state
= tevent_req_data(
1300 req
, struct collect_highseqnum_db_state
);
1304 status
= pull_database_recv(subreq
, &ret
);
1305 TALLOC_FREE(subreq
);
1307 state
->ban_credits
[state
->max_pnn
] += 1;
1308 tevent_req_error(req
, ret
);
1312 tevent_req_done(req
);
1315 static bool collect_highseqnum_db_recv(struct tevent_req
*req
, int *perr
)
1317 return generic_recv(req
, perr
);
1321 * Collect all databases
1324 struct collect_all_db_state
{
1325 struct tevent_context
*ev
;
1326 struct ctdb_client_context
*client
;
1330 uint32_t *ban_credits
;
1332 struct recdb_context
*recdb
;
1333 struct ctdb_pulldb pulldb
;
1337 static void collect_all_db_pulldb_done(struct tevent_req
*subreq
);
1339 static struct tevent_req
*collect_all_db_send(
1340 TALLOC_CTX
*mem_ctx
,
1341 struct tevent_context
*ev
,
1342 struct ctdb_client_context
*client
,
1343 uint32_t *pnn_list
, int count
, uint32_t *caps
,
1344 uint32_t *ban_credits
, uint32_t db_id
,
1345 struct recdb_context
*recdb
)
1347 struct tevent_req
*req
, *subreq
;
1348 struct collect_all_db_state
*state
;
1351 req
= tevent_req_create(mem_ctx
, &state
,
1352 struct collect_all_db_state
);
1358 state
->client
= client
;
1359 state
->pnn_list
= pnn_list
;
1360 state
->count
= count
;
1362 state
->ban_credits
= ban_credits
;
1363 state
->db_id
= db_id
;
1364 state
->recdb
= recdb
;
1367 pnn
= state
->pnn_list
[state
->index
];
1369 subreq
= pull_database_send(state
, ev
, client
, pnn
, caps
[pnn
], recdb
);
1370 if (tevent_req_nomem(subreq
, req
)) {
1371 return tevent_req_post(req
, ev
);
1373 tevent_req_set_callback(subreq
, collect_all_db_pulldb_done
, req
);
1378 static void collect_all_db_pulldb_done(struct tevent_req
*subreq
)
1380 struct tevent_req
*req
= tevent_req_callback_data(
1381 subreq
, struct tevent_req
);
1382 struct collect_all_db_state
*state
= tevent_req_data(
1383 req
, struct collect_all_db_state
);
1388 status
= pull_database_recv(subreq
, &ret
);
1389 TALLOC_FREE(subreq
);
1391 pnn
= state
->pnn_list
[state
->index
];
1392 state
->ban_credits
[pnn
] += 1;
1393 tevent_req_error(req
, ret
);
1398 if (state
->index
== state
->count
) {
1399 tevent_req_done(req
);
1403 pnn
= state
->pnn_list
[state
->index
];
1404 subreq
= pull_database_send(state
, state
->ev
, state
->client
,
1405 pnn
, state
->caps
[pnn
], state
->recdb
);
1406 if (tevent_req_nomem(subreq
, req
)) {
1409 tevent_req_set_callback(subreq
, collect_all_db_pulldb_done
, req
);
1412 static bool collect_all_db_recv(struct tevent_req
*req
, int *perr
)
1414 return generic_recv(req
, perr
);
1419 * For each database do the following:
1422 * - Freeze database on all nodes
1423 * - Start transaction on all nodes
1424 * - Collect database from all nodes
1425 * - Wipe database on all nodes
1426 * - Push database to all nodes
1427 * - Commit transaction on all nodes
1428 * - Thaw database on all nodes
1431 struct recover_db_state
{
1432 struct tevent_context
*ev
;
1433 struct ctdb_client_context
*client
;
1434 struct ctdb_tunable_list
*tun_list
;
1438 uint32_t *ban_credits
;
1443 struct ctdb_transdb transdb
;
1445 const char *db_name
, *db_path
;
1446 struct recdb_context
*recdb
;
1449 static void recover_db_name_done(struct tevent_req
*subreq
);
1450 static void recover_db_path_done(struct tevent_req
*subreq
);
1451 static void recover_db_freeze_done(struct tevent_req
*subreq
);
1452 static void recover_db_transaction_started(struct tevent_req
*subreq
);
1453 static void recover_db_collect_done(struct tevent_req
*subreq
);
1454 static void recover_db_wipedb_done(struct tevent_req
*subreq
);
1455 static void recover_db_pushdb_done(struct tevent_req
*subreq
);
1456 static void recover_db_transaction_committed(struct tevent_req
*subreq
);
1457 static void recover_db_thaw_done(struct tevent_req
*subreq
);
1459 static struct tevent_req
*recover_db_send(TALLOC_CTX
*mem_ctx
,
1460 struct tevent_context
*ev
,
1461 struct ctdb_client_context
*client
,
1462 struct ctdb_tunable_list
*tun_list
,
1463 uint32_t *pnn_list
, int count
,
1465 uint32_t *ban_credits
,
1466 uint32_t generation
,
1467 uint32_t db_id
, uint8_t db_flags
)
1469 struct tevent_req
*req
, *subreq
;
1470 struct recover_db_state
*state
;
1471 struct ctdb_req_control request
;
1473 req
= tevent_req_create(mem_ctx
, &state
, struct recover_db_state
);
1479 state
->client
= client
;
1480 state
->tun_list
= tun_list
;
1481 state
->pnn_list
= pnn_list
;
1482 state
->count
= count
;
1484 state
->ban_credits
= ban_credits
;
1485 state
->db_id
= db_id
;
1486 state
->db_flags
= db_flags
;
1488 state
->destnode
= ctdb_client_pnn(client
);
1489 state
->transdb
.db_id
= db_id
;
1490 state
->transdb
.tid
= generation
;
1492 ctdb_req_control_get_dbname(&request
, db_id
);
1493 subreq
= ctdb_client_control_send(state
, ev
, client
, state
->destnode
,
1494 TIMEOUT(), &request
);
1495 if (tevent_req_nomem(subreq
, req
)) {
1496 return tevent_req_post(req
, ev
);
1498 tevent_req_set_callback(subreq
, recover_db_name_done
, req
);
1503 static void recover_db_name_done(struct tevent_req
*subreq
)
1505 struct tevent_req
*req
= tevent_req_callback_data(
1506 subreq
, struct tevent_req
);
1507 struct recover_db_state
*state
= tevent_req_data(
1508 req
, struct recover_db_state
);
1509 struct ctdb_reply_control
*reply
;
1510 struct ctdb_req_control request
;
1514 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1515 TALLOC_FREE(subreq
);
1517 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1519 tevent_req_error(req
, ret
);
1523 ret
= ctdb_reply_control_get_dbname(reply
, state
, &state
->db_name
);
1525 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1527 tevent_req_error(req
, EPROTO
);
1533 ctdb_req_control_getdbpath(&request
, state
->db_id
);
1534 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
1535 state
->destnode
, TIMEOUT(),
1537 if (tevent_req_nomem(subreq
, req
)) {
1540 tevent_req_set_callback(subreq
, recover_db_path_done
, req
);
1543 static void recover_db_path_done(struct tevent_req
*subreq
)
1545 struct tevent_req
*req
= tevent_req_callback_data(
1546 subreq
, struct tevent_req
);
1547 struct recover_db_state
*state
= tevent_req_data(
1548 req
, struct recover_db_state
);
1549 struct ctdb_reply_control
*reply
;
1550 struct ctdb_req_control request
;
1554 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1555 TALLOC_FREE(subreq
);
1557 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1558 state
->db_name
, ret
);
1559 tevent_req_error(req
, ret
);
1563 ret
= ctdb_reply_control_getdbpath(reply
, state
, &state
->db_path
);
1565 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1566 state
->db_name
, ret
);
1567 tevent_req_error(req
, EPROTO
);
1573 ctdb_req_control_db_freeze(&request
, state
->db_id
);
1574 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1576 state
->pnn_list
, state
->count
,
1577 TIMEOUT(), &request
);
1578 if (tevent_req_nomem(subreq
, req
)) {
1581 tevent_req_set_callback(subreq
, recover_db_freeze_done
, req
);
1584 static void recover_db_freeze_done(struct tevent_req
*subreq
)
1586 struct tevent_req
*req
= tevent_req_callback_data(
1587 subreq
, struct tevent_req
);
1588 struct recover_db_state
*state
= tevent_req_data(
1589 req
, struct recover_db_state
);
1590 struct ctdb_req_control request
;
1595 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1597 TALLOC_FREE(subreq
);
1602 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1603 state
->count
, err_list
,
1606 D_ERR("control FREEZE_DB failed for db %s"
1607 " on node %u, ret=%d\n",
1608 state
->db_name
, pnn
, ret2
);
1609 state
->ban_credits
[pnn
] += 1;
1611 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1612 state
->db_name
, ret
);
1614 tevent_req_error(req
, ret
);
1618 ctdb_req_control_db_transaction_start(&request
, &state
->transdb
);
1619 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1621 state
->pnn_list
, state
->count
,
1622 TIMEOUT(), &request
);
1623 if (tevent_req_nomem(subreq
, req
)) {
1626 tevent_req_set_callback(subreq
, recover_db_transaction_started
, req
);
1629 static void recover_db_transaction_started(struct tevent_req
*subreq
)
1631 struct tevent_req
*req
= tevent_req_callback_data(
1632 subreq
, struct tevent_req
);
1633 struct recover_db_state
*state
= tevent_req_data(
1634 req
, struct recover_db_state
);
1639 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1641 TALLOC_FREE(subreq
);
1646 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1650 D_ERR("control TRANSACTION_DB failed for db=%s"
1651 " on node %u, ret=%d\n",
1652 state
->db_name
, pnn
, ret2
);
1654 D_ERR("control TRANSACTION_DB failed for db=%s,"
1655 " ret=%d\n", state
->db_name
, ret
);
1657 tevent_req_error(req
, ret
);
1661 state
->recdb
= recdb_create(state
, state
->db_id
, state
->db_name
,
1663 state
->tun_list
->database_hash_size
,
1664 state
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
);
1665 if (tevent_req_nomem(state
->recdb
, req
)) {
1669 if ((state
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
) ||
1670 (state
->db_flags
& CTDB_DB_FLAGS_REPLICATED
)) {
1671 subreq
= collect_highseqnum_db_send(
1672 state
, state
->ev
, state
->client
,
1673 state
->pnn_list
, state
->count
, state
->caps
,
1674 state
->ban_credits
, state
->db_id
,
1677 subreq
= collect_all_db_send(
1678 state
, state
->ev
, state
->client
,
1679 state
->pnn_list
, state
->count
, state
->caps
,
1680 state
->ban_credits
, state
->db_id
,
1683 if (tevent_req_nomem(subreq
, req
)) {
1686 tevent_req_set_callback(subreq
, recover_db_collect_done
, req
);
1689 static void recover_db_collect_done(struct tevent_req
*subreq
)
1691 struct tevent_req
*req
= tevent_req_callback_data(
1692 subreq
, struct tevent_req
);
1693 struct recover_db_state
*state
= tevent_req_data(
1694 req
, struct recover_db_state
);
1695 struct ctdb_req_control request
;
1699 if ((state
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
) ||
1700 (state
->db_flags
& CTDB_DB_FLAGS_REPLICATED
)) {
1701 status
= collect_highseqnum_db_recv(subreq
, &ret
);
1703 status
= collect_all_db_recv(subreq
, &ret
);
1705 TALLOC_FREE(subreq
);
1707 tevent_req_error(req
, ret
);
1711 ctdb_req_control_wipe_database(&request
, &state
->transdb
);
1712 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1714 state
->pnn_list
, state
->count
,
1715 TIMEOUT(), &request
);
1716 if (tevent_req_nomem(subreq
, req
)) {
1719 tevent_req_set_callback(subreq
, recover_db_wipedb_done
, req
);
1722 static void recover_db_wipedb_done(struct tevent_req
*subreq
)
1724 struct tevent_req
*req
= tevent_req_callback_data(
1725 subreq
, struct tevent_req
);
1726 struct recover_db_state
*state
= tevent_req_data(
1727 req
, struct recover_db_state
);
1732 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1734 TALLOC_FREE(subreq
);
1739 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1743 D_ERR("control WIPEDB failed for db %s on node %u,"
1744 " ret=%d\n", state
->db_name
, pnn
, ret2
);
1746 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1747 state
->db_name
, ret
);
1749 tevent_req_error(req
, ret
);
1753 subreq
= push_database_send(state
, state
->ev
, state
->client
,
1754 state
->pnn_list
, state
->count
,
1755 state
->caps
, state
->tun_list
,
1757 if (tevent_req_nomem(subreq
, req
)) {
1760 tevent_req_set_callback(subreq
, recover_db_pushdb_done
, req
);
1763 static void recover_db_pushdb_done(struct tevent_req
*subreq
)
1765 struct tevent_req
*req
= tevent_req_callback_data(
1766 subreq
, struct tevent_req
);
1767 struct recover_db_state
*state
= tevent_req_data(
1768 req
, struct recover_db_state
);
1769 struct ctdb_req_control request
;
1773 status
= push_database_recv(subreq
, &ret
);
1774 TALLOC_FREE(subreq
);
1776 tevent_req_error(req
, ret
);
1780 TALLOC_FREE(state
->recdb
);
1782 ctdb_req_control_db_transaction_commit(&request
, &state
->transdb
);
1783 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1785 state
->pnn_list
, state
->count
,
1786 TIMEOUT(), &request
);
1787 if (tevent_req_nomem(subreq
, req
)) {
1790 tevent_req_set_callback(subreq
, recover_db_transaction_committed
, req
);
1793 static void recover_db_transaction_committed(struct tevent_req
*subreq
)
1795 struct tevent_req
*req
= tevent_req_callback_data(
1796 subreq
, struct tevent_req
);
1797 struct recover_db_state
*state
= tevent_req_data(
1798 req
, struct recover_db_state
);
1799 struct ctdb_req_control request
;
1804 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1806 TALLOC_FREE(subreq
);
1811 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1815 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1816 " on node %u, ret=%d\n",
1817 state
->db_name
, pnn
, ret2
);
1819 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1820 " ret=%d\n", state
->db_name
, ret
);
1822 tevent_req_error(req
, ret
);
1826 ctdb_req_control_db_thaw(&request
, state
->db_id
);
1827 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1829 state
->pnn_list
, state
->count
,
1830 TIMEOUT(), &request
);
1831 if (tevent_req_nomem(subreq
, req
)) {
1834 tevent_req_set_callback(subreq
, recover_db_thaw_done
, req
);
1837 static void recover_db_thaw_done(struct tevent_req
*subreq
)
1839 struct tevent_req
*req
= tevent_req_callback_data(
1840 subreq
, struct tevent_req
);
1841 struct recover_db_state
*state
= tevent_req_data(
1842 req
, struct recover_db_state
);
1847 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1849 TALLOC_FREE(subreq
);
1854 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1858 D_ERR("control DB_THAW failed for db %s on node %u,"
1859 " ret=%d\n", state
->db_name
, pnn
, ret2
);
1861 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1862 state
->db_name
, ret
);
1864 tevent_req_error(req
, ret
);
1868 tevent_req_done(req
);
1871 static bool recover_db_recv(struct tevent_req
*req
)
1873 return generic_recv(req
, NULL
);
1878 * Start database recovery for each database
1880 * Try to recover each database 5 times before failing recovery.
1883 struct db_recovery_state
{
1884 struct tevent_context
*ev
;
1885 struct ctdb_dbid_map
*dbmap
;
1890 struct db_recovery_one_state
{
1891 struct tevent_req
*req
;
1892 struct ctdb_client_context
*client
;
1893 struct ctdb_dbid_map
*dbmap
;
1894 struct ctdb_tunable_list
*tun_list
;
1898 uint32_t *ban_credits
;
1899 uint32_t generation
;
1905 static void db_recovery_one_done(struct tevent_req
*subreq
);
1907 static struct tevent_req
*db_recovery_send(TALLOC_CTX
*mem_ctx
,
1908 struct tevent_context
*ev
,
1909 struct ctdb_client_context
*client
,
1910 struct ctdb_dbid_map
*dbmap
,
1911 struct ctdb_tunable_list
*tun_list
,
1912 uint32_t *pnn_list
, int count
,
1914 uint32_t *ban_credits
,
1915 uint32_t generation
)
1917 struct tevent_req
*req
, *subreq
;
1918 struct db_recovery_state
*state
;
1921 req
= tevent_req_create(mem_ctx
, &state
, struct db_recovery_state
);
1927 state
->dbmap
= dbmap
;
1928 state
->num_replies
= 0;
1929 state
->num_failed
= 0;
1931 if (dbmap
->num
== 0) {
1932 tevent_req_done(req
);
1933 return tevent_req_post(req
, ev
);
1936 for (i
=0; i
<dbmap
->num
; i
++) {
1937 struct db_recovery_one_state
*substate
;
1939 substate
= talloc_zero(state
, struct db_recovery_one_state
);
1940 if (tevent_req_nomem(substate
, req
)) {
1941 return tevent_req_post(req
, ev
);
1944 substate
->req
= req
;
1945 substate
->client
= client
;
1946 substate
->dbmap
= dbmap
;
1947 substate
->tun_list
= tun_list
;
1948 substate
->pnn_list
= pnn_list
;
1949 substate
->count
= count
;
1950 substate
->caps
= caps
;
1951 substate
->ban_credits
= ban_credits
;
1952 substate
->generation
= generation
;
1953 substate
->db_id
= dbmap
->dbs
[i
].db_id
;
1954 substate
->db_flags
= dbmap
->dbs
[i
].flags
;
1956 subreq
= recover_db_send(state
, ev
, client
, tun_list
,
1957 pnn_list
, count
, caps
, ban_credits
,
1958 generation
, substate
->db_id
,
1959 substate
->db_flags
);
1960 if (tevent_req_nomem(subreq
, req
)) {
1961 return tevent_req_post(req
, ev
);
1963 tevent_req_set_callback(subreq
, db_recovery_one_done
,
1965 D_NOTICE("recover database 0x%08x\n", substate
->db_id
);
1971 static void db_recovery_one_done(struct tevent_req
*subreq
)
1973 struct db_recovery_one_state
*substate
= tevent_req_callback_data(
1974 subreq
, struct db_recovery_one_state
);
1975 struct tevent_req
*req
= substate
->req
;
1976 struct db_recovery_state
*state
= tevent_req_data(
1977 req
, struct db_recovery_state
);
1980 status
= recover_db_recv(subreq
);
1981 TALLOC_FREE(subreq
);
1984 talloc_free(substate
);
1988 substate
->num_fails
+= 1;
1989 if (substate
->num_fails
< NUM_RETRIES
) {
1990 subreq
= recover_db_send(state
, state
->ev
, substate
->client
,
1992 substate
->pnn_list
, substate
->count
,
1993 substate
->caps
, substate
->ban_credits
,
1994 substate
->generation
, substate
->db_id
,
1995 substate
->db_flags
);
1996 if (tevent_req_nomem(subreq
, req
)) {
1999 tevent_req_set_callback(subreq
, db_recovery_one_done
, substate
);
2000 D_NOTICE("recover database 0x%08x, attempt %d\n",
2001 substate
->db_id
, substate
->num_fails
+1);
2006 state
->num_failed
+= 1;
2009 state
->num_replies
+= 1;
2011 if (state
->num_replies
== state
->dbmap
->num
) {
2012 tevent_req_done(req
);
2016 static bool db_recovery_recv(struct tevent_req
*req
, int *count
)
2018 struct db_recovery_state
*state
= tevent_req_data(
2019 req
, struct db_recovery_state
);
2022 if (tevent_req_is_unix_error(req
, &err
)) {
2027 *count
= state
->num_replies
- state
->num_failed
;
2029 if (state
->num_failed
> 0) {
2038 * Run the parallel database recovery
2043 * - Get capabilities from all nodes
2045 * - Set RECOVERY_ACTIVE
2046 * - Send START_RECOVERY
2047 * - Update vnnmap on all nodes
2048 * - Run database recovery
2049 * - Set RECOVERY_NORMAL
2050 * - Send END_RECOVERY
2053 struct recovery_state
{
2054 struct tevent_context
*ev
;
2055 struct ctdb_client_context
*client
;
2056 uint32_t generation
;
2060 struct ctdb_node_map
*nodemap
;
2062 uint32_t *ban_credits
;
2063 struct ctdb_tunable_list
*tun_list
;
2064 struct ctdb_vnn_map
*vnnmap
;
2065 struct ctdb_dbid_map
*dbmap
;
2068 static void recovery_tunables_done(struct tevent_req
*subreq
);
2069 static void recovery_nodemap_done(struct tevent_req
*subreq
);
2070 static void recovery_vnnmap_done(struct tevent_req
*subreq
);
2071 static void recovery_capabilities_done(struct tevent_req
*subreq
);
2072 static void recovery_dbmap_done(struct tevent_req
*subreq
);
2073 static void recovery_active_done(struct tevent_req
*subreq
);
2074 static void recovery_start_recovery_done(struct tevent_req
*subreq
);
2075 static void recovery_vnnmap_update_done(struct tevent_req
*subreq
);
2076 static void recovery_db_recovery_done(struct tevent_req
*subreq
);
2077 static void recovery_failed_done(struct tevent_req
*subreq
);
2078 static void recovery_normal_done(struct tevent_req
*subreq
);
2079 static void recovery_end_recovery_done(struct tevent_req
*subreq
);
2081 static struct tevent_req
*recovery_send(TALLOC_CTX
*mem_ctx
,
2082 struct tevent_context
*ev
,
2083 struct ctdb_client_context
*client
,
2084 uint32_t generation
)
2086 struct tevent_req
*req
, *subreq
;
2087 struct recovery_state
*state
;
2088 struct ctdb_req_control request
;
2090 req
= tevent_req_create(mem_ctx
, &state
, struct recovery_state
);
2096 state
->client
= client
;
2097 state
->generation
= generation
;
2098 state
->destnode
= ctdb_client_pnn(client
);
2100 ctdb_req_control_get_all_tunables(&request
);
2101 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2102 state
->destnode
, TIMEOUT(),
2104 if (tevent_req_nomem(subreq
, req
)) {
2105 return tevent_req_post(req
, ev
);
2107 tevent_req_set_callback(subreq
, recovery_tunables_done
, req
);
2112 static void recovery_tunables_done(struct tevent_req
*subreq
)
2114 struct tevent_req
*req
= tevent_req_callback_data(
2115 subreq
, struct tevent_req
);
2116 struct recovery_state
*state
= tevent_req_data(
2117 req
, struct recovery_state
);
2118 struct ctdb_reply_control
*reply
;
2119 struct ctdb_req_control request
;
2123 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2124 TALLOC_FREE(subreq
);
2126 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret
);
2127 tevent_req_error(req
, ret
);
2131 ret
= ctdb_reply_control_get_all_tunables(reply
, state
,
2134 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret
);
2135 tevent_req_error(req
, EPROTO
);
2141 recover_timeout
= state
->tun_list
->recover_timeout
;
2143 ctdb_req_control_get_nodemap(&request
);
2144 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2145 state
->destnode
, TIMEOUT(),
2147 if (tevent_req_nomem(subreq
, req
)) {
2150 tevent_req_set_callback(subreq
, recovery_nodemap_done
, req
);
2153 static void recovery_nodemap_done(struct tevent_req
*subreq
)
2155 struct tevent_req
*req
= tevent_req_callback_data(
2156 subreq
, struct tevent_req
);
2157 struct recovery_state
*state
= tevent_req_data(
2158 req
, struct recovery_state
);
2159 struct ctdb_reply_control
*reply
;
2160 struct ctdb_req_control request
;
2164 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2165 TALLOC_FREE(subreq
);
2167 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2168 state
->destnode
, ret
);
2169 tevent_req_error(req
, ret
);
2173 ret
= ctdb_reply_control_get_nodemap(reply
, state
, &state
->nodemap
);
2175 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret
);
2176 tevent_req_error(req
, ret
);
2180 state
->count
= list_of_active_nodes(state
->nodemap
, CTDB_UNKNOWN_PNN
,
2181 state
, &state
->pnn_list
);
2182 if (state
->count
<= 0) {
2183 tevent_req_error(req
, ENOMEM
);
2187 state
->ban_credits
= talloc_zero_array(state
, uint32_t,
2188 state
->nodemap
->num
);
2189 if (tevent_req_nomem(state
->ban_credits
, req
)) {
2193 ctdb_req_control_getvnnmap(&request
);
2194 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2195 state
->destnode
, TIMEOUT(),
2197 if (tevent_req_nomem(subreq
, req
)) {
2200 tevent_req_set_callback(subreq
, recovery_vnnmap_done
, req
);
2203 static void recovery_vnnmap_done(struct tevent_req
*subreq
)
2205 struct tevent_req
*req
= tevent_req_callback_data(
2206 subreq
, struct tevent_req
);
2207 struct recovery_state
*state
= tevent_req_data(
2208 req
, struct recovery_state
);
2209 struct ctdb_reply_control
*reply
;
2210 struct ctdb_req_control request
;
2214 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2215 TALLOC_FREE(subreq
);
2217 D_ERR("control GETVNNMAP failed to node %u, ret=%d\n",
2218 state
->destnode
, ret
);
2219 tevent_req_error(req
, ret
);
2223 ret
= ctdb_reply_control_getvnnmap(reply
, state
, &state
->vnnmap
);
2225 D_ERR("control GETVNNMAP failed, ret=%d\n", ret
);
2226 tevent_req_error(req
, ret
);
2230 ctdb_req_control_get_capabilities(&request
);
2231 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2233 state
->pnn_list
, state
->count
,
2234 TIMEOUT(), &request
);
2235 if (tevent_req_nomem(subreq
, req
)) {
2238 tevent_req_set_callback(subreq
, recovery_capabilities_done
, req
);
2241 static void recovery_capabilities_done(struct tevent_req
*subreq
)
2243 struct tevent_req
*req
= tevent_req_callback_data(
2244 subreq
, struct tevent_req
);
2245 struct recovery_state
*state
= tevent_req_data(
2246 req
, struct recovery_state
);
2247 struct ctdb_reply_control
**reply
;
2248 struct ctdb_req_control request
;
2253 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
2255 TALLOC_FREE(subreq
);
2260 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2264 D_ERR("control GET_CAPABILITIES failed on node %u,"
2265 " ret=%d\n", pnn
, ret2
);
2267 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2270 tevent_req_error(req
, ret
);
2274 /* Make the array size same as nodemap */
2275 state
->caps
= talloc_zero_array(state
, uint32_t,
2276 state
->nodemap
->num
);
2277 if (tevent_req_nomem(state
->caps
, req
)) {
2281 for (i
=0; i
<state
->count
; i
++) {
2284 pnn
= state
->pnn_list
[i
];
2285 ret
= ctdb_reply_control_get_capabilities(reply
[i
],
2288 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2290 tevent_req_error(req
, EPROTO
);
2297 ctdb_req_control_get_dbmap(&request
);
2298 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2299 state
->destnode
, TIMEOUT(),
2301 if (tevent_req_nomem(subreq
, req
)) {
2304 tevent_req_set_callback(subreq
, recovery_dbmap_done
, req
);
2307 static void recovery_dbmap_done(struct tevent_req
*subreq
)
2309 struct tevent_req
*req
= tevent_req_callback_data(
2310 subreq
, struct tevent_req
);
2311 struct recovery_state
*state
= tevent_req_data(
2312 req
, struct recovery_state
);
2313 struct ctdb_reply_control
*reply
;
2314 struct ctdb_req_control request
;
2318 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2319 TALLOC_FREE(subreq
);
2321 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2322 state
->destnode
, ret
);
2323 tevent_req_error(req
, ret
);
2327 ret
= ctdb_reply_control_get_dbmap(reply
, state
, &state
->dbmap
);
2329 D_ERR("control GET_DBMAP failed, ret=%d\n", ret
);
2330 tevent_req_error(req
, ret
);
2334 ctdb_req_control_set_recmode(&request
, CTDB_RECOVERY_ACTIVE
);
2335 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2337 state
->pnn_list
, state
->count
,
2338 TIMEOUT(), &request
);
2339 if (tevent_req_nomem(subreq
, req
)) {
2342 tevent_req_set_callback(subreq
, recovery_active_done
, req
);
2345 static void recovery_active_done(struct tevent_req
*subreq
)
2347 struct tevent_req
*req
= tevent_req_callback_data(
2348 subreq
, struct tevent_req
);
2349 struct recovery_state
*state
= tevent_req_data(
2350 req
, struct recovery_state
);
2351 struct ctdb_req_control request
;
2352 struct ctdb_vnn_map
*vnnmap
;
2357 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2359 TALLOC_FREE(subreq
);
2364 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2368 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2369 " ret=%d\n", pnn
, ret2
);
2371 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2374 tevent_req_error(req
, ret
);
2378 D_ERR("Set recovery mode to ACTIVE\n");
2380 /* Calculate new VNNMAP */
2382 for (i
=0; i
<state
->nodemap
->num
; i
++) {
2383 if (state
->nodemap
->node
[i
].flags
& NODE_FLAGS_INACTIVE
) {
2386 if (!(state
->caps
[i
] & CTDB_CAP_LMASTER
)) {
2393 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2396 vnnmap
= talloc_zero(state
, struct ctdb_vnn_map
);
2397 if (tevent_req_nomem(vnnmap
, req
)) {
2401 vnnmap
->size
= (count
== 0 ? 1 : count
);
2402 vnnmap
->map
= talloc_array(vnnmap
, uint32_t, vnnmap
->size
);
2403 if (tevent_req_nomem(vnnmap
->map
, req
)) {
2408 vnnmap
->map
[0] = state
->destnode
;
2411 for (i
=0; i
<state
->nodemap
->num
; i
++) {
2412 if (state
->nodemap
->node
[i
].flags
&
2413 NODE_FLAGS_INACTIVE
) {
2416 if (!(state
->caps
[i
] & CTDB_CAP_LMASTER
)) {
2420 vnnmap
->map
[count
] = state
->nodemap
->node
[i
].pnn
;
2425 vnnmap
->generation
= state
->generation
;
2427 talloc_free(state
->vnnmap
);
2428 state
->vnnmap
= vnnmap
;
2430 ctdb_req_control_start_recovery(&request
);
2431 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2433 state
->pnn_list
, state
->count
,
2434 TIMEOUT(), &request
);
2435 if (tevent_req_nomem(subreq
, req
)) {
2438 tevent_req_set_callback(subreq
, recovery_start_recovery_done
, req
);
2441 static void recovery_start_recovery_done(struct tevent_req
*subreq
)
2443 struct tevent_req
*req
= tevent_req_callback_data(
2444 subreq
, struct tevent_req
);
2445 struct recovery_state
*state
= tevent_req_data(
2446 req
, struct recovery_state
);
2447 struct ctdb_req_control request
;
2452 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2454 TALLOC_FREE(subreq
);
2459 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2463 D_ERR("failed to run start_recovery event on node %u,"
2464 " ret=%d\n", pnn
, ret2
);
2466 D_ERR("failed to run start_recovery event, ret=%d\n",
2469 tevent_req_error(req
, ret
);
2473 D_ERR("start_recovery event finished\n");
2475 ctdb_req_control_setvnnmap(&request
, state
->vnnmap
);
2476 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2478 state
->pnn_list
, state
->count
,
2479 TIMEOUT(), &request
);
2480 if (tevent_req_nomem(subreq
, req
)) {
2483 tevent_req_set_callback(subreq
, recovery_vnnmap_update_done
, req
);
2486 static void recovery_vnnmap_update_done(struct tevent_req
*subreq
)
2488 struct tevent_req
*req
= tevent_req_callback_data(
2489 subreq
, struct tevent_req
);
2490 struct recovery_state
*state
= tevent_req_data(
2491 req
, struct recovery_state
);
2496 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2498 TALLOC_FREE(subreq
);
2503 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2507 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2510 D_ERR("failed to update VNNMAP, ret=%d\n", ret
);
2512 tevent_req_error(req
, ret
);
2516 D_NOTICE("updated VNNMAP\n");
2518 subreq
= db_recovery_send(state
, state
->ev
, state
->client
,
2519 state
->dbmap
, state
->tun_list
,
2520 state
->pnn_list
, state
->count
,
2521 state
->caps
, state
->ban_credits
,
2522 state
->vnnmap
->generation
);
2523 if (tevent_req_nomem(subreq
, req
)) {
2526 tevent_req_set_callback(subreq
, recovery_db_recovery_done
, req
);
2529 static void recovery_db_recovery_done(struct tevent_req
*subreq
)
2531 struct tevent_req
*req
= tevent_req_callback_data(
2532 subreq
, struct tevent_req
);
2533 struct recovery_state
*state
= tevent_req_data(
2534 req
, struct recovery_state
);
2535 struct ctdb_req_control request
;
2539 status
= db_recovery_recv(subreq
, &count
);
2540 TALLOC_FREE(subreq
);
2542 D_ERR("%d of %d databases recovered\n", count
, state
->dbmap
->num
);
2545 uint32_t max_pnn
= CTDB_UNKNOWN_PNN
, max_credits
= 0;
2548 /* Bans are not enabled */
2549 if (state
->tun_list
->enable_bans
== 0) {
2550 tevent_req_error(req
, EIO
);
2554 for (i
=0; i
<state
->count
; i
++) {
2556 pnn
= state
->pnn_list
[i
];
2557 if (state
->ban_credits
[pnn
] > max_credits
) {
2559 max_credits
= state
->ban_credits
[pnn
];
2563 /* If pulling database fails multiple times */
2564 if (max_credits
>= NUM_RETRIES
) {
2565 struct ctdb_req_message message
;
2567 D_ERR("Assigning banning credits to node %u\n",
2570 message
.srvid
= CTDB_SRVID_BANNING
;
2571 message
.data
.pnn
= max_pnn
;
2573 subreq
= ctdb_client_message_send(
2574 state
, state
->ev
, state
->client
,
2575 ctdb_client_pnn(state
->client
),
2577 if (tevent_req_nomem(subreq
, req
)) {
2580 tevent_req_set_callback(subreq
, recovery_failed_done
,
2583 tevent_req_error(req
, EIO
);
2588 ctdb_req_control_set_recmode(&request
, CTDB_RECOVERY_NORMAL
);
2589 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2591 state
->pnn_list
, state
->count
,
2592 TIMEOUT(), &request
);
2593 if (tevent_req_nomem(subreq
, req
)) {
2596 tevent_req_set_callback(subreq
, recovery_normal_done
, req
);
2599 static void recovery_failed_done(struct tevent_req
*subreq
)
2601 struct tevent_req
*req
= tevent_req_callback_data(
2602 subreq
, struct tevent_req
);
2606 status
= ctdb_client_message_recv(subreq
, &ret
);
2607 TALLOC_FREE(subreq
);
2609 D_ERR("failed to assign banning credits, ret=%d\n", ret
);
2612 tevent_req_error(req
, EIO
);
2615 static void recovery_normal_done(struct tevent_req
*subreq
)
2617 struct tevent_req
*req
= tevent_req_callback_data(
2618 subreq
, struct tevent_req
);
2619 struct recovery_state
*state
= tevent_req_data(
2620 req
, struct recovery_state
);
2621 struct ctdb_req_control request
;
2626 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
2628 TALLOC_FREE(subreq
);
2633 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2637 D_ERR("failed to set recovery mode NORMAL on node %u,"
2638 " ret=%d\n", pnn
, ret2
);
2640 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
2643 tevent_req_error(req
, ret
);
2647 D_ERR("Set recovery mode to NORMAL\n");
2649 ctdb_req_control_end_recovery(&request
);
2650 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2652 state
->pnn_list
, state
->count
,
2653 TIMEOUT(), &request
);
2654 if (tevent_req_nomem(subreq
, req
)) {
2657 tevent_req_set_callback(subreq
, recovery_end_recovery_done
, req
);
2660 static void recovery_end_recovery_done(struct tevent_req
*subreq
)
2662 struct tevent_req
*req
= tevent_req_callback_data(
2663 subreq
, struct tevent_req
);
2664 struct recovery_state
*state
= tevent_req_data(
2665 req
, struct recovery_state
);
2670 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
2672 TALLOC_FREE(subreq
);
2677 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2681 D_ERR("failed to run recovered event on node %u,"
2682 " ret=%d\n", pnn
, ret2
);
2684 D_ERR("failed to run recovered event, ret=%d\n", ret
);
2686 tevent_req_error(req
, ret
);
2690 D_ERR("recovered event finished\n");
2692 tevent_req_done(req
);
2695 static void recovery_recv(struct tevent_req
*req
, int *perr
)
2697 generic_recv(req
, perr
);
2700 static void usage(const char *progname
)
2702 fprintf(stderr
, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2708 * Arguments - log fd, write fd, socket path, generation
2710 int main(int argc
, char *argv
[])
2713 const char *sockpath
;
2714 TALLOC_CTX
*mem_ctx
;
2715 struct tevent_context
*ev
;
2716 struct ctdb_client_context
*client
;
2718 struct tevent_req
*req
;
2719 uint32_t generation
;
2726 write_fd
= atoi(argv
[1]);
2728 generation
= (uint32_t)strtoul(argv
[3], NULL
, 0);
2730 mem_ctx
= talloc_new(NULL
);
2731 if (mem_ctx
== NULL
) {
2732 fprintf(stderr
, "recovery: talloc_new() failed\n");
2736 ret
= logging_init(mem_ctx
, NULL
, NULL
, "ctdb-recovery");
2738 fprintf(stderr
, "recovery: Unable to initialize logging\n");
2742 ev
= tevent_context_init(mem_ctx
);
2744 D_ERR("tevent_context_init() failed\n");
2748 ret
= ctdb_client_init(mem_ctx
, ev
, sockpath
, &client
);
2750 D_ERR("ctdb_client_init() failed, ret=%d\n", ret
);
2754 req
= recovery_send(mem_ctx
, ev
, client
, generation
);
2756 D_ERR("database_recover_send() failed\n");
2760 if (! tevent_req_poll(req
, ev
)) {
2761 D_ERR("tevent_req_poll() failed\n");
2765 recovery_recv(req
, &ret
);
2768 D_ERR("database recovery failed, ret=%d\n", ret
);
2772 sys_write(write_fd
, &ret
, sizeof(ret
));
2776 TALLOC_FREE(mem_ctx
);