2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/time.h"
31 #include "lib/util/tevent_unix.h"
33 #include "protocol/protocol.h"
34 #include "protocol/protocol_api.h"
35 #include "client/client.h"
37 static int recover_timeout
= 30;
41 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
43 static void LOG(const char *fmt
, ...)
48 vfprintf(stderr
, fmt
, ap
);
56 static ssize_t
sys_write(int fd
, const void *buf
, size_t count
)
61 ret
= write(fd
, buf
, count
);
62 #if defined(EWOULDBLOCK)
63 } while (ret
== -1 && (errno
== EINTR
|| errno
== EAGAIN
|| errno
== EWOULDBLOCK
));
65 } while (ret
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
70 static bool generic_recv(struct tevent_req
*req
, int *perr
)
74 if (tevent_req_is_unix_error(req
, &err
)) {
84 static uint64_t rec_srvid
= CTDB_SRVID_RECOVERY
;
86 static uint64_t srvid_next(void)
93 * Recovery database functions
96 struct recdb_context
{
104 static struct recdb_context
*recdb_create(TALLOC_CTX
*mem_ctx
, uint32_t db_id
,
107 uint32_t hash_size
, bool persistent
)
109 static char *db_dir_state
= NULL
;
110 struct recdb_context
*recdb
;
111 unsigned int tdb_flags
;
113 recdb
= talloc(mem_ctx
, struct recdb_context
);
118 if (db_dir_state
== NULL
) {
119 db_dir_state
= getenv("CTDB_DBDIR_STATE");
122 recdb
->db_name
= db_name
;
123 recdb
->db_id
= db_id
;
124 recdb
->db_path
= talloc_asprintf(recdb
, "%s/recdb.%s",
125 db_dir_state
!= NULL
?
127 dirname(discard_const(db_path
)),
129 if (recdb
->db_path
== NULL
) {
133 unlink(recdb
->db_path
);
135 tdb_flags
= TDB_NOLOCK
| TDB_INCOMPATIBLE_HASH
| TDB_DISALLOW_NESTING
;
136 recdb
->db
= tdb_wrap_open(mem_ctx
, recdb
->db_path
, hash_size
,
137 tdb_flags
, O_RDWR
|O_CREAT
|O_EXCL
, 0600);
138 if (recdb
->db
== NULL
) {
140 LOG("failed to create recovery db %s\n", recdb
->db_path
);
143 recdb
->persistent
= persistent
;
148 static uint32_t recdb_id(struct recdb_context
*recdb
)
153 static const char *recdb_name(struct recdb_context
*recdb
)
155 return recdb
->db_name
;
158 static const char *recdb_path(struct recdb_context
*recdb
)
160 return recdb
->db_path
;
163 static struct tdb_context
*recdb_tdb(struct recdb_context
*recdb
)
165 return recdb
->db
->tdb
;
168 static bool recdb_persistent(struct recdb_context
*recdb
)
170 return recdb
->persistent
;
173 struct recdb_add_traverse_state
{
174 struct recdb_context
*recdb
;
178 static int recdb_add_traverse(uint32_t reqid
, struct ctdb_ltdb_header
*header
,
179 TDB_DATA key
, TDB_DATA data
,
182 struct recdb_add_traverse_state
*state
=
183 (struct recdb_add_traverse_state
*)private_data
;
184 struct ctdb_ltdb_header
*hdr
;
188 /* header is not marshalled separately in the pulldb control */
189 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
193 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
195 /* fetch the existing record, if any */
196 prev_data
= tdb_fetch(recdb_tdb(state
->recdb
), key
);
198 if (prev_data
.dptr
!= NULL
) {
199 struct ctdb_ltdb_header prev_hdr
;
201 prev_hdr
= *(struct ctdb_ltdb_header
*)prev_data
.dptr
;
202 free(prev_data
.dptr
);
203 if (hdr
->rsn
< prev_hdr
.rsn
||
204 (hdr
->rsn
== prev_hdr
.rsn
&&
205 prev_hdr
.dmaster
!= state
->mypnn
)) {
210 ret
= tdb_store(recdb_tdb(state
->recdb
), key
, data
, TDB_REPLACE
);
217 static bool recdb_add(struct recdb_context
*recdb
, int mypnn
,
218 struct ctdb_rec_buffer
*recbuf
)
220 struct recdb_add_traverse_state state
;
226 ret
= ctdb_rec_buffer_traverse(recbuf
, recdb_add_traverse
, &state
);
234 /* This function decides which records from recdb are retained */
235 static int recbuf_filter_add(struct ctdb_rec_buffer
*recbuf
, bool persistent
,
236 uint32_t reqid
, uint32_t dmaster
,
237 TDB_DATA key
, TDB_DATA data
)
239 struct ctdb_ltdb_header
*header
;
243 * skip empty records - but NOT for persistent databases:
245 * The record-by-record mode of recovery deletes empty records.
246 * For persistent databases, this can lead to data corruption
247 * by deleting records that should be there:
249 * - Assume the cluster has been running for a while.
251 * - A record R in a persistent database has been created and
252 * deleted a couple of times, the last operation being deletion,
253 * leaving an empty record with a high RSN, say 10.
255 * - Now a node N is turned off.
257 * - This leaves the local database copy of D on N with the empty
258 * copy of R and RSN 10. On all other nodes, the recovery has deleted
259 * the copy of record R.
261 * - Now the record is created again while node N is turned off.
262 * This creates R with RSN = 1 on all nodes except for N.
264 * - Now node N is turned on again. The following recovery will chose
265 * the older empty copy of R due to RSN 10 > RSN 1.
267 * ==> Hence the record is gone after the recovery.
269 * On databases like Samba's registry, this can damage the higher-level
270 * data structures built from the various tdb-level records.
272 if (!persistent
&& data
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
276 /* update the dmaster field to point to us */
277 header
= (struct ctdb_ltdb_header
*)data
.dptr
;
279 header
->dmaster
= dmaster
;
280 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
283 ret
= ctdb_rec_buffer_add(recbuf
, recbuf
, reqid
, NULL
, key
, data
);
291 struct recdb_records_traverse_state
{
292 struct ctdb_rec_buffer
*recbuf
;
299 static int recdb_records_traverse(struct tdb_context
*tdb
,
300 TDB_DATA key
, TDB_DATA data
,
303 struct recdb_records_traverse_state
*state
=
304 (struct recdb_records_traverse_state
*)private_data
;
307 ret
= recbuf_filter_add(state
->recbuf
, state
->persistent
,
308 state
->reqid
, state
->dmaster
, key
, data
);
310 state
->failed
= true;
317 static struct ctdb_rec_buffer
*recdb_records(struct recdb_context
*recdb
,
321 struct recdb_records_traverse_state state
;
324 state
.recbuf
= ctdb_rec_buffer_init(mem_ctx
, recdb_id(recdb
));
325 if (state
.recbuf
== NULL
) {
328 state
.dmaster
= dmaster
;
330 state
.persistent
= recdb_persistent(recdb
);
331 state
.failed
= false;
333 ret
= tdb_traverse_read(recdb_tdb(recdb
), recdb_records_traverse
,
335 if (ret
== -1 || state
.failed
) {
336 LOG("Failed to marshall recovery records for %s\n",
338 TALLOC_FREE(state
.recbuf
);
345 struct recdb_file_traverse_state
{
346 struct ctdb_rec_buffer
*recbuf
;
347 struct recdb_context
*recdb
;
358 static int recdb_file_traverse(struct tdb_context
*tdb
,
359 TDB_DATA key
, TDB_DATA data
,
362 struct recdb_file_traverse_state
*state
=
363 (struct recdb_file_traverse_state
*)private_data
;
366 ret
= recbuf_filter_add(state
->recbuf
, state
->persistent
,
367 state
->reqid
, state
->dmaster
, key
, data
);
369 state
->failed
= true;
373 if (ctdb_rec_buffer_len(state
->recbuf
) > state
->max_size
) {
374 ret
= ctdb_rec_buffer_write(state
->recbuf
, state
->fd
);
376 LOG("Failed to collect recovery records for %s\n",
377 recdb_name(state
->recdb
));
378 state
->failed
= true;
382 state
->num_buffers
+= 1;
384 TALLOC_FREE(state
->recbuf
);
385 state
->recbuf
= ctdb_rec_buffer_init(state
->mem_ctx
,
386 recdb_id(state
->recdb
));
387 if (state
->recbuf
== NULL
) {
388 state
->failed
= true;
396 static int recdb_file(struct recdb_context
*recdb
, TALLOC_CTX
*mem_ctx
,
397 uint32_t dmaster
, int fd
, int max_size
)
399 struct recdb_file_traverse_state state
;
402 state
.recbuf
= ctdb_rec_buffer_init(mem_ctx
, recdb_id(recdb
));
403 if (state
.recbuf
== NULL
) {
407 state
.mem_ctx
= mem_ctx
;
408 state
.dmaster
= dmaster
;
410 state
.persistent
= recdb_persistent(recdb
);
411 state
.failed
= false;
413 state
.max_size
= max_size
;
414 state
.num_buffers
= 0;
416 ret
= tdb_traverse_read(recdb_tdb(recdb
), recdb_file_traverse
, &state
);
417 if (ret
== -1 || state
.failed
) {
418 TALLOC_FREE(state
.recbuf
);
422 ret
= ctdb_rec_buffer_write(state
.recbuf
, fd
);
424 LOG("Failed to collect recovery records for %s\n",
426 TALLOC_FREE(state
.recbuf
);
429 state
.num_buffers
+= 1;
431 LOG("Wrote %d buffers of recovery records for %s\n",
432 state
.num_buffers
, recdb_name(recdb
));
434 return state
.num_buffers
;
438 * Pull database from a single node
441 struct pull_database_state
{
442 struct tevent_context
*ev
;
443 struct ctdb_client_context
*client
;
444 struct recdb_context
*recdb
;
450 static void pull_database_handler(uint64_t srvid
, TDB_DATA data
,
452 static void pull_database_register_done(struct tevent_req
*subreq
);
453 static void pull_database_old_done(struct tevent_req
*subreq
);
454 static void pull_database_unregister_done(struct tevent_req
*subreq
);
455 static void pull_database_new_done(struct tevent_req
*subreq
);
457 static struct tevent_req
*pull_database_send(
459 struct tevent_context
*ev
,
460 struct ctdb_client_context
*client
,
461 uint32_t pnn
, uint32_t caps
,
462 struct recdb_context
*recdb
)
464 struct tevent_req
*req
, *subreq
;
465 struct pull_database_state
*state
;
466 struct ctdb_req_control request
;
468 req
= tevent_req_create(mem_ctx
, &state
, struct pull_database_state
);
474 state
->client
= client
;
475 state
->recdb
= recdb
;
477 state
->srvid
= srvid_next();
479 if (caps
& CTDB_CAP_FRAGMENTED_CONTROLS
) {
480 subreq
= ctdb_client_set_message_handler_send(
481 state
, state
->ev
, state
->client
,
482 state
->srvid
, pull_database_handler
,
484 if (tevent_req_nomem(subreq
, req
)) {
485 return tevent_req_post(req
, ev
);
488 tevent_req_set_callback(subreq
, pull_database_register_done
,
492 struct ctdb_pulldb pulldb
;
494 pulldb
.db_id
= recdb_id(recdb
);
495 pulldb
.lmaster
= CTDB_LMASTER_ANY
;
497 ctdb_req_control_pull_db(&request
, &pulldb
);
498 subreq
= ctdb_client_control_send(state
, state
->ev
,
502 if (tevent_req_nomem(subreq
, req
)) {
503 return tevent_req_post(req
, ev
);
505 tevent_req_set_callback(subreq
, pull_database_old_done
, req
);
511 static void pull_database_handler(uint64_t srvid
, TDB_DATA data
,
514 struct tevent_req
*req
= talloc_get_type_abort(
515 private_data
, struct tevent_req
);
516 struct pull_database_state
*state
= tevent_req_data(
517 req
, struct pull_database_state
);
518 struct ctdb_rec_buffer
*recbuf
;
522 if (srvid
!= state
->srvid
) {
526 ret
= ctdb_rec_buffer_pull(data
.dptr
, data
.dsize
, state
, &recbuf
);
528 LOG("Invalid data received for DB_PULL messages\n");
532 if (recbuf
->db_id
!= recdb_id(state
->recdb
)) {
534 LOG("Invalid dbid:%08x for DB_PULL messages for %s\n",
535 recbuf
->db_id
, recdb_name(state
->recdb
));
539 status
= recdb_add(state
->recdb
, ctdb_client_pnn(state
->client
),
543 LOG("Failed to add records to recdb for %s\n",
544 recdb_name(state
->recdb
));
548 state
->num_records
+= recbuf
->count
;
552 static void pull_database_register_done(struct tevent_req
*subreq
)
554 struct tevent_req
*req
= tevent_req_callback_data(
555 subreq
, struct tevent_req
);
556 struct pull_database_state
*state
= tevent_req_data(
557 req
, struct pull_database_state
);
558 struct ctdb_req_control request
;
559 struct ctdb_pulldb_ext pulldb_ext
;
563 status
= ctdb_client_set_message_handler_recv(subreq
, &ret
);
566 LOG("failed to set message handler for DB_PULL for %s\n",
567 recdb_name(state
->recdb
));
568 tevent_req_error(req
, ret
);
572 pulldb_ext
.db_id
= recdb_id(state
->recdb
);
573 pulldb_ext
.lmaster
= CTDB_LMASTER_ANY
;
574 pulldb_ext
.srvid
= state
->srvid
;
576 ctdb_req_control_db_pull(&request
, &pulldb_ext
);
577 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
578 state
->pnn
, TIMEOUT(), &request
);
579 if (tevent_req_nomem(subreq
, req
)) {
582 tevent_req_set_callback(subreq
, pull_database_new_done
, req
);
585 static void pull_database_old_done(struct tevent_req
*subreq
)
587 struct tevent_req
*req
= tevent_req_callback_data(
588 subreq
, struct tevent_req
);
589 struct pull_database_state
*state
= tevent_req_data(
590 req
, struct pull_database_state
);
591 struct ctdb_reply_control
*reply
;
592 struct ctdb_rec_buffer
*recbuf
;
596 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
599 LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
600 recdb_name(state
->recdb
), state
->pnn
, ret
);
601 tevent_req_error(req
, ret
);
605 ret
= ctdb_reply_control_pull_db(reply
, state
, &recbuf
);
608 tevent_req_error(req
, ret
);
612 status
= recdb_add(state
->recdb
, ctdb_client_pnn(state
->client
),
616 tevent_req_error(req
, EIO
);
620 state
->num_records
= recbuf
->count
;
623 LOG("Pulled %d records for db %s from node %d\n",
624 state
->num_records
, recdb_name(state
->recdb
), state
->pnn
);
626 tevent_req_done(req
);
629 static void pull_database_new_done(struct tevent_req
*subreq
)
631 struct tevent_req
*req
= tevent_req_callback_data(
632 subreq
, struct tevent_req
);
633 struct pull_database_state
*state
= tevent_req_data(
634 req
, struct pull_database_state
);
635 struct ctdb_reply_control
*reply
;
636 uint32_t num_records
;
640 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
643 LOG("control DB_PULL failed for %s on node %u, ret=%d\n",
644 recdb_name(state
->recdb
), state
->pnn
, ret
);
645 tevent_req_error(req
, ret
);
649 ret
= ctdb_reply_control_db_pull(reply
, &num_records
);
651 if (num_records
!= state
->num_records
) {
652 LOG("mismatch (%u != %u) in DB_PULL records for %s\n",
653 num_records
, state
->num_records
, recdb_name(state
->recdb
));
654 tevent_req_error(req
, EIO
);
658 LOG("Pulled %d records for db %s from node %d\n",
659 state
->num_records
, recdb_name(state
->recdb
), state
->pnn
);
661 subreq
= ctdb_client_remove_message_handler_send(
662 state
, state
->ev
, state
->client
,
664 if (tevent_req_nomem(subreq
, req
)) {
667 tevent_req_set_callback(subreq
, pull_database_unregister_done
, req
);
670 static void pull_database_unregister_done(struct tevent_req
*subreq
)
672 struct tevent_req
*req
= tevent_req_callback_data(
673 subreq
, struct tevent_req
);
674 struct pull_database_state
*state
= tevent_req_data(
675 req
, struct pull_database_state
);
679 status
= ctdb_client_remove_message_handler_recv(subreq
, &ret
);
682 LOG("failed to remove message handler for DB_PULL for %s\n",
683 recdb_name(state
->recdb
));
684 tevent_req_error(req
, ret
);
688 tevent_req_done(req
);
691 static bool pull_database_recv(struct tevent_req
*req
, int *perr
)
693 return generic_recv(req
, perr
);
697 * Push database to specified nodes (old style)
700 struct push_database_old_state
{
701 struct tevent_context
*ev
;
702 struct ctdb_client_context
*client
;
703 struct recdb_context
*recdb
;
706 struct ctdb_rec_buffer
*recbuf
;
710 static void push_database_old_push_done(struct tevent_req
*subreq
);
712 static struct tevent_req
*push_database_old_send(
714 struct tevent_context
*ev
,
715 struct ctdb_client_context
*client
,
716 uint32_t *pnn_list
, int count
,
717 struct recdb_context
*recdb
)
719 struct tevent_req
*req
, *subreq
;
720 struct push_database_old_state
*state
;
721 struct ctdb_req_control request
;
724 req
= tevent_req_create(mem_ctx
, &state
,
725 struct push_database_old_state
);
731 state
->client
= client
;
732 state
->recdb
= recdb
;
733 state
->pnn_list
= pnn_list
;
734 state
->count
= count
;
737 state
->recbuf
= recdb_records(recdb
, state
,
738 ctdb_client_pnn(client
));
739 if (tevent_req_nomem(state
->recbuf
, req
)) {
740 return tevent_req_post(req
, ev
);
743 pnn
= state
->pnn_list
[state
->index
];
745 ctdb_req_control_push_db(&request
, state
->recbuf
);
746 subreq
= ctdb_client_control_send(state
, ev
, client
, pnn
,
747 TIMEOUT(), &request
);
748 if (tevent_req_nomem(subreq
, req
)) {
749 return tevent_req_post(req
, ev
);
751 tevent_req_set_callback(subreq
, push_database_old_push_done
, req
);
756 static void push_database_old_push_done(struct tevent_req
*subreq
)
758 struct tevent_req
*req
= tevent_req_callback_data(
759 subreq
, struct tevent_req
);
760 struct push_database_old_state
*state
= tevent_req_data(
761 req
, struct push_database_old_state
);
762 struct ctdb_req_control request
;
767 status
= ctdb_client_control_recv(subreq
, &ret
, NULL
, NULL
);
770 LOG("control PUSH_DB failed for db %s on node %u, ret=%d\n",
771 recdb_name(state
->recdb
), state
->pnn_list
[state
->index
],
773 tevent_req_error(req
, ret
);
778 if (state
->index
== state
->count
) {
779 TALLOC_FREE(state
->recbuf
);
780 tevent_req_done(req
);
784 pnn
= state
->pnn_list
[state
->index
];
786 ctdb_req_control_push_db(&request
, state
->recbuf
);
787 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
788 pnn
, TIMEOUT(), &request
);
789 if (tevent_req_nomem(subreq
, req
)) {
792 tevent_req_set_callback(subreq
, push_database_old_push_done
, req
);
795 static bool push_database_old_recv(struct tevent_req
*req
, int *perr
)
797 return generic_recv(req
, perr
);
801 * Push database to specified nodes (new style)
804 struct push_database_new_state
{
805 struct tevent_context
*ev
;
806 struct ctdb_client_context
*client
;
807 struct recdb_context
*recdb
;
814 int num_buffers_sent
;
818 static void push_database_new_started(struct tevent_req
*subreq
);
819 static void push_database_new_send_msg(struct tevent_req
*req
);
820 static void push_database_new_send_done(struct tevent_req
*subreq
);
821 static void push_database_new_confirmed(struct tevent_req
*subreq
);
823 static struct tevent_req
*push_database_new_send(
825 struct tevent_context
*ev
,
826 struct ctdb_client_context
*client
,
827 uint32_t *pnn_list
, int count
,
828 struct recdb_context
*recdb
,
831 struct tevent_req
*req
, *subreq
;
832 struct push_database_new_state
*state
;
833 struct ctdb_req_control request
;
834 struct ctdb_pulldb_ext pulldb_ext
;
838 req
= tevent_req_create(mem_ctx
, &state
,
839 struct push_database_new_state
);
845 state
->client
= client
;
846 state
->recdb
= recdb
;
847 state
->pnn_list
= pnn_list
;
848 state
->count
= count
;
850 state
->srvid
= srvid_next();
851 state
->dmaster
= ctdb_client_pnn(client
);
852 state
->num_buffers_sent
= 0;
853 state
->num_records
= 0;
855 filename
= talloc_asprintf(state
, "%s.dat", recdb_path(recdb
));
856 if (tevent_req_nomem(filename
, req
)) {
857 return tevent_req_post(req
, ev
);
860 state
->fd
= open(filename
, O_RDWR
|O_CREAT
, 0644);
861 if (state
->fd
== -1) {
862 tevent_req_error(req
, errno
);
863 return tevent_req_post(req
, ev
);
866 talloc_free(filename
);
868 state
->num_buffers
= recdb_file(recdb
, state
, state
->dmaster
,
869 state
->fd
, max_size
);
870 if (state
->num_buffers
== -1) {
871 tevent_req_error(req
, ENOMEM
);
872 return tevent_req_post(req
, ev
);
875 offset
= lseek(state
->fd
, 0, SEEK_SET
);
877 tevent_req_error(req
, EIO
);
878 return tevent_req_post(req
, ev
);
881 pulldb_ext
.db_id
= recdb_id(recdb
);
882 pulldb_ext
.srvid
= state
->srvid
;
884 ctdb_req_control_db_push_start(&request
, &pulldb_ext
);
885 subreq
= ctdb_client_control_multi_send(state
, ev
, client
,
887 TIMEOUT(), &request
);
888 if (tevent_req_nomem(subreq
, req
)) {
889 return tevent_req_post(req
, ev
);
891 tevent_req_set_callback(subreq
, push_database_new_started
, req
);
896 static void push_database_new_started(struct tevent_req
*subreq
)
898 struct tevent_req
*req
= tevent_req_callback_data(
899 subreq
, struct tevent_req
);
900 struct push_database_new_state
*state
= tevent_req_data(
901 req
, struct push_database_new_state
);
906 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
913 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
917 LOG("control DB_PUSH_START failed for db %s "
918 "on node %u, ret=%d\n",
919 recdb_name(state
->recdb
), pnn
, ret2
);
921 LOG("control DB_PUSH_START failed for db %s, ret=%d\n",
922 recdb_name(state
->recdb
), ret
);
924 talloc_free(err_list
);
926 tevent_req_error(req
, ret
);
930 push_database_new_send_msg(req
);
933 static void push_database_new_send_msg(struct tevent_req
*req
)
935 struct push_database_new_state
*state
= tevent_req_data(
936 req
, struct push_database_new_state
);
937 struct tevent_req
*subreq
;
938 struct ctdb_rec_buffer
*recbuf
;
939 struct ctdb_req_message message
;
943 if (state
->num_buffers_sent
== state
->num_buffers
) {
944 struct ctdb_req_control request
;
946 ctdb_req_control_db_push_confirm(&request
,
947 recdb_id(state
->recdb
));
948 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
952 TIMEOUT(), &request
);
953 if (tevent_req_nomem(subreq
, req
)) {
956 tevent_req_set_callback(subreq
, push_database_new_confirmed
,
961 ret
= ctdb_rec_buffer_read(state
->fd
, state
, &recbuf
);
963 tevent_req_error(req
, ret
);
967 data
.dsize
= ctdb_rec_buffer_len(recbuf
);
968 data
.dptr
= talloc_size(state
, data
.dsize
);
969 if (tevent_req_nomem(data
.dptr
, req
)) {
973 ctdb_rec_buffer_push(recbuf
, data
.dptr
);
975 message
.srvid
= state
->srvid
;
976 message
.data
.data
= data
;
978 LOG("Pushing buffer %d with %d records for %s\n",
979 state
->num_buffers_sent
, recbuf
->count
, recdb_name(state
->recdb
));
981 subreq
= ctdb_client_message_multi_send(state
, state
->ev
,
983 state
->pnn_list
, state
->count
,
985 if (tevent_req_nomem(subreq
, req
)) {
988 tevent_req_set_callback(subreq
, push_database_new_send_done
, req
);
990 state
->num_records
+= recbuf
->count
;
992 talloc_free(data
.dptr
);
996 static void push_database_new_send_done(struct tevent_req
*subreq
)
998 struct tevent_req
*req
= tevent_req_callback_data(
999 subreq
, struct tevent_req
);
1000 struct push_database_new_state
*state
= tevent_req_data(
1001 req
, struct push_database_new_state
);
1005 status
= ctdb_client_message_multi_recv(subreq
, &ret
, NULL
, NULL
);
1006 TALLOC_FREE(subreq
);
1008 LOG("Sending recovery records failed for %s\n",
1009 recdb_name(state
->recdb
));
1010 tevent_req_error(req
, ret
);
1014 state
->num_buffers_sent
+= 1;
1016 push_database_new_send_msg(req
);
1019 static void push_database_new_confirmed(struct tevent_req
*subreq
)
1021 struct tevent_req
*req
= tevent_req_callback_data(
1022 subreq
, struct tevent_req
);
1023 struct push_database_new_state
*state
= tevent_req_data(
1024 req
, struct push_database_new_state
);
1025 struct ctdb_reply_control
**reply
;
1029 uint32_t num_records
;
1031 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
1033 TALLOC_FREE(subreq
);
1038 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1039 state
->count
, err_list
,
1042 LOG("control DB_PUSH_CONFIRM failed for %s on node %u,"
1043 " ret=%d\n", recdb_name(state
->recdb
), pnn
, ret2
);
1045 LOG("control DB_PUSH_CONFIRM failed for %s, ret=%d\n",
1046 recdb_name(state
->recdb
), ret
);
1048 tevent_req_error(req
, ret
);
1052 for (i
=0; i
<state
->count
; i
++) {
1053 ret
= ctdb_reply_control_db_push_confirm(reply
[i
],
1056 tevent_req_error(req
, EPROTO
);
1060 if (num_records
!= state
->num_records
) {
1061 LOG("Node %u received %d of %d records for %s\n",
1062 state
->pnn_list
[i
], num_records
,
1063 state
->num_records
, recdb_name(state
->recdb
));
1064 tevent_req_error(req
, EPROTO
);
1071 LOG("Pushed %d records for db %s\n",
1072 state
->num_records
, recdb_name(state
->recdb
));
1074 tevent_req_done(req
);
1077 static bool push_database_new_recv(struct tevent_req
*req
, int *perr
)
1079 return generic_recv(req
, perr
);
1083 * wrapper for push_database_old and push_database_new
1086 struct push_database_state
{
1087 bool old_done
, new_done
;
1090 static void push_database_old_done(struct tevent_req
*subreq
);
1091 static void push_database_new_done(struct tevent_req
*subreq
);
1093 static struct tevent_req
*push_database_send(
1094 TALLOC_CTX
*mem_ctx
,
1095 struct tevent_context
*ev
,
1096 struct ctdb_client_context
*client
,
1097 uint32_t *pnn_list
, int count
, uint32_t *caps
,
1098 struct ctdb_tunable_list
*tun_list
,
1099 struct recdb_context
*recdb
)
1101 struct tevent_req
*req
, *subreq
;
1102 struct push_database_state
*state
;
1103 uint32_t *old_list
, *new_list
;
1104 int old_count
, new_count
;
1107 req
= tevent_req_create(mem_ctx
, &state
, struct push_database_state
);
1112 state
->old_done
= false;
1113 state
->new_done
= false;
1117 old_list
= talloc_array(state
, uint32_t, count
);
1118 new_list
= talloc_array(state
, uint32_t, count
);
1119 if (tevent_req_nomem(old_list
, req
) ||
1120 tevent_req_nomem(new_list
,req
)) {
1121 return tevent_req_post(req
, ev
);
1124 for (i
=0; i
<count
; i
++) {
1125 uint32_t pnn
= pnn_list
[i
];
1127 if (caps
[pnn
] & CTDB_CAP_FRAGMENTED_CONTROLS
) {
1128 new_list
[new_count
] = pnn
;
1131 old_list
[old_count
] = pnn
;
1136 if (old_count
> 0) {
1137 subreq
= push_database_old_send(state
, ev
, client
,
1138 old_list
, old_count
, recdb
);
1139 if (tevent_req_nomem(subreq
, req
)) {
1140 return tevent_req_post(req
, ev
);
1142 tevent_req_set_callback(subreq
, push_database_old_done
, req
);
1144 state
->old_done
= true;
1147 if (new_count
> 0) {
1148 subreq
= push_database_new_send(state
, ev
, client
,
1149 new_list
, new_count
, recdb
,
1150 tun_list
->rec_buffer_size_limit
);
1151 if (tevent_req_nomem(subreq
, req
)) {
1152 return tevent_req_post(req
, ev
);
1154 tevent_req_set_callback(subreq
, push_database_new_done
, req
);
1156 state
->new_done
= true;
1162 static void push_database_old_done(struct tevent_req
*subreq
)
1164 struct tevent_req
*req
= tevent_req_callback_data(
1165 subreq
, struct tevent_req
);
1166 struct push_database_state
*state
= tevent_req_data(
1167 req
, struct push_database_state
);
1171 status
= push_database_old_recv(subreq
, &ret
);
1173 tevent_req_error(req
, ret
);
1177 state
->old_done
= true;
1179 if (state
->old_done
&& state
->new_done
) {
1180 tevent_req_done(req
);
1184 static void push_database_new_done(struct tevent_req
*subreq
)
1186 struct tevent_req
*req
= tevent_req_callback_data(
1187 subreq
, struct tevent_req
);
1188 struct push_database_state
*state
= tevent_req_data(
1189 req
, struct push_database_state
);
1193 status
= push_database_new_recv(subreq
, &ret
);
1195 tevent_req_error(req
, ret
);
1199 state
->new_done
= true;
1201 if (state
->old_done
&& state
->new_done
) {
1202 tevent_req_done(req
);
1206 static bool push_database_recv(struct tevent_req
*req
, int *perr
)
1208 return generic_recv(req
, perr
);
1212 * Collect databases using highest sequence number
1215 struct collect_highseqnum_db_state
{
1216 struct tevent_context
*ev
;
1217 struct ctdb_client_context
*client
;
1221 uint32_t *ban_credits
;
1223 struct recdb_context
*recdb
;
1227 static void collect_highseqnum_db_seqnum_done(struct tevent_req
*subreq
);
1228 static void collect_highseqnum_db_pulldb_done(struct tevent_req
*subreq
);
1230 static struct tevent_req
*collect_highseqnum_db_send(
1231 TALLOC_CTX
*mem_ctx
,
1232 struct tevent_context
*ev
,
1233 struct ctdb_client_context
*client
,
1234 uint32_t *pnn_list
, int count
, uint32_t *caps
,
1235 uint32_t *ban_credits
, uint32_t db_id
,
1236 struct recdb_context
*recdb
)
1238 struct tevent_req
*req
, *subreq
;
1239 struct collect_highseqnum_db_state
*state
;
1240 struct ctdb_req_control request
;
1242 req
= tevent_req_create(mem_ctx
, &state
,
1243 struct collect_highseqnum_db_state
);
1249 state
->client
= client
;
1250 state
->pnn_list
= pnn_list
;
1251 state
->count
= count
;
1253 state
->ban_credits
= ban_credits
;
1254 state
->db_id
= db_id
;
1255 state
->recdb
= recdb
;
1257 ctdb_req_control_get_db_seqnum(&request
, db_id
);
1258 subreq
= ctdb_client_control_multi_send(mem_ctx
, ev
, client
,
1259 state
->pnn_list
, state
->count
,
1260 TIMEOUT(), &request
);
1261 if (tevent_req_nomem(subreq
, req
)) {
1262 return tevent_req_post(req
, ev
);
1264 tevent_req_set_callback(subreq
, collect_highseqnum_db_seqnum_done
,
1270 static void collect_highseqnum_db_seqnum_done(struct tevent_req
*subreq
)
1272 struct tevent_req
*req
= tevent_req_callback_data(
1273 subreq
, struct tevent_req
);
1274 struct collect_highseqnum_db_state
*state
= tevent_req_data(
1275 req
, struct collect_highseqnum_db_state
);
1276 struct ctdb_reply_control
**reply
;
1280 uint64_t seqnum
, max_seqnum
;
1282 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
,
1284 TALLOC_FREE(subreq
);
1289 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1290 state
->count
, err_list
,
1293 LOG("control GET_DB_SEQNUM failed for %s on node %u,"
1294 " ret=%d\n", recdb_name(state
->recdb
), pnn
, ret2
);
1296 LOG("control GET_DB_SEQNUM failed for %s, ret=%d\n",
1297 recdb_name(state
->recdb
), ret
);
1299 tevent_req_error(req
, ret
);
1304 state
->max_pnn
= state
->pnn_list
[0];
1305 for (i
=0; i
<state
->count
; i
++) {
1306 ret
= ctdb_reply_control_get_db_seqnum(reply
[i
], &seqnum
);
1308 tevent_req_error(req
, EPROTO
);
1312 if (max_seqnum
< seqnum
) {
1313 max_seqnum
= seqnum
;
1314 state
->max_pnn
= state
->pnn_list
[i
];
1320 LOG("Pull persistent db %s from node %d with seqnum 0x%"PRIx64
"\n",
1321 recdb_name(state
->recdb
), state
->max_pnn
, max_seqnum
);
1323 subreq
= pull_database_send(state
, state
->ev
, state
->client
,
1325 state
->caps
[state
->max_pnn
],
1327 if (tevent_req_nomem(subreq
, req
)) {
1330 tevent_req_set_callback(subreq
, collect_highseqnum_db_pulldb_done
,
1334 static void collect_highseqnum_db_pulldb_done(struct tevent_req
*subreq
)
1336 struct tevent_req
*req
= tevent_req_callback_data(
1337 subreq
, struct tevent_req
);
1338 struct collect_highseqnum_db_state
*state
= tevent_req_data(
1339 req
, struct collect_highseqnum_db_state
);
1343 status
= pull_database_recv(subreq
, &ret
);
1344 TALLOC_FREE(subreq
);
1346 state
->ban_credits
[state
->max_pnn
] += 1;
1347 tevent_req_error(req
, ret
);
1351 tevent_req_done(req
);
1354 static bool collect_highseqnum_db_recv(struct tevent_req
*req
, int *perr
)
1356 return generic_recv(req
, perr
);
1360 * Collect all databases
1363 struct collect_all_db_state
{
1364 struct tevent_context
*ev
;
1365 struct ctdb_client_context
*client
;
1369 uint32_t *ban_credits
;
1371 struct recdb_context
*recdb
;
1372 struct ctdb_pulldb pulldb
;
1376 static void collect_all_db_pulldb_done(struct tevent_req
*subreq
);
1378 static struct tevent_req
*collect_all_db_send(
1379 TALLOC_CTX
*mem_ctx
,
1380 struct tevent_context
*ev
,
1381 struct ctdb_client_context
*client
,
1382 uint32_t *pnn_list
, int count
, uint32_t *caps
,
1383 uint32_t *ban_credits
, uint32_t db_id
,
1384 struct recdb_context
*recdb
)
1386 struct tevent_req
*req
, *subreq
;
1387 struct collect_all_db_state
*state
;
1390 req
= tevent_req_create(mem_ctx
, &state
,
1391 struct collect_all_db_state
);
1397 state
->client
= client
;
1398 state
->pnn_list
= pnn_list
;
1399 state
->count
= count
;
1401 state
->db_id
= db_id
;
1402 state
->recdb
= recdb
;
1405 pnn
= state
->pnn_list
[state
->index
];
1407 subreq
= pull_database_send(state
, ev
, client
, pnn
, caps
[pnn
], recdb
);
1408 if (tevent_req_nomem(subreq
, req
)) {
1409 return tevent_req_post(req
, ev
);
1411 tevent_req_set_callback(subreq
, collect_all_db_pulldb_done
, req
);
1416 static void collect_all_db_pulldb_done(struct tevent_req
*subreq
)
1418 struct tevent_req
*req
= tevent_req_callback_data(
1419 subreq
, struct tevent_req
);
1420 struct collect_all_db_state
*state
= tevent_req_data(
1421 req
, struct collect_all_db_state
);
1426 status
= pull_database_recv(subreq
, &ret
);
1427 TALLOC_FREE(subreq
);
1429 pnn
= state
->pnn_list
[state
->index
];
1430 state
->ban_credits
[pnn
] += 1;
1431 tevent_req_error(req
, ret
);
1436 if (state
->index
== state
->count
) {
1437 tevent_req_done(req
);
1441 pnn
= state
->pnn_list
[state
->index
];
1442 subreq
= pull_database_send(state
, state
->ev
, state
->client
,
1443 pnn
, state
->caps
[pnn
], state
->recdb
);
1444 if (tevent_req_nomem(subreq
, req
)) {
1447 tevent_req_set_callback(subreq
, collect_all_db_pulldb_done
, req
);
1450 static bool collect_all_db_recv(struct tevent_req
*req
, int *perr
)
1452 return generic_recv(req
, perr
);
1457 * For each database do the following:
1460 * - Freeze database on all nodes
1461 * - Start transaction on all nodes
1462 * - Collect database from all nodes
1463 * - Wipe database on all nodes
1464 * - Push database to all nodes
1465 * - Commit transaction on all nodes
1466 * - Thaw database on all nodes
1469 struct recover_db_state
{
1470 struct tevent_context
*ev
;
1471 struct ctdb_client_context
*client
;
1472 struct ctdb_tunable_list
*tun_list
;
1476 uint32_t *ban_credits
;
1481 struct ctdb_transdb transdb
;
1483 const char *db_name
, *db_path
;
1484 struct recdb_context
*recdb
;
1487 static void recover_db_name_done(struct tevent_req
*subreq
);
1488 static void recover_db_path_done(struct tevent_req
*subreq
);
1489 static void recover_db_freeze_done(struct tevent_req
*subreq
);
1490 static void recover_db_transaction_started(struct tevent_req
*subreq
);
1491 static void recover_db_collect_done(struct tevent_req
*subreq
);
1492 static void recover_db_wipedb_done(struct tevent_req
*subreq
);
1493 static void recover_db_pushdb_done(struct tevent_req
*subreq
);
1494 static void recover_db_transaction_committed(struct tevent_req
*subreq
);
1495 static void recover_db_thaw_done(struct tevent_req
*subreq
);
1497 static struct tevent_req
*recover_db_send(TALLOC_CTX
*mem_ctx
,
1498 struct tevent_context
*ev
,
1499 struct ctdb_client_context
*client
,
1500 struct ctdb_tunable_list
*tun_list
,
1501 uint32_t *pnn_list
, int count
,
1503 uint32_t *ban_credits
,
1504 uint32_t generation
,
1505 uint32_t db_id
, bool persistent
)
1507 struct tevent_req
*req
, *subreq
;
1508 struct recover_db_state
*state
;
1509 struct ctdb_req_control request
;
1511 req
= tevent_req_create(mem_ctx
, &state
, struct recover_db_state
);
1517 state
->client
= client
;
1518 state
->tun_list
= tun_list
;
1519 state
->pnn_list
= pnn_list
;
1520 state
->count
= count
;
1522 state
->ban_credits
= ban_credits
;
1523 state
->db_id
= db_id
;
1524 state
->persistent
= persistent
;
1526 state
->destnode
= ctdb_client_pnn(client
);
1527 state
->transdb
.db_id
= db_id
;
1528 state
->transdb
.tid
= generation
;
1530 ctdb_req_control_get_dbname(&request
, db_id
);
1531 subreq
= ctdb_client_control_send(state
, ev
, client
, state
->destnode
,
1532 TIMEOUT(), &request
);
1533 if (tevent_req_nomem(subreq
, req
)) {
1534 return tevent_req_post(req
, ev
);
1536 tevent_req_set_callback(subreq
, recover_db_name_done
, req
);
1541 static void recover_db_name_done(struct tevent_req
*subreq
)
1543 struct tevent_req
*req
= tevent_req_callback_data(
1544 subreq
, struct tevent_req
);
1545 struct recover_db_state
*state
= tevent_req_data(
1546 req
, struct recover_db_state
);
1547 struct ctdb_reply_control
*reply
;
1548 struct ctdb_req_control request
;
1552 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1553 TALLOC_FREE(subreq
);
1555 LOG("control GET_DBNAME failed for db=0x%x\n, ret=%d",
1557 tevent_req_error(req
, ret
);
1561 ret
= ctdb_reply_control_get_dbname(reply
, state
, &state
->db_name
);
1563 LOG("control GET_DBNAME failed for db=0x%x\n, ret=%d\n",
1565 tevent_req_error(req
, EPROTO
);
1571 ctdb_req_control_getdbpath(&request
, state
->db_id
);
1572 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
1573 state
->destnode
, TIMEOUT(),
1575 if (tevent_req_nomem(subreq
, req
)) {
1578 tevent_req_set_callback(subreq
, recover_db_path_done
, req
);
1581 static void recover_db_path_done(struct tevent_req
*subreq
)
1583 struct tevent_req
*req
= tevent_req_callback_data(
1584 subreq
, struct tevent_req
);
1585 struct recover_db_state
*state
= tevent_req_data(
1586 req
, struct recover_db_state
);
1587 struct ctdb_reply_control
*reply
;
1588 struct ctdb_req_control request
;
1592 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
1593 TALLOC_FREE(subreq
);
1595 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1596 state
->db_name
, ret
);
1597 tevent_req_error(req
, ret
);
1601 ret
= ctdb_reply_control_getdbpath(reply
, state
, &state
->db_path
);
1603 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1604 state
->db_name
, ret
);
1605 tevent_req_error(req
, EPROTO
);
1611 ctdb_req_control_db_freeze(&request
, state
->db_id
);
1612 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1614 state
->pnn_list
, state
->count
,
1615 TIMEOUT(), &request
);
1616 if (tevent_req_nomem(subreq
, req
)) {
1619 tevent_req_set_callback(subreq
, recover_db_freeze_done
, req
);
1622 static void recover_db_freeze_done(struct tevent_req
*subreq
)
1624 struct tevent_req
*req
= tevent_req_callback_data(
1625 subreq
, struct tevent_req
);
1626 struct recover_db_state
*state
= tevent_req_data(
1627 req
, struct recover_db_state
);
1628 struct ctdb_req_control request
;
1633 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1635 TALLOC_FREE(subreq
);
1640 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1641 state
->count
, err_list
,
1644 LOG("control FREEZE_DB failed for db %s on node %u,"
1645 " ret=%d\n", state
->db_name
, pnn
, ret2
);
1647 LOG("control FREEZE_DB failed for db %s, ret=%d\n",
1648 state
->db_name
, ret
);
1650 tevent_req_error(req
, ret
);
1654 ctdb_req_control_db_transaction_start(&request
, &state
->transdb
);
1655 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1657 state
->pnn_list
, state
->count
,
1658 TIMEOUT(), &request
);
1659 if (tevent_req_nomem(subreq
, req
)) {
1662 tevent_req_set_callback(subreq
, recover_db_transaction_started
, req
);
1665 static void recover_db_transaction_started(struct tevent_req
*subreq
)
1667 struct tevent_req
*req
= tevent_req_callback_data(
1668 subreq
, struct tevent_req
);
1669 struct recover_db_state
*state
= tevent_req_data(
1670 req
, struct recover_db_state
);
1675 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1677 TALLOC_FREE(subreq
);
1682 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1686 LOG("control TRANSACTION_DB failed for db=%s,"
1687 " ret=%d\n", state
->db_name
, pnn
, ret2
);
1689 LOG("control TRANSACTION_DB failed for db=%s,"
1690 " ret=%d\n", state
->db_name
, ret
);
1692 tevent_req_error(req
, ret
);
1696 state
->recdb
= recdb_create(state
, state
->db_id
, state
->db_name
,
1698 state
->tun_list
->database_hash_size
,
1700 if (tevent_req_nomem(state
->recdb
, req
)) {
1704 if (state
->persistent
&& state
->tun_list
->recover_pdb_by_seqnum
!= 0) {
1705 subreq
= collect_highseqnum_db_send(
1706 state
, state
->ev
, state
->client
,
1707 state
->pnn_list
, state
->count
, state
->caps
,
1708 state
->ban_credits
, state
->db_id
,
1711 subreq
= collect_all_db_send(
1712 state
, state
->ev
, state
->client
,
1713 state
->pnn_list
, state
->count
, state
->caps
,
1714 state
->ban_credits
, state
->db_id
,
1717 if (tevent_req_nomem(subreq
, req
)) {
1720 tevent_req_set_callback(subreq
, recover_db_collect_done
, req
);
1723 static void recover_db_collect_done(struct tevent_req
*subreq
)
1725 struct tevent_req
*req
= tevent_req_callback_data(
1726 subreq
, struct tevent_req
);
1727 struct recover_db_state
*state
= tevent_req_data(
1728 req
, struct recover_db_state
);
1729 struct ctdb_req_control request
;
1733 if (state
->persistent
&& state
->tun_list
->recover_pdb_by_seqnum
!= 0) {
1734 status
= collect_highseqnum_db_recv(subreq
, &ret
);
1736 status
= collect_all_db_recv(subreq
, &ret
);
1738 TALLOC_FREE(subreq
);
1740 tevent_req_error(req
, ret
);
1744 ctdb_req_control_wipe_database(&request
, &state
->transdb
);
1745 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1747 state
->pnn_list
, state
->count
,
1748 TIMEOUT(), &request
);
1749 if (tevent_req_nomem(subreq
, req
)) {
1752 tevent_req_set_callback(subreq
, recover_db_wipedb_done
, req
);
1755 static void recover_db_wipedb_done(struct tevent_req
*subreq
)
1757 struct tevent_req
*req
= tevent_req_callback_data(
1758 subreq
, struct tevent_req
);
1759 struct recover_db_state
*state
= tevent_req_data(
1760 req
, struct recover_db_state
);
1765 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1767 TALLOC_FREE(subreq
);
1772 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1776 LOG("control WIPEDB failed for db %s on node %u,"
1777 " ret=%d\n", state
->db_name
, pnn
, ret2
);
1779 LOG("control WIPEDB failed for db %s, ret=%d\n",
1780 state
->db_name
, pnn
, ret
);
1782 tevent_req_error(req
, ret
);
1786 subreq
= push_database_send(state
, state
->ev
, state
->client
,
1787 state
->pnn_list
, state
->count
,
1788 state
->caps
, state
->tun_list
,
1790 if (tevent_req_nomem(subreq
, req
)) {
1793 tevent_req_set_callback(subreq
, recover_db_pushdb_done
, req
);
1796 static void recover_db_pushdb_done(struct tevent_req
*subreq
)
1798 struct tevent_req
*req
= tevent_req_callback_data(
1799 subreq
, struct tevent_req
);
1800 struct recover_db_state
*state
= tevent_req_data(
1801 req
, struct recover_db_state
);
1802 struct ctdb_req_control request
;
1806 status
= push_database_recv(subreq
, &ret
);
1807 TALLOC_FREE(subreq
);
1809 tevent_req_error(req
, ret
);
1813 TALLOC_FREE(state
->recdb
);
1815 ctdb_req_control_db_transaction_commit(&request
, &state
->transdb
);
1816 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1818 state
->pnn_list
, state
->count
,
1819 TIMEOUT(), &request
);
1820 if (tevent_req_nomem(subreq
, req
)) {
1823 tevent_req_set_callback(subreq
, recover_db_transaction_committed
, req
);
1826 static void recover_db_transaction_committed(struct tevent_req
*subreq
)
1828 struct tevent_req
*req
= tevent_req_callback_data(
1829 subreq
, struct tevent_req
);
1830 struct recover_db_state
*state
= tevent_req_data(
1831 req
, struct recover_db_state
);
1832 struct ctdb_req_control request
;
1837 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1839 TALLOC_FREE(subreq
);
1844 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1848 LOG("control DB_TRANSACTION_COMMIT failed for db %s"
1849 " on node %u, ret=%d", state
->db_name
, pnn
, ret2
);
1851 LOG("control DB_TRANSACTION_COMMIT failed for db %s\n,"
1852 " ret=%d", state
->db_name
, ret
);
1854 tevent_req_error(req
, ret
);
1858 ctdb_req_control_db_thaw(&request
, state
->db_id
);
1859 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
1861 state
->pnn_list
, state
->count
,
1862 TIMEOUT(), &request
);
1863 if (tevent_req_nomem(subreq
, req
)) {
1866 tevent_req_set_callback(subreq
, recover_db_thaw_done
, req
);
1869 static void recover_db_thaw_done(struct tevent_req
*subreq
)
1871 struct tevent_req
*req
= tevent_req_callback_data(
1872 subreq
, struct tevent_req
);
1873 struct recover_db_state
*state
= tevent_req_data(
1874 req
, struct recover_db_state
);
1879 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
1881 TALLOC_FREE(subreq
);
1886 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
1890 LOG("control DB_THAW failed for db %s on node %u,"
1891 " ret=%d\n", state
->db_name
, pnn
, ret2
);
1893 LOG("control DB_THAW failed for db %s, ret=%d\n",
1894 state
->db_name
, ret
);
1896 tevent_req_error(req
, ret
);
1900 tevent_req_done(req
);
1903 static bool recover_db_recv(struct tevent_req
*req
)
1905 return generic_recv(req
, NULL
);
1910 * Start database recovery for each database
1912 * Try to recover each database 5 times before failing recovery.
1915 struct db_recovery_state
{
1916 struct tevent_context
*ev
;
1917 struct ctdb_dbid_map
*dbmap
;
1922 struct db_recovery_one_state
{
1923 struct tevent_req
*req
;
1924 struct ctdb_client_context
*client
;
1925 struct ctdb_dbid_map
*dbmap
;
1926 struct ctdb_tunable_list
*tun_list
;
1930 uint32_t *ban_credits
;
1931 uint32_t generation
;
1937 static void db_recovery_one_done(struct tevent_req
*subreq
);
1939 static struct tevent_req
*db_recovery_send(TALLOC_CTX
*mem_ctx
,
1940 struct tevent_context
*ev
,
1941 struct ctdb_client_context
*client
,
1942 struct ctdb_dbid_map
*dbmap
,
1943 struct ctdb_tunable_list
*tun_list
,
1944 uint32_t *pnn_list
, int count
,
1946 uint32_t *ban_credits
,
1947 uint32_t generation
)
1949 struct tevent_req
*req
, *subreq
;
1950 struct db_recovery_state
*state
;
1953 req
= tevent_req_create(mem_ctx
, &state
, struct db_recovery_state
);
1959 state
->dbmap
= dbmap
;
1960 state
->num_replies
= 0;
1961 state
->num_failed
= 0;
1963 if (dbmap
->num
== 0) {
1964 tevent_req_done(req
);
1965 return tevent_req_post(req
, ev
);
1968 for (i
=0; i
<dbmap
->num
; i
++) {
1969 struct db_recovery_one_state
*substate
;
1971 substate
= talloc_zero(state
, struct db_recovery_one_state
);
1972 if (tevent_req_nomem(substate
, req
)) {
1973 return tevent_req_post(req
, ev
);
1976 substate
->req
= req
;
1977 substate
->client
= client
;
1978 substate
->dbmap
= dbmap
;
1979 substate
->tun_list
= tun_list
;
1980 substate
->pnn_list
= pnn_list
;
1981 substate
->count
= count
;
1982 substate
->caps
= caps
;
1983 substate
->ban_credits
= ban_credits
;
1984 substate
->generation
= generation
;
1985 substate
->db_id
= dbmap
->dbs
[i
].db_id
;
1986 substate
->persistent
= dbmap
->dbs
[i
].flags
&
1987 CTDB_DB_FLAGS_PERSISTENT
;
1989 subreq
= recover_db_send(state
, ev
, client
, tun_list
,
1990 pnn_list
, count
, caps
, ban_credits
,
1991 generation
, substate
->db_id
,
1992 substate
->persistent
);
1993 if (tevent_req_nomem(subreq
, req
)) {
1994 return tevent_req_post(req
, ev
);
1996 tevent_req_set_callback(subreq
, db_recovery_one_done
,
1998 LOG("recover database 0x%08x\n", substate
->db_id
);
2004 static void db_recovery_one_done(struct tevent_req
*subreq
)
2006 struct db_recovery_one_state
*substate
= tevent_req_callback_data(
2007 subreq
, struct db_recovery_one_state
);
2008 struct tevent_req
*req
= substate
->req
;
2009 struct db_recovery_state
*state
= tevent_req_data(
2010 req
, struct db_recovery_state
);
2013 status
= recover_db_recv(subreq
);
2014 TALLOC_FREE(subreq
);
2017 talloc_free(substate
);
2021 substate
->num_fails
+= 1;
2022 if (substate
->num_fails
< NUM_RETRIES
) {
2023 subreq
= recover_db_send(state
, state
->ev
, substate
->client
,
2025 substate
->pnn_list
, substate
->count
,
2026 substate
->caps
, substate
->ban_credits
,
2027 substate
->generation
, substate
->db_id
,
2028 substate
->persistent
);
2029 if (tevent_req_nomem(subreq
, req
)) {
2032 tevent_req_set_callback(subreq
, db_recovery_one_done
, substate
);
2033 LOG("recover database 0x%08x, attempt %d\n", substate
->db_id
,
2034 substate
->num_fails
+1);
2039 state
->num_failed
+= 1;
2042 state
->num_replies
+= 1;
2044 if (state
->num_replies
== state
->dbmap
->num
) {
2045 tevent_req_done(req
);
2049 static bool db_recovery_recv(struct tevent_req
*req
, int *count
)
2051 struct db_recovery_state
*state
= tevent_req_data(
2052 req
, struct db_recovery_state
);
2055 if (tevent_req_is_unix_error(req
, &err
)) {
2060 *count
= state
->num_replies
- state
->num_failed
;
2062 if (state
->num_failed
> 0) {
2071 * Run the parallel database recovery
2076 * - Get capabilities from all nodes
2078 * - Set RECOVERY_ACTIVE
2079 * - Send START_RECOVERY
2080 * - Update vnnmap on all nodes
2081 * - Run database recovery
2082 * - Send END_RECOVERY
2083 * - Set RECOVERY_NORMAL
2086 struct recovery_state
{
2087 struct tevent_context
*ev
;
2088 struct ctdb_client_context
*client
;
2089 uint32_t generation
;
2093 struct ctdb_node_map
*nodemap
;
2095 uint32_t *ban_credits
;
2096 struct ctdb_tunable_list
*tun_list
;
2097 struct ctdb_vnn_map
*vnnmap
;
2098 struct ctdb_dbid_map
*dbmap
;
2101 static void recovery_tunables_done(struct tevent_req
*subreq
);
2102 static void recovery_nodemap_done(struct tevent_req
*subreq
);
2103 static void recovery_vnnmap_done(struct tevent_req
*subreq
);
2104 static void recovery_capabilities_done(struct tevent_req
*subreq
);
2105 static void recovery_dbmap_done(struct tevent_req
*subreq
);
2106 static void recovery_active_done(struct tevent_req
*subreq
);
2107 static void recovery_start_recovery_done(struct tevent_req
*subreq
);
2108 static void recovery_vnnmap_update_done(struct tevent_req
*subreq
);
2109 static void recovery_db_recovery_done(struct tevent_req
*subreq
);
2110 static void recovery_failed_done(struct tevent_req
*subreq
);
2111 static void recovery_normal_done(struct tevent_req
*subreq
);
2112 static void recovery_end_recovery_done(struct tevent_req
*subreq
);
2114 static struct tevent_req
*recovery_send(TALLOC_CTX
*mem_ctx
,
2115 struct tevent_context
*ev
,
2116 struct ctdb_client_context
*client
,
2117 uint32_t generation
)
2119 struct tevent_req
*req
, *subreq
;
2120 struct recovery_state
*state
;
2121 struct ctdb_req_control request
;
2123 req
= tevent_req_create(mem_ctx
, &state
, struct recovery_state
);
2129 state
->client
= client
;
2130 state
->generation
= generation
;
2131 state
->destnode
= ctdb_client_pnn(client
);
2133 ctdb_req_control_get_all_tunables(&request
);
2134 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2135 state
->destnode
, TIMEOUT(),
2137 if (tevent_req_nomem(subreq
, req
)) {
2138 return tevent_req_post(req
, ev
);
2140 tevent_req_set_callback(subreq
, recovery_tunables_done
, req
);
2145 static void recovery_tunables_done(struct tevent_req
*subreq
)
2147 struct tevent_req
*req
= tevent_req_callback_data(
2148 subreq
, struct tevent_req
);
2149 struct recovery_state
*state
= tevent_req_data(
2150 req
, struct recovery_state
);
2151 struct ctdb_reply_control
*reply
;
2152 struct ctdb_req_control request
;
2156 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2157 TALLOC_FREE(subreq
);
2159 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret
);
2160 tevent_req_error(req
, ret
);
2164 ret
= ctdb_reply_control_get_all_tunables(reply
, state
,
2167 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret
);
2168 tevent_req_error(req
, EPROTO
);
2174 recover_timeout
= state
->tun_list
->recover_timeout
;
2176 ctdb_req_control_get_nodemap(&request
);
2177 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2178 state
->destnode
, TIMEOUT(),
2180 if (tevent_req_nomem(subreq
, req
)) {
2183 tevent_req_set_callback(subreq
, recovery_nodemap_done
, req
);
2186 static void recovery_nodemap_done(struct tevent_req
*subreq
)
2188 struct tevent_req
*req
= tevent_req_callback_data(
2189 subreq
, struct tevent_req
);
2190 struct recovery_state
*state
= tevent_req_data(
2191 req
, struct recovery_state
);
2192 struct ctdb_reply_control
*reply
;
2193 struct ctdb_req_control request
;
2197 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2198 TALLOC_FREE(subreq
);
2200 LOG("control GET_NODEMAP failed to node %u, ret=%d\n",
2201 state
->destnode
, ret
);
2202 tevent_req_error(req
, ret
);
2206 ret
= ctdb_reply_control_get_nodemap(reply
, state
, &state
->nodemap
);
2208 LOG("control GET_NODEMAP failed, ret=%d\n", ret
);
2209 tevent_req_error(req
, ret
);
2213 state
->count
= list_of_active_nodes(state
->nodemap
, CTDB_UNKNOWN_PNN
,
2214 state
, &state
->pnn_list
);
2215 if (state
->count
<= 0) {
2216 tevent_req_error(req
, ENOMEM
);
2220 state
->ban_credits
= talloc_zero_array(state
, uint32_t,
2221 state
->nodemap
->num
);
2222 if (tevent_req_nomem(state
->ban_credits
, req
)) {
2226 ctdb_req_control_getvnnmap(&request
);
2227 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2228 state
->destnode
, TIMEOUT(),
2230 if (tevent_req_nomem(subreq
, req
)) {
2233 tevent_req_set_callback(subreq
, recovery_vnnmap_done
, req
);
2236 static void recovery_vnnmap_done(struct tevent_req
*subreq
)
2238 struct tevent_req
*req
= tevent_req_callback_data(
2239 subreq
, struct tevent_req
);
2240 struct recovery_state
*state
= tevent_req_data(
2241 req
, struct recovery_state
);
2242 struct ctdb_reply_control
*reply
;
2243 struct ctdb_req_control request
;
2247 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2248 TALLOC_FREE(subreq
);
2250 LOG("control GETVNNMAP failed to node %u, ret=%d\n",
2251 state
->destnode
, ret
);
2252 tevent_req_error(req
, ret
);
2256 ret
= ctdb_reply_control_getvnnmap(reply
, state
, &state
->vnnmap
);
2258 LOG("control GETVNNMAP failed, ret=%d\n", ret
);
2259 tevent_req_error(req
, ret
);
2263 ctdb_req_control_get_capabilities(&request
);
2264 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2266 state
->pnn_list
, state
->count
,
2267 TIMEOUT(), &request
);
2268 if (tevent_req_nomem(subreq
, req
)) {
2271 tevent_req_set_callback(subreq
, recovery_capabilities_done
, req
);
2274 static void recovery_capabilities_done(struct tevent_req
*subreq
)
2276 struct tevent_req
*req
= tevent_req_callback_data(
2277 subreq
, struct tevent_req
);
2278 struct recovery_state
*state
= tevent_req_data(
2279 req
, struct recovery_state
);
2280 struct ctdb_reply_control
**reply
;
2281 struct ctdb_req_control request
;
2286 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
2288 TALLOC_FREE(subreq
);
2293 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2297 LOG("control GET_CAPABILITIES failed on node %u,"
2298 " ret=%d\n", pnn
, ret2
);
2300 LOG("control GET_CAPABILITIES failed, ret=%d\n", ret
);
2302 tevent_req_error(req
, ret
);
2306 /* Make the array size same as nodemap */
2307 state
->caps
= talloc_zero_array(state
, uint32_t,
2308 state
->nodemap
->num
);
2309 if (tevent_req_nomem(state
->caps
, req
)) {
2313 for (i
=0; i
<state
->count
; i
++) {
2316 pnn
= state
->pnn_list
[i
];
2317 ret
= ctdb_reply_control_get_capabilities(reply
[i
],
2320 LOG("control GET_CAPABILITIES failed on node %u\n", pnn
);
2321 tevent_req_error(req
, EPROTO
);
2328 ctdb_req_control_get_dbmap(&request
);
2329 subreq
= ctdb_client_control_send(state
, state
->ev
, state
->client
,
2330 state
->destnode
, TIMEOUT(),
2332 if (tevent_req_nomem(subreq
, req
)) {
2335 tevent_req_set_callback(subreq
, recovery_dbmap_done
, req
);
2338 static void recovery_dbmap_done(struct tevent_req
*subreq
)
2340 struct tevent_req
*req
= tevent_req_callback_data(
2341 subreq
, struct tevent_req
);
2342 struct recovery_state
*state
= tevent_req_data(
2343 req
, struct recovery_state
);
2344 struct ctdb_reply_control
*reply
;
2345 struct ctdb_req_control request
;
2349 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
2350 TALLOC_FREE(subreq
);
2352 LOG("control GET_DBMAP failed to node %u, ret=%d\n",
2353 state
->destnode
, ret
);
2354 tevent_req_error(req
, ret
);
2358 ret
= ctdb_reply_control_get_dbmap(reply
, state
, &state
->dbmap
);
2360 LOG("control GET_DBMAP failed, ret=%d\n", ret
);
2361 tevent_req_error(req
, ret
);
2365 ctdb_req_control_set_recmode(&request
, CTDB_RECOVERY_ACTIVE
);
2366 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2368 state
->pnn_list
, state
->count
,
2369 TIMEOUT(), &request
);
2370 if (tevent_req_nomem(subreq
, req
)) {
2373 tevent_req_set_callback(subreq
, recovery_active_done
, req
);
2376 static void recovery_active_done(struct tevent_req
*subreq
)
2378 struct tevent_req
*req
= tevent_req_callback_data(
2379 subreq
, struct tevent_req
);
2380 struct recovery_state
*state
= tevent_req_data(
2381 req
, struct recovery_state
);
2382 struct ctdb_req_control request
;
2383 struct ctdb_vnn_map
*vnnmap
;
2388 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2390 TALLOC_FREE(subreq
);
2395 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2399 LOG("failed to set recovery mode to ACTIVE on node %u,"
2400 " ret=%d\n", pnn
, ret2
);
2402 LOG("failed to set recovery mode to ACTIVE, ret=%d\n",
2405 tevent_req_error(req
, ret
);
2409 LOG("set recovery mode to ACTIVE\n");
2411 /* Calculate new VNNMAP */
2413 for (i
=0; i
<state
->nodemap
->num
; i
++) {
2414 if (state
->nodemap
->node
[i
].flags
& NODE_FLAGS_INACTIVE
) {
2417 if (!(state
->caps
[i
] & CTDB_CAP_LMASTER
)) {
2424 LOG("no active lmasters found. Adding recmaster anyway\n");
2427 vnnmap
= talloc_zero(state
, struct ctdb_vnn_map
);
2428 if (tevent_req_nomem(vnnmap
, req
)) {
2432 vnnmap
->size
= (count
== 0 ? 1 : count
);
2433 vnnmap
->map
= talloc_array(vnnmap
, uint32_t, vnnmap
->size
);
2434 if (tevent_req_nomem(vnnmap
->map
, req
)) {
2439 vnnmap
->map
[0] = state
->destnode
;
2442 for (i
=0; i
<state
->nodemap
->num
; i
++) {
2443 if (state
->nodemap
->node
[i
].flags
&
2444 NODE_FLAGS_INACTIVE
) {
2447 if (!(state
->caps
[i
] & CTDB_CAP_LMASTER
)) {
2451 vnnmap
->map
[count
] = state
->nodemap
->node
[i
].pnn
;
2456 vnnmap
->generation
= state
->generation
;
2458 talloc_free(state
->vnnmap
);
2459 state
->vnnmap
= vnnmap
;
2461 ctdb_req_control_start_recovery(&request
);
2462 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2464 state
->pnn_list
, state
->count
,
2465 TIMEOUT(), &request
);
2466 if (tevent_req_nomem(subreq
, req
)) {
2469 tevent_req_set_callback(subreq
, recovery_start_recovery_done
, req
);
2472 static void recovery_start_recovery_done(struct tevent_req
*subreq
)
2474 struct tevent_req
*req
= tevent_req_callback_data(
2475 subreq
, struct tevent_req
);
2476 struct recovery_state
*state
= tevent_req_data(
2477 req
, struct recovery_state
);
2478 struct ctdb_req_control request
;
2483 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2485 TALLOC_FREE(subreq
);
2490 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2494 LOG("failed to run start_recovery event on node %u,"
2495 " ret=%d\n", pnn
, ret2
);
2497 LOG("failed to run start_recovery event, ret=%d\n",
2500 tevent_req_error(req
, ret
);
2504 LOG("start_recovery event finished\n");
2506 ctdb_req_control_setvnnmap(&request
, state
->vnnmap
);
2507 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2509 state
->pnn_list
, state
->count
,
2510 TIMEOUT(), &request
);
2511 if (tevent_req_nomem(subreq
, req
)) {
2514 tevent_req_set_callback(subreq
, recovery_vnnmap_update_done
, req
);
2517 static void recovery_vnnmap_update_done(struct tevent_req
*subreq
)
2519 struct tevent_req
*req
= tevent_req_callback_data(
2520 subreq
, struct tevent_req
);
2521 struct recovery_state
*state
= tevent_req_data(
2522 req
, struct recovery_state
);
2527 status
= ctdb_client_control_multi_recv(subreq
, &ret
, NULL
, &err_list
,
2529 TALLOC_FREE(subreq
);
2534 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2538 LOG("failed to update VNNMAP on node %u, ret=%d\n",
2541 LOG("failed to update VNNMAP, ret=%d\n", ret
);
2543 tevent_req_error(req
, ret
);
2547 LOG("updated VNNMAP\n");
2549 subreq
= db_recovery_send(state
, state
->ev
, state
->client
,
2550 state
->dbmap
, state
->tun_list
,
2551 state
->pnn_list
, state
->count
,
2552 state
->caps
, state
->ban_credits
,
2553 state
->vnnmap
->generation
);
2554 if (tevent_req_nomem(subreq
, req
)) {
2557 tevent_req_set_callback(subreq
, recovery_db_recovery_done
, req
);
2560 static void recovery_db_recovery_done(struct tevent_req
*subreq
)
2562 struct tevent_req
*req
= tevent_req_callback_data(
2563 subreq
, struct tevent_req
);
2564 struct recovery_state
*state
= tevent_req_data(
2565 req
, struct recovery_state
);
2566 struct ctdb_req_control request
;
2570 status
= db_recovery_recv(subreq
, &count
);
2571 TALLOC_FREE(subreq
);
2573 LOG("%d of %d databases recovered\n", count
, state
->dbmap
->num
);
2576 uint32_t max_pnn
= CTDB_UNKNOWN_PNN
, max_credits
= 0;
2579 /* Bans are not enabled */
2580 if (state
->tun_list
->enable_bans
== 0) {
2581 tevent_req_error(req
, EIO
);
2585 for (i
=0; i
<state
->count
; i
++) {
2587 pnn
= state
->pnn_list
[i
];
2588 if (state
->ban_credits
[pnn
] > max_credits
) {
2590 max_credits
= state
->ban_credits
[pnn
];
2594 /* If pulling database fails multiple times */
2595 if (max_credits
>= NUM_RETRIES
) {
2596 struct ctdb_req_message message
;
2598 LOG("Assigning banning credits to node %u\n", max_pnn
);
2600 message
.srvid
= CTDB_SRVID_BANNING
;
2601 message
.data
.pnn
= max_pnn
;
2603 subreq
= ctdb_client_message_send(
2604 state
, state
->ev
, state
->client
,
2605 ctdb_client_pnn(state
->client
),
2607 if (tevent_req_nomem(subreq
, req
)) {
2610 tevent_req_set_callback(subreq
, recovery_failed_done
,
2613 tevent_req_error(req
, EIO
);
2618 ctdb_req_control_set_recmode(&request
, CTDB_RECOVERY_NORMAL
);
2619 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2621 state
->pnn_list
, state
->count
,
2622 TIMEOUT(), &request
);
2623 if (tevent_req_nomem(subreq
, req
)) {
2626 tevent_req_set_callback(subreq
, recovery_normal_done
, req
);
2629 static void recovery_failed_done(struct tevent_req
*subreq
)
2631 struct tevent_req
*req
= tevent_req_callback_data(
2632 subreq
, struct tevent_req
);
2636 status
= ctdb_client_message_recv(subreq
, &ret
);
2637 TALLOC_FREE(subreq
);
2639 LOG("failed to assign banning credits, ret=%d\n", ret
);
2642 tevent_req_error(req
, EIO
);
2645 static void recovery_normal_done(struct tevent_req
*subreq
)
2647 struct tevent_req
*req
= tevent_req_callback_data(
2648 subreq
, struct tevent_req
);
2649 struct recovery_state
*state
= tevent_req_data(
2650 req
, struct recovery_state
);
2651 struct ctdb_req_control request
;
2656 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
2658 TALLOC_FREE(subreq
);
2663 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2667 LOG("failed to set recovery mode to NORMAL on node %u,"
2668 " ret=%d\n", pnn
, ret2
);
2670 LOG("failed to set recovery mode to NORMAL, ret=%d\n",
2673 tevent_req_error(req
, ret
);
2677 LOG("set recovery mode to NORMAL\n");
2679 ctdb_req_control_end_recovery(&request
);
2680 subreq
= ctdb_client_control_multi_send(state
, state
->ev
,
2682 state
->pnn_list
, state
->count
,
2683 TIMEOUT(), &request
);
2684 if (tevent_req_nomem(subreq
, req
)) {
2687 tevent_req_set_callback(subreq
, recovery_end_recovery_done
, req
);
2690 static void recovery_end_recovery_done(struct tevent_req
*subreq
)
2692 struct tevent_req
*req
= tevent_req_callback_data(
2693 subreq
, struct tevent_req
);
2694 struct recovery_state
*state
= tevent_req_data(
2695 req
, struct recovery_state
);
2700 status
= ctdb_client_control_multi_recv(subreq
, &ret
, state
, &err_list
,
2702 TALLOC_FREE(subreq
);
2707 ret2
= ctdb_client_control_multi_error(state
->pnn_list
,
2711 LOG("failed to run recovered event on node %u,"
2712 " ret=%d\n", pnn
, ret2
);
2714 LOG("failed to run recovered event, ret=%d\n", ret
);
2716 tevent_req_error(req
, ret
);
2720 LOG("recovered event finished\n");
2722 tevent_req_done(req
);
2725 static void recovery_recv(struct tevent_req
*req
, int *perr
)
2727 generic_recv(req
, perr
);
2730 static void usage(const char *progname
)
2732 fprintf(stderr
, "\nUsage: %s <log-fd> <output-fd> <ctdb-socket-path> <generation>\n",
2738 * Arguments - log fd, write fd, socket path, generation
2740 int main(int argc
, char *argv
[])
2742 int log_fd
, write_fd
;
2743 const char *sockpath
;
2744 TALLOC_CTX
*mem_ctx
;
2745 struct tevent_context
*ev
;
2746 struct ctdb_client_context
*client
;
2748 struct tevent_req
*req
;
2749 uint32_t generation
;
2756 log_fd
= atoi(argv
[1]);
2757 if (log_fd
!= STDOUT_FILENO
&& log_fd
!= STDERR_FILENO
) {
2758 close(STDOUT_FILENO
);
2759 close(STDERR_FILENO
);
2760 dup2(log_fd
, STDOUT_FILENO
);
2761 dup2(log_fd
, STDERR_FILENO
);
2765 write_fd
= atoi(argv
[2]);
2767 generation
= (uint32_t)strtoul(argv
[4], NULL
, 0);
2769 mem_ctx
= talloc_new(NULL
);
2770 if (mem_ctx
== NULL
) {
2771 LOG("talloc_new() failed\n");
2775 ev
= tevent_context_init(mem_ctx
);
2777 LOG("tevent_context_init() failed\n");
2781 ret
= ctdb_client_init(mem_ctx
, ev
, sockpath
, &client
);
2783 LOG("ctdb_client_init() failed, ret=%d\n", ret
);
2787 req
= recovery_send(mem_ctx
, ev
, client
, generation
);
2789 LOG("database_recover_send() failed\n");
2793 if (! tevent_req_poll(req
, ev
)) {
2794 LOG("tevent_req_poll() failed\n");
2798 recovery_recv(req
, &ret
);
2801 LOG("database recovery failed, ret=%d\n", ret
);
2805 sys_write(write_fd
, &ret
, sizeof(ret
));
2809 talloc_free(mem_ctx
);