Add IPv6 support to ADS client side LDAP connects. Corrected format for IPv6 LDAP...
[Samba.git] / ctdb / server / ctdb_recover.c
blob23f793be64fffaa05a0d6c4e97ee1eeac8757c47
1 /*
2 ctdb recovery code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
31 int
32 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
34 struct ctdb_vnn_map_wire *map;
35 size_t len;
37 CHECK_CONTROL_DATA_SIZE(0);
39 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
40 map = talloc_size(outdata, len);
41 CTDB_NO_MEMORY(ctdb, map);
43 map->generation = ctdb->vnn_map->generation;
44 map->size = ctdb->vnn_map->size;
45 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
47 outdata->dsize = len;
48 outdata->dptr = (uint8_t *)map;
50 return 0;
53 int
54 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
56 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
57 int i;
59 for(i=1; i<=NUM_DB_PRIORITIES; i++) {
60 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
61 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
62 return -1;
66 talloc_free(ctdb->vnn_map);
68 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
69 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
71 ctdb->vnn_map->generation = map->generation;
72 ctdb->vnn_map->size = map->size;
73 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
74 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
76 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
78 return 0;
81 int
82 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
84 uint32_t i, len;
85 struct ctdb_db_context *ctdb_db;
86 struct ctdb_dbid_map *dbid_map;
88 CHECK_CONTROL_DATA_SIZE(0);
90 len = 0;
91 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
92 len++;
96 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
97 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
98 if (!outdata->dptr) {
99 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
100 exit(1);
103 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
104 dbid_map->num = len;
105 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
106 dbid_map->dbs[i].dbid = ctdb_db->db_id;
107 if (ctdb_db->persistent != 0) {
108 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
110 if (ctdb_db->readonly != 0) {
111 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
113 if (ctdb_db->sticky != 0) {
114 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
118 return 0;
122 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
124 CHECK_CONTROL_DATA_SIZE(0);
126 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
127 ctdb->num_nodes,
128 outdata);
129 if (outdata->dptr == NULL) {
130 return -1;
133 outdata->dsize = talloc_get_size(outdata->dptr);
135 return 0;
139 reload the nodes file
142 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
144 int i, num_nodes;
145 TALLOC_CTX *tmp_ctx;
146 struct ctdb_node **nodes;
148 tmp_ctx = talloc_new(ctdb);
150 /* steal the old nodes file for a while */
151 talloc_steal(tmp_ctx, ctdb->nodes);
152 nodes = ctdb->nodes;
153 ctdb->nodes = NULL;
154 num_nodes = ctdb->num_nodes;
155 ctdb->num_nodes = 0;
157 /* load the new nodes file */
158 ctdb_load_nodes_file(ctdb);
160 for (i=0; i<ctdb->num_nodes; i++) {
161 /* keep any identical pre-existing nodes and connections */
162 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
163 talloc_free(ctdb->nodes[i]);
164 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
165 continue;
168 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
169 continue;
172 /* any new or different nodes must be added */
173 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
174 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
175 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
177 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
178 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
179 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
183 /* tell the recovery daemon to reaload the nodes file too */
184 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
186 talloc_free(tmp_ctx);
188 return 0;
192 a traverse function for pulling all relevent records from pulldb
194 struct pulldb_data {
195 struct ctdb_context *ctdb;
196 struct ctdb_db_context *ctdb_db;
197 struct ctdb_marshall_buffer *pulldata;
198 uint32_t len;
199 uint32_t allocated_len;
200 bool failed;
203 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
205 struct pulldb_data *params = (struct pulldb_data *)p;
206 struct ctdb_rec_data *rec;
207 struct ctdb_context *ctdb = params->ctdb;
208 struct ctdb_db_context *ctdb_db = params->ctdb_db;
210 /* add the record to the blob */
211 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
212 if (rec == NULL) {
213 params->failed = true;
214 return -1;
216 if (params->len + rec->length >= params->allocated_len) {
217 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
218 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
220 if (params->pulldata == NULL) {
221 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
222 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
224 params->pulldata->count++;
225 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
226 params->len += rec->length;
228 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
229 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
232 talloc_free(rec);
234 return 0;
238 pull a bunch of records from a ltdb, filtering by lmaster
240 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
242 struct ctdb_control_pulldb *pull;
243 struct ctdb_db_context *ctdb_db;
244 struct pulldb_data params;
245 struct ctdb_marshall_buffer *reply;
247 pull = (struct ctdb_control_pulldb *)indata.dptr;
249 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
250 if (!ctdb_db) {
251 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
252 return -1;
255 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
256 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
257 return -1;
260 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
261 CTDB_NO_MEMORY(ctdb, reply);
263 reply->db_id = pull->db_id;
265 params.ctdb = ctdb;
266 params.ctdb_db = ctdb_db;
267 params.pulldata = reply;
268 params.len = offsetof(struct ctdb_marshall_buffer, data);
269 params.allocated_len = params.len;
270 params.failed = false;
272 if (ctdb_db->unhealthy_reason) {
273 /* this is just a warning, as the tdb should be empty anyway */
274 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
275 ctdb_db->db_name, ctdb_db->unhealthy_reason));
278 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
279 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
280 return -1;
283 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
284 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
285 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
286 talloc_free(params.pulldata);
287 return -1;
290 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
292 outdata->dptr = (uint8_t *)params.pulldata;
293 outdata->dsize = params.len;
295 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
296 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
298 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
299 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
303 return 0;
307 push a bunch of records into a ltdb, filtering by rsn
309 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
311 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
312 struct ctdb_db_context *ctdb_db;
313 int i, ret;
314 struct ctdb_rec_data *rec;
316 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
317 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
318 return -1;
321 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
322 if (!ctdb_db) {
323 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
324 return -1;
327 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
328 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
329 return -1;
332 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
333 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
334 return -1;
337 rec = (struct ctdb_rec_data *)&reply->data[0];
339 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
340 reply->count, reply->db_id));
342 for (i=0;i<reply->count;i++) {
343 TDB_DATA key, data;
344 struct ctdb_ltdb_header *hdr;
346 key.dptr = &rec->data[0];
347 key.dsize = rec->keylen;
348 data.dptr = &rec->data[key.dsize];
349 data.dsize = rec->datalen;
351 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
352 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
353 goto failed;
355 hdr = (struct ctdb_ltdb_header *)data.dptr;
356 /* strip off any read only record flags. All readonly records
357 are revoked implicitely by a recovery
359 hdr->flags &= ~CTDB_REC_RO_FLAGS;
361 data.dptr += sizeof(*hdr);
362 data.dsize -= sizeof(*hdr);
364 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
365 if (ret != 0) {
366 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
367 goto failed;
370 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
373 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
374 reply->count, reply->db_id));
376 if (ctdb_db->readonly) {
377 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
378 ctdb_db->db_id));
379 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
380 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
381 ctdb_db->readonly = false;
382 tdb_close(ctdb_db->rottdb);
383 ctdb_db->rottdb = NULL;
384 ctdb_db->readonly = false;
386 while (ctdb_db->revokechild_active != NULL) {
387 talloc_free(ctdb_db->revokechild_active);
391 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
392 return 0;
394 failed:
395 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
396 return -1;
399 struct ctdb_set_recmode_state {
400 struct ctdb_context *ctdb;
401 struct ctdb_req_control *c;
402 uint32_t recmode;
403 int fd[2];
404 struct timed_event *te;
405 struct fd_event *fde;
406 pid_t child;
407 struct timeval start_time;
411 called if our set_recmode child times out. this would happen if
412 ctdb_recovery_lock() would block.
414 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
415 struct timeval t, void *private_data)
417 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
418 struct ctdb_set_recmode_state);
420 /* we consider this a success, not a failure, as we failed to
421 set the recovery lock which is what we wanted. This can be
422 caused by the cluster filesystem being very slow to
423 arbitrate locks immediately after a node failure.
425 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
426 state->ctdb->recovery_mode = state->recmode;
427 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
428 talloc_free(state);
432 /* when we free the recmode state we must kill any child process.
434 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
436 double l = timeval_elapsed(&state->start_time);
438 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
440 if (state->fd[0] != -1) {
441 state->fd[0] = -1;
443 if (state->fd[1] != -1) {
444 state->fd[1] = -1;
446 ctdb_kill(state->ctdb, state->child, SIGKILL);
447 return 0;
450 /* this is called when the client process has completed ctdb_recovery_lock()
451 and has written data back to us through the pipe.
453 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
454 uint16_t flags, void *private_data)
456 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
457 struct ctdb_set_recmode_state);
458 char c = 0;
459 int ret;
461 /* we got a response from our child process so we can abort the
462 timeout.
464 talloc_free(state->te);
465 state->te = NULL;
468 /* If, as expected, the child was unable to take the recovery
469 * lock then it will have written 0 into the pipe, so
470 * continue. However, any other value (e.g. 1) indicates that
471 * it was able to take the recovery lock when it should have
472 * been held by the recovery daemon on the recovery master.
474 ret = sys_read(state->fd[0], &c, 1);
475 if (ret != 1 || c != 0) {
476 ctdb_request_control_reply(
477 state->ctdb, state->c, NULL, -1,
478 "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem");
479 talloc_free(state);
480 return;
483 state->ctdb->recovery_mode = state->recmode;
485 /* release any deferred attach calls from clients */
486 if (state->recmode == CTDB_RECOVERY_NORMAL) {
487 ctdb_process_deferred_attach(state->ctdb);
490 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
491 talloc_free(state);
492 return;
495 static void
496 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
497 struct timeval t, void *private_data)
499 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
501 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
502 talloc_free(ctdb->release_ips_ctx);
503 ctdb->release_ips_ctx = NULL;
505 ctdb_release_all_ips(ctdb);
509 * Set up an event to drop all public ips if we remain in recovery for too
510 * long
512 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
514 if (ctdb->release_ips_ctx != NULL) {
515 talloc_free(ctdb->release_ips_ctx);
517 ctdb->release_ips_ctx = talloc_new(ctdb);
518 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
520 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
521 return 0;
525 set the recovery mode
527 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
528 struct ctdb_req_control *c,
529 TDB_DATA indata, bool *async_reply,
530 const char **errormsg)
532 uint32_t recmode = *(uint32_t *)indata.dptr;
533 int i, ret;
534 struct ctdb_set_recmode_state *state;
535 pid_t parent = getpid();
537 /* if we enter recovery but stay in recovery for too long
538 we will eventually drop all our ip addresses
540 if (recmode == CTDB_RECOVERY_NORMAL) {
541 talloc_free(ctdb->release_ips_ctx);
542 ctdb->release_ips_ctx = NULL;
543 } else {
544 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
545 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
549 if (recmode != ctdb->recovery_mode) {
550 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
551 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
554 if (recmode != CTDB_RECOVERY_NORMAL ||
555 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
556 ctdb->recovery_mode = recmode;
557 return 0;
560 /* some special handling when ending recovery mode */
562 /* force the databases to thaw */
563 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
564 if (ctdb->freeze_handles[i] != NULL) {
565 ctdb_control_thaw(ctdb, i, false);
569 state = talloc(ctdb, struct ctdb_set_recmode_state);
570 CTDB_NO_MEMORY(ctdb, state);
572 state->start_time = timeval_current();
573 state->fd[0] = -1;
574 state->fd[1] = -1;
576 /* release any deferred attach calls from clients */
577 if (recmode == CTDB_RECOVERY_NORMAL) {
578 ctdb_process_deferred_attach(ctdb);
581 if (ctdb->recovery_lock_file == NULL) {
582 /* Not using recovery lock file */
583 ctdb->recovery_mode = recmode;
584 return 0;
587 /* For the rest of what needs to be done, we need to do this in
588 a child process since
589 1, the call to ctdb_recovery_lock() can block if the cluster
590 filesystem is in the process of recovery.
592 ret = pipe(state->fd);
593 if (ret != 0) {
594 talloc_free(state);
595 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
596 return -1;
599 state->child = ctdb_fork(ctdb);
600 if (state->child == (pid_t)-1) {
601 close(state->fd[0]);
602 close(state->fd[1]);
603 talloc_free(state);
604 return -1;
607 if (state->child == 0) {
608 char cc = 0;
609 close(state->fd[0]);
611 ctdb_set_process_name("ctdb_recmode");
612 debug_extra = talloc_asprintf(NULL, "set_recmode:");
613 /* Daemon should not be able to get the recover lock,
614 * as it should be held by the recovery master */
615 if (ctdb_recovery_lock(ctdb)) {
616 DEBUG(DEBUG_ERR,
617 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
618 ctdb->recovery_lock_file));
619 ctdb_recovery_unlock(ctdb);
620 cc = 1;
623 sys_write(state->fd[1], &cc, 1);
624 /* make sure we die when our parent dies */
625 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
626 sleep(5);
627 sys_write(state->fd[1], &cc, 1);
629 _exit(0);
631 close(state->fd[1]);
632 set_close_on_exec(state->fd[0]);
634 state->fd[1] = -1;
636 talloc_set_destructor(state, set_recmode_destructor);
638 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
640 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
641 ctdb_set_recmode_timeout, state);
643 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
644 EVENT_FD_READ,
645 set_recmode_handler,
646 (void *)state);
648 if (state->fde == NULL) {
649 talloc_free(state);
650 return -1;
652 tevent_fd_set_auto_close(state->fde);
654 state->ctdb = ctdb;
655 state->recmode = recmode;
656 state->c = talloc_steal(state, c);
658 *async_reply = true;
660 return 0;
664 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
666 return ctdb->recovery_lock_fd != -1;
670 try and get the recovery lock in shared storage - should only work
671 on the recovery master recovery daemon. Anywhere else is a bug
673 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
675 struct flock lock;
677 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
678 O_RDWR|O_CREAT, 0600);
679 if (ctdb->recovery_lock_fd == -1) {
680 DEBUG(DEBUG_ERR,
681 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
682 ctdb->recovery_lock_file, strerror(errno)));
683 return false;
686 set_close_on_exec(ctdb->recovery_lock_fd);
688 lock.l_type = F_WRLCK;
689 lock.l_whence = SEEK_SET;
690 lock.l_start = 0;
691 lock.l_len = 1;
692 lock.l_pid = 0;
694 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
695 int saved_errno = errno;
696 close(ctdb->recovery_lock_fd);
697 ctdb->recovery_lock_fd = -1;
698 /* Fail silently on these errors, since they indicate
699 * lock contention, but log an error for any other
700 * failure. */
701 if (saved_errno != EACCES &&
702 saved_errno != EAGAIN) {
703 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
704 "recovery lock on '%s' - (%s)\n",
705 ctdb->recovery_lock_file,
706 strerror(saved_errno)));
708 return false;
711 return true;
714 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
716 if (ctdb->recovery_lock_fd != -1) {
717 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
718 close(ctdb->recovery_lock_fd);
719 ctdb->recovery_lock_fd = -1;
724 delete a record as part of the vacuum process
725 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
726 use non-blocking locks
728 return 0 if the record was successfully deleted (i.e. it does not exist
729 when the function returns)
730 or !0 is the record still exists in the tdb after returning.
732 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
734 TDB_DATA key, data, data2;
735 struct ctdb_ltdb_header *hdr, *hdr2;
737 /* these are really internal tdb functions - but we need them here for
738 non-blocking lock of the freelist */
739 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
740 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
743 key.dsize = rec->keylen;
744 key.dptr = &rec->data[0];
745 data.dsize = rec->datalen;
746 data.dptr = &rec->data[rec->keylen];
748 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
749 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
750 return -1;
753 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
754 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
755 return -1;
758 hdr = (struct ctdb_ltdb_header *)data.dptr;
760 /* use a non-blocking lock */
761 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
762 return -1;
765 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
766 if (data2.dptr == NULL) {
767 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
768 return 0;
771 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
772 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
773 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
774 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
776 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
777 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
779 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
780 free(data2.dptr);
781 return 0;
784 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
786 if (hdr2->rsn > hdr->rsn) {
787 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
788 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
789 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
790 free(data2.dptr);
791 return -1;
794 /* do not allow deleting record that have readonly flags set. */
795 if (hdr->flags & CTDB_REC_RO_FLAGS) {
796 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
797 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
798 free(data2.dptr);
799 return -1;
801 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
802 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
803 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
804 free(data2.dptr);
805 return -1;
808 if (hdr2->dmaster == ctdb->pnn) {
809 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
810 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
811 free(data2.dptr);
812 return -1;
815 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
816 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
817 free(data2.dptr);
818 return -1;
821 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
822 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
823 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
824 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
825 free(data2.dptr);
826 return -1;
829 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
830 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
831 free(data2.dptr);
832 return 0;
837 struct recovery_callback_state {
838 struct ctdb_req_control *c;
843 called when the 'recovered' event script has finished
845 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
847 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
849 ctdb_enable_monitoring(ctdb);
850 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
852 if (status != 0) {
853 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
854 if (status == -ETIME) {
855 ctdb_ban_self(ctdb);
859 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
860 talloc_free(state);
862 gettimeofday(&ctdb->last_recovery_finished, NULL);
864 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
865 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
870 recovery has finished
872 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
873 struct ctdb_req_control *c,
874 bool *async_reply)
876 int ret;
877 struct recovery_callback_state *state;
879 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
881 ctdb_persistent_finish_trans3_commits(ctdb);
883 state = talloc(ctdb, struct recovery_callback_state);
884 CTDB_NO_MEMORY(ctdb, state);
886 state->c = c;
888 ctdb_disable_monitoring(ctdb);
890 ret = ctdb_event_script_callback(ctdb, state,
891 ctdb_end_recovery_callback,
892 state,
893 CTDB_EVENT_RECOVERED, "%s", "");
895 if (ret != 0) {
896 ctdb_enable_monitoring(ctdb);
898 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
899 talloc_free(state);
900 return -1;
903 /* tell the control that we will be reply asynchronously */
904 state->c = talloc_steal(state, c);
905 *async_reply = true;
906 return 0;
910 called when the 'startrecovery' event script has finished
912 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
914 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
916 if (status != 0) {
917 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
920 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
921 talloc_free(state);
925 run the startrecovery eventscript
927 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
928 struct ctdb_req_control *c,
929 bool *async_reply)
931 int ret;
932 struct recovery_callback_state *state;
934 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
935 gettimeofday(&ctdb->last_recovery_started, NULL);
937 state = talloc(ctdb, struct recovery_callback_state);
938 CTDB_NO_MEMORY(ctdb, state);
940 state->c = talloc_steal(state, c);
942 ctdb_disable_monitoring(ctdb);
944 ret = ctdb_event_script_callback(ctdb, state,
945 ctdb_start_recovery_callback,
946 state,
947 CTDB_EVENT_START_RECOVERY,
948 "%s", "");
950 if (ret != 0) {
951 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
952 talloc_free(state);
953 return -1;
956 /* tell the control that we will be reply asynchronously */
957 *async_reply = true;
958 return 0;
962 try to delete all these records as part of the vacuuming process
963 and return the records we failed to delete
965 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
967 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
968 struct ctdb_db_context *ctdb_db;
969 int i;
970 struct ctdb_rec_data *rec;
971 struct ctdb_marshall_buffer *records;
973 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
974 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
975 return -1;
978 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
979 if (!ctdb_db) {
980 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
981 return -1;
985 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
986 reply->count, reply->db_id));
989 /* create a blob to send back the records we couldnt delete */
990 records = (struct ctdb_marshall_buffer *)
991 talloc_zero_size(outdata,
992 offsetof(struct ctdb_marshall_buffer, data));
993 if (records == NULL) {
994 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
995 return -1;
997 records->db_id = ctdb_db->db_id;
1000 rec = (struct ctdb_rec_data *)&reply->data[0];
1001 for (i=0;i<reply->count;i++) {
1002 TDB_DATA key, data;
1004 key.dptr = &rec->data[0];
1005 key.dsize = rec->keylen;
1006 data.dptr = &rec->data[key.dsize];
1007 data.dsize = rec->datalen;
1009 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1010 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1011 return -1;
1014 /* If we cant delete the record we must add it to the reply
1015 so the lmaster knows it may not purge this record
1017 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1018 size_t old_size;
1019 struct ctdb_ltdb_header *hdr;
1021 hdr = (struct ctdb_ltdb_header *)data.dptr;
1022 data.dptr += sizeof(*hdr);
1023 data.dsize -= sizeof(*hdr);
1025 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1027 old_size = talloc_get_size(records);
1028 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1029 if (records == NULL) {
1030 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1031 return -1;
1033 records->count++;
1034 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1037 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1041 *outdata = ctdb_marshall_finish(records);
1043 return 0;
1047 * Store a record as part of the vacuum process:
1048 * This is called from the RECEIVE_RECORD control which
1049 * the lmaster uses to send the current empty copy
1050 * to all nodes for storing, before it lets the other
1051 * nodes delete the records in the second phase with
1052 * the TRY_DELETE_RECORDS control.
1054 * Only store if we are not lmaster or dmaster, and our
1055 * rsn is <= the provided rsn. Use non-blocking locks.
1057 * return 0 if the record was successfully stored.
1058 * return !0 if the record still exists in the tdb after returning.
1060 static int store_tdb_record(struct ctdb_context *ctdb,
1061 struct ctdb_db_context *ctdb_db,
1062 struct ctdb_rec_data *rec)
1064 TDB_DATA key, data, data2;
1065 struct ctdb_ltdb_header *hdr, *hdr2;
1066 int ret;
1068 key.dsize = rec->keylen;
1069 key.dptr = &rec->data[0];
1070 data.dsize = rec->datalen;
1071 data.dptr = &rec->data[rec->keylen];
1073 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1074 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1075 "where we are lmaster\n"));
1076 return -1;
1079 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1080 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1081 return -1;
1084 hdr = (struct ctdb_ltdb_header *)data.dptr;
1086 /* use a non-blocking lock */
1087 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1088 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1089 return -1;
1092 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1093 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1094 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1095 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1096 ret = -1;
1097 goto done;
1099 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1100 ret = 0;
1101 goto done;
1104 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1106 if (hdr2->rsn > hdr->rsn) {
1107 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1108 "rsn=%llu - called with rsn=%llu\n",
1109 (unsigned long long)hdr2->rsn,
1110 (unsigned long long)hdr->rsn));
1111 ret = -1;
1112 goto done;
1115 /* do not allow vacuuming of records that have readonly flags set. */
1116 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1117 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1118 "flags set\n"));
1119 ret = -1;
1120 goto done;
1122 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1123 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1124 "flags set\n"));
1125 ret = -1;
1126 goto done;
1129 if (hdr2->dmaster == ctdb->pnn) {
1130 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1131 "where we are the dmaster\n"));
1132 ret = -1;
1133 goto done;
1136 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1137 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1138 ret = -1;
1139 goto done;
1142 ret = 0;
1144 done:
1145 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1146 free(data2.dptr);
1147 return ret;
1153 * Try to store all these records as part of the vacuuming process
1154 * and return the records we failed to store.
1156 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1157 TDB_DATA indata, TDB_DATA *outdata)
1159 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1160 struct ctdb_db_context *ctdb_db;
1161 int i;
1162 struct ctdb_rec_data *rec;
1163 struct ctdb_marshall_buffer *records;
1165 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1166 DEBUG(DEBUG_ERR,
1167 (__location__ " invalid data in receive_records\n"));
1168 return -1;
1171 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1172 if (!ctdb_db) {
1173 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1174 reply->db_id));
1175 return -1;
1178 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1179 "dbid 0x%x\n", reply->count, reply->db_id));
1181 /* create a blob to send back the records we could not store */
1182 records = (struct ctdb_marshall_buffer *)
1183 talloc_zero_size(outdata,
1184 offsetof(struct ctdb_marshall_buffer, data));
1185 if (records == NULL) {
1186 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1187 return -1;
1189 records->db_id = ctdb_db->db_id;
1191 rec = (struct ctdb_rec_data *)&reply->data[0];
1192 for (i=0; i<reply->count; i++) {
1193 TDB_DATA key, data;
1195 key.dptr = &rec->data[0];
1196 key.dsize = rec->keylen;
1197 data.dptr = &rec->data[key.dsize];
1198 data.dsize = rec->datalen;
1200 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1201 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1202 "in indata\n"));
1203 return -1;
1207 * If we can not store the record we must add it to the reply
1208 * so the lmaster knows it may not purge this record.
1210 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1211 size_t old_size;
1212 struct ctdb_ltdb_header *hdr;
1214 hdr = (struct ctdb_ltdb_header *)data.dptr;
1215 data.dptr += sizeof(*hdr);
1216 data.dsize -= sizeof(*hdr);
1218 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1219 "record with hash 0x%08x in vacuum "
1220 "via RECEIVE_RECORDS\n",
1221 ctdb_hash(&key)));
1223 old_size = talloc_get_size(records);
1224 records = talloc_realloc_size(outdata, records,
1225 old_size + rec->length);
1226 if (records == NULL) {
1227 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1228 "expand\n"));
1229 return -1;
1231 records->count++;
1232 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1235 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1238 *outdata = ctdb_marshall_finish(records);
1240 return 0;
1245 report capabilities
1247 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1249 uint32_t *capabilities = NULL;
1251 capabilities = talloc(outdata, uint32_t);
1252 CTDB_NO_MEMORY(ctdb, capabilities);
1253 *capabilities = ctdb->capabilities;
1255 outdata->dsize = sizeof(uint32_t);
1256 outdata->dptr = (uint8_t *)capabilities;
1258 return 0;
1261 /* The recovery daemon will ping us at regular intervals.
1262 If we havent been pinged for a while we assume the recovery
1263 daemon is inoperable and we restart.
1265 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1267 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1268 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1270 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1272 if (*count < ctdb->tunable.recd_ping_failcount) {
1273 (*count)++;
1274 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1275 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1276 ctdb_recd_ping_timeout, ctdb);
1277 return;
1280 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1282 ctdb_stop_recoverd(ctdb);
1283 ctdb_start_recoverd(ctdb);
1286 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1288 talloc_free(ctdb->recd_ping_count);
1290 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1291 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1293 if (ctdb->tunable.recd_ping_timeout != 0) {
1294 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1295 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1296 ctdb_recd_ping_timeout, ctdb);
1299 return 0;
1304 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1306 uint32_t new_recmaster;
1308 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1309 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1311 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1312 DEBUG(DEBUG_NOTICE,
1313 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1316 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1317 DEBUG(DEBUG_NOTICE,
1318 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1321 ctdb->recovery_master = new_recmaster;
1322 return 0;
1326 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1328 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1329 ctdb_disable_monitoring(ctdb);
1330 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1332 return 0;
1335 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1337 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1338 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1340 return 0;