wafsamba: fix pidl dependencies to rebuild on pidl changes
[Samba.git] / ctdb / server / ctdb_ltdb_server.c
blob2d1daafb1575ef208f4b530870aa64d85ecbd1c9
1 /*
2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
27 #include <talloc.h>
28 #include <tevent.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #include "server/ctdb_config.h"
46 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
48 /**
49 * write a record to a normal database
51 * This is the server-variant of the ctdb_ltdb_store function.
52 * It contains logic to determine whether a record should be
53 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
54 * controls to the local ctdb daemon if apporpriate.
56 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
57 TDB_DATA key,
58 struct ctdb_ltdb_header *header,
59 TDB_DATA data)
61 struct ctdb_context *ctdb = ctdb_db->ctdb;
62 TDB_DATA rec[2];
63 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
64 int ret;
65 bool seqnum_suppressed = false;
66 bool keep = false;
67 bool schedule_for_deletion = false;
68 bool remove_from_delete_queue = false;
69 uint32_t lmaster;
71 if (ctdb->flags & CTDB_FLAG_TORTURE) {
72 TDB_DATA old;
73 struct ctdb_ltdb_header *h2;
75 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
76 h2 = (struct ctdb_ltdb_header *)old.dptr;
77 if (old.dptr != NULL &&
78 old.dsize >= hsize &&
79 h2->rsn > header->rsn) {
80 DEBUG(DEBUG_ERR,
81 ("RSN regression! %"PRIu64" %"PRIu64"\n",
82 h2->rsn, header->rsn));
84 if (old.dptr) {
85 free(old.dptr);
89 if (ctdb->vnn_map == NULL) {
91 * Called from a client: always store the record
92 * Also don't call ctdb_lmaster since it uses the vnn_map!
94 keep = true;
95 goto store;
98 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
101 * If we migrate an empty record off to another node
102 * and the record has not been migrated with data,
103 * delete the record instead of storing the empty record.
105 if (data.dsize != 0) {
106 keep = true;
107 } else if (header->flags & CTDB_REC_RO_FLAGS) {
108 keep = true;
109 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
111 * The record is not created by the client but
112 * automatically by the ctdb_ltdb_fetch logic that
113 * creates a record with an initial header in the
114 * ltdb before trying to migrate the record from
115 * the current lmaster. Keep it instead of trying
116 * to delete the non-existing record...
118 keep = true;
119 schedule_for_deletion = true;
120 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
121 keep = true;
122 } else if (ctdb_db->ctdb->pnn == lmaster) {
124 * If we are lmaster, then we usually keep the record.
125 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
126 * and the record is empty and has never been migrated
127 * with data, then we should delete it instead of storing it.
128 * This is part of the vacuuming process.
130 * The reason that we usually need to store even empty records
131 * on the lmaster is that a client operating directly on the
132 * lmaster (== dmaster) expects the local copy of the record to
133 * exist after successful ctdb migrate call. If the record does
134 * not exist, the client goes into a migrate loop and eventually
135 * fails. So storing the empty record makes sure that we do not
136 * need to change the client code.
138 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
139 keep = true;
140 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
141 keep = true;
143 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
144 keep = true;
147 if (keep) {
148 if (ctdb_db_volatile(ctdb_db) &&
149 (ctdb_db->ctdb->pnn == header->dmaster) &&
150 !(header->flags & CTDB_REC_RO_FLAGS))
152 header->rsn++;
154 if (data.dsize == 0) {
155 schedule_for_deletion = true;
158 remove_from_delete_queue = !schedule_for_deletion;
161 store:
163 * The VACUUM_MIGRATED flag is only set temporarily for
164 * the above logic when the record was retrieved by a
165 * VACUUM_MIGRATE call and should not be stored in the
166 * database.
168 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
169 * and there are two cases in which the corresponding record
170 * is stored in the local database:
171 * 1. The record has been migrated with data in the past
172 * (the MIGRATED_WITH_DATA record flag is set).
173 * 2. The record has been filled with data again since it
174 * had been submitted in the VACUUM_FETCH message to the
175 * lmaster.
176 * For such records it is important to not store the
177 * VACUUM_MIGRATED flag in the database.
179 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
182 * Similarly, clear the AUTOMATIC flag which should not enter
183 * the local database copy since this would require client
184 * modifications to clear the flag when the client stores
185 * the record.
187 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
189 rec[0].dsize = hsize;
190 rec[0].dptr = (uint8_t *)header;
192 rec[1].dsize = data.dsize;
193 rec[1].dptr = data.dptr;
195 /* Databases with seqnum updates enabled only get their seqnum
196 changes when/if we modify the data */
197 if (ctdb_db->seqnum_update != NULL) {
198 TDB_DATA old;
199 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
201 if ((old.dsize == hsize + data.dsize) &&
202 memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
203 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
204 seqnum_suppressed = true;
206 if (old.dptr != NULL) {
207 free(old.dptr);
211 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
212 ctdb_db->db_name,
213 keep?"storing":"deleting",
214 ctdb_hash(&key)));
216 if (keep) {
217 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
218 } else {
219 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
222 if (ret != 0) {
223 int lvl = DEBUG_ERR;
225 if (keep == false &&
226 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
228 lvl = DEBUG_DEBUG;
231 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
232 "%d - %s\n",
233 ctdb_db->db_name,
234 keep?"store":"delete", ret,
235 tdb_errorstr(ctdb_db->ltdb->tdb)));
237 schedule_for_deletion = false;
238 remove_from_delete_queue = false;
240 if (seqnum_suppressed) {
241 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
244 if (schedule_for_deletion) {
245 int ret2;
246 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
247 if (ret2 != 0) {
248 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
252 if (remove_from_delete_queue) {
253 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
256 return ret;
259 struct lock_fetch_state {
260 struct ctdb_context *ctdb;
261 struct ctdb_db_context *ctdb_db;
262 void (*recv_pkt)(void *, struct ctdb_req_header *);
263 void *recv_context;
264 struct ctdb_req_header *hdr;
265 uint32_t generation;
266 bool ignore_generation;
270 called when we should retry the operation
272 static void lock_fetch_callback(void *p, bool locked)
274 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
275 if (!state->ignore_generation &&
276 state->generation != state->ctdb_db->generation) {
277 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
278 talloc_free(state->hdr);
279 return;
281 state->recv_pkt(state->recv_context, state->hdr);
282 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
287 do a non-blocking ltdb_lock, deferring this ctdb request until we
288 have the chainlock
290 It does the following:
292 1) tries to get the chainlock. If it succeeds, then it returns 0
294 2) if it fails to get a chainlock immediately then it sets up a
295 non-blocking chainlock via ctdb_lock_record, and when it gets the
296 chainlock it re-submits this ctdb request to the main packet
297 receive function.
299 This effectively queues all ctdb requests that cannot be
300 immediately satisfied until it can get the lock. This means that
301 the main ctdb daemon will not block waiting for a chainlock held by
302 a client
304 There are 3 possible return values:
306 0: means that it got the lock immediately.
307 -1: means that it failed to get the lock, and won't retry
308 -2: means that it failed to get the lock immediately, but will retry
310 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
311 TDB_DATA key, struct ctdb_req_header *hdr,
312 void (*recv_pkt)(void *, struct ctdb_req_header *),
313 void *recv_context, bool ignore_generation)
315 int ret;
316 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
317 struct lock_request *lreq;
318 struct lock_fetch_state *state;
320 ret = tdb_chainlock_nonblock(tdb, key);
322 if (ret != 0 &&
323 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
324 /* a hard failure - don't try again */
325 return -1;
328 /* when torturing, ensure we test the contended path */
329 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
330 random() % 5 == 0) {
331 ret = -1;
332 tdb_chainunlock(tdb, key);
335 /* first the non-contended path */
336 if (ret == 0) {
337 return 0;
340 state = talloc(hdr, struct lock_fetch_state);
341 state->ctdb = ctdb_db->ctdb;
342 state->ctdb_db = ctdb_db;
343 state->hdr = hdr;
344 state->recv_pkt = recv_pkt;
345 state->recv_context = recv_context;
346 state->generation = ctdb_db->generation;
347 state->ignore_generation = ignore_generation;
349 /* now the contended path */
350 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
351 if (lreq == NULL) {
352 return -1;
355 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
356 so it won't be freed yet */
357 talloc_steal(state, hdr);
359 /* now tell the caller than we will retry asynchronously */
360 return -2;
364 a varient of ctdb_ltdb_lock_requeue that also fetches the record
366 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
367 TDB_DATA key, struct ctdb_ltdb_header *header,
368 struct ctdb_req_header *hdr, TDB_DATA *data,
369 void (*recv_pkt)(void *, struct ctdb_req_header *),
370 void *recv_context, bool ignore_generation)
372 int ret;
374 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
375 recv_context, ignore_generation);
376 if (ret == 0) {
377 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
378 if (ret != 0) {
379 int uret;
380 uret = ctdb_ltdb_unlock(ctdb_db, key);
381 if (uret != 0) {
382 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
386 return ret;
391 paraoid check to see if the db is empty
393 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
395 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
396 int count = tdb_traverse_read(tdb, NULL, NULL);
397 if (count != 0) {
398 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
399 ctdb_db->db_path));
400 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
404 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
405 struct ctdb_db_context *ctdb_db)
407 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
408 char *old;
409 char *reason = NULL;
410 TDB_DATA key;
411 TDB_DATA val;
413 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
414 key.dsize = strlen(ctdb_db->db_name);
416 old = ctdb_db->unhealthy_reason;
417 ctdb_db->unhealthy_reason = NULL;
419 val = tdb_fetch(tdb, key);
420 if (val.dsize > 0) {
421 reason = talloc_strndup(ctdb_db,
422 (const char *)val.dptr,
423 val.dsize);
424 if (reason == NULL) {
425 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
426 (int)val.dsize));
427 ctdb_db->unhealthy_reason = old;
428 free(val.dptr);
429 return -1;
433 if (val.dptr) {
434 free(val.dptr);
437 talloc_free(old);
438 ctdb_db->unhealthy_reason = reason;
439 return 0;
442 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
443 struct ctdb_db_context *ctdb_db,
444 const char *given_reason,/* NULL means healthy */
445 int num_healthy_nodes)
447 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
448 int ret;
449 TDB_DATA key;
450 TDB_DATA val;
451 char *new_reason = NULL;
452 char *old_reason = NULL;
454 ret = tdb_transaction_start(tdb);
455 if (ret != 0) {
456 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
457 tdb_name(tdb), ret, tdb_errorstr(tdb)));
458 return -1;
461 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
462 if (ret != 0) {
463 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
464 ctdb_db->db_name, ret));
465 return -1;
467 old_reason = ctdb_db->unhealthy_reason;
469 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
470 key.dsize = strlen(ctdb_db->db_name);
472 if (given_reason) {
473 new_reason = talloc_strdup(ctdb_db, given_reason);
474 if (new_reason == NULL) {
475 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
476 given_reason));
477 return -1;
479 } else if (old_reason && num_healthy_nodes == 0) {
481 * If the reason indicates ok, but there where no healthy nodes
482 * available, that it means, we have not recovered valid content
483 * of the db. So if there's an old reason, prefix it with
484 * "NO-HEALTHY-NODES - "
486 const char *prefix;
488 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
489 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
490 if (ret != 0) {
491 prefix = _TMP_PREFIX;
492 } else {
493 prefix = "";
495 new_reason = talloc_asprintf(ctdb_db, "%s%s",
496 prefix, old_reason);
497 if (new_reason == NULL) {
498 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
499 prefix, old_reason));
500 return -1;
502 #undef _TMP_PREFIX
505 if (new_reason) {
506 val.dptr = discard_const_p(uint8_t, new_reason);
507 val.dsize = strlen(new_reason);
509 ret = tdb_store(tdb, key, val, TDB_REPLACE);
510 if (ret != 0) {
511 tdb_transaction_cancel(tdb);
512 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
513 tdb_name(tdb), ctdb_db->db_name, new_reason,
514 ret, tdb_errorstr(tdb)));
515 talloc_free(new_reason);
516 return -1;
518 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
519 ctdb_db->db_name, new_reason));
520 } else if (old_reason) {
521 ret = tdb_delete(tdb, key);
522 if (ret != 0) {
523 tdb_transaction_cancel(tdb);
524 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
525 tdb_name(tdb), ctdb_db->db_name,
526 ret, tdb_errorstr(tdb)));
527 talloc_free(new_reason);
528 return -1;
530 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
531 ctdb_db->db_name));
534 ret = tdb_transaction_commit(tdb);
535 if (ret != TDB_SUCCESS) {
536 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
537 tdb_name(tdb), ret, tdb_errorstr(tdb)));
538 talloc_free(new_reason);
539 return -1;
542 talloc_free(old_reason);
543 ctdb_db->unhealthy_reason = new_reason;
545 return 0;
548 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
549 struct ctdb_db_context *ctdb_db)
551 time_t now = time(NULL);
552 char *new_path;
553 char *new_reason;
554 int ret;
555 struct tm *tm;
557 tm = gmtime(&now);
559 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
560 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
561 "%04u%02u%02u%02u%02u%02u.0Z",
562 ctdb_db->db_path,
563 tm->tm_year+1900, tm->tm_mon+1,
564 tm->tm_mday, tm->tm_hour, tm->tm_min,
565 tm->tm_sec);
566 if (new_path == NULL) {
567 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
568 return -1;
571 new_reason = talloc_asprintf(ctdb_db,
572 "ERROR - Backup of corrupted TDB in '%s'",
573 new_path);
574 if (new_reason == NULL) {
575 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576 return -1;
578 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
579 talloc_free(new_reason);
580 if (ret != 0) {
581 DEBUG(DEBUG_CRIT,(__location__
582 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
583 ctdb_db->db_path));
584 return -1;
587 ret = rename(ctdb_db->db_path, new_path);
588 if (ret != 0) {
589 DEBUG(DEBUG_CRIT,(__location__
590 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
591 ctdb_db->db_path, new_path,
592 errno, strerror(errno)));
593 talloc_free(new_path);
594 return -1;
597 DEBUG(DEBUG_CRIT,(__location__
598 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
599 ctdb_db->db_path, new_path));
600 talloc_free(new_path);
601 return 0;
604 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
606 struct ctdb_db_context *ctdb_db;
607 int ret;
608 int ok = 0;
609 int fail = 0;
611 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
612 if (!ctdb_db_persistent(ctdb_db)) {
613 continue;
616 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
617 if (ret != 0) {
618 DEBUG(DEBUG_ALERT,(__location__
619 " load persistent health for '%s' failed\n",
620 ctdb_db->db_path));
621 return -1;
624 if (ctdb_db->unhealthy_reason == NULL) {
625 ok++;
626 DEBUG(DEBUG_INFO,(__location__
627 " persistent db '%s' healthy\n",
628 ctdb_db->db_path));
629 continue;
632 fail++;
633 DEBUG(DEBUG_ALERT,(__location__
634 " persistent db '%s' unhealthy: %s\n",
635 ctdb_db->db_path,
636 ctdb_db->unhealthy_reason));
638 DEBUG(DEBUG_NOTICE,
639 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
640 ok, fail));
642 if (fail != 0) {
643 return -1;
646 return 0;
651 mark a database - as healthy
653 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
655 uint32_t db_id = *(uint32_t *)indata.dptr;
656 struct ctdb_db_context *ctdb_db;
657 int ret;
658 bool may_recover = false;
660 ctdb_db = find_ctdb_db(ctdb, db_id);
661 if (!ctdb_db) {
662 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
663 return -1;
666 if (ctdb_db->unhealthy_reason) {
667 may_recover = true;
670 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
671 if (ret != 0) {
672 DEBUG(DEBUG_ERR,(__location__
673 " ctdb_update_persistent_health(%s) failed\n",
674 ctdb_db->db_name));
675 return -1;
678 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
679 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
680 ctdb_db->db_name));
681 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
684 return 0;
687 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
688 TDB_DATA indata,
689 TDB_DATA *outdata)
691 uint32_t db_id = *(uint32_t *)indata.dptr;
692 struct ctdb_db_context *ctdb_db;
693 int ret;
695 ctdb_db = find_ctdb_db(ctdb, db_id);
696 if (!ctdb_db) {
697 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
698 return -1;
701 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
702 if (ret != 0) {
703 DEBUG(DEBUG_ERR,(__location__
704 " ctdb_load_persistent_health(%s) failed\n",
705 ctdb_db->db_name));
706 return -1;
709 *outdata = tdb_null;
710 if (ctdb_db->unhealthy_reason) {
711 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
712 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
715 return 0;
719 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
721 char *ropath;
723 if (ctdb_db_readonly(ctdb_db)) {
724 return 0;
727 if (! ctdb_db_volatile(ctdb_db)) {
728 DEBUG(DEBUG_ERR,
729 ("Non-volatile databases do not support readonly flag\n"));
730 return -1;
733 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
734 if (ropath == NULL) {
735 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
736 return -1;
738 ctdb_db->rottdb = tdb_open(ropath,
739 ctdb->tunable.database_hash_size,
740 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
741 O_CREAT|O_RDWR, 0600);
742 if (ctdb_db->rottdb == NULL) {
743 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
744 talloc_free(ropath);
745 return -1;
748 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
750 ctdb_db_set_readonly(ctdb_db);
752 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
754 talloc_free(ropath);
755 return 0;
759 attach to a database, handling both persistent and non-persistent databases
760 return 0 on success, -1 on failure
762 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
763 uint8_t db_flags, const char *unhealthy_reason)
765 struct ctdb_db_context *ctdb_db, *tmp_db;
766 int ret;
767 struct TDB_DATA key;
768 int tdb_flags;
769 int mode = 0600;
770 int remaining_tries = 0;
772 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
773 CTDB_NO_MEMORY(ctdb, ctdb_db);
775 ctdb_db->ctdb = ctdb;
776 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
777 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
779 key.dsize = strlen(db_name)+1;
780 key.dptr = discard_const(db_name);
781 ctdb_db->db_id = ctdb_hash(&key);
782 ctdb_db->db_flags = db_flags;
784 if (ctdb_db_volatile(ctdb_db)) {
785 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
786 if (ctdb_db->delete_queue == NULL) {
787 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
790 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
793 /* check for hash collisions */
794 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
795 if (tmp_db->db_id == ctdb_db->db_id) {
796 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
797 tmp_db->db_id, db_name, tmp_db->db_name));
798 talloc_free(ctdb_db);
799 return -1;
803 if (ctdb_db_persistent(ctdb_db)) {
804 if (unhealthy_reason) {
805 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
806 unhealthy_reason, 0);
807 if (ret != 0) {
808 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
809 ctdb_db->db_name, unhealthy_reason, ret));
810 talloc_free(ctdb_db);
811 return -1;
815 if (ctdb->max_persistent_check_errors > 0) {
816 remaining_tries = 1;
818 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
819 remaining_tries = 0;
822 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
823 if (ret != 0) {
824 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
825 ctdb_db->db_name, ret));
826 talloc_free(ctdb_db);
827 return -1;
831 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
832 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
833 ctdb_db->db_name, ctdb_db->unhealthy_reason));
834 talloc_free(ctdb_db);
835 return -1;
838 if (ctdb_db->unhealthy_reason) {
839 /* this is just a warning, but we want that in the log file! */
840 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
841 ctdb_db->db_name, ctdb_db->unhealthy_reason));
844 /* open the database */
845 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
846 ctdb_db_persistent(ctdb_db) ?
847 ctdb->db_directory_persistent :
848 ctdb->db_directory,
849 db_name, ctdb->pnn);
851 tdb_flags = ctdb_db_tdb_flags(db_flags,
852 ctdb->valgrinding,
853 ctdb_config.tdb_mutexes);
855 again:
856 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
857 ctdb->tunable.database_hash_size,
858 tdb_flags,
859 O_CREAT|O_RDWR, mode);
860 if (ctdb_db->ltdb == NULL) {
861 struct stat st;
862 int saved_errno = errno;
864 if (! ctdb_db_persistent(ctdb_db)) {
865 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
866 ctdb_db->db_path,
867 saved_errno,
868 strerror(saved_errno)));
869 talloc_free(ctdb_db);
870 return -1;
873 if (remaining_tries == 0) {
874 DEBUG(DEBUG_CRIT,(__location__
875 "Failed to open persistent tdb '%s': %d - %s\n",
876 ctdb_db->db_path,
877 saved_errno,
878 strerror(saved_errno)));
879 talloc_free(ctdb_db);
880 return -1;
883 ret = stat(ctdb_db->db_path, &st);
884 if (ret != 0) {
885 DEBUG(DEBUG_CRIT,(__location__
886 "Failed to open persistent tdb '%s': %d - %s\n",
887 ctdb_db->db_path,
888 saved_errno,
889 strerror(saved_errno)));
890 talloc_free(ctdb_db);
891 return -1;
894 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
895 if (ret != 0) {
896 DEBUG(DEBUG_CRIT,(__location__
897 "Failed to open persistent tdb '%s': %d - %s\n",
898 ctdb_db->db_path,
899 saved_errno,
900 strerror(saved_errno)));
901 talloc_free(ctdb_db);
902 return -1;
905 remaining_tries--;
906 mode = st.st_mode;
907 goto again;
910 if (!ctdb_db_persistent(ctdb_db)) {
911 ctdb_check_db_empty(ctdb_db);
912 } else {
913 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
914 if (ret != 0) {
915 int fd;
916 struct stat st;
918 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
919 ctdb_db->db_path, ret,
920 tdb_errorstr(ctdb_db->ltdb->tdb)));
921 if (remaining_tries == 0) {
922 talloc_free(ctdb_db);
923 return -1;
926 fd = tdb_fd(ctdb_db->ltdb->tdb);
927 ret = fstat(fd, &st);
928 if (ret != 0) {
929 DEBUG(DEBUG_CRIT,(__location__
930 "Failed to fstat() persistent tdb '%s': %d - %s\n",
931 ctdb_db->db_path,
932 errno,
933 strerror(errno)));
934 talloc_free(ctdb_db);
935 return -1;
938 /* close the TDB */
939 talloc_free(ctdb_db->ltdb);
940 ctdb_db->ltdb = NULL;
942 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
943 if (ret != 0) {
944 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
945 ctdb_db->db_path));
946 talloc_free(ctdb_db);
947 return -1;
950 remaining_tries--;
951 mode = st.st_mode;
952 goto again;
956 /* remember the flags the client has specified */
957 tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
960 /* set up a rb tree we can use to track which records we have a
961 fetch-lock in-flight for so we can defer any additional calls
962 for the same record.
964 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
965 if (ctdb_db->deferred_fetch == NULL) {
966 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
967 talloc_free(ctdb_db);
968 return -1;
971 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
972 if (ctdb_db->defer_dmaster == NULL) {
973 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
974 ctdb_db->db_name));
975 talloc_free(ctdb_db);
976 return -1;
979 DLIST_ADD(ctdb->db_list, ctdb_db);
981 /* setting this can help some high churn databases */
982 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
985 all databases support the "null" function. we need this in
986 order to do forced migration of records
988 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
989 if (ret != 0) {
990 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
991 talloc_free(ctdb_db);
992 return -1;
996 all databases support the "fetch" function. we need this
997 for efficient Samba3 ctdb fetch
999 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1000 if (ret != 0) {
1001 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1002 talloc_free(ctdb_db);
1003 return -1;
1007 all databases support the "fetch_with_header" function. we need this
1008 for efficient readonly record fetches
1010 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1011 if (ret != 0) {
1012 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1013 talloc_free(ctdb_db);
1014 return -1;
1017 ret = ctdb_vacuum_init(ctdb_db);
1018 if (ret != 0) {
1019 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1020 "database '%s'\n", ctdb_db->db_name));
1021 talloc_free(ctdb_db);
1022 return -1;
1025 ret = ctdb_migration_init(ctdb_db);
1026 if (ret != 0) {
1027 DEBUG(DEBUG_ERR,
1028 ("Failed to setup migration tracking for db '%s'\n",
1029 ctdb_db->db_name));
1030 talloc_free(ctdb_db);
1031 return -1;
1034 ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1035 &ctdb_db->lock_log);
1036 if (ret != 0) {
1037 DEBUG(DEBUG_ERR,
1038 ("Failed to setup lock logging for db '%s'\n",
1039 ctdb_db->db_name));
1040 talloc_free(ctdb_db);
1041 return -1;
1044 ctdb_db->generation = ctdb->vnn_map->generation;
1046 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1047 ctdb_db->db_path, tdb_flags));
1049 /* success */
1050 return 0;
1054 struct ctdb_deferred_attach_context {
1055 struct ctdb_deferred_attach_context *next, *prev;
1056 struct ctdb_context *ctdb;
1057 struct ctdb_req_control_old *c;
1061 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1063 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1065 return 0;
1068 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1069 struct tevent_timer *te,
1070 struct timeval t, void *private_data)
1072 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1073 struct ctdb_context *ctdb = da_ctx->ctdb;
1075 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1076 talloc_free(da_ctx);
1079 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1080 struct tevent_timer *te,
1081 struct timeval t, void *private_data)
1083 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1084 struct ctdb_context *ctdb = da_ctx->ctdb;
1086 /* This talloc-steals the packet ->c */
1087 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1088 talloc_free(da_ctx);
1091 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1093 struct ctdb_deferred_attach_context *da_ctx;
1095 /* call it from the main event loop as soon as the current event
1096 finishes.
1098 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1099 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1100 tevent_add_timer(ctdb->ev, da_ctx,
1101 timeval_current_ofs(1,0),
1102 ctdb_deferred_attach_callback, da_ctx);
1105 return 0;
1109 a client has asked to attach a new database
1111 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
1112 TDB_DATA indata,
1113 TDB_DATA *outdata,
1114 uint8_t db_flags,
1115 uint32_t srcnode,
1116 uint32_t client_id,
1117 struct ctdb_req_control_old *c,
1118 bool *async_reply)
1120 const char *db_name = (const char *)indata.dptr;
1121 struct ctdb_db_context *db;
1122 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1123 struct ctdb_client *client = NULL;
1124 uint32_t opcode;
1126 if (ctdb->tunable.allow_client_db_attach == 0) {
1127 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1128 "AllowClientDBAccess == 0\n", db_name));
1129 return -1;
1132 /* don't allow any local clients to attach while we are in recovery mode
1133 * except for the recovery daemon.
1134 * allow all attach from the network since these are always from remote
1135 * recovery daemons.
1137 if (srcnode == ctdb->pnn && client_id != 0) {
1138 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1140 if (client != NULL) {
1141 /* If the node is inactive it is not part of the cluster
1142 and we should not allow clients to attach to any
1143 databases
1145 if (node->flags & NODE_FLAGS_INACTIVE) {
1146 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1147 return -1;
1150 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1151 client->pid != ctdb->recoverd_pid &&
1152 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1153 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1155 if (da_ctx == NULL) {
1156 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1157 return -1;
1160 da_ctx->ctdb = ctdb;
1161 da_ctx->c = talloc_steal(da_ctx, c);
1162 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1163 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1165 tevent_add_timer(ctdb->ev, da_ctx,
1166 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1167 ctdb_deferred_attach_timeout, da_ctx);
1169 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1170 *async_reply = true;
1171 return 0;
1175 /* see if we already have this name */
1176 db = ctdb_db_handle(ctdb, db_name);
1177 if (db) {
1178 if ((db->db_flags & db_flags) != db_flags) {
1179 DEBUG(DEBUG_ERR,
1180 ("Error: Failed to re-attach with 0x%x flags,"
1181 " database has 0x%x flags\n", db_flags,
1182 db->db_flags));
1183 return -1;
1185 outdata->dptr = (uint8_t *)&db->db_id;
1186 outdata->dsize = sizeof(db->db_id);
1187 return 0;
1190 if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1191 return -1;
1194 db = ctdb_db_handle(ctdb, db_name);
1195 if (!db) {
1196 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1197 return -1;
1200 outdata->dptr = (uint8_t *)&db->db_id;
1201 outdata->dsize = sizeof(db->db_id);
1203 /* Try to ensure it's locked in mem */
1204 lockdown_memory(ctdb->valgrinding);
1206 if (ctdb_db_persistent(db)) {
1207 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1208 } else if (ctdb_db_replicated(db)) {
1209 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1210 } else {
1211 opcode = CTDB_CONTROL_DB_ATTACH;
1214 /* tell all the other nodes about this database */
1215 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
1216 0, CTDB_CTRL_FLAG_NOREPLY,
1217 indata, NULL, NULL);
1219 /* success */
1220 return 0;
1224 * a client has asked to detach from a database
1226 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1227 uint32_t client_id)
1229 uint32_t db_id;
1230 struct ctdb_db_context *ctdb_db;
1231 struct ctdb_client *client = NULL;
1233 db_id = *(uint32_t *)indata.dptr;
1234 ctdb_db = find_ctdb_db(ctdb, db_id);
1235 if (ctdb_db == NULL) {
1236 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1237 db_id));
1238 return -1;
1241 if (ctdb->tunable.allow_client_db_attach == 1) {
1242 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1243 "Clients are allowed access to databases "
1244 "(AllowClientDBAccess == 1)\n",
1245 ctdb_db->db_name));
1246 return -1;
1249 if (! ctdb_db_volatile(ctdb_db)) {
1250 DEBUG(DEBUG_ERR,
1251 ("Detaching non-volatile database %s denied\n",
1252 ctdb_db->db_name));
1253 return -1;
1256 /* Cannot detach from database when in recovery */
1257 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1258 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1259 return -1;
1262 /* If a control comes from a client, then broadcast it to all nodes.
1263 * Do the actual detach only if the control comes from other daemons.
1265 if (client_id != 0) {
1266 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1267 if (client != NULL) {
1268 /* forward the control to all the nodes */
1269 ctdb_daemon_send_control(ctdb,
1270 CTDB_BROADCAST_CONNECTED, 0,
1271 CTDB_CONTROL_DB_DETACH, 0,
1272 CTDB_CTRL_FLAG_NOREPLY,
1273 indata, NULL, NULL);
1274 return 0;
1276 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1277 "for database '%s'\n", ctdb_db->db_name));
1278 return -1;
1281 /* Detach database from recoverd */
1282 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1283 CTDB_SRVID_DETACH_DATABASE,
1284 indata) != 0) {
1285 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1286 return -1;
1289 /* Disable vacuuming and drop all vacuuming data */
1290 talloc_free(ctdb_db->vacuum_handle);
1291 talloc_free(ctdb_db->delete_queue);
1293 /* Terminate any deferred fetch */
1294 talloc_free(ctdb_db->deferred_fetch);
1296 /* Terminate any traverses */
1297 while (ctdb_db->traverse) {
1298 talloc_free(ctdb_db->traverse);
1301 /* Terminate any revokes */
1302 while (ctdb_db->revokechild_active) {
1303 talloc_free(ctdb_db->revokechild_active);
1306 /* Free readonly tracking database */
1307 if (ctdb_db_readonly(ctdb_db)) {
1308 talloc_free(ctdb_db->rottdb);
1311 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1313 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1314 ctdb_db->db_name));
1315 talloc_free(ctdb_db);
1317 return 0;
1321 attach to all existing persistent databases
1323 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1324 const char *unhealthy_reason)
1326 DIR *d;
1327 struct dirent *de;
1329 /* open the persistent db directory and scan it for files */
1330 d = opendir(ctdb->db_directory_persistent);
1331 if (d == NULL) {
1332 return 0;
1335 while ((de=readdir(d))) {
1336 char *p, *s, *q;
1337 size_t len = strlen(de->d_name);
1338 uint32_t node;
1339 int invalid_name = 0;
1341 s = talloc_strdup(ctdb, de->d_name);
1342 if (s == NULL) {
1343 closedir(d);
1344 CTDB_NO_MEMORY(ctdb, s);
1347 /* only accept names ending in .tdb */
1348 p = strstr(s, ".tdb.");
1349 if (len < 7 || p == NULL) {
1350 talloc_free(s);
1351 continue;
1354 /* only accept names ending with .tdb. and any number of digits */
1355 q = p+5;
1356 while (*q != 0 && invalid_name == 0) {
1357 if (!isdigit(*q++)) {
1358 invalid_name = 1;
1361 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1362 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1363 talloc_free(s);
1364 continue;
1366 p[4] = 0;
1368 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1369 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1370 closedir(d);
1371 talloc_free(s);
1372 return -1;
1375 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1377 talloc_free(s);
1379 closedir(d);
1380 return 0;
1383 int ctdb_attach_databases(struct ctdb_context *ctdb)
1385 int ret;
1386 char *persistent_health_path = NULL;
1387 char *unhealthy_reason = NULL;
1388 bool first_try = true;
1390 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1391 ctdb->db_directory_state,
1392 PERSISTENT_HEALTH_TDB,
1393 ctdb->pnn);
1394 if (persistent_health_path == NULL) {
1395 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1396 return -1;
1399 again:
1401 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1402 0, TDB_DISALLOW_NESTING,
1403 O_CREAT | O_RDWR, 0600);
1404 if (ctdb->db_persistent_health == NULL) {
1405 struct tdb_wrap *tdb;
1407 if (!first_try) {
1408 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1409 persistent_health_path,
1410 errno,
1411 strerror(errno)));
1412 talloc_free(persistent_health_path);
1413 talloc_free(unhealthy_reason);
1414 return -1;
1416 first_try = false;
1418 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1419 persistent_health_path,
1420 "was cleared after a failure",
1421 "manual verification needed");
1422 if (unhealthy_reason == NULL) {
1423 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1424 talloc_free(persistent_health_path);
1425 return -1;
1428 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1429 persistent_health_path));
1430 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1431 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1432 O_CREAT | O_RDWR, 0600);
1433 if (tdb) {
1434 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1435 persistent_health_path,
1436 errno,
1437 strerror(errno)));
1438 talloc_free(persistent_health_path);
1439 talloc_free(unhealthy_reason);
1440 return -1;
1443 talloc_free(tdb);
1444 goto again;
1446 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1447 if (ret != 0) {
1448 struct tdb_wrap *tdb;
1450 talloc_free(ctdb->db_persistent_health);
1451 ctdb->db_persistent_health = NULL;
1453 if (!first_try) {
1454 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1455 persistent_health_path));
1456 talloc_free(persistent_health_path);
1457 talloc_free(unhealthy_reason);
1458 return -1;
1460 first_try = false;
1462 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1463 persistent_health_path,
1464 "was cleared after a failure",
1465 "manual verification needed");
1466 if (unhealthy_reason == NULL) {
1467 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1468 talloc_free(persistent_health_path);
1469 return -1;
1472 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1473 persistent_health_path));
1474 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1475 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1476 O_CREAT | O_RDWR, 0600);
1477 if (tdb) {
1478 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1479 persistent_health_path,
1480 errno,
1481 strerror(errno)));
1482 talloc_free(persistent_health_path);
1483 talloc_free(unhealthy_reason);
1484 return -1;
1487 talloc_free(tdb);
1488 goto again;
1490 talloc_free(persistent_health_path);
1492 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1493 talloc_free(unhealthy_reason);
1494 if (ret != 0) {
1495 return ret;
1498 return 0;
1502 called when a broadcast seqnum update comes in
1504 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1506 struct ctdb_db_context *ctdb_db;
1507 if (srcnode == ctdb->pnn) {
1508 /* don't update ourselves! */
1509 return 0;
1512 ctdb_db = find_ctdb_db(ctdb, db_id);
1513 if (!ctdb_db) {
1514 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1515 return -1;
1518 if (ctdb_db->unhealthy_reason) {
1519 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1520 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1521 return -1;
1524 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1525 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1526 return 0;
1530 timer to check for seqnum changes in a ltdb and propagate them
1532 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1533 struct tevent_timer *te,
1534 struct timeval t, void *p)
1536 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1537 struct ctdb_context *ctdb = ctdb_db->ctdb;
1538 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1539 if (new_seqnum != ctdb_db->seqnum) {
1540 /* something has changed - propagate it */
1541 TDB_DATA data;
1542 data.dptr = (uint8_t *)&ctdb_db->db_id;
1543 data.dsize = sizeof(uint32_t);
1544 ctdb_daemon_send_control(ctdb,
1545 CTDB_BROADCAST_ACTIVE,
1547 CTDB_CONTROL_UPDATE_SEQNUM,
1549 CTDB_CTRL_FLAG_NOREPLY,
1550 data,
1551 NULL,
1552 NULL);
1554 ctdb_db->seqnum = new_seqnum;
1556 /* setup a new timer */
1557 ctdb_db->seqnum_update =
1558 tevent_add_timer(ctdb->ev, ctdb_db,
1559 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1560 (ctdb->tunable.seqnum_interval%1000)*1000),
1561 ctdb_ltdb_seqnum_check, ctdb_db);
1565 enable seqnum handling on this db
1567 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1569 struct ctdb_db_context *ctdb_db;
1570 ctdb_db = find_ctdb_db(ctdb, db_id);
1571 if (!ctdb_db) {
1572 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1573 return -1;
1576 if (ctdb_db->seqnum_update == NULL) {
1577 ctdb_db->seqnum_update = tevent_add_timer(
1578 ctdb->ev, ctdb_db,
1579 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1580 (ctdb->tunable.seqnum_interval%1000)*1000),
1581 ctdb_ltdb_seqnum_check, ctdb_db);
1584 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1585 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1586 return 0;
1589 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1591 if (ctdb_db_sticky(ctdb_db)) {
1592 return 0;
1595 if (! ctdb_db_volatile(ctdb_db)) {
1596 DEBUG(DEBUG_ERR,
1597 ("Non-volatile databases do not support sticky flag\n"));
1598 return -1;
1601 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1603 ctdb_db_set_sticky(ctdb_db);
1605 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1607 return 0;
1610 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1612 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1613 int i;
1615 for (i=0; i<MAX_HOT_KEYS; i++) {
1616 if (s->hot_keys[i].key.dsize > 0) {
1617 talloc_free(s->hot_keys[i].key.dptr);
1621 ZERO_STRUCT(ctdb_db->statistics);
1624 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1625 uint32_t db_id,
1626 TDB_DATA *outdata)
1628 struct ctdb_db_context *ctdb_db;
1629 struct ctdb_db_statistics_old *stats;
1630 int i;
1631 int len;
1632 char *ptr;
1634 ctdb_db = find_ctdb_db(ctdb, db_id);
1635 if (!ctdb_db) {
1636 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1637 return -1;
1640 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1641 for (i = 0; i < MAX_HOT_KEYS; i++) {
1642 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1645 stats = talloc_size(outdata, len);
1646 if (stats == NULL) {
1647 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1648 return -1;
1651 memcpy(stats, &ctdb_db->statistics,
1652 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1654 stats->num_hot_keys = MAX_HOT_KEYS;
1656 ptr = &stats->hot_keys_wire[0];
1657 for (i = 0; i < MAX_HOT_KEYS; i++) {
1658 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1659 ctdb_db->statistics.hot_keys[i].key.dsize);
1660 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1663 outdata->dptr = (uint8_t *)stats;
1664 outdata->dsize = len;
1666 return 0;