s3:idmap_ad: add support for ADS_AUTH_SASL_{STARTTLS,LDAPS}
[Samba.git] / ctdb / server / ctdb_ltdb_server.c
blobe2cb9165c71ac08a3719329b346c1e6a24e344df
1 /*
2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
27 #include <talloc.h>
28 #include <tevent.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #include "server/ctdb_config.h"
46 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
48 /**
49 * write a record to a normal database
51 * This is the server-variant of the ctdb_ltdb_store function.
52 * It contains logic to determine whether a record should be
53 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
54 * controls to the local ctdb daemon if appropriate.
56 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
57 TDB_DATA key,
58 struct ctdb_ltdb_header *header,
59 TDB_DATA data)
61 struct ctdb_context *ctdb = ctdb_db->ctdb;
62 TDB_DATA rec[2];
63 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
64 int ret;
65 bool keep = false;
66 bool schedule_for_deletion = false;
67 bool remove_from_delete_queue = false;
68 uint32_t lmaster;
70 if (ctdb->flags & CTDB_FLAG_TORTURE) {
71 TDB_DATA old;
72 struct ctdb_ltdb_header *h2;
74 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
75 h2 = (struct ctdb_ltdb_header *)old.dptr;
76 if (old.dptr != NULL &&
77 old.dsize >= hsize &&
78 h2->rsn > header->rsn) {
79 DEBUG(DEBUG_ERR,
80 ("RSN regression! %"PRIu64" %"PRIu64"\n",
81 h2->rsn, header->rsn));
83 if (old.dptr) {
84 free(old.dptr);
88 if (ctdb->vnn_map == NULL) {
90 * Called from a client: always store the record
91 * Also don't call ctdb_lmaster since it uses the vnn_map!
93 keep = true;
94 goto store;
97 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
100 * If we migrate an empty record off to another node
101 * and the record has not been migrated with data,
102 * delete the record instead of storing the empty record.
104 if (data.dsize != 0) {
105 keep = true;
106 } else if (header->flags & CTDB_REC_RO_FLAGS) {
107 keep = true;
108 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
110 * The record is not created by the client but
111 * automatically by the ctdb_ltdb_fetch logic that
112 * creates a record with an initial header in the
113 * ltdb before trying to migrate the record from
114 * the current lmaster. Keep it instead of trying
115 * to delete the non-existing record...
117 keep = true;
118 schedule_for_deletion = true;
119 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
120 keep = true;
121 } else if (ctdb_db->ctdb->pnn == lmaster) {
123 * If we are lmaster, then we usually keep the record.
124 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
125 * and the record is empty and has never been migrated
126 * with data, then we should delete it instead of storing it.
127 * This is part of the vacuuming process.
129 * The reason that we usually need to store even empty records
130 * on the lmaster is that a client operating directly on the
131 * lmaster (== dmaster) expects the local copy of the record to
132 * exist after successful ctdb migrate call. If the record does
133 * not exist, the client goes into a migrate loop and eventually
134 * fails. So storing the empty record makes sure that we do not
135 * need to change the client code.
137 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
138 keep = true;
139 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
140 keep = true;
142 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
143 keep = true;
146 if (keep) {
147 if (ctdb_db_volatile(ctdb_db) &&
148 (ctdb_db->ctdb->pnn == header->dmaster) &&
149 !(header->flags & CTDB_REC_RO_FLAGS))
151 header->rsn++;
153 if (data.dsize == 0) {
154 schedule_for_deletion = true;
157 remove_from_delete_queue = !schedule_for_deletion;
160 store:
162 * The VACUUM_MIGRATED flag is only set temporarily for
163 * the above logic when the record was retrieved by a
164 * VACUUM_MIGRATE call and should not be stored in the
165 * database.
167 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
168 * and there are two cases in which the corresponding record
169 * is stored in the local database:
170 * 1. The record has been migrated with data in the past
171 * (the MIGRATED_WITH_DATA record flag is set).
172 * 2. The record has been filled with data again since it
173 * had been submitted in the VACUUM_FETCH message to the
174 * lmaster.
175 * For such records it is important to not store the
176 * VACUUM_MIGRATED flag in the database.
178 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
181 * Similarly, clear the AUTOMATIC flag which should not enter
182 * the local database copy since this would require client
183 * modifications to clear the flag when the client stores
184 * the record.
186 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
188 rec[0].dsize = hsize;
189 rec[0].dptr = (uint8_t *)header;
191 rec[1].dsize = data.dsize;
192 rec[1].dptr = data.dptr;
194 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
195 ctdb_db->db_name,
196 keep?"storing":"deleting",
197 ctdb_hash(&key)));
199 if (keep) {
200 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
201 } else {
202 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
205 if (ret != 0) {
206 int lvl = DEBUG_ERR;
208 if (keep == false &&
209 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
211 lvl = DEBUG_DEBUG;
214 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
215 "%d - %s\n",
216 ctdb_db->db_name,
217 keep?"store":"delete", ret,
218 tdb_errorstr(ctdb_db->ltdb->tdb)));
220 schedule_for_deletion = false;
221 remove_from_delete_queue = false;
224 if (schedule_for_deletion) {
225 int ret2;
226 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
227 if (ret2 != 0) {
228 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
232 if (remove_from_delete_queue) {
233 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
236 return ret;
239 struct lock_fetch_state {
240 struct ctdb_context *ctdb;
241 struct ctdb_db_context *ctdb_db;
242 void (*recv_pkt)(void *, struct ctdb_req_header *);
243 void *recv_context;
244 struct ctdb_req_header *hdr;
245 uint32_t generation;
246 bool ignore_generation;
250 called when we should retry the operation
252 static void lock_fetch_callback(void *p, bool locked)
254 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
255 if (!state->ignore_generation &&
256 state->generation != state->ctdb_db->generation) {
257 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
258 talloc_free(state->hdr);
259 return;
261 state->recv_pkt(state->recv_context, state->hdr);
262 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
267 do a non-blocking ltdb_lock, deferring this ctdb request until we
268 have the chainlock
270 It does the following:
272 1) tries to get the chainlock. If it succeeds, then it returns 0
274 2) if it fails to get a chainlock immediately then it sets up a
275 non-blocking chainlock via ctdb_lock_record, and when it gets the
276 chainlock it re-submits this ctdb request to the main packet
277 receive function.
279 This effectively queues all ctdb requests that cannot be
280 immediately satisfied until it can get the lock. This means that
281 the main ctdb daemon will not block waiting for a chainlock held by
282 a client
284 There are 3 possible return values:
286 0: means that it got the lock immediately.
287 -1: means that it failed to get the lock, and won't retry
288 -2: means that it failed to get the lock immediately, but will retry
290 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
291 TDB_DATA key, struct ctdb_req_header *hdr,
292 void (*recv_pkt)(void *, struct ctdb_req_header *),
293 void *recv_context, bool ignore_generation)
295 int ret;
296 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
297 struct lock_request *lreq;
298 struct lock_fetch_state *state;
300 ret = tdb_chainlock_nonblock(tdb, key);
302 if (ret != 0 &&
303 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
304 /* a hard failure - don't try again */
305 return -1;
308 /* when torturing, ensure we test the contended path */
309 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
310 random() % 5 == 0) {
311 ret = -1;
312 tdb_chainunlock(tdb, key);
315 /* first the non-contended path */
316 if (ret == 0) {
317 return 0;
320 state = talloc(hdr, struct lock_fetch_state);
321 state->ctdb = ctdb_db->ctdb;
322 state->ctdb_db = ctdb_db;
323 state->hdr = hdr;
324 state->recv_pkt = recv_pkt;
325 state->recv_context = recv_context;
326 state->generation = ctdb_db->generation;
327 state->ignore_generation = ignore_generation;
329 /* now the contended path */
330 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
331 if (lreq == NULL) {
332 return -1;
335 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
336 so it won't be freed yet */
337 talloc_steal(state, hdr);
339 /* now tell the caller than we will retry asynchronously */
340 return -2;
344 a variant of ctdb_ltdb_lock_requeue that also fetches the record
346 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
347 TDB_DATA key, struct ctdb_ltdb_header *header,
348 struct ctdb_req_header *hdr, TDB_DATA *data,
349 void (*recv_pkt)(void *, struct ctdb_req_header *),
350 void *recv_context, bool ignore_generation)
352 int ret;
354 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
355 recv_context, ignore_generation);
356 if (ret != 0) {
357 return ret;
360 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
361 if (ret != 0) {
362 int uret;
363 uret = ctdb_ltdb_unlock(ctdb_db, key);
364 if (uret != 0) {
365 DBG_ERR("ctdb_ltdb_unlock() failed with error %d\n",
366 uret);
369 return ret;
374 paranoid check to see if the db is empty
376 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
378 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
379 int count = tdb_traverse_read(tdb, NULL, NULL);
380 if (count != 0) {
381 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
382 ctdb_db->db_path));
383 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
387 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
388 struct ctdb_db_context *ctdb_db)
390 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
391 char *old;
392 char *reason = NULL;
393 TDB_DATA key;
394 TDB_DATA val;
396 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
397 key.dsize = strlen(ctdb_db->db_name);
399 old = ctdb_db->unhealthy_reason;
400 ctdb_db->unhealthy_reason = NULL;
402 val = tdb_fetch(tdb, key);
403 if (val.dsize > 0) {
404 reason = talloc_strndup(ctdb_db,
405 (const char *)val.dptr,
406 val.dsize);
407 if (reason == NULL) {
408 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
409 (int)val.dsize));
410 ctdb_db->unhealthy_reason = old;
411 free(val.dptr);
412 return -1;
416 if (val.dptr) {
417 free(val.dptr);
420 talloc_free(old);
421 ctdb_db->unhealthy_reason = reason;
422 return 0;
425 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
426 struct ctdb_db_context *ctdb_db,
427 const char *given_reason,/* NULL means healthy */
428 unsigned int num_healthy_nodes)
430 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
431 int ret;
432 TDB_DATA key;
433 TDB_DATA val;
434 char *new_reason = NULL;
435 char *old_reason = NULL;
437 ret = tdb_transaction_start(tdb);
438 if (ret != 0) {
439 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
440 tdb_name(tdb), ret, tdb_errorstr(tdb)));
441 return -1;
444 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
445 if (ret != 0) {
446 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
447 ctdb_db->db_name, ret));
448 return -1;
450 old_reason = ctdb_db->unhealthy_reason;
452 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
453 key.dsize = strlen(ctdb_db->db_name);
455 if (given_reason) {
456 new_reason = talloc_strdup(ctdb_db, given_reason);
457 if (new_reason == NULL) {
458 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
459 given_reason));
460 return -1;
462 } else if (old_reason && num_healthy_nodes == 0) {
464 * If the reason indicates ok, but there were no healthy nodes
465 * available, it means that we have not recovered valid content
466 * of the db. So if there's an old reason, prefix it with
467 * "NO-HEALTHY-NODES - "
469 const char *prefix;
471 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
472 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
473 if (ret != 0) {
474 prefix = _TMP_PREFIX;
475 } else {
476 prefix = "";
478 new_reason = talloc_asprintf(ctdb_db, "%s%s",
479 prefix, old_reason);
480 if (new_reason == NULL) {
481 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
482 prefix, old_reason));
483 return -1;
485 #undef _TMP_PREFIX
488 if (new_reason) {
489 val.dptr = discard_const_p(uint8_t, new_reason);
490 val.dsize = strlen(new_reason);
492 ret = tdb_store(tdb, key, val, TDB_REPLACE);
493 if (ret != 0) {
494 tdb_transaction_cancel(tdb);
495 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
496 tdb_name(tdb), ctdb_db->db_name, new_reason,
497 ret, tdb_errorstr(tdb)));
498 talloc_free(new_reason);
499 return -1;
501 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
502 ctdb_db->db_name, new_reason));
503 } else if (old_reason) {
504 ret = tdb_delete(tdb, key);
505 if (ret != 0) {
506 tdb_transaction_cancel(tdb);
507 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
508 tdb_name(tdb), ctdb_db->db_name,
509 ret, tdb_errorstr(tdb)));
510 talloc_free(new_reason);
511 return -1;
513 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
514 ctdb_db->db_name));
517 ret = tdb_transaction_commit(tdb);
518 if (ret != TDB_SUCCESS) {
519 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
520 tdb_name(tdb), ret, tdb_errorstr(tdb)));
521 talloc_free(new_reason);
522 return -1;
525 talloc_free(old_reason);
526 ctdb_db->unhealthy_reason = new_reason;
528 return 0;
531 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
532 struct ctdb_db_context *ctdb_db)
534 time_t now = time(NULL);
535 char *new_path;
536 char *new_reason;
537 int ret;
538 struct tm *tm;
540 tm = gmtime(&now);
542 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
543 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
544 "%04u%02u%02u%02u%02u%02u.0Z",
545 ctdb_db->db_path,
546 tm->tm_year+1900, tm->tm_mon+1,
547 tm->tm_mday, tm->tm_hour, tm->tm_min,
548 tm->tm_sec);
549 if (new_path == NULL) {
550 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
551 return -1;
554 new_reason = talloc_asprintf(ctdb_db,
555 "ERROR - Backup of corrupted TDB in '%s'",
556 new_path);
557 if (new_reason == NULL) {
558 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
559 return -1;
561 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
562 talloc_free(new_reason);
563 if (ret != 0) {
564 DEBUG(DEBUG_CRIT,(__location__
565 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
566 ctdb_db->db_path));
567 return -1;
570 ret = rename(ctdb_db->db_path, new_path);
571 if (ret != 0) {
572 DEBUG(DEBUG_CRIT,(__location__
573 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
574 ctdb_db->db_path, new_path,
575 errno, strerror(errno)));
576 talloc_free(new_path);
577 return -1;
580 DEBUG(DEBUG_CRIT,(__location__
581 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
582 ctdb_db->db_path, new_path));
583 talloc_free(new_path);
584 return 0;
587 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
589 struct ctdb_db_context *ctdb_db;
590 int ret;
591 int ok = 0;
592 int fail = 0;
594 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
595 if (!ctdb_db_persistent(ctdb_db)) {
596 continue;
599 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
600 if (ret != 0) {
601 DEBUG(DEBUG_ALERT,(__location__
602 " load persistent health for '%s' failed\n",
603 ctdb_db->db_path));
604 return -1;
607 if (ctdb_db->unhealthy_reason == NULL) {
608 ok++;
609 DEBUG(DEBUG_INFO,(__location__
610 " persistent db '%s' healthy\n",
611 ctdb_db->db_path));
612 continue;
615 fail++;
616 DEBUG(DEBUG_ALERT,(__location__
617 " persistent db '%s' unhealthy: %s\n",
618 ctdb_db->db_path,
619 ctdb_db->unhealthy_reason));
621 DEBUG(DEBUG_NOTICE,
622 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
623 ok, fail));
625 if (fail != 0) {
626 return -1;
629 return 0;
634 mark a database - as healthy
636 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
638 uint32_t db_id = *(uint32_t *)indata.dptr;
639 struct ctdb_db_context *ctdb_db;
640 int ret;
641 bool may_recover = false;
643 ctdb_db = find_ctdb_db(ctdb, db_id);
644 if (!ctdb_db) {
645 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
646 return -1;
649 if (ctdb_db->unhealthy_reason) {
650 may_recover = true;
653 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
654 if (ret != 0) {
655 DEBUG(DEBUG_ERR,(__location__
656 " ctdb_update_persistent_health(%s) failed\n",
657 ctdb_db->db_name));
658 return -1;
661 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
662 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
663 ctdb_db->db_name));
664 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
667 return 0;
670 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
671 TDB_DATA indata,
672 TDB_DATA *outdata)
674 uint32_t db_id = *(uint32_t *)indata.dptr;
675 struct ctdb_db_context *ctdb_db;
676 int ret;
678 ctdb_db = find_ctdb_db(ctdb, db_id);
679 if (!ctdb_db) {
680 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
681 return -1;
684 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
685 if (ret != 0) {
686 DEBUG(DEBUG_ERR,(__location__
687 " ctdb_load_persistent_health(%s) failed\n",
688 ctdb_db->db_name));
689 return -1;
692 *outdata = tdb_null;
693 if (ctdb_db->unhealthy_reason) {
694 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
695 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
698 return 0;
702 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
704 char *ropath;
706 if (ctdb_db_readonly(ctdb_db)) {
707 return 0;
710 if (! ctdb_db_volatile(ctdb_db)) {
711 DEBUG(DEBUG_ERR,
712 ("Non-volatile databases do not support readonly flag\n"));
713 return -1;
716 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
717 if (ropath == NULL) {
718 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
719 return -1;
721 ctdb_db->rottdb = tdb_open(ropath,
722 ctdb->tunable.database_hash_size,
723 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
724 O_CREAT|O_RDWR, 0600);
725 if (ctdb_db->rottdb == NULL) {
726 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
727 talloc_free(ropath);
728 return -1;
731 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
733 ctdb_db_set_readonly(ctdb_db);
735 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
737 talloc_free(ropath);
738 return 0;
742 attach to a database, handling both persistent and non-persistent databases
743 return 0 on success, -1 on failure
745 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
746 uint8_t db_flags, const char *unhealthy_reason)
748 struct ctdb_db_context *ctdb_db, *tmp_db;
749 int ret;
750 struct TDB_DATA key;
751 int tdb_flags;
752 int mode = 0600;
753 int remaining_tries = 0;
755 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
756 CTDB_NO_MEMORY(ctdb, ctdb_db);
758 ctdb_db->ctdb = ctdb;
759 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
760 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
762 key.dsize = strlen(db_name)+1;
763 key.dptr = discard_const(db_name);
764 ctdb_db->db_id = ctdb_hash(&key);
765 ctdb_db->db_flags = db_flags;
767 if (ctdb_db_volatile(ctdb_db)) {
768 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
769 if (ctdb_db->delete_queue == NULL) {
770 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
773 ctdb_db->fetch_queue = trbt_create(ctdb_db, 0);
774 if (ctdb_db->fetch_queue == NULL) {
775 CTDB_NO_MEMORY(ctdb, ctdb_db->fetch_queue);
778 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
781 /* check for hash collisions */
782 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
783 if (tmp_db->db_id == ctdb_db->db_id) {
784 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
785 tmp_db->db_id, db_name, tmp_db->db_name));
786 talloc_free(ctdb_db);
787 return -1;
791 if (ctdb_db_persistent(ctdb_db)) {
792 if (unhealthy_reason) {
793 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
794 unhealthy_reason, 0);
795 if (ret != 0) {
796 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
797 ctdb_db->db_name, unhealthy_reason, ret));
798 talloc_free(ctdb_db);
799 return -1;
803 if (ctdb->max_persistent_check_errors > 0) {
804 remaining_tries = 1;
806 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
807 remaining_tries = 0;
810 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
811 if (ret != 0) {
812 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
813 ctdb_db->db_name, ret));
814 talloc_free(ctdb_db);
815 return -1;
819 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
820 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
821 ctdb_db->db_name, ctdb_db->unhealthy_reason));
822 talloc_free(ctdb_db);
823 return -1;
826 if (ctdb_db->unhealthy_reason) {
827 /* this is just a warning, but we want that in the log file! */
828 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
829 ctdb_db->db_name, ctdb_db->unhealthy_reason));
832 /* open the database */
833 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
834 ctdb_db_persistent(ctdb_db) ?
835 ctdb->db_directory_persistent :
836 ctdb->db_directory,
837 db_name, ctdb->pnn);
839 tdb_flags = ctdb_db_tdb_flags(db_flags,
840 ctdb->valgrinding,
841 ctdb_config.tdb_mutexes);
843 again:
844 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
845 ctdb->tunable.database_hash_size,
846 tdb_flags,
847 O_CREAT|O_RDWR, mode);
848 if (ctdb_db->ltdb == NULL) {
849 struct stat st;
850 int saved_errno = errno;
852 if (! ctdb_db_persistent(ctdb_db)) {
853 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
854 ctdb_db->db_path,
855 saved_errno,
856 strerror(saved_errno)));
857 talloc_free(ctdb_db);
858 return -1;
861 if (remaining_tries == 0) {
862 DEBUG(DEBUG_CRIT,(__location__
863 "Failed to open persistent tdb '%s': %d - %s\n",
864 ctdb_db->db_path,
865 saved_errno,
866 strerror(saved_errno)));
867 talloc_free(ctdb_db);
868 return -1;
871 ret = stat(ctdb_db->db_path, &st);
872 if (ret != 0) {
873 DEBUG(DEBUG_CRIT,(__location__
874 "Failed to open persistent tdb '%s': %d - %s\n",
875 ctdb_db->db_path,
876 saved_errno,
877 strerror(saved_errno)));
878 talloc_free(ctdb_db);
879 return -1;
882 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
883 if (ret != 0) {
884 DEBUG(DEBUG_CRIT,(__location__
885 "Failed to open persistent tdb '%s': %d - %s\n",
886 ctdb_db->db_path,
887 saved_errno,
888 strerror(saved_errno)));
889 talloc_free(ctdb_db);
890 return -1;
893 remaining_tries--;
894 mode = st.st_mode;
895 goto again;
898 if (!ctdb_db_persistent(ctdb_db)) {
899 ctdb_check_db_empty(ctdb_db);
900 } else {
901 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
902 if (ret != 0) {
903 int fd;
904 struct stat st;
906 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
907 ctdb_db->db_path, ret,
908 tdb_errorstr(ctdb_db->ltdb->tdb)));
909 if (remaining_tries == 0) {
910 talloc_free(ctdb_db);
911 return -1;
914 fd = tdb_fd(ctdb_db->ltdb->tdb);
915 ret = fstat(fd, &st);
916 if (ret != 0) {
917 DEBUG(DEBUG_CRIT,(__location__
918 "Failed to fstat() persistent tdb '%s': %d - %s\n",
919 ctdb_db->db_path,
920 errno,
921 strerror(errno)));
922 talloc_free(ctdb_db);
923 return -1;
926 /* close the TDB */
927 talloc_free(ctdb_db->ltdb);
928 ctdb_db->ltdb = NULL;
930 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
931 if (ret != 0) {
932 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
933 ctdb_db->db_path));
934 talloc_free(ctdb_db);
935 return -1;
938 remaining_tries--;
939 mode = st.st_mode;
940 goto again;
944 /* remember the flags the client has specified */
945 tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
948 /* set up a rb tree we can use to track which records we have a
949 fetch-lock in-flight for so we can defer any additional calls
950 for the same record.
952 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
953 if (ctdb_db->deferred_fetch == NULL) {
954 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
955 talloc_free(ctdb_db);
956 return -1;
959 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
960 if (ctdb_db->defer_dmaster == NULL) {
961 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
962 ctdb_db->db_name));
963 talloc_free(ctdb_db);
964 return -1;
967 DLIST_ADD(ctdb->db_list, ctdb_db);
969 /* setting this can help some high churn databases */
970 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
973 all databases support the "null" function. we need this in
974 order to do forced migration of records
976 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
977 if (ret != 0) {
978 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
979 talloc_free(ctdb_db);
980 return -1;
984 all databases support the "fetch" function. we need this
985 for efficient Samba3 ctdb fetch
987 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
988 if (ret != 0) {
989 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
990 talloc_free(ctdb_db);
991 return -1;
995 all databases support the "fetch_with_header" function. we need this
996 for efficient readonly record fetches
998 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
999 if (ret != 0) {
1000 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1001 talloc_free(ctdb_db);
1002 return -1;
1005 ret = ctdb_vacuum_init(ctdb_db);
1006 if (ret != 0) {
1007 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1008 "database '%s'\n", ctdb_db->db_name));
1009 talloc_free(ctdb_db);
1010 return -1;
1013 ret = ctdb_migration_init(ctdb_db);
1014 if (ret != 0) {
1015 DEBUG(DEBUG_ERR,
1016 ("Failed to setup migration tracking for db '%s'\n",
1017 ctdb_db->db_name));
1018 talloc_free(ctdb_db);
1019 return -1;
1022 ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1023 &ctdb_db->lock_log);
1024 if (ret != 0) {
1025 DEBUG(DEBUG_ERR,
1026 ("Failed to setup lock logging for db '%s'\n",
1027 ctdb_db->db_name));
1028 talloc_free(ctdb_db);
1029 return -1;
1032 ctdb_db->generation = ctdb->vnn_map->generation;
1034 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1035 ctdb_db->db_path, tdb_flags));
1037 /* success */
1038 return 0;
1042 struct ctdb_deferred_attach_context {
1043 struct ctdb_deferred_attach_context *next, *prev;
1044 struct ctdb_context *ctdb;
1045 struct ctdb_req_control_old *c;
1049 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1051 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1053 return 0;
1056 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1057 struct tevent_timer *te,
1058 struct timeval t, void *private_data)
1060 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1061 struct ctdb_context *ctdb = da_ctx->ctdb;
1063 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1064 talloc_free(da_ctx);
1067 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1068 struct tevent_timer *te,
1069 struct timeval t, void *private_data)
1071 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1072 struct ctdb_context *ctdb = da_ctx->ctdb;
1074 /* This talloc-steals the packet ->c */
1075 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1076 talloc_free(da_ctx);
1079 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1081 struct ctdb_deferred_attach_context *da_ctx;
1083 /* call it from the main event loop as soon as the current event
1084 finishes.
1086 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1087 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1088 tevent_add_timer(ctdb->ev, da_ctx,
1089 timeval_current_ofs(1,0),
1090 ctdb_deferred_attach_callback, da_ctx);
1093 return 0;
1097 a client has asked to attach a new database
1099 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
1100 TDB_DATA indata,
1101 TDB_DATA *outdata,
1102 uint8_t db_flags,
1103 uint32_t srcnode,
1104 uint32_t client_id,
1105 struct ctdb_req_control_old *c,
1106 bool *async_reply)
1108 const char *db_name = (const char *)indata.dptr;
1109 struct ctdb_db_context *db;
1110 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1111 struct ctdb_client *client = NULL;
1112 uint32_t opcode;
1114 if (ctdb->tunable.allow_client_db_attach == 0) {
1115 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1116 "AllowClientDBAccess == 0\n", db_name));
1117 return -1;
1120 /* don't allow any local clients to attach while we are in recovery mode
1121 * except for the recovery daemon.
1122 * allow all attach from the network since these are always from remote
1123 * recovery daemons.
1125 if (srcnode == ctdb->pnn && client_id != 0) {
1126 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1128 if (client != NULL) {
1129 /* If the node is inactive it is not part of the cluster
1130 and we should not allow clients to attach to any
1131 databases
1133 if (node->flags & NODE_FLAGS_INACTIVE) {
1134 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1135 return -1;
1138 if ((c->flags & CTDB_CTRL_FLAG_ATTACH_RECOVERY) &&
1139 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
1140 DBG_ERR("Attach from recovery refused because "
1141 "recovery is not active\n");
1142 return -1;
1145 if (!(c->flags & CTDB_CTRL_FLAG_ATTACH_RECOVERY) &&
1146 (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
1147 ctdb->runstate < CTDB_RUNSTATE_STARTUP)) {
1148 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1150 if (da_ctx == NULL) {
1151 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1152 return -1;
1155 da_ctx->ctdb = ctdb;
1156 da_ctx->c = talloc_steal(da_ctx, c);
1157 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1158 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1160 tevent_add_timer(ctdb->ev, da_ctx,
1161 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1162 ctdb_deferred_attach_timeout, da_ctx);
1164 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1165 *async_reply = true;
1166 return 0;
1170 /* see if we already have this name */
1171 db = ctdb_db_handle(ctdb, db_name);
1172 if (db) {
1173 if ((db->db_flags & db_flags) != db_flags) {
1174 DEBUG(DEBUG_ERR,
1175 ("Error: Failed to re-attach with 0x%x flags,"
1176 " database has 0x%x flags\n", db_flags,
1177 db->db_flags));
1178 return -1;
1180 outdata->dptr = (uint8_t *)&db->db_id;
1181 outdata->dsize = sizeof(db->db_id);
1182 return 0;
1185 if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1186 return -1;
1189 db = ctdb_db_handle(ctdb, db_name);
1190 if (!db) {
1191 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1192 return -1;
1195 outdata->dptr = (uint8_t *)&db->db_id;
1196 outdata->dsize = sizeof(db->db_id);
1198 /* Try to ensure it's locked in mem */
1199 lockdown_memory(ctdb->valgrinding);
1201 if (ctdb_db_persistent(db)) {
1202 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1203 } else if (ctdb_db_replicated(db)) {
1204 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1205 } else {
1206 opcode = CTDB_CONTROL_DB_ATTACH;
1209 /* tell all the other nodes about this database */
1210 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
1211 0, CTDB_CTRL_FLAG_NOREPLY,
1212 indata, NULL, NULL);
1214 /* success */
1215 return 0;
1219 * a client has asked to detach from a database
1221 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1222 uint32_t client_id)
1224 uint32_t db_id;
1225 struct ctdb_db_context *ctdb_db;
1226 struct ctdb_client *client = NULL;
1228 db_id = *(uint32_t *)indata.dptr;
1229 ctdb_db = find_ctdb_db(ctdb, db_id);
1230 if (ctdb_db == NULL) {
1231 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1232 db_id));
1233 return -1;
1236 if (ctdb->tunable.allow_client_db_attach == 1) {
1237 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1238 "Clients are allowed access to databases "
1239 "(AllowClientDBAccess == 1)\n",
1240 ctdb_db->db_name));
1241 return -1;
1244 if (! ctdb_db_volatile(ctdb_db)) {
1245 DEBUG(DEBUG_ERR,
1246 ("Detaching non-volatile database %s denied\n",
1247 ctdb_db->db_name));
1248 return -1;
1251 /* Cannot detach from database when in recovery */
1252 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1253 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1254 return -1;
1257 /* If a control comes from a client, then broadcast it to all nodes.
1258 * Do the actual detach only if the control comes from other daemons.
1260 if (client_id != 0) {
1261 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1262 if (client != NULL) {
1263 /* forward the control to all the nodes */
1264 ctdb_daemon_send_control(ctdb,
1265 CTDB_BROADCAST_CONNECTED, 0,
1266 CTDB_CONTROL_DB_DETACH, 0,
1267 CTDB_CTRL_FLAG_NOREPLY,
1268 indata, NULL, NULL);
1269 return 0;
1271 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1272 "for database '%s'\n", ctdb_db->db_name));
1273 return -1;
1276 /* Disable vacuuming and drop all vacuuming data */
1277 talloc_free(ctdb_db->vacuum_handle);
1278 talloc_free(ctdb_db->delete_queue);
1279 talloc_free(ctdb_db->fetch_queue);
1281 /* Terminate any deferred fetch */
1282 talloc_free(ctdb_db->deferred_fetch);
1284 /* Terminate any traverses */
1285 while (ctdb_db->traverse) {
1286 talloc_free(ctdb_db->traverse);
1289 /* Terminate any revokes */
1290 while (ctdb_db->revokechild_active) {
1291 talloc_free(ctdb_db->revokechild_active);
1294 /* Free readonly tracking database */
1295 if (ctdb_db_readonly(ctdb_db)) {
1296 talloc_free(ctdb_db->rottdb);
1299 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1301 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1302 ctdb_db->db_name));
1303 talloc_free(ctdb_db);
1305 return 0;
1309 attach to all existing persistent databases
1311 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1312 const char *unhealthy_reason)
1314 DIR *d;
1315 struct dirent *de;
1317 /* open the persistent db directory and scan it for files */
1318 d = opendir(ctdb->db_directory_persistent);
1319 if (d == NULL) {
1320 return 0;
1323 while ((de=readdir(d))) {
1324 char *p, *s, *q;
1325 size_t len = strlen(de->d_name);
1326 uint32_t node;
1327 int invalid_name = 0;
1329 s = talloc_strdup(ctdb, de->d_name);
1330 if (s == NULL) {
1331 closedir(d);
1332 CTDB_NO_MEMORY(ctdb, s);
1335 /* only accept names ending in .tdb */
1336 p = strstr(s, ".tdb.");
1337 if (len < 7 || p == NULL) {
1338 talloc_free(s);
1339 continue;
1342 /* only accept names ending with .tdb. and any number of digits */
1343 q = p+5;
1344 while (*q != 0 && invalid_name == 0) {
1345 if (!isdigit(*q++)) {
1346 invalid_name = 1;
1349 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1350 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1351 talloc_free(s);
1352 continue;
1354 p[4] = 0;
1356 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1357 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1358 closedir(d);
1359 talloc_free(s);
1360 return -1;
1363 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1365 talloc_free(s);
1367 closedir(d);
1368 return 0;
1371 int ctdb_attach_databases(struct ctdb_context *ctdb)
1373 int ret;
1374 char *persistent_health_path = NULL;
1375 char *unhealthy_reason = NULL;
1376 bool first_try = true;
1378 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1379 ctdb->db_directory_state,
1380 PERSISTENT_HEALTH_TDB,
1381 ctdb->pnn);
1382 if (persistent_health_path == NULL) {
1383 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1384 return -1;
1387 again:
1389 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1390 0, TDB_DISALLOW_NESTING,
1391 O_CREAT | O_RDWR, 0600);
1392 if (ctdb->db_persistent_health == NULL) {
1393 struct tdb_wrap *tdb;
1395 if (!first_try) {
1396 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1397 persistent_health_path,
1398 errno,
1399 strerror(errno)));
1400 talloc_free(persistent_health_path);
1401 talloc_free(unhealthy_reason);
1402 return -1;
1404 first_try = false;
1406 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1407 persistent_health_path,
1408 "was cleared after a failure",
1409 "manual verification needed");
1410 if (unhealthy_reason == NULL) {
1411 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1412 talloc_free(persistent_health_path);
1413 return -1;
1416 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1417 persistent_health_path));
1418 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1419 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1420 O_CREAT | O_RDWR, 0600);
1421 if (tdb) {
1422 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1423 persistent_health_path,
1424 errno,
1425 strerror(errno)));
1426 talloc_free(persistent_health_path);
1427 talloc_free(unhealthy_reason);
1428 return -1;
1431 talloc_free(tdb);
1432 goto again;
1434 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1435 if (ret != 0) {
1436 struct tdb_wrap *tdb;
1438 talloc_free(ctdb->db_persistent_health);
1439 ctdb->db_persistent_health = NULL;
1441 if (!first_try) {
1442 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1443 persistent_health_path));
1444 talloc_free(persistent_health_path);
1445 talloc_free(unhealthy_reason);
1446 return -1;
1448 first_try = false;
1450 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1451 persistent_health_path,
1452 "was cleared after a failure",
1453 "manual verification needed");
1454 if (unhealthy_reason == NULL) {
1455 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1456 talloc_free(persistent_health_path);
1457 return -1;
1460 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1461 persistent_health_path));
1462 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1463 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1464 O_CREAT | O_RDWR, 0600);
1465 if (tdb) {
1466 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1467 persistent_health_path,
1468 errno,
1469 strerror(errno)));
1470 talloc_free(persistent_health_path);
1471 talloc_free(unhealthy_reason);
1472 return -1;
1475 talloc_free(tdb);
1476 goto again;
1478 talloc_free(persistent_health_path);
1480 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1481 talloc_free(unhealthy_reason);
1482 if (ret != 0) {
1483 return ret;
1486 return 0;
1490 called when a broadcast seqnum update comes in
1492 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1494 struct ctdb_db_context *ctdb_db;
1495 if (srcnode == ctdb->pnn) {
1496 /* don't update ourselves! */
1497 return 0;
1500 ctdb_db = find_ctdb_db(ctdb, db_id);
1501 if (!ctdb_db) {
1502 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1503 return -1;
1506 if (ctdb_db->unhealthy_reason) {
1507 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1508 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1509 return -1;
1512 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1513 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1514 return 0;
1518 timer to check for seqnum changes in a ltdb and propagate them
1520 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1521 struct tevent_timer *te,
1522 struct timeval t, void *p)
1524 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1525 struct ctdb_context *ctdb = ctdb_db->ctdb;
1526 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1527 if (new_seqnum != ctdb_db->seqnum) {
1528 /* something has changed - propagate it */
1529 TDB_DATA data;
1530 data.dptr = (uint8_t *)&ctdb_db->db_id;
1531 data.dsize = sizeof(uint32_t);
1532 ctdb_daemon_send_control(ctdb,
1533 CTDB_BROADCAST_ACTIVE,
1535 CTDB_CONTROL_UPDATE_SEQNUM,
1537 CTDB_CTRL_FLAG_NOREPLY,
1538 data,
1539 NULL,
1540 NULL);
1542 ctdb_db->seqnum = new_seqnum;
1544 /* setup a new timer */
1545 ctdb_db->seqnum_update =
1546 tevent_add_timer(ctdb->ev, ctdb_db,
1547 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1548 (ctdb->tunable.seqnum_interval%1000)*1000),
1549 ctdb_ltdb_seqnum_check, ctdb_db);
1553 enable seqnum handling on this db
1555 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1557 struct ctdb_db_context *ctdb_db;
1558 ctdb_db = find_ctdb_db(ctdb, db_id);
1559 if (!ctdb_db) {
1560 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1561 return -1;
1564 if (ctdb_db->seqnum_update == NULL) {
1565 ctdb_db->seqnum_update = tevent_add_timer(
1566 ctdb->ev, ctdb_db,
1567 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1568 (ctdb->tunable.seqnum_interval%1000)*1000),
1569 ctdb_ltdb_seqnum_check, ctdb_db);
1572 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1573 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1574 return 0;
1577 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1579 if (ctdb_db_sticky(ctdb_db)) {
1580 return 0;
1583 if (! ctdb_db_volatile(ctdb_db)) {
1584 DEBUG(DEBUG_ERR,
1585 ("Non-volatile databases do not support sticky flag\n"));
1586 return -1;
1589 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1591 ctdb_db_set_sticky(ctdb_db);
1593 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1595 return 0;
1598 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1600 unsigned int i;
1602 for (i=0; i<MAX_HOT_KEYS; i++) {
1603 if (ctdb_db->hot_keys[i].key.dsize > 0) {
1604 TALLOC_FREE(ctdb_db->hot_keys[i].key.dptr);
1605 ctdb_db->hot_keys[i].key.dsize = 0;
1607 ctdb_db->hot_keys[i].count = 0;
1608 ctdb_db->hot_keys[i].last_logged_count = 0;
1611 ZERO_STRUCT(ctdb_db->statistics);
1614 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1615 uint32_t db_id,
1616 TDB_DATA *outdata)
1618 struct ctdb_db_context *ctdb_db;
1619 struct ctdb_db_statistics_old *stats;
1620 unsigned int i;
1621 size_t len;
1622 char *ptr;
1624 ctdb_db = find_ctdb_db(ctdb, db_id);
1625 if (!ctdb_db) {
1626 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1627 return -1;
1630 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1631 for (i = 0; i < MAX_HOT_KEYS; i++) {
1632 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1634 s->hot_keys[i].key.dsize = ctdb_db->hot_keys[i].key.dsize;
1635 s->hot_keys[i].key.dptr = ctdb_db->hot_keys[i].key.dptr;
1636 s->hot_keys[i].count = ctdb_db->hot_keys[i].count;
1638 len += s->hot_keys[i].key.dsize;
1641 stats = talloc_size(outdata, len);
1642 if (stats == NULL) {
1643 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1644 return -1;
1647 memcpy(stats, &ctdb_db->statistics,
1648 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1650 stats->num_hot_keys = MAX_HOT_KEYS;
1652 ptr = &stats->hot_keys_wire[0];
1653 for (i = 0; i < MAX_HOT_KEYS; i++) {
1654 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1655 ctdb_db->statistics.hot_keys[i].key.dsize);
1656 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1659 outdata->dptr = (uint8_t *)stats;
1660 outdata->dsize = len;
1662 return 0;