ctdb-util: Rename db_wrap to tdb_wrap and make it a build subsystem
[Samba.git] / ctdb / server / ctdb_ltdb_server.c
blob8fb2bc7ce910900ee4e2892b3e26f70e339558d9
1 /*
2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34 /**
35 * write a record to a normal database
37 * This is the server-variant of the ctdb_ltdb_store function.
38 * It contains logic to determine whether a record should be
39 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
40 * controls to the local ctdb daemon if apporpriate.
42 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
43 TDB_DATA key,
44 struct ctdb_ltdb_header *header,
45 TDB_DATA data)
47 struct ctdb_context *ctdb = ctdb_db->ctdb;
48 TDB_DATA rec;
49 int ret;
50 bool seqnum_suppressed = false;
51 bool keep = false;
52 bool schedule_for_deletion = false;
53 bool remove_from_delete_queue = false;
54 uint32_t lmaster;
56 if (ctdb->flags & CTDB_FLAG_TORTURE) {
57 struct ctdb_ltdb_header *h2;
58 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
64 if (rec.dptr) free(rec.dptr);
67 if (ctdb->vnn_map == NULL) {
69 * Called from a client: always store the record
70 * Also don't call ctdb_lmaster since it uses the vnn_map!
72 keep = true;
73 goto store;
76 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
79 * If we migrate an empty record off to another node
80 * and the record has not been migrated with data,
81 * delete the record instead of storing the empty record.
83 if (data.dsize != 0) {
84 keep = true;
85 } else if (header->flags & CTDB_REC_RO_FLAGS) {
86 keep = true;
87 } else if (ctdb_db->persistent) {
88 keep = true;
89 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
91 * The record is not created by the client but
92 * automatically by the ctdb_ltdb_fetch logic that
93 * creates a record with an initial header in the
94 * ltdb before trying to migrate the record from
95 * the current lmaster. Keep it instead of trying
96 * to delete the non-existing record...
98 keep = true;
99 schedule_for_deletion = true;
100 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
101 keep = true;
102 } else if (ctdb_db->ctdb->pnn == lmaster) {
104 * If we are lmaster, then we usually keep the record.
105 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
106 * and the record is empty and has never been migrated
107 * with data, then we should delete it instead of storing it.
108 * This is part of the vacuuming process.
110 * The reason that we usually need to store even empty records
111 * on the lmaster is that a client operating directly on the
112 * lmaster (== dmaster) expects the local copy of the record to
113 * exist after successful ctdb migrate call. If the record does
114 * not exist, the client goes into a migrate loop and eventually
115 * fails. So storing the empty record makes sure that we do not
116 * need to change the client code.
118 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
119 keep = true;
120 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
121 keep = true;
123 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
124 keep = true;
127 if (keep) {
128 if (!ctdb_db->persistent &&
129 (ctdb_db->ctdb->pnn == header->dmaster) &&
130 !(header->flags & CTDB_REC_RO_FLAGS))
132 header->rsn++;
134 if (data.dsize == 0) {
135 schedule_for_deletion = true;
138 remove_from_delete_queue = !schedule_for_deletion;
141 store:
143 * The VACUUM_MIGRATED flag is only set temporarily for
144 * the above logic when the record was retrieved by a
145 * VACUUM_MIGRATE call and should not be stored in the
146 * database.
148 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
149 * and there are two cases in which the corresponding record
150 * is stored in the local database:
151 * 1. The record has been migrated with data in the past
152 * (the MIGRATED_WITH_DATA record flag is set).
153 * 2. The record has been filled with data again since it
154 * had been submitted in the VACUUM_FETCH message to the
155 * lmaster.
156 * For such records it is important to not store the
157 * VACUUM_MIGRATED flag in the database.
159 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
162 * Similarly, clear the AUTOMATIC flag which should not enter
163 * the local database copy since this would require client
164 * modifications to clear the flag when the client stores
165 * the record.
167 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
169 rec.dsize = sizeof(*header) + data.dsize;
170 rec.dptr = talloc_size(ctdb, rec.dsize);
171 CTDB_NO_MEMORY(ctdb, rec.dptr);
173 memcpy(rec.dptr, header, sizeof(*header));
174 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
176 /* Databases with seqnum updates enabled only get their seqnum
177 changes when/if we modify the data */
178 if (ctdb_db->seqnum_update != NULL) {
179 TDB_DATA old;
180 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
182 if ( (old.dsize == rec.dsize)
183 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
184 rec.dptr+sizeof(struct ctdb_ltdb_header),
185 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
186 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
187 seqnum_suppressed = true;
189 if (old.dptr) free(old.dptr);
192 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
193 ctdb_db->db_name,
194 keep?"storing":"deleting",
195 ctdb_hash(&key)));
197 if (keep) {
198 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
199 } else {
200 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
203 if (ret != 0) {
204 int lvl = DEBUG_ERR;
206 if (keep == false &&
207 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
209 lvl = DEBUG_DEBUG;
212 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
213 "%d - %s\n",
214 ctdb_db->db_name,
215 keep?"store":"delete", ret,
216 tdb_errorstr(ctdb_db->ltdb->tdb)));
218 schedule_for_deletion = false;
219 remove_from_delete_queue = false;
221 if (seqnum_suppressed) {
222 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
225 talloc_free(rec.dptr);
227 if (schedule_for_deletion) {
228 int ret2;
229 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
230 if (ret2 != 0) {
231 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
235 if (remove_from_delete_queue) {
236 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
239 return ret;
242 struct lock_fetch_state {
243 struct ctdb_context *ctdb;
244 void (*recv_pkt)(void *, struct ctdb_req_header *);
245 void *recv_context;
246 struct ctdb_req_header *hdr;
247 uint32_t generation;
248 bool ignore_generation;
252 called when we should retry the operation
254 static void lock_fetch_callback(void *p, bool locked)
256 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
257 if (!state->ignore_generation &&
258 state->generation != state->ctdb->vnn_map->generation) {
259 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
260 talloc_free(state->hdr);
261 return;
263 state->recv_pkt(state->recv_context, state->hdr);
264 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
269 do a non-blocking ltdb_lock, deferring this ctdb request until we
270 have the chainlock
272 It does the following:
274 1) tries to get the chainlock. If it succeeds, then it returns 0
276 2) if it fails to get a chainlock immediately then it sets up a
277 non-blocking chainlock via ctdb_lock_record, and when it gets the
278 chainlock it re-submits this ctdb request to the main packet
279 receive function.
281 This effectively queues all ctdb requests that cannot be
282 immediately satisfied until it can get the lock. This means that
283 the main ctdb daemon will not block waiting for a chainlock held by
284 a client
286 There are 3 possible return values:
288 0: means that it got the lock immediately.
289 -1: means that it failed to get the lock, and won't retry
290 -2: means that it failed to get the lock immediately, but will retry
292 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
293 TDB_DATA key, struct ctdb_req_header *hdr,
294 void (*recv_pkt)(void *, struct ctdb_req_header *),
295 void *recv_context, bool ignore_generation)
297 int ret;
298 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
299 struct lock_request *lreq;
300 struct lock_fetch_state *state;
302 ret = tdb_chainlock_nonblock(tdb, key);
304 if (ret != 0 &&
305 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
306 /* a hard failure - don't try again */
307 return -1;
310 /* when torturing, ensure we test the contended path */
311 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
312 random() % 5 == 0) {
313 ret = -1;
314 tdb_chainunlock(tdb, key);
317 /* first the non-contended path */
318 if (ret == 0) {
319 return 0;
322 state = talloc(hdr, struct lock_fetch_state);
323 state->ctdb = ctdb_db->ctdb;
324 state->hdr = hdr;
325 state->recv_pkt = recv_pkt;
326 state->recv_context = recv_context;
327 state->generation = ctdb_db->ctdb->vnn_map->generation;
328 state->ignore_generation = ignore_generation;
330 /* now the contended path */
331 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
332 if (lreq == NULL) {
333 return -1;
336 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
337 so it won't be freed yet */
338 talloc_steal(state, hdr);
340 /* now tell the caller than we will retry asynchronously */
341 return -2;
345 a varient of ctdb_ltdb_lock_requeue that also fetches the record
347 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
348 TDB_DATA key, struct ctdb_ltdb_header *header,
349 struct ctdb_req_header *hdr, TDB_DATA *data,
350 void (*recv_pkt)(void *, struct ctdb_req_header *),
351 void *recv_context, bool ignore_generation)
353 int ret;
355 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
356 recv_context, ignore_generation);
357 if (ret == 0) {
358 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
359 if (ret != 0) {
360 int uret;
361 uret = ctdb_ltdb_unlock(ctdb_db, key);
362 if (uret != 0) {
363 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
367 return ret;
372 paraoid check to see if the db is empty
374 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
376 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
377 int count = tdb_traverse_read(tdb, NULL, NULL);
378 if (count != 0) {
379 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
380 ctdb_db->db_path));
381 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
385 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
386 struct ctdb_db_context *ctdb_db)
388 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
389 char *old;
390 char *reason = NULL;
391 TDB_DATA key;
392 TDB_DATA val;
394 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
395 key.dsize = strlen(ctdb_db->db_name);
397 old = ctdb_db->unhealthy_reason;
398 ctdb_db->unhealthy_reason = NULL;
400 val = tdb_fetch(tdb, key);
401 if (val.dsize > 0) {
402 reason = talloc_strndup(ctdb_db,
403 (const char *)val.dptr,
404 val.dsize);
405 if (reason == NULL) {
406 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
407 (int)val.dsize));
408 ctdb_db->unhealthy_reason = old;
409 free(val.dptr);
410 return -1;
414 if (val.dptr) {
415 free(val.dptr);
418 talloc_free(old);
419 ctdb_db->unhealthy_reason = reason;
420 return 0;
423 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
424 struct ctdb_db_context *ctdb_db,
425 const char *given_reason,/* NULL means healthy */
426 int num_healthy_nodes)
428 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
429 int ret;
430 TDB_DATA key;
431 TDB_DATA val;
432 char *new_reason = NULL;
433 char *old_reason = NULL;
435 ret = tdb_transaction_start(tdb);
436 if (ret != 0) {
437 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
438 tdb_name(tdb), ret, tdb_errorstr(tdb)));
439 return -1;
442 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
443 if (ret != 0) {
444 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
445 ctdb_db->db_name, ret));
446 return -1;
448 old_reason = ctdb_db->unhealthy_reason;
450 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
451 key.dsize = strlen(ctdb_db->db_name);
453 if (given_reason) {
454 new_reason = talloc_strdup(ctdb_db, given_reason);
455 if (new_reason == NULL) {
456 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
457 given_reason));
458 return -1;
460 } else if (old_reason && num_healthy_nodes == 0) {
462 * If the reason indicates ok, but there where no healthy nodes
463 * available, that it means, we have not recovered valid content
464 * of the db. So if there's an old reason, prefix it with
465 * "NO-HEALTHY-NODES - "
467 const char *prefix;
469 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
470 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
471 if (ret != 0) {
472 prefix = _TMP_PREFIX;
473 } else {
474 prefix = "";
476 new_reason = talloc_asprintf(ctdb_db, "%s%s",
477 prefix, old_reason);
478 if (new_reason == NULL) {
479 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
480 prefix, old_reason));
481 return -1;
483 #undef _TMP_PREFIX
486 if (new_reason) {
487 val.dptr = discard_const_p(uint8_t, new_reason);
488 val.dsize = strlen(new_reason);
490 ret = tdb_store(tdb, key, val, TDB_REPLACE);
491 if (ret != 0) {
492 tdb_transaction_cancel(tdb);
493 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
494 tdb_name(tdb), ctdb_db->db_name, new_reason,
495 ret, tdb_errorstr(tdb)));
496 talloc_free(new_reason);
497 return -1;
499 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
500 ctdb_db->db_name, new_reason));
501 } else if (old_reason) {
502 ret = tdb_delete(tdb, key);
503 if (ret != 0) {
504 tdb_transaction_cancel(tdb);
505 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
506 tdb_name(tdb), ctdb_db->db_name,
507 ret, tdb_errorstr(tdb)));
508 talloc_free(new_reason);
509 return -1;
511 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
512 ctdb_db->db_name));
515 ret = tdb_transaction_commit(tdb);
516 if (ret != TDB_SUCCESS) {
517 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
518 tdb_name(tdb), ret, tdb_errorstr(tdb)));
519 talloc_free(new_reason);
520 return -1;
523 talloc_free(old_reason);
524 ctdb_db->unhealthy_reason = new_reason;
526 return 0;
529 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
530 struct ctdb_db_context *ctdb_db)
532 time_t now = time(NULL);
533 char *new_path;
534 char *new_reason;
535 int ret;
536 struct tm *tm;
538 tm = gmtime(&now);
540 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
541 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
542 "%04u%02u%02u%02u%02u%02u.0Z",
543 ctdb_db->db_path,
544 tm->tm_year+1900, tm->tm_mon+1,
545 tm->tm_mday, tm->tm_hour, tm->tm_min,
546 tm->tm_sec);
547 if (new_path == NULL) {
548 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
549 return -1;
552 new_reason = talloc_asprintf(ctdb_db,
553 "ERROR - Backup of corrupted TDB in '%s'",
554 new_path);
555 if (new_reason == NULL) {
556 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
557 return -1;
559 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
560 talloc_free(new_reason);
561 if (ret != 0) {
562 DEBUG(DEBUG_CRIT,(__location__
563 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
564 ctdb_db->db_path));
565 return -1;
568 ret = rename(ctdb_db->db_path, new_path);
569 if (ret != 0) {
570 DEBUG(DEBUG_CRIT,(__location__
571 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
572 ctdb_db->db_path, new_path,
573 errno, strerror(errno)));
574 talloc_free(new_path);
575 return -1;
578 DEBUG(DEBUG_CRIT,(__location__
579 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
580 ctdb_db->db_path, new_path));
581 talloc_free(new_path);
582 return 0;
585 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
587 struct ctdb_db_context *ctdb_db;
588 int ret;
589 int ok = 0;
590 int fail = 0;
592 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
593 if (!ctdb_db->persistent) {
594 continue;
597 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
598 if (ret != 0) {
599 DEBUG(DEBUG_ALERT,(__location__
600 " load persistent health for '%s' failed\n",
601 ctdb_db->db_path));
602 return -1;
605 if (ctdb_db->unhealthy_reason == NULL) {
606 ok++;
607 DEBUG(DEBUG_INFO,(__location__
608 " persistent db '%s' healthy\n",
609 ctdb_db->db_path));
610 continue;
613 fail++;
614 DEBUG(DEBUG_ALERT,(__location__
615 " persistent db '%s' unhealthy: %s\n",
616 ctdb_db->db_path,
617 ctdb_db->unhealthy_reason));
619 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
620 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
621 ok, fail));
623 if (fail != 0) {
624 return -1;
627 return 0;
632 mark a database - as healthy
634 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
636 uint32_t db_id = *(uint32_t *)indata.dptr;
637 struct ctdb_db_context *ctdb_db;
638 int ret;
639 bool may_recover = false;
641 ctdb_db = find_ctdb_db(ctdb, db_id);
642 if (!ctdb_db) {
643 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
644 return -1;
647 if (ctdb_db->unhealthy_reason) {
648 may_recover = true;
651 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
652 if (ret != 0) {
653 DEBUG(DEBUG_ERR,(__location__
654 " ctdb_update_persistent_health(%s) failed\n",
655 ctdb_db->db_name));
656 return -1;
659 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
660 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
661 ctdb_db->db_name));
662 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
665 return 0;
668 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
669 TDB_DATA indata,
670 TDB_DATA *outdata)
672 uint32_t db_id = *(uint32_t *)indata.dptr;
673 struct ctdb_db_context *ctdb_db;
674 int ret;
676 ctdb_db = find_ctdb_db(ctdb, db_id);
677 if (!ctdb_db) {
678 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
679 return -1;
682 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
683 if (ret != 0) {
684 DEBUG(DEBUG_ERR,(__location__
685 " ctdb_load_persistent_health(%s) failed\n",
686 ctdb_db->db_name));
687 return -1;
690 *outdata = tdb_null;
691 if (ctdb_db->unhealthy_reason) {
692 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
693 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
696 return 0;
700 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
702 char *ropath;
704 if (ctdb_db->readonly) {
705 return 0;
708 if (ctdb_db->persistent) {
709 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
710 return -1;
713 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
714 if (ropath == NULL) {
715 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
716 return -1;
718 ctdb_db->rottdb = tdb_open(ropath,
719 ctdb->tunable.database_hash_size,
720 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
721 O_CREAT|O_RDWR, 0);
722 if (ctdb_db->rottdb == NULL) {
723 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
724 talloc_free(ropath);
725 return -1;
728 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
730 ctdb_db->readonly = true;
732 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
734 talloc_free(ropath);
735 return 0;
739 attach to a database, handling both persistent and non-persistent databases
740 return 0 on success, -1 on failure
742 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
743 bool persistent, const char *unhealthy_reason,
744 bool jenkinshash, bool mutexes)
746 struct ctdb_db_context *ctdb_db, *tmp_db;
747 int ret;
748 struct TDB_DATA key;
749 unsigned tdb_flags;
750 int mode = 0600;
751 int remaining_tries = 0;
753 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
754 CTDB_NO_MEMORY(ctdb, ctdb_db);
756 ctdb_db->priority = 1;
757 ctdb_db->ctdb = ctdb;
758 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
759 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
761 key.dsize = strlen(db_name)+1;
762 key.dptr = discard_const(db_name);
763 ctdb_db->db_id = ctdb_hash(&key);
764 ctdb_db->persistent = persistent;
766 if (!ctdb_db->persistent) {
767 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
768 if (ctdb_db->delete_queue == NULL) {
769 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
772 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
775 /* check for hash collisions */
776 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
777 if (tmp_db->db_id == ctdb_db->db_id) {
778 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
779 tmp_db->db_id, db_name, tmp_db->db_name));
780 talloc_free(ctdb_db);
781 return -1;
785 if (persistent) {
786 if (unhealthy_reason) {
787 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
788 unhealthy_reason, 0);
789 if (ret != 0) {
790 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
791 ctdb_db->db_name, unhealthy_reason, ret));
792 talloc_free(ctdb_db);
793 return -1;
797 if (ctdb->max_persistent_check_errors > 0) {
798 remaining_tries = 1;
800 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
801 remaining_tries = 0;
804 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
805 if (ret != 0) {
806 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
807 ctdb_db->db_name, ret));
808 talloc_free(ctdb_db);
809 return -1;
813 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
814 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
815 ctdb_db->db_name, ctdb_db->unhealthy_reason));
816 talloc_free(ctdb_db);
817 return -1;
820 if (ctdb_db->unhealthy_reason) {
821 /* this is just a warning, but we want that in the log file! */
822 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
823 ctdb_db->db_name, ctdb_db->unhealthy_reason));
826 /* open the database */
827 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
828 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
829 db_name, ctdb->pnn);
831 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
832 if (ctdb->valgrinding) {
833 tdb_flags |= TDB_NOMMAP;
835 tdb_flags |= TDB_DISALLOW_NESTING;
836 if (jenkinshash) {
837 tdb_flags |= TDB_INCOMPATIBLE_HASH;
839 #ifdef TDB_MUTEX_LOCKING
840 if (ctdb->tunable.mutex_enabled && mutexes &&
841 tdb_runtime_check_for_robust_mutexes()) {
842 tdb_flags |= TDB_MUTEX_LOCKING;
844 #endif
846 again:
847 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
848 ctdb->tunable.database_hash_size,
849 tdb_flags,
850 O_CREAT|O_RDWR, mode);
851 if (ctdb_db->ltdb == NULL) {
852 struct stat st;
853 int saved_errno = errno;
855 if (!persistent) {
856 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
857 ctdb_db->db_path,
858 saved_errno,
859 strerror(saved_errno)));
860 talloc_free(ctdb_db);
861 return -1;
864 if (remaining_tries == 0) {
865 DEBUG(DEBUG_CRIT,(__location__
866 "Failed to open persistent tdb '%s': %d - %s\n",
867 ctdb_db->db_path,
868 saved_errno,
869 strerror(saved_errno)));
870 talloc_free(ctdb_db);
871 return -1;
874 ret = stat(ctdb_db->db_path, &st);
875 if (ret != 0) {
876 DEBUG(DEBUG_CRIT,(__location__
877 "Failed to open persistent tdb '%s': %d - %s\n",
878 ctdb_db->db_path,
879 saved_errno,
880 strerror(saved_errno)));
881 talloc_free(ctdb_db);
882 return -1;
885 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
886 if (ret != 0) {
887 DEBUG(DEBUG_CRIT,(__location__
888 "Failed to open persistent tdb '%s': %d - %s\n",
889 ctdb_db->db_path,
890 saved_errno,
891 strerror(saved_errno)));
892 talloc_free(ctdb_db);
893 return -1;
896 remaining_tries--;
897 mode = st.st_mode;
898 goto again;
901 if (!persistent) {
902 ctdb_check_db_empty(ctdb_db);
903 } else {
904 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
905 if (ret != 0) {
906 int fd;
907 struct stat st;
909 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
910 ctdb_db->db_path, ret,
911 tdb_errorstr(ctdb_db->ltdb->tdb)));
912 if (remaining_tries == 0) {
913 talloc_free(ctdb_db);
914 return -1;
917 fd = tdb_fd(ctdb_db->ltdb->tdb);
918 ret = fstat(fd, &st);
919 if (ret != 0) {
920 DEBUG(DEBUG_CRIT,(__location__
921 "Failed to fstat() persistent tdb '%s': %d - %s\n",
922 ctdb_db->db_path,
923 errno,
924 strerror(errno)));
925 talloc_free(ctdb_db);
926 return -1;
929 /* close the TDB */
930 talloc_free(ctdb_db->ltdb);
931 ctdb_db->ltdb = NULL;
933 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
934 if (ret != 0) {
935 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
936 ctdb_db->db_path));
937 talloc_free(ctdb_db);
938 return -1;
941 remaining_tries--;
942 mode = st.st_mode;
943 goto again;
947 /* set up a rb tree we can use to track which records we have a
948 fetch-lock in-flight for so we can defer any additional calls
949 for the same record.
951 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
952 if (ctdb_db->deferred_fetch == NULL) {
953 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
954 talloc_free(ctdb_db);
955 return -1;
958 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
959 if (ctdb_db->defer_dmaster == NULL) {
960 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
961 ctdb_db->db_name));
962 talloc_free(ctdb_db);
963 return -1;
966 DLIST_ADD(ctdb->db_list, ctdb_db);
968 /* setting this can help some high churn databases */
969 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
972 all databases support the "null" function. we need this in
973 order to do forced migration of records
975 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
976 if (ret != 0) {
977 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
978 talloc_free(ctdb_db);
979 return -1;
983 all databases support the "fetch" function. we need this
984 for efficient Samba3 ctdb fetch
986 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
987 if (ret != 0) {
988 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
989 talloc_free(ctdb_db);
990 return -1;
994 all databases support the "fetch_with_header" function. we need this
995 for efficient readonly record fetches
997 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
998 if (ret != 0) {
999 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1000 talloc_free(ctdb_db);
1001 return -1;
1004 ret = ctdb_vacuum_init(ctdb_db);
1005 if (ret != 0) {
1006 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1007 "database '%s'\n", ctdb_db->db_name));
1008 talloc_free(ctdb_db);
1009 return -1;
1013 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1014 ctdb_db->db_path, tdb_flags));
1016 /* success */
1017 return 0;
1021 struct ctdb_deferred_attach_context {
1022 struct ctdb_deferred_attach_context *next, *prev;
1023 struct ctdb_context *ctdb;
1024 struct ctdb_req_control *c;
1028 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1030 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1032 return 0;
1035 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1037 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1038 struct ctdb_context *ctdb = da_ctx->ctdb;
1040 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1041 talloc_free(da_ctx);
1044 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1046 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1047 struct ctdb_context *ctdb = da_ctx->ctdb;
1049 /* This talloc-steals the packet ->c */
1050 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1051 talloc_free(da_ctx);
1054 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1056 struct ctdb_deferred_attach_context *da_ctx;
1058 /* call it from the main event loop as soon as the current event
1059 finishes.
1061 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1062 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1063 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1066 return 0;
1070 a client has asked to attach a new database
1072 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1073 TDB_DATA *outdata, uint64_t tdb_flags,
1074 bool persistent, uint32_t client_id,
1075 struct ctdb_req_control *c,
1076 bool *async_reply)
1078 const char *db_name = (const char *)indata.dptr;
1079 struct ctdb_db_context *db;
1080 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1081 struct ctdb_client *client = NULL;
1082 bool with_jenkinshash, with_mutexes;
1084 if (ctdb->tunable.allow_client_db_attach == 0) {
1085 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1086 "AllowClientDBAccess == 0\n", db_name));
1087 return -1;
1090 /* dont allow any local clients to attach while we are in recovery mode
1091 * except for the recovery daemon.
1092 * allow all attach from the network since these are always from remote
1093 * recovery daemons.
1095 if (client_id != 0) {
1096 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1098 if (client != NULL) {
1099 /* If the node is inactive it is not part of the cluster
1100 and we should not allow clients to attach to any
1101 databases
1103 if (node->flags & NODE_FLAGS_INACTIVE) {
1104 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1105 return -1;
1108 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1109 client->pid != ctdb->recoverd_pid &&
1110 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1111 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1113 if (da_ctx == NULL) {
1114 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1115 return -1;
1118 da_ctx->ctdb = ctdb;
1119 da_ctx->c = talloc_steal(da_ctx, c);
1120 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1121 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1123 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1125 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1126 *async_reply = true;
1127 return 0;
1131 /* the client can optionally pass additional tdb flags, but we
1132 only allow a subset of those on the database in ctdb. Note
1133 that tdb_flags is passed in via the (otherwise unused)
1134 srvid to the attach control */
1135 #ifdef TDB_MUTEX_LOCKING
1136 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING);
1137 #else
1138 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1139 #endif
1141 /* see if we already have this name */
1142 db = ctdb_db_handle(ctdb, db_name);
1143 if (db) {
1144 if (db->persistent != persistent) {
1145 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1146 "database %s\n", persistent ? "" : "non-",
1147 db-> persistent ? "" : "non-", db_name));
1148 return -1;
1150 outdata->dptr = (uint8_t *)&db->db_id;
1151 outdata->dsize = sizeof(db->db_id);
1152 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1153 return 0;
1156 with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1157 #ifdef TDB_MUTEX_LOCKING
1158 with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1159 #else
1160 with_mutexes = false;
1161 #endif
1163 if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1164 with_jenkinshash, with_mutexes) != 0) {
1165 return -1;
1168 db = ctdb_db_handle(ctdb, db_name);
1169 if (!db) {
1170 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1171 return -1;
1174 /* remember the flags the client has specified */
1175 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1177 outdata->dptr = (uint8_t *)&db->db_id;
1178 outdata->dsize = sizeof(db->db_id);
1180 /* Try to ensure it's locked in mem */
1181 lockdown_memory(ctdb->valgrinding);
1183 /* tell all the other nodes about this database */
1184 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1185 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1186 CTDB_CONTROL_DB_ATTACH,
1187 0, CTDB_CTRL_FLAG_NOREPLY,
1188 indata, NULL, NULL);
1190 /* success */
1191 return 0;
1195 * a client has asked to detach from a database
1197 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1198 uint32_t client_id)
1200 uint32_t db_id;
1201 struct ctdb_db_context *ctdb_db;
1202 struct ctdb_client *client = NULL;
1204 db_id = *(uint32_t *)indata.dptr;
1205 ctdb_db = find_ctdb_db(ctdb, db_id);
1206 if (ctdb_db == NULL) {
1207 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1208 db_id));
1209 return -1;
1212 if (ctdb->tunable.allow_client_db_attach == 1) {
1213 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1214 "Clients are allowed access to databases "
1215 "(AllowClientDBAccess == 1)\n",
1216 ctdb_db->db_name));
1217 return -1;
1220 if (ctdb_db->persistent) {
1221 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1222 "denied\n", ctdb_db->db_name));
1223 return -1;
1226 /* Cannot detach from database when in recovery */
1227 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1228 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1229 return -1;
1232 /* If a control comes from a client, then broadcast it to all nodes.
1233 * Do the actual detach only if the control comes from other daemons.
1235 if (client_id != 0) {
1236 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1237 if (client != NULL) {
1238 /* forward the control to all the nodes */
1239 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1240 CTDB_CONTROL_DB_DETACH, 0,
1241 CTDB_CTRL_FLAG_NOREPLY,
1242 indata, NULL, NULL);
1243 return 0;
1245 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1246 "for database '%s'\n", ctdb_db->db_name));
1247 return -1;
1250 /* Detach database from recoverd */
1251 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1252 CTDB_SRVID_DETACH_DATABASE,
1253 indata) != 0) {
1254 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1255 return -1;
1258 /* Disable vacuuming and drop all vacuuming data */
1259 talloc_free(ctdb_db->vacuum_handle);
1260 talloc_free(ctdb_db->delete_queue);
1262 /* Terminate any deferred fetch */
1263 talloc_free(ctdb_db->deferred_fetch);
1265 /* Terminate any traverses */
1266 while (ctdb_db->traverse) {
1267 talloc_free(ctdb_db->traverse);
1270 /* Terminate any revokes */
1271 while (ctdb_db->revokechild_active) {
1272 talloc_free(ctdb_db->revokechild_active);
1275 /* Free readonly tracking database */
1276 if (ctdb_db->readonly) {
1277 talloc_free(ctdb_db->rottdb);
1280 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1282 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1283 ctdb_db->db_name));
1284 talloc_free(ctdb_db);
1286 return 0;
1290 attach to all existing persistent databases
1292 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1293 const char *unhealthy_reason)
1295 DIR *d;
1296 struct dirent *de;
1298 /* open the persistent db directory and scan it for files */
1299 d = opendir(ctdb->db_directory_persistent);
1300 if (d == NULL) {
1301 return 0;
1304 while ((de=readdir(d))) {
1305 char *p, *s, *q;
1306 size_t len = strlen(de->d_name);
1307 uint32_t node;
1308 int invalid_name = 0;
1310 s = talloc_strdup(ctdb, de->d_name);
1311 if (s == NULL) {
1312 closedir(d);
1313 CTDB_NO_MEMORY(ctdb, s);
1316 /* only accept names ending in .tdb */
1317 p = strstr(s, ".tdb.");
1318 if (len < 7 || p == NULL) {
1319 talloc_free(s);
1320 continue;
1323 /* only accept names ending with .tdb. and any number of digits */
1324 q = p+5;
1325 while (*q != 0 && invalid_name == 0) {
1326 if (!isdigit(*q++)) {
1327 invalid_name = 1;
1330 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1331 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1332 talloc_free(s);
1333 continue;
1335 p[4] = 0;
1337 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1338 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1339 closedir(d);
1340 talloc_free(s);
1341 return -1;
1344 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1346 talloc_free(s);
1348 closedir(d);
1349 return 0;
1352 int ctdb_attach_databases(struct ctdb_context *ctdb)
1354 int ret;
1355 char *persistent_health_path = NULL;
1356 char *unhealthy_reason = NULL;
1357 bool first_try = true;
1359 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1360 ctdb->db_directory_state,
1361 PERSISTENT_HEALTH_TDB,
1362 ctdb->pnn);
1363 if (persistent_health_path == NULL) {
1364 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1365 return -1;
1368 again:
1370 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1371 0, TDB_DISALLOW_NESTING,
1372 O_CREAT | O_RDWR, 0600);
1373 if (ctdb->db_persistent_health == NULL) {
1374 struct tdb_wrap *tdb;
1376 if (!first_try) {
1377 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1378 persistent_health_path,
1379 errno,
1380 strerror(errno)));
1381 talloc_free(persistent_health_path);
1382 talloc_free(unhealthy_reason);
1383 return -1;
1385 first_try = false;
1387 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1388 persistent_health_path,
1389 "was cleared after a failure",
1390 "manual verification needed");
1391 if (unhealthy_reason == NULL) {
1392 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1393 talloc_free(persistent_health_path);
1394 return -1;
1397 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1398 persistent_health_path));
1399 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1400 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1401 O_CREAT | O_RDWR, 0600);
1402 if (tdb) {
1403 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1404 persistent_health_path,
1405 errno,
1406 strerror(errno)));
1407 talloc_free(persistent_health_path);
1408 talloc_free(unhealthy_reason);
1409 return -1;
1412 talloc_free(tdb);
1413 goto again;
1415 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1416 if (ret != 0) {
1417 struct tdb_wrap *tdb;
1419 talloc_free(ctdb->db_persistent_health);
1420 ctdb->db_persistent_health = NULL;
1422 if (!first_try) {
1423 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1424 persistent_health_path));
1425 talloc_free(persistent_health_path);
1426 talloc_free(unhealthy_reason);
1427 return -1;
1429 first_try = false;
1431 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1432 persistent_health_path,
1433 "was cleared after a failure",
1434 "manual verification needed");
1435 if (unhealthy_reason == NULL) {
1436 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1437 talloc_free(persistent_health_path);
1438 return -1;
1441 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1442 persistent_health_path));
1443 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1444 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1445 O_CREAT | O_RDWR, 0600);
1446 if (tdb) {
1447 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1448 persistent_health_path,
1449 errno,
1450 strerror(errno)));
1451 talloc_free(persistent_health_path);
1452 talloc_free(unhealthy_reason);
1453 return -1;
1456 talloc_free(tdb);
1457 goto again;
1459 talloc_free(persistent_health_path);
1461 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1462 talloc_free(unhealthy_reason);
1463 if (ret != 0) {
1464 return ret;
1467 return 0;
1471 called when a broadcast seqnum update comes in
1473 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1475 struct ctdb_db_context *ctdb_db;
1476 if (srcnode == ctdb->pnn) {
1477 /* don't update ourselves! */
1478 return 0;
1481 ctdb_db = find_ctdb_db(ctdb, db_id);
1482 if (!ctdb_db) {
1483 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1484 return -1;
1487 if (ctdb_db->unhealthy_reason) {
1488 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1489 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1490 return -1;
1493 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1494 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1495 return 0;
1499 timer to check for seqnum changes in a ltdb and propogate them
1501 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1502 struct timeval t, void *p)
1504 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1505 struct ctdb_context *ctdb = ctdb_db->ctdb;
1506 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1507 if (new_seqnum != ctdb_db->seqnum) {
1508 /* something has changed - propogate it */
1509 TDB_DATA data;
1510 data.dptr = (uint8_t *)&ctdb_db->db_id;
1511 data.dsize = sizeof(uint32_t);
1512 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1513 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1514 data, NULL, NULL);
1516 ctdb_db->seqnum = new_seqnum;
1518 /* setup a new timer */
1519 ctdb_db->seqnum_update =
1520 event_add_timed(ctdb->ev, ctdb_db,
1521 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1522 ctdb_ltdb_seqnum_check, ctdb_db);
1526 enable seqnum handling on this db
1528 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1530 struct ctdb_db_context *ctdb_db;
1531 ctdb_db = find_ctdb_db(ctdb, db_id);
1532 if (!ctdb_db) {
1533 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1534 return -1;
1537 if (ctdb_db->seqnum_update == NULL) {
1538 ctdb_db->seqnum_update =
1539 event_add_timed(ctdb->ev, ctdb_db,
1540 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1541 ctdb_ltdb_seqnum_check, ctdb_db);
1544 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1545 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1546 return 0;
1549 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
1550 uint32_t client_id)
1552 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1553 struct ctdb_db_context *ctdb_db;
1555 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1556 if (!ctdb_db) {
1557 if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
1558 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
1559 db_prio->db_id));
1561 return 0;
1564 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1565 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1566 return 0;
1569 ctdb_db->priority = db_prio->priority;
1570 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1572 if (client_id != 0) {
1573 /* Broadcast the update to the rest of the cluster */
1574 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1575 CTDB_CONTROL_SET_DB_PRIORITY, 0,
1576 CTDB_CTRL_FLAG_NOREPLY, indata,
1577 NULL, NULL);
1579 return 0;
1583 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1585 if (ctdb_db->sticky) {
1586 return 0;
1589 if (ctdb_db->persistent) {
1590 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1591 return -1;
1594 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1596 ctdb_db->sticky = true;
1598 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1600 return 0;
1603 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1604 uint32_t db_id,
1605 TDB_DATA *outdata)
1607 struct ctdb_db_context *ctdb_db;
1608 struct ctdb_db_statistics *stats;
1609 int i;
1610 int len;
1611 char *ptr;
1613 ctdb_db = find_ctdb_db(ctdb, db_id);
1614 if (!ctdb_db) {
1615 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1616 return -1;
1619 len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
1620 for (i = 0; i < MAX_HOT_KEYS; i++) {
1621 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1624 stats = talloc_size(outdata, len);
1625 if (stats == NULL) {
1626 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1627 return -1;
1630 *stats = ctdb_db->statistics;
1632 stats->num_hot_keys = MAX_HOT_KEYS;
1634 ptr = &stats->hot_keys_wire[0];
1635 for (i = 0; i < MAX_HOT_KEYS; i++) {
1636 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1637 ctdb_db->statistics.hot_keys[i].key.dsize);
1638 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1641 outdata->dptr = (uint8_t *)stats;
1642 outdata->dsize = len;
1644 return 0;