4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 #include "ctdb_cluster_mutex.h"
46 ctdb_control_getvnnmap(struct ctdb_context
*ctdb
, uint32_t opcode
, TDB_DATA indata
, TDB_DATA
*outdata
)
48 struct ctdb_vnn_map_wire
*map
;
51 CHECK_CONTROL_DATA_SIZE(0);
53 len
= offsetof(struct ctdb_vnn_map_wire
, map
) + sizeof(uint32_t)*ctdb
->vnn_map
->size
;
54 map
= talloc_size(outdata
, len
);
55 CTDB_NO_MEMORY(ctdb
, map
);
57 map
->generation
= ctdb
->vnn_map
->generation
;
58 map
->size
= ctdb
->vnn_map
->size
;
59 memcpy(map
->map
, ctdb
->vnn_map
->map
, sizeof(uint32_t)*map
->size
);
62 outdata
->dptr
= (uint8_t *)map
;
68 ctdb_control_setvnnmap(struct ctdb_context
*ctdb
, uint32_t opcode
, TDB_DATA indata
, TDB_DATA
*outdata
)
70 struct ctdb_vnn_map_wire
*map
= (struct ctdb_vnn_map_wire
*)indata
.dptr
;
72 if (ctdb
->recovery_mode
!= CTDB_RECOVERY_ACTIVE
) {
73 DEBUG(DEBUG_ERR
, ("Attempt to set vnnmap when not in recovery\n"));
77 talloc_free(ctdb
->vnn_map
);
79 ctdb
->vnn_map
= talloc(ctdb
, struct ctdb_vnn_map
);
80 CTDB_NO_MEMORY(ctdb
, ctdb
->vnn_map
);
82 ctdb
->vnn_map
->generation
= map
->generation
;
83 ctdb
->vnn_map
->size
= map
->size
;
84 ctdb
->vnn_map
->map
= talloc_array(ctdb
->vnn_map
, uint32_t, map
->size
);
85 CTDB_NO_MEMORY(ctdb
, ctdb
->vnn_map
->map
);
87 memcpy(ctdb
->vnn_map
->map
, map
->map
, sizeof(uint32_t)*map
->size
);
93 ctdb_control_getdbmap(struct ctdb_context
*ctdb
, uint32_t opcode
, TDB_DATA indata
, TDB_DATA
*outdata
)
96 struct ctdb_db_context
*ctdb_db
;
97 struct ctdb_dbid_map_old
*dbid_map
;
99 CHECK_CONTROL_DATA_SIZE(0);
102 for(ctdb_db
=ctdb
->db_list
;ctdb_db
;ctdb_db
=ctdb_db
->next
){
107 outdata
->dsize
= offsetof(struct ctdb_dbid_map_old
, dbs
) + sizeof(dbid_map
->dbs
[0])*len
;
108 outdata
->dptr
= (unsigned char *)talloc_zero_size(outdata
, outdata
->dsize
);
109 if (!outdata
->dptr
) {
110 DEBUG(DEBUG_ALERT
, (__location__
" Failed to allocate dbmap array\n"));
114 dbid_map
= (struct ctdb_dbid_map_old
*)outdata
->dptr
;
116 for (i
=0,ctdb_db
=ctdb
->db_list
;ctdb_db
;i
++,ctdb_db
=ctdb_db
->next
){
117 dbid_map
->dbs
[i
].db_id
= ctdb_db
->db_id
;
118 dbid_map
->dbs
[i
].flags
= ctdb_db
->db_flags
;
125 ctdb_control_getnodemap(struct ctdb_context
*ctdb
, uint32_t opcode
, TDB_DATA indata
, TDB_DATA
*outdata
)
127 CHECK_CONTROL_DATA_SIZE(0);
129 outdata
->dptr
= (unsigned char *)ctdb_node_list_to_map(ctdb
->nodes
,
132 if (outdata
->dptr
== NULL
) {
136 outdata
->dsize
= talloc_get_size(outdata
->dptr
);
142 reload the nodes file
145 ctdb_control_reload_nodes_file(struct ctdb_context
*ctdb
, uint32_t opcode
)
149 struct ctdb_node
**nodes
;
151 tmp_ctx
= talloc_new(ctdb
);
153 /* steal the old nodes file for a while */
154 talloc_steal(tmp_ctx
, ctdb
->nodes
);
157 num_nodes
= ctdb
->num_nodes
;
160 /* load the new nodes file */
161 ctdb_load_nodes_file(ctdb
);
163 for (i
=0; i
<ctdb
->num_nodes
; i
++) {
164 /* keep any identical pre-existing nodes and connections */
165 if ((i
< num_nodes
) && ctdb_same_address(&ctdb
->nodes
[i
]->address
, &nodes
[i
]->address
)) {
166 talloc_free(ctdb
->nodes
[i
]);
167 ctdb
->nodes
[i
] = talloc_steal(ctdb
->nodes
, nodes
[i
]);
171 if (ctdb
->nodes
[i
]->flags
& NODE_FLAGS_DELETED
) {
175 /* any new or different nodes must be added */
176 if (ctdb
->methods
->add_node(ctdb
->nodes
[i
]) != 0) {
177 DEBUG(DEBUG_CRIT
, (__location__
" methods->add_node failed at %d\n", i
));
178 ctdb_fatal(ctdb
, "failed to add node. shutting down\n");
180 if (ctdb
->methods
->connect_node(ctdb
->nodes
[i
]) != 0) {
181 DEBUG(DEBUG_CRIT
, (__location__
" methods->add_connect failed at %d\n", i
));
182 ctdb_fatal(ctdb
, "failed to connect to node. shutting down\n");
186 /* tell the recovery daemon to reaload the nodes file too */
187 ctdb_daemon_send_message(ctdb
, ctdb
->pnn
, CTDB_SRVID_RELOAD_NODES
, tdb_null
);
189 talloc_free(tmp_ctx
);
195 a traverse function for pulling all relevent records from pulldb
198 struct ctdb_context
*ctdb
;
199 struct ctdb_db_context
*ctdb_db
;
200 struct ctdb_marshall_buffer
*pulldata
;
202 uint32_t allocated_len
;
206 static int traverse_pulldb(struct tdb_context
*tdb
, TDB_DATA key
, TDB_DATA data
, void *p
)
208 struct pulldb_data
*params
= (struct pulldb_data
*)p
;
209 struct ctdb_rec_data_old
*rec
;
210 struct ctdb_context
*ctdb
= params
->ctdb
;
211 struct ctdb_db_context
*ctdb_db
= params
->ctdb_db
;
213 /* add the record to the blob */
214 rec
= ctdb_marshall_record(params
->pulldata
, 0, key
, NULL
, data
);
216 params
->failed
= true;
219 if (params
->len
+ rec
->length
>= params
->allocated_len
) {
220 params
->allocated_len
= rec
->length
+ params
->len
+ ctdb
->tunable
.pulldb_preallocation_size
;
221 params
->pulldata
= talloc_realloc_size(NULL
, params
->pulldata
, params
->allocated_len
);
223 if (params
->pulldata
== NULL
) {
224 DEBUG(DEBUG_CRIT
,(__location__
" Failed to expand pulldb_data to %u\n", rec
->length
+ params
->len
));
225 ctdb_fatal(params
->ctdb
, "failed to allocate memory for recovery. shutting down\n");
227 params
->pulldata
->count
++;
228 memcpy(params
->len
+(uint8_t *)params
->pulldata
, rec
, rec
->length
);
229 params
->len
+= rec
->length
;
231 if (ctdb
->tunable
.db_record_size_warn
!= 0 && rec
->length
> ctdb
->tunable
.db_record_size_warn
) {
232 DEBUG(DEBUG_ERR
,("Data record in %s is big. Record size is %d bytes\n", ctdb_db
->db_name
, (int)rec
->length
));
241 pull a bunch of records from a ltdb, filtering by lmaster
243 int32_t ctdb_control_pull_db(struct ctdb_context
*ctdb
, TDB_DATA indata
, TDB_DATA
*outdata
)
245 struct ctdb_pulldb
*pull
;
246 struct ctdb_db_context
*ctdb_db
;
247 struct pulldb_data params
;
248 struct ctdb_marshall_buffer
*reply
;
250 pull
= (struct ctdb_pulldb
*)indata
.dptr
;
252 ctdb_db
= find_ctdb_db(ctdb
, pull
->db_id
);
254 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%08x\n", pull
->db_id
));
258 if (!ctdb_db_frozen(ctdb_db
)) {
260 ("rejecting ctdb_control_pull_db when not frozen\n"));
264 reply
= talloc_zero(outdata
, struct ctdb_marshall_buffer
);
265 CTDB_NO_MEMORY(ctdb
, reply
);
267 reply
->db_id
= pull
->db_id
;
270 params
.ctdb_db
= ctdb_db
;
271 params
.pulldata
= reply
;
272 params
.len
= offsetof(struct ctdb_marshall_buffer
, data
);
273 params
.allocated_len
= params
.len
;
274 params
.failed
= false;
276 if (ctdb_db
->unhealthy_reason
) {
277 /* this is just a warning, as the tdb should be empty anyway */
278 DEBUG(DEBUG_WARNING
,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
279 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
282 if (ctdb_lockdb_mark(ctdb_db
) != 0) {
283 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock on entire db - failing\n"));
287 if (tdb_traverse_read(ctdb_db
->ltdb
->tdb
, traverse_pulldb
, ¶ms
) == -1) {
288 DEBUG(DEBUG_ERR
,(__location__
" Failed to get traverse db '%s'\n", ctdb_db
->db_name
));
289 ctdb_lockdb_unmark(ctdb_db
);
290 talloc_free(params
.pulldata
);
294 ctdb_lockdb_unmark(ctdb_db
);
296 outdata
->dptr
= (uint8_t *)params
.pulldata
;
297 outdata
->dsize
= params
.len
;
299 if (ctdb
->tunable
.db_record_count_warn
!= 0 && params
.pulldata
->count
> ctdb
->tunable
.db_record_count_warn
) {
300 DEBUG(DEBUG_ERR
,("Database %s is big. Contains %d records\n", ctdb_db
->db_name
, params
.pulldata
->count
));
302 if (ctdb
->tunable
.db_size_warn
!= 0 && outdata
->dsize
> ctdb
->tunable
.db_size_warn
) {
303 DEBUG(DEBUG_ERR
,("Database %s is big. Contains %d bytes\n", ctdb_db
->db_name
, (int)outdata
->dsize
));
310 struct db_pull_state
{
311 struct ctdb_context
*ctdb
;
312 struct ctdb_db_context
*ctdb_db
;
313 struct ctdb_marshall_buffer
*recs
;
316 uint32_t num_records
;
319 static int traverse_db_pull(struct tdb_context
*tdb
, TDB_DATA key
,
320 TDB_DATA data
, void *private_data
)
322 struct db_pull_state
*state
= (struct db_pull_state
*)private_data
;
323 struct ctdb_marshall_buffer
*recs
;
325 recs
= ctdb_marshall_add(state
->ctdb
, state
->recs
,
326 state
->ctdb_db
->db_id
, 0, key
, NULL
, data
);
328 TALLOC_FREE(state
->recs
);
333 if (talloc_get_size(state
->recs
) >=
334 state
->ctdb
->tunable
.rec_buffer_size_limit
) {
338 buffer
= ctdb_marshall_finish(state
->recs
);
339 ret
= ctdb_daemon_send_message(state
->ctdb
, state
->pnn
,
340 state
->srvid
, buffer
);
342 TALLOC_FREE(state
->recs
);
346 state
->num_records
+= state
->recs
->count
;
347 TALLOC_FREE(state
->recs
);
353 int32_t ctdb_control_db_pull(struct ctdb_context
*ctdb
,
354 struct ctdb_req_control_old
*c
,
355 TDB_DATA indata
, TDB_DATA
*outdata
)
357 struct ctdb_pulldb_ext
*pulldb_ext
;
358 struct ctdb_db_context
*ctdb_db
;
359 struct db_pull_state state
;
362 pulldb_ext
= (struct ctdb_pulldb_ext
*)indata
.dptr
;
364 ctdb_db
= find_ctdb_db(ctdb
, pulldb_ext
->db_id
);
365 if (ctdb_db
== NULL
) {
366 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%08x\n",
371 if (!ctdb_db_frozen(ctdb_db
)) {
373 ("rejecting ctdb_control_pull_db when not frozen\n"));
377 if (ctdb_db
->unhealthy_reason
) {
378 /* this is just a warning, as the tdb should be empty anyway */
380 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
381 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
385 state
.ctdb_db
= ctdb_db
;
387 state
.pnn
= c
->hdr
.srcnode
;
388 state
.srvid
= pulldb_ext
->srvid
;
389 state
.num_records
= 0;
391 if (ctdb_lockdb_mark(ctdb_db
) != 0) {
393 (__location__
" Failed to get lock on entire db - failing\n"));
397 ret
= tdb_traverse_read(ctdb_db
->ltdb
->tdb
, traverse_db_pull
, &state
);
400 (__location__
" Failed to get traverse db '%s'\n",
402 ctdb_lockdb_unmark(ctdb_db
);
406 /* Last few records */
407 if (state
.recs
!= NULL
) {
410 buffer
= ctdb_marshall_finish(state
.recs
);
411 ret
= ctdb_daemon_send_message(state
.ctdb
, state
.pnn
,
412 state
.srvid
, buffer
);
414 TALLOC_FREE(state
.recs
);
415 ctdb_lockdb_unmark(ctdb_db
);
419 state
.num_records
+= state
.recs
->count
;
420 TALLOC_FREE(state
.recs
);
423 ctdb_lockdb_unmark(ctdb_db
);
425 outdata
->dptr
= talloc_size(outdata
, sizeof(uint32_t));
426 if (outdata
->dptr
== NULL
) {
427 DEBUG(DEBUG_ERR
, (__location__
" Memory allocation error\n"));
431 memcpy(outdata
->dptr
, (uint8_t *)&state
.num_records
, sizeof(uint32_t));
432 outdata
->dsize
= sizeof(uint32_t);
438 push a bunch of records into a ltdb, filtering by rsn
440 int32_t ctdb_control_push_db(struct ctdb_context
*ctdb
, TDB_DATA indata
)
442 struct ctdb_marshall_buffer
*reply
= (struct ctdb_marshall_buffer
*)indata
.dptr
;
443 struct ctdb_db_context
*ctdb_db
;
445 struct ctdb_rec_data_old
*rec
;
447 if (indata
.dsize
< offsetof(struct ctdb_marshall_buffer
, data
)) {
448 DEBUG(DEBUG_ERR
,(__location__
" invalid data in pulldb reply\n"));
452 ctdb_db
= find_ctdb_db(ctdb
, reply
->db_id
);
454 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%08x\n", reply
->db_id
));
458 if (!ctdb_db_frozen(ctdb_db
)) {
460 ("rejecting ctdb_control_push_db when not frozen\n"));
464 if (ctdb_lockdb_mark(ctdb_db
) != 0) {
465 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock on entire db - failing\n"));
469 rec
= (struct ctdb_rec_data_old
*)&reply
->data
[0];
471 DEBUG(DEBUG_INFO
,("starting push of %u records for dbid 0x%x\n",
472 reply
->count
, reply
->db_id
));
474 for (i
=0;i
<reply
->count
;i
++) {
476 struct ctdb_ltdb_header
*hdr
;
478 key
.dptr
= &rec
->data
[0];
479 key
.dsize
= rec
->keylen
;
480 data
.dptr
= &rec
->data
[key
.dsize
];
481 data
.dsize
= rec
->datalen
;
483 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
484 DEBUG(DEBUG_CRIT
,(__location__
" bad ltdb record\n"));
487 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
488 /* strip off any read only record flags. All readonly records
489 are revoked implicitely by a recovery
491 hdr
->flags
&= ~CTDB_REC_RO_FLAGS
;
493 data
.dptr
+= sizeof(*hdr
);
494 data
.dsize
-= sizeof(*hdr
);
496 ret
= ctdb_ltdb_store(ctdb_db
, key
, hdr
, data
);
498 DEBUG(DEBUG_CRIT
, (__location__
" Unable to store record\n"));
502 rec
= (struct ctdb_rec_data_old
*)(rec
->length
+ (uint8_t *)rec
);
505 DEBUG(DEBUG_DEBUG
,("finished push of %u records for dbid 0x%x\n",
506 reply
->count
, reply
->db_id
));
508 if (ctdb_db_readonly(ctdb_db
)) {
509 DEBUG(DEBUG_CRIT
,("Clearing the tracking database for dbid 0x%x\n",
511 if (tdb_wipe_all(ctdb_db
->rottdb
) != 0) {
512 DEBUG(DEBUG_ERR
,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db
->db_id
));
513 tdb_close(ctdb_db
->rottdb
);
514 ctdb_db
->rottdb
= NULL
;
515 ctdb_db_reset_readonly(ctdb_db
);
517 while (ctdb_db
->revokechild_active
!= NULL
) {
518 talloc_free(ctdb_db
->revokechild_active
);
522 ctdb_lockdb_unmark(ctdb_db
);
526 ctdb_lockdb_unmark(ctdb_db
);
530 struct db_push_state
{
531 struct ctdb_context
*ctdb
;
532 struct ctdb_db_context
*ctdb_db
;
534 uint32_t num_records
;
538 static void db_push_msg_handler(uint64_t srvid
, TDB_DATA indata
,
541 struct db_push_state
*state
= talloc_get_type(
542 private_data
, struct db_push_state
);
543 struct ctdb_marshall_buffer
*recs
;
544 struct ctdb_rec_data_old
*rec
;
551 recs
= (struct ctdb_marshall_buffer
*)indata
.dptr
;
552 rec
= (struct ctdb_rec_data_old
*)&recs
->data
[0];
554 DEBUG(DEBUG_INFO
, ("starting push of %u records for dbid 0x%x\n",
555 recs
->count
, recs
->db_id
));
557 for (i
=0; i
<recs
->count
; i
++) {
559 struct ctdb_ltdb_header
*hdr
;
561 key
.dptr
= &rec
->data
[0];
562 key
.dsize
= rec
->keylen
;
563 data
.dptr
= &rec
->data
[key
.dsize
];
564 data
.dsize
= rec
->datalen
;
566 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
567 DEBUG(DEBUG_CRIT
,(__location__
" bad ltdb record\n"));
571 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
572 /* Strip off any read only record flags.
573 * All readonly records are revoked implicitely by a recovery.
575 hdr
->flags
&= ~CTDB_REC_RO_FLAGS
;
577 data
.dptr
+= sizeof(*hdr
);
578 data
.dsize
-= sizeof(*hdr
);
580 ret
= ctdb_ltdb_store(state
->ctdb_db
, key
, hdr
, data
);
583 (__location__
" Unable to store record\n"));
587 rec
= (struct ctdb_rec_data_old
*)(rec
->length
+ (uint8_t *)rec
);
590 DEBUG(DEBUG_DEBUG
, ("finished push of %u records for dbid 0x%x\n",
591 recs
->count
, recs
->db_id
));
593 state
->num_records
+= recs
->count
;
597 state
->failed
= true;
600 int32_t ctdb_control_db_push_start(struct ctdb_context
*ctdb
, TDB_DATA indata
)
602 struct ctdb_pulldb_ext
*pulldb_ext
;
603 struct ctdb_db_context
*ctdb_db
;
604 struct db_push_state
*state
;
607 pulldb_ext
= (struct ctdb_pulldb_ext
*)indata
.dptr
;
609 ctdb_db
= find_ctdb_db(ctdb
, pulldb_ext
->db_id
);
610 if (ctdb_db
== NULL
) {
612 (__location__
" Unknown db 0x%08x\n", pulldb_ext
->db_id
));
616 if (!ctdb_db_frozen(ctdb_db
)) {
618 ("rejecting ctdb_control_db_push_start when not frozen\n"));
622 if (ctdb_db
->push_started
) {
624 (__location__
" DB push already started for %s\n",
627 /* De-register old state */
628 state
= (struct db_push_state
*)ctdb_db
->push_state
;
630 srvid_deregister(ctdb
->srv
, state
->srvid
, state
);
632 ctdb_db
->push_state
= NULL
;
636 state
= talloc_zero(ctdb_db
, struct db_push_state
);
638 DEBUG(DEBUG_ERR
, (__location__
" Memory allocation error\n"));
643 state
->ctdb_db
= ctdb_db
;
644 state
->srvid
= pulldb_ext
->srvid
;
645 state
->failed
= false;
647 ret
= srvid_register(ctdb
->srv
, state
, state
->srvid
,
648 db_push_msg_handler
, state
);
651 (__location__
" Failed to register srvid for db push\n"));
656 if (ctdb_lockdb_mark(ctdb_db
) != 0) {
658 (__location__
" Failed to get lock on entire db - failing\n"));
659 srvid_deregister(ctdb
->srv
, state
->srvid
, state
);
664 ctdb_db
->push_started
= true;
665 ctdb_db
->push_state
= state
;
670 int32_t ctdb_control_db_push_confirm(struct ctdb_context
*ctdb
,
671 TDB_DATA indata
, TDB_DATA
*outdata
)
674 struct ctdb_db_context
*ctdb_db
;
675 struct db_push_state
*state
;
677 db_id
= *(uint32_t *)indata
.dptr
;
679 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
680 if (ctdb_db
== NULL
) {
681 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%08x\n", db_id
));
685 if (!ctdb_db_frozen(ctdb_db
)) {
687 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
691 if (!ctdb_db
->push_started
) {
692 DEBUG(DEBUG_ERR
, (__location__
" DB push not started\n"));
696 if (ctdb_db_readonly(ctdb_db
)) {
698 ("Clearing the tracking database for dbid 0x%x\n",
700 if (tdb_wipe_all(ctdb_db
->rottdb
) != 0) {
702 ("Failed to wipe tracking database for 0x%x."
703 " Dropping read-only delegation support\n",
705 tdb_close(ctdb_db
->rottdb
);
706 ctdb_db
->rottdb
= NULL
;
707 ctdb_db_reset_readonly(ctdb_db
);
710 while (ctdb_db
->revokechild_active
!= NULL
) {
711 talloc_free(ctdb_db
->revokechild_active
);
715 ctdb_lockdb_unmark(ctdb_db
);
717 state
= (struct db_push_state
*)ctdb_db
->push_state
;
719 DEBUG(DEBUG_ERR
, (__location__
" Missing push db state\n"));
723 srvid_deregister(ctdb
->srv
, state
->srvid
, state
);
725 outdata
->dptr
= talloc_size(outdata
, sizeof(uint32_t));
726 if (outdata
->dptr
== NULL
) {
727 DEBUG(DEBUG_ERR
, (__location__
" Memory allocation error\n"));
729 ctdb_db
->push_state
= NULL
;
733 memcpy(outdata
->dptr
, (uint8_t *)&state
->num_records
, sizeof(uint32_t));
734 outdata
->dsize
= sizeof(uint32_t);
737 ctdb_db
->push_started
= false;
738 ctdb_db
->push_state
= NULL
;
743 struct set_recmode_state
{
744 struct ctdb_context
*ctdb
;
745 struct ctdb_req_control_old
*c
;
748 static void set_recmode_handler(char status
,
752 struct set_recmode_state
*state
= talloc_get_type_abort(
753 private_data
, struct set_recmode_state
);
755 const char *err
= NULL
;
761 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
762 state
->ctdb
->recovery_lock
));
764 err
= "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
769 DEBUG(DEBUG_DEBUG
, (__location__
" Recovery lock check OK\n"));
770 state
->ctdb
->recovery_mode
= CTDB_RECOVERY_NORMAL
;
771 ctdb_process_deferred_attach(state
->ctdb
);
775 CTDB_UPDATE_RECLOCK_LATENCY(state
->ctdb
, "daemon reclock",
776 reclock
.ctdbd
, latency
);
780 /* Timeout. Consider this a success, not a failure,
781 * as we failed to set the recovery lock which is what
782 * we wanted. This can be caused by the cluster
783 * filesystem being very slow to arbitrate locks
784 * immediately after a node failure. */
787 "Time out getting recovery lock, allowing recmode set anyway\n"));
788 state
->ctdb
->recovery_mode
= CTDB_RECOVERY_NORMAL
;
789 ctdb_process_deferred_attach(state
->ctdb
);
796 ("Unexpected error when testing recovery lock\n"));
798 err
= "Unexpected error when testing recovery lock";
801 ctdb_request_control_reply(state
->ctdb
, state
->c
, NULL
, s
, err
);
806 ctdb_drop_all_ips_event(struct tevent_context
*ev
, struct tevent_timer
*te
,
807 struct timeval t
, void *private_data
)
809 struct ctdb_context
*ctdb
= talloc_get_type(private_data
, struct ctdb_context
);
811 DEBUG(DEBUG_ERR
,(__location__
" Been in recovery mode for too long. Dropping all IPS\n"));
812 talloc_free(ctdb
->release_ips_ctx
);
813 ctdb
->release_ips_ctx
= NULL
;
815 ctdb_release_all_ips(ctdb
);
819 * Set up an event to drop all public ips if we remain in recovery for too
822 int ctdb_deferred_drop_all_ips(struct ctdb_context
*ctdb
)
824 if (ctdb
->release_ips_ctx
!= NULL
) {
825 talloc_free(ctdb
->release_ips_ctx
);
827 ctdb
->release_ips_ctx
= talloc_new(ctdb
);
828 CTDB_NO_MEMORY(ctdb
, ctdb
->release_ips_ctx
);
830 tevent_add_timer(ctdb
->ev
, ctdb
->release_ips_ctx
,
831 timeval_current_ofs(ctdb
->tunable
.recovery_drop_all_ips
, 0),
832 ctdb_drop_all_ips_event
, ctdb
);
837 set the recovery mode
839 int32_t ctdb_control_set_recmode(struct ctdb_context
*ctdb
,
840 struct ctdb_req_control_old
*c
,
841 TDB_DATA indata
, bool *async_reply
,
842 const char **errormsg
)
844 uint32_t recmode
= *(uint32_t *)indata
.dptr
;
845 struct ctdb_db_context
*ctdb_db
;
846 struct set_recmode_state
*state
;
847 struct ctdb_cluster_mutex_handle
*h
;
849 if (recmode
== ctdb
->recovery_mode
) {
850 D_INFO("Recovery mode already set to %s\n",
851 recmode
== CTDB_RECOVERY_NORMAL
? "NORMAL" : "ACTIVE");
855 D_NOTICE("Recovery mode set to %s\n",
856 recmode
== CTDB_RECOVERY_NORMAL
? "NORMAL" : "ACTIVE");
858 /* if we enter recovery but stay in recovery for too long
859 we will eventually drop all our ip addresses
861 if (recmode
== CTDB_RECOVERY_ACTIVE
) {
862 if (ctdb_deferred_drop_all_ips(ctdb
) != 0) {
863 D_ERR("Failed to set up deferred drop all ips\n");
866 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
870 /* From this point: recmode == CTDB_RECOVERY_NORMAL
872 * Therefore, what follows is special handling when setting
873 * recovery mode back to normal */
875 TALLOC_FREE(ctdb
->release_ips_ctx
);
877 for (ctdb_db
= ctdb
->db_list
; ctdb_db
!= NULL
; ctdb_db
= ctdb_db
->next
) {
878 if (ctdb_db
->generation
!= ctdb
->vnn_map
->generation
) {
880 ("Inconsistent DB generation %u for %s\n",
881 ctdb_db
->generation
, ctdb_db
->db_name
));
882 DEBUG(DEBUG_ERR
, ("Recovery mode set to ACTIVE\n"));
887 /* force the databases to thaw */
888 if (ctdb_db_all_frozen(ctdb
)) {
889 ctdb_control_thaw(ctdb
, false);
892 if (ctdb
->recovery_lock
== NULL
) {
893 /* Not using recovery lock file */
894 ctdb
->recovery_mode
= CTDB_RECOVERY_NORMAL
;
895 ctdb_process_deferred_attach(ctdb
);
899 state
= talloc_zero(ctdb
, struct set_recmode_state
);
901 DEBUG(DEBUG_ERR
, (__location__
" out of memory\n"));
907 h
= ctdb_cluster_mutex(state
, ctdb
, ctdb
->recovery_lock
, 5,
908 set_recmode_handler
, state
, NULL
, NULL
);
914 state
->c
= talloc_steal(state
, c
);
922 delete a record as part of the vacuum process
923 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
924 use non-blocking locks
926 return 0 if the record was successfully deleted (i.e. it does not exist
927 when the function returns)
928 or !0 is the record still exists in the tdb after returning.
930 static int delete_tdb_record(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, struct ctdb_rec_data_old
*rec
)
932 TDB_DATA key
, data
, data2
;
933 struct ctdb_ltdb_header
*hdr
, *hdr2
;
935 /* these are really internal tdb functions - but we need them here for
936 non-blocking lock of the freelist */
937 int tdb_lock_nonblock(struct tdb_context
*tdb
, int list
, int ltype
);
938 int tdb_unlock(struct tdb_context
*tdb
, int list
, int ltype
);
941 key
.dsize
= rec
->keylen
;
942 key
.dptr
= &rec
->data
[0];
943 data
.dsize
= rec
->datalen
;
944 data
.dptr
= &rec
->data
[rec
->keylen
];
946 if (ctdb_lmaster(ctdb
, &key
) == ctdb
->pnn
) {
947 DEBUG(DEBUG_INFO
,(__location__
" Called delete on record where we are lmaster\n"));
951 if (data
.dsize
!= sizeof(struct ctdb_ltdb_header
)) {
952 DEBUG(DEBUG_ERR
,(__location__
" Bad record size\n"));
956 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
958 /* use a non-blocking lock */
959 if (tdb_chainlock_nonblock(ctdb_db
->ltdb
->tdb
, key
) != 0) {
963 data2
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
964 if (data2
.dptr
== NULL
) {
965 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
969 if (data2
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
970 if (tdb_lock_nonblock(ctdb_db
->ltdb
->tdb
, -1, F_WRLCK
) == 0) {
971 if (tdb_delete(ctdb_db
->ltdb
->tdb
, key
) != 0) {
972 DEBUG(DEBUG_CRIT
,(__location__
" Failed to delete corrupt record\n"));
974 tdb_unlock(ctdb_db
->ltdb
->tdb
, -1, F_WRLCK
);
975 DEBUG(DEBUG_CRIT
,(__location__
" Deleted corrupt record\n"));
977 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
982 hdr2
= (struct ctdb_ltdb_header
*)data2
.dptr
;
984 if (hdr2
->rsn
> hdr
->rsn
) {
985 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
986 DEBUG(DEBUG_INFO
,(__location__
" Skipping record with rsn=%llu - called with rsn=%llu\n",
987 (unsigned long long)hdr2
->rsn
, (unsigned long long)hdr
->rsn
));
992 /* do not allow deleting record that have readonly flags set. */
993 if (hdr
->flags
& CTDB_REC_RO_FLAGS
) {
994 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
995 DEBUG(DEBUG_INFO
,(__location__
" Skipping record with readonly flags set\n"));
999 if (hdr2
->flags
& CTDB_REC_RO_FLAGS
) {
1000 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
1001 DEBUG(DEBUG_INFO
,(__location__
" Skipping record with readonly flags set\n"));
1006 if (hdr2
->dmaster
== ctdb
->pnn
) {
1007 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
1008 DEBUG(DEBUG_INFO
,(__location__
" Attempted delete record where we are the dmaster\n"));
1013 if (tdb_lock_nonblock(ctdb_db
->ltdb
->tdb
, -1, F_WRLCK
) != 0) {
1014 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
1019 if (tdb_delete(ctdb_db
->ltdb
->tdb
, key
) != 0) {
1020 tdb_unlock(ctdb_db
->ltdb
->tdb
, -1, F_WRLCK
);
1021 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
1022 DEBUG(DEBUG_INFO
,(__location__
" Failed to delete record\n"));
1027 tdb_unlock(ctdb_db
->ltdb
->tdb
, -1, F_WRLCK
);
1028 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
1035 struct recovery_callback_state
{
1036 struct ctdb_req_control_old
*c
;
1041 called when the 'recovered' event script has finished
1043 static void ctdb_end_recovery_callback(struct ctdb_context
*ctdb
, int status
, void *p
)
1045 struct recovery_callback_state
*state
= talloc_get_type(p
, struct recovery_callback_state
);
1047 CTDB_INCREMENT_STAT(ctdb
, num_recoveries
);
1050 DEBUG(DEBUG_ERR
,(__location__
" recovered event script failed (status %d)\n", status
));
1051 if (status
== -ETIME
) {
1052 ctdb_ban_self(ctdb
);
1056 ctdb_request_control_reply(ctdb
, state
->c
, NULL
, status
, NULL
);
1059 gettimeofday(&ctdb
->last_recovery_finished
, NULL
);
1061 if (ctdb
->runstate
== CTDB_RUNSTATE_FIRST_RECOVERY
) {
1062 ctdb_set_runstate(ctdb
, CTDB_RUNSTATE_STARTUP
);
1067 recovery has finished
1069 int32_t ctdb_control_end_recovery(struct ctdb_context
*ctdb
,
1070 struct ctdb_req_control_old
*c
,
1074 struct recovery_callback_state
*state
;
1076 DEBUG(DEBUG_ERR
,("Recovery has finished\n"));
1078 ctdb_persistent_finish_trans3_commits(ctdb
);
1080 state
= talloc(ctdb
, struct recovery_callback_state
);
1081 CTDB_NO_MEMORY(ctdb
, state
);
1085 ret
= ctdb_event_script_callback(ctdb
, state
,
1086 ctdb_end_recovery_callback
,
1088 CTDB_EVENT_RECOVERED
, "%s", "");
1091 DEBUG(DEBUG_ERR
,(__location__
" Failed to end recovery\n"));
1096 /* tell the control that we will be reply asynchronously */
1097 state
->c
= talloc_steal(state
, c
);
1098 *async_reply
= true;
1103 called when the 'startrecovery' event script has finished
1105 static void ctdb_start_recovery_callback(struct ctdb_context
*ctdb
, int status
, void *p
)
1107 struct recovery_callback_state
*state
= talloc_get_type(p
, struct recovery_callback_state
);
1110 DEBUG(DEBUG_ERR
,(__location__
" startrecovery event script failed (status %d)\n", status
));
1113 ctdb_request_control_reply(ctdb
, state
->c
, NULL
, status
, NULL
);
1117 static void run_start_recovery_event(struct ctdb_context
*ctdb
,
1118 struct recovery_callback_state
*state
)
1122 ret
= ctdb_event_script_callback(ctdb
, state
,
1123 ctdb_start_recovery_callback
,
1125 CTDB_EVENT_START_RECOVERY
,
1129 DEBUG(DEBUG_ERR
,("Unable to run startrecovery event\n"));
1130 ctdb_request_control_reply(ctdb
, state
->c
, NULL
, -1, NULL
);
1138 static bool reclock_strings_equal(const char *a
, const char *b
)
1140 return (a
== NULL
&& b
== NULL
) ||
1141 (a
!= NULL
&& b
!= NULL
&& strcmp(a
, b
) == 0);
1144 static void start_recovery_reclock_callback(struct ctdb_context
*ctdb
,
1147 const char *errormsg
,
1150 struct recovery_callback_state
*state
= talloc_get_type_abort(
1151 private_data
, struct recovery_callback_state
);
1152 const char *local
= ctdb
->recovery_lock
;
1153 const char *remote
= NULL
;
1156 DEBUG(DEBUG_ERR
, (__location__
" GET_RECLOCK failed\n"));
1157 ctdb_request_control_reply(ctdb
, state
->c
, NULL
,
1163 /* Check reclock consistency */
1164 if (data
.dsize
> 0) {
1165 /* Ensure NUL-termination */
1166 data
.dptr
[data
.dsize
-1] = '\0';
1167 remote
= (const char *)data
.dptr
;
1169 if (! reclock_strings_equal(local
, remote
)) {
1171 ctdb_request_control_reply(ctdb
, state
->c
, NULL
, -1, NULL
);
1173 ("Recovery lock configuration inconsistent: "
1174 "recmaster has %s, this node has %s, shutting down\n",
1175 remote
== NULL
? "NULL" : remote
,
1176 local
== NULL
? "NULL" : local
));
1178 ctdb_shutdown_sequence(ctdb
, 1);
1181 ("Recovery lock consistency check successful\n"));
1183 run_start_recovery_event(ctdb
, state
);
1186 /* Check recovery lock consistency and run eventscripts for the
1187 * "startrecovery" event */
1188 int32_t ctdb_control_start_recovery(struct ctdb_context
*ctdb
,
1189 struct ctdb_req_control_old
*c
,
1193 struct recovery_callback_state
*state
;
1194 uint32_t recmaster
= c
->hdr
.srcnode
;
1196 DEBUG(DEBUG_ERR
, ("Recovery has started\n"));
1197 gettimeofday(&ctdb
->last_recovery_started
, NULL
);
1199 state
= talloc(ctdb
, struct recovery_callback_state
);
1200 CTDB_NO_MEMORY(ctdb
, state
);
1204 /* Although the recovery master sent this node a start
1205 * recovery control, this node might still think the recovery
1206 * master is disconnected. In this case defer the recovery
1207 * lock consistency check. */
1208 if (ctdb
->nodes
[recmaster
]->flags
& NODE_FLAGS_DISCONNECTED
) {
1209 run_start_recovery_event(ctdb
, state
);
1211 /* Ask the recovery master about its reclock setting */
1212 ret
= ctdb_daemon_send_control(ctdb
,
1215 CTDB_CONTROL_GET_RECLOCK_FILE
,
1218 start_recovery_reclock_callback
,
1222 DEBUG(DEBUG_ERR
, (__location__
" GET_RECLOCK failed\n"));
1228 /* tell the control that we will be reply asynchronously */
1229 state
->c
= talloc_steal(state
, c
);
1230 *async_reply
= true;
1236 try to delete all these records as part of the vacuuming process
1237 and return the records we failed to delete
1239 int32_t ctdb_control_try_delete_records(struct ctdb_context
*ctdb
, TDB_DATA indata
, TDB_DATA
*outdata
)
1241 struct ctdb_marshall_buffer
*reply
= (struct ctdb_marshall_buffer
*)indata
.dptr
;
1242 struct ctdb_db_context
*ctdb_db
;
1244 struct ctdb_rec_data_old
*rec
;
1245 struct ctdb_marshall_buffer
*records
;
1247 if (indata
.dsize
< offsetof(struct ctdb_marshall_buffer
, data
)) {
1248 DEBUG(DEBUG_ERR
,(__location__
" invalid data in try_delete_records\n"));
1252 ctdb_db
= find_ctdb_db(ctdb
, reply
->db_id
);
1254 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%08x\n", reply
->db_id
));
1259 DEBUG(DEBUG_DEBUG
,("starting try_delete_records of %u records for dbid 0x%x\n",
1260 reply
->count
, reply
->db_id
));
1263 /* create a blob to send back the records we couldnt delete */
1264 records
= (struct ctdb_marshall_buffer
*)
1265 talloc_zero_size(outdata
,
1266 offsetof(struct ctdb_marshall_buffer
, data
));
1267 if (records
== NULL
) {
1268 DEBUG(DEBUG_ERR
,(__location__
" Out of memory\n"));
1271 records
->db_id
= ctdb_db
->db_id
;
1274 rec
= (struct ctdb_rec_data_old
*)&reply
->data
[0];
1275 for (i
=0;i
<reply
->count
;i
++) {
1278 key
.dptr
= &rec
->data
[0];
1279 key
.dsize
= rec
->keylen
;
1280 data
.dptr
= &rec
->data
[key
.dsize
];
1281 data
.dsize
= rec
->datalen
;
1283 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1284 DEBUG(DEBUG_CRIT
,(__location__
" bad ltdb record in indata\n"));
1285 talloc_free(records
);
1289 /* If we cant delete the record we must add it to the reply
1290 so the lmaster knows it may not purge this record
1292 if (delete_tdb_record(ctdb
, ctdb_db
, rec
) != 0) {
1294 struct ctdb_ltdb_header
*hdr
;
1296 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
1297 data
.dptr
+= sizeof(*hdr
);
1298 data
.dsize
-= sizeof(*hdr
);
1300 DEBUG(DEBUG_INFO
, (__location__
" Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key
)));
1302 old_size
= talloc_get_size(records
);
1303 records
= talloc_realloc_size(outdata
, records
, old_size
+ rec
->length
);
1304 if (records
== NULL
) {
1305 DEBUG(DEBUG_ERR
,(__location__
" Failed to expand\n"));
1309 memcpy(old_size
+(uint8_t *)records
, rec
, rec
->length
);
1312 rec
= (struct ctdb_rec_data_old
*)(rec
->length
+ (uint8_t *)rec
);
1316 *outdata
= ctdb_marshall_finish(records
);
1322 * Store a record as part of the vacuum process:
1323 * This is called from the RECEIVE_RECORD control which
1324 * the lmaster uses to send the current empty copy
1325 * to all nodes for storing, before it lets the other
1326 * nodes delete the records in the second phase with
1327 * the TRY_DELETE_RECORDS control.
1329 * Only store if we are not lmaster or dmaster, and our
1330 * rsn is <= the provided rsn. Use non-blocking locks.
1332 * return 0 if the record was successfully stored.
1333 * return !0 if the record still exists in the tdb after returning.
1335 static int store_tdb_record(struct ctdb_context
*ctdb
,
1336 struct ctdb_db_context
*ctdb_db
,
1337 struct ctdb_rec_data_old
*rec
)
1339 TDB_DATA key
, data
, data2
;
1340 struct ctdb_ltdb_header
*hdr
, *hdr2
;
1343 key
.dsize
= rec
->keylen
;
1344 key
.dptr
= &rec
->data
[0];
1345 data
.dsize
= rec
->datalen
;
1346 data
.dptr
= &rec
->data
[rec
->keylen
];
1348 if (ctdb_lmaster(ctdb
, &key
) == ctdb
->pnn
) {
1349 DEBUG(DEBUG_INFO
, (__location__
" Called store_tdb_record "
1350 "where we are lmaster\n"));
1354 if (data
.dsize
!= sizeof(struct ctdb_ltdb_header
)) {
1355 DEBUG(DEBUG_ERR
, (__location__
" Bad record size\n"));
1359 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
1361 /* use a non-blocking lock */
1362 if (tdb_chainlock_nonblock(ctdb_db
->ltdb
->tdb
, key
) != 0) {
1363 DEBUG(DEBUG_INFO
, (__location__
" Failed to lock chain in non-blocking mode\n"));
1367 data2
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
1368 if (data2
.dptr
== NULL
|| data2
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1369 if (tdb_store(ctdb_db
->ltdb
->tdb
, key
, data
, 0) == -1) {
1370 DEBUG(DEBUG_ERR
, (__location__
"Failed to store record\n"));
1374 DEBUG(DEBUG_INFO
, (__location__
" Stored record\n"));
1379 hdr2
= (struct ctdb_ltdb_header
*)data2
.dptr
;
1381 if (hdr2
->rsn
> hdr
->rsn
) {
1382 DEBUG(DEBUG_INFO
, (__location__
" Skipping record with "
1383 "rsn=%llu - called with rsn=%llu\n",
1384 (unsigned long long)hdr2
->rsn
,
1385 (unsigned long long)hdr
->rsn
));
1390 /* do not allow vacuuming of records that have readonly flags set. */
1391 if (hdr
->flags
& CTDB_REC_RO_FLAGS
) {
1392 DEBUG(DEBUG_INFO
,(__location__
" Skipping record with readonly "
1397 if (hdr2
->flags
& CTDB_REC_RO_FLAGS
) {
1398 DEBUG(DEBUG_INFO
,(__location__
" Skipping record with readonly "
1404 if (hdr2
->dmaster
== ctdb
->pnn
) {
1405 DEBUG(DEBUG_INFO
, (__location__
" Attempted to store record "
1406 "where we are the dmaster\n"));
1411 if (tdb_store(ctdb_db
->ltdb
->tdb
, key
, data
, 0) != 0) {
1412 DEBUG(DEBUG_INFO
,(__location__
" Failed to store record\n"));
1420 tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
1428 * Try to store all these records as part of the vacuuming process
1429 * and return the records we failed to store.
1431 int32_t ctdb_control_receive_records(struct ctdb_context
*ctdb
,
1432 TDB_DATA indata
, TDB_DATA
*outdata
)
1434 struct ctdb_marshall_buffer
*reply
= (struct ctdb_marshall_buffer
*)indata
.dptr
;
1435 struct ctdb_db_context
*ctdb_db
;
1437 struct ctdb_rec_data_old
*rec
;
1438 struct ctdb_marshall_buffer
*records
;
1440 if (indata
.dsize
< offsetof(struct ctdb_marshall_buffer
, data
)) {
1442 (__location__
" invalid data in receive_records\n"));
1446 ctdb_db
= find_ctdb_db(ctdb
, reply
->db_id
);
1448 DEBUG(DEBUG_ERR
, (__location__
" Unknown db 0x%08x\n",
1453 DEBUG(DEBUG_DEBUG
, ("starting receive_records of %u records for "
1454 "dbid 0x%x\n", reply
->count
, reply
->db_id
));
1456 /* create a blob to send back the records we could not store */
1457 records
= (struct ctdb_marshall_buffer
*)
1458 talloc_zero_size(outdata
,
1459 offsetof(struct ctdb_marshall_buffer
, data
));
1460 if (records
== NULL
) {
1461 DEBUG(DEBUG_ERR
, (__location__
" Out of memory\n"));
1464 records
->db_id
= ctdb_db
->db_id
;
1466 rec
= (struct ctdb_rec_data_old
*)&reply
->data
[0];
1467 for (i
=0; i
<reply
->count
; i
++) {
1470 key
.dptr
= &rec
->data
[0];
1471 key
.dsize
= rec
->keylen
;
1472 data
.dptr
= &rec
->data
[key
.dsize
];
1473 data
.dsize
= rec
->datalen
;
1475 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1476 DEBUG(DEBUG_CRIT
, (__location__
" bad ltdb record "
1478 talloc_free(records
);
1483 * If we can not store the record we must add it to the reply
1484 * so the lmaster knows it may not purge this record.
1486 if (store_tdb_record(ctdb
, ctdb_db
, rec
) != 0) {
1488 struct ctdb_ltdb_header
*hdr
;
1490 hdr
= (struct ctdb_ltdb_header
*)data
.dptr
;
1491 data
.dptr
+= sizeof(*hdr
);
1492 data
.dsize
-= sizeof(*hdr
);
1494 DEBUG(DEBUG_INFO
, (__location__
" Failed to store "
1495 "record with hash 0x%08x in vacuum "
1496 "via RECEIVE_RECORDS\n",
1499 old_size
= talloc_get_size(records
);
1500 records
= talloc_realloc_size(outdata
, records
,
1501 old_size
+ rec
->length
);
1502 if (records
== NULL
) {
1503 DEBUG(DEBUG_ERR
, (__location__
" Failed to "
1508 memcpy(old_size
+(uint8_t *)records
, rec
, rec
->length
);
1511 rec
= (struct ctdb_rec_data_old
*)(rec
->length
+ (uint8_t *)rec
);
1514 *outdata
= ctdb_marshall_finish(records
);
1523 int32_t ctdb_control_get_capabilities(struct ctdb_context
*ctdb
, TDB_DATA
*outdata
)
1525 uint32_t *capabilities
= NULL
;
1527 capabilities
= talloc(outdata
, uint32_t);
1528 CTDB_NO_MEMORY(ctdb
, capabilities
);
1529 *capabilities
= ctdb
->capabilities
;
1531 outdata
->dsize
= sizeof(uint32_t);
1532 outdata
->dptr
= (uint8_t *)capabilities
;
1537 /* The recovery daemon will ping us at regular intervals.
1538 If we havent been pinged for a while we assume the recovery
1539 daemon is inoperable and we restart.
1541 static void ctdb_recd_ping_timeout(struct tevent_context
*ev
,
1542 struct tevent_timer
*te
,
1543 struct timeval t
, void *p
)
1545 struct ctdb_context
*ctdb
= talloc_get_type(p
, struct ctdb_context
);
1546 uint32_t *count
= talloc_get_type(ctdb
->recd_ping_count
, uint32_t);
1548 DEBUG(DEBUG_ERR
, ("Recovery daemon ping timeout. Count : %u\n", *count
));
1550 if (*count
< ctdb
->tunable
.recd_ping_failcount
) {
1552 tevent_add_timer(ctdb
->ev
, ctdb
->recd_ping_count
,
1553 timeval_current_ofs(ctdb
->tunable
.recd_ping_timeout
, 0),
1554 ctdb_recd_ping_timeout
, ctdb
);
1558 DEBUG(DEBUG_ERR
, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1560 ctdb_stop_recoverd(ctdb
);
1561 ctdb_start_recoverd(ctdb
);
1564 int32_t ctdb_control_recd_ping(struct ctdb_context
*ctdb
)
1566 talloc_free(ctdb
->recd_ping_count
);
1568 ctdb
->recd_ping_count
= talloc_zero(ctdb
, uint32_t);
1569 CTDB_NO_MEMORY(ctdb
, ctdb
->recd_ping_count
);
1571 if (ctdb
->tunable
.recd_ping_timeout
!= 0) {
1572 tevent_add_timer(ctdb
->ev
, ctdb
->recd_ping_count
,
1573 timeval_current_ofs(ctdb
->tunable
.recd_ping_timeout
, 0),
1574 ctdb_recd_ping_timeout
, ctdb
);
1582 int32_t ctdb_control_set_recmaster(struct ctdb_context
*ctdb
, uint32_t opcode
, TDB_DATA indata
)
1584 uint32_t new_recmaster
;
1586 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1587 new_recmaster
= ((uint32_t *)(&indata
.dptr
[0]))[0];
1589 if (ctdb
->pnn
!= new_recmaster
&& ctdb
->recovery_master
== ctdb
->pnn
) {
1591 ("Remote node (%u) is now the recovery master\n",
1595 if (ctdb
->pnn
== new_recmaster
&& ctdb
->recovery_master
!= new_recmaster
) {
1597 ("This node (%u) is now the recovery master\n",
1601 ctdb
->recovery_master
= new_recmaster
;
1606 int32_t ctdb_control_stop_node(struct ctdb_context
*ctdb
)
1608 DEBUG(DEBUG_ERR
, ("Stopping node\n"));
1609 ctdb
->nodes
[ctdb
->pnn
]->flags
|= NODE_FLAGS_STOPPED
;
1614 int32_t ctdb_control_continue_node(struct ctdb_context
*ctdb
)
1616 DEBUG(DEBUG_ERR
, ("Continue node\n"));
1617 ctdb
->nodes
[ctdb
->pnn
]->flags
&= ~NODE_FLAGS_STOPPED
;