ldb_tdb: Use mem_ctx and so avoid leak onto long-term memory on duplicated add.
[Samba.git] / ctdb / server / ctdb_ltdb_server.c
blobd94d942a9fae3fb41a09a913958af67bfea5728f
1 /*
2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
27 #include <talloc.h>
28 #include <tevent.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
46 /**
47 * write a record to a normal database
49 * This is the server-variant of the ctdb_ltdb_store function.
50 * It contains logic to determine whether a record should be
51 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52 * controls to the local ctdb daemon if apporpriate.
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55 TDB_DATA key,
56 struct ctdb_ltdb_header *header,
57 TDB_DATA data)
59 struct ctdb_context *ctdb = ctdb_db->ctdb;
60 TDB_DATA rec[2];
61 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
62 int ret;
63 bool seqnum_suppressed = false;
64 bool keep = false;
65 bool schedule_for_deletion = false;
66 bool remove_from_delete_queue = false;
67 uint32_t lmaster;
69 if (ctdb->flags & CTDB_FLAG_TORTURE) {
70 TDB_DATA old;
71 struct ctdb_ltdb_header *h2;
73 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74 h2 = (struct ctdb_ltdb_header *)old.dptr;
75 if (old.dptr != NULL &&
76 old.dsize >= hsize &&
77 h2->rsn > header->rsn) {
78 DEBUG(DEBUG_ERR,
79 ("RSN regression! %"PRIu64" %"PRIu64"\n",
80 h2->rsn, header->rsn));
82 if (old.dptr) {
83 free(old.dptr);
87 if (ctdb->vnn_map == NULL) {
89 * Called from a client: always store the record
90 * Also don't call ctdb_lmaster since it uses the vnn_map!
92 keep = true;
93 goto store;
96 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
99 * If we migrate an empty record off to another node
100 * and the record has not been migrated with data,
101 * delete the record instead of storing the empty record.
103 if (data.dsize != 0) {
104 keep = true;
105 } else if (header->flags & CTDB_REC_RO_FLAGS) {
106 keep = true;
107 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
109 * The record is not created by the client but
110 * automatically by the ctdb_ltdb_fetch logic that
111 * creates a record with an initial header in the
112 * ltdb before trying to migrate the record from
113 * the current lmaster. Keep it instead of trying
114 * to delete the non-existing record...
116 keep = true;
117 schedule_for_deletion = true;
118 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
119 keep = true;
120 } else if (ctdb_db->ctdb->pnn == lmaster) {
122 * If we are lmaster, then we usually keep the record.
123 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124 * and the record is empty and has never been migrated
125 * with data, then we should delete it instead of storing it.
126 * This is part of the vacuuming process.
128 * The reason that we usually need to store even empty records
129 * on the lmaster is that a client operating directly on the
130 * lmaster (== dmaster) expects the local copy of the record to
131 * exist after successful ctdb migrate call. If the record does
132 * not exist, the client goes into a migrate loop and eventually
133 * fails. So storing the empty record makes sure that we do not
134 * need to change the client code.
136 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
137 keep = true;
138 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
139 keep = true;
141 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
142 keep = true;
145 if (keep) {
146 if (ctdb_db_volatile(ctdb_db) &&
147 (ctdb_db->ctdb->pnn == header->dmaster) &&
148 !(header->flags & CTDB_REC_RO_FLAGS))
150 header->rsn++;
152 if (data.dsize == 0) {
153 schedule_for_deletion = true;
156 remove_from_delete_queue = !schedule_for_deletion;
159 store:
161 * The VACUUM_MIGRATED flag is only set temporarily for
162 * the above logic when the record was retrieved by a
163 * VACUUM_MIGRATE call and should not be stored in the
164 * database.
166 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167 * and there are two cases in which the corresponding record
168 * is stored in the local database:
169 * 1. The record has been migrated with data in the past
170 * (the MIGRATED_WITH_DATA record flag is set).
171 * 2. The record has been filled with data again since it
172 * had been submitted in the VACUUM_FETCH message to the
173 * lmaster.
174 * For such records it is important to not store the
175 * VACUUM_MIGRATED flag in the database.
177 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
180 * Similarly, clear the AUTOMATIC flag which should not enter
181 * the local database copy since this would require client
182 * modifications to clear the flag when the client stores
183 * the record.
185 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
187 rec[0].dsize = hsize;
188 rec[0].dptr = (uint8_t *)header;
190 rec[1].dsize = data.dsize;
191 rec[1].dptr = data.dptr;
193 /* Databases with seqnum updates enabled only get their seqnum
194 changes when/if we modify the data */
195 if (ctdb_db->seqnum_update != NULL) {
196 TDB_DATA old;
197 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
199 if ((old.dsize == hsize + data.dsize) &&
200 memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
201 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202 seqnum_suppressed = true;
204 if (old.dptr != NULL) {
205 free(old.dptr);
209 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
210 ctdb_db->db_name,
211 keep?"storing":"deleting",
212 ctdb_hash(&key)));
214 if (keep) {
215 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
216 } else {
217 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
220 if (ret != 0) {
221 int lvl = DEBUG_ERR;
223 if (keep == false &&
224 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
226 lvl = DEBUG_DEBUG;
229 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
230 "%d - %s\n",
231 ctdb_db->db_name,
232 keep?"store":"delete", ret,
233 tdb_errorstr(ctdb_db->ltdb->tdb)));
235 schedule_for_deletion = false;
236 remove_from_delete_queue = false;
238 if (seqnum_suppressed) {
239 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
242 if (schedule_for_deletion) {
243 int ret2;
244 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
245 if (ret2 != 0) {
246 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
250 if (remove_from_delete_queue) {
251 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
254 return ret;
257 struct lock_fetch_state {
258 struct ctdb_context *ctdb;
259 struct ctdb_db_context *ctdb_db;
260 void (*recv_pkt)(void *, struct ctdb_req_header *);
261 void *recv_context;
262 struct ctdb_req_header *hdr;
263 uint32_t generation;
264 bool ignore_generation;
268 called when we should retry the operation
270 static void lock_fetch_callback(void *p, bool locked)
272 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
273 if (!state->ignore_generation &&
274 state->generation != state->ctdb_db->generation) {
275 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
276 talloc_free(state->hdr);
277 return;
279 state->recv_pkt(state->recv_context, state->hdr);
280 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
285 do a non-blocking ltdb_lock, deferring this ctdb request until we
286 have the chainlock
288 It does the following:
290 1) tries to get the chainlock. If it succeeds, then it returns 0
292 2) if it fails to get a chainlock immediately then it sets up a
293 non-blocking chainlock via ctdb_lock_record, and when it gets the
294 chainlock it re-submits this ctdb request to the main packet
295 receive function.
297 This effectively queues all ctdb requests that cannot be
298 immediately satisfied until it can get the lock. This means that
299 the main ctdb daemon will not block waiting for a chainlock held by
300 a client
302 There are 3 possible return values:
304 0: means that it got the lock immediately.
305 -1: means that it failed to get the lock, and won't retry
306 -2: means that it failed to get the lock immediately, but will retry
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
309 TDB_DATA key, struct ctdb_req_header *hdr,
310 void (*recv_pkt)(void *, struct ctdb_req_header *),
311 void *recv_context, bool ignore_generation)
313 int ret;
314 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
315 struct lock_request *lreq;
316 struct lock_fetch_state *state;
318 ret = tdb_chainlock_nonblock(tdb, key);
320 if (ret != 0 &&
321 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
322 /* a hard failure - don't try again */
323 return -1;
326 /* when torturing, ensure we test the contended path */
327 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
328 random() % 5 == 0) {
329 ret = -1;
330 tdb_chainunlock(tdb, key);
333 /* first the non-contended path */
334 if (ret == 0) {
335 return 0;
338 state = talloc(hdr, struct lock_fetch_state);
339 state->ctdb = ctdb_db->ctdb;
340 state->ctdb_db = ctdb_db;
341 state->hdr = hdr;
342 state->recv_pkt = recv_pkt;
343 state->recv_context = recv_context;
344 state->generation = ctdb_db->generation;
345 state->ignore_generation = ignore_generation;
347 /* now the contended path */
348 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
349 if (lreq == NULL) {
350 return -1;
353 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354 so it won't be freed yet */
355 talloc_steal(state, hdr);
357 /* now tell the caller than we will retry asynchronously */
358 return -2;
362 a varient of ctdb_ltdb_lock_requeue that also fetches the record
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
365 TDB_DATA key, struct ctdb_ltdb_header *header,
366 struct ctdb_req_header *hdr, TDB_DATA *data,
367 void (*recv_pkt)(void *, struct ctdb_req_header *),
368 void *recv_context, bool ignore_generation)
370 int ret;
372 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
373 recv_context, ignore_generation);
374 if (ret == 0) {
375 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
376 if (ret != 0) {
377 int uret;
378 uret = ctdb_ltdb_unlock(ctdb_db, key);
379 if (uret != 0) {
380 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
384 return ret;
389 paraoid check to see if the db is empty
391 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
393 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
394 int count = tdb_traverse_read(tdb, NULL, NULL);
395 if (count != 0) {
396 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
397 ctdb_db->db_path));
398 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
402 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
403 struct ctdb_db_context *ctdb_db)
405 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
406 char *old;
407 char *reason = NULL;
408 TDB_DATA key;
409 TDB_DATA val;
411 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
412 key.dsize = strlen(ctdb_db->db_name);
414 old = ctdb_db->unhealthy_reason;
415 ctdb_db->unhealthy_reason = NULL;
417 val = tdb_fetch(tdb, key);
418 if (val.dsize > 0) {
419 reason = talloc_strndup(ctdb_db,
420 (const char *)val.dptr,
421 val.dsize);
422 if (reason == NULL) {
423 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
424 (int)val.dsize));
425 ctdb_db->unhealthy_reason = old;
426 free(val.dptr);
427 return -1;
431 if (val.dptr) {
432 free(val.dptr);
435 talloc_free(old);
436 ctdb_db->unhealthy_reason = reason;
437 return 0;
440 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
441 struct ctdb_db_context *ctdb_db,
442 const char *given_reason,/* NULL means healthy */
443 int num_healthy_nodes)
445 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
446 int ret;
447 TDB_DATA key;
448 TDB_DATA val;
449 char *new_reason = NULL;
450 char *old_reason = NULL;
452 ret = tdb_transaction_start(tdb);
453 if (ret != 0) {
454 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
455 tdb_name(tdb), ret, tdb_errorstr(tdb)));
456 return -1;
459 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
460 if (ret != 0) {
461 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
462 ctdb_db->db_name, ret));
463 return -1;
465 old_reason = ctdb_db->unhealthy_reason;
467 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
468 key.dsize = strlen(ctdb_db->db_name);
470 if (given_reason) {
471 new_reason = talloc_strdup(ctdb_db, given_reason);
472 if (new_reason == NULL) {
473 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
474 given_reason));
475 return -1;
477 } else if (old_reason && num_healthy_nodes == 0) {
479 * If the reason indicates ok, but there where no healthy nodes
480 * available, that it means, we have not recovered valid content
481 * of the db. So if there's an old reason, prefix it with
482 * "NO-HEALTHY-NODES - "
484 const char *prefix;
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
488 if (ret != 0) {
489 prefix = _TMP_PREFIX;
490 } else {
491 prefix = "";
493 new_reason = talloc_asprintf(ctdb_db, "%s%s",
494 prefix, old_reason);
495 if (new_reason == NULL) {
496 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
497 prefix, old_reason));
498 return -1;
500 #undef _TMP_PREFIX
503 if (new_reason) {
504 val.dptr = discard_const_p(uint8_t, new_reason);
505 val.dsize = strlen(new_reason);
507 ret = tdb_store(tdb, key, val, TDB_REPLACE);
508 if (ret != 0) {
509 tdb_transaction_cancel(tdb);
510 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
511 tdb_name(tdb), ctdb_db->db_name, new_reason,
512 ret, tdb_errorstr(tdb)));
513 talloc_free(new_reason);
514 return -1;
516 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
517 ctdb_db->db_name, new_reason));
518 } else if (old_reason) {
519 ret = tdb_delete(tdb, key);
520 if (ret != 0) {
521 tdb_transaction_cancel(tdb);
522 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
523 tdb_name(tdb), ctdb_db->db_name,
524 ret, tdb_errorstr(tdb)));
525 talloc_free(new_reason);
526 return -1;
528 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
529 ctdb_db->db_name));
532 ret = tdb_transaction_commit(tdb);
533 if (ret != TDB_SUCCESS) {
534 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
535 tdb_name(tdb), ret, tdb_errorstr(tdb)));
536 talloc_free(new_reason);
537 return -1;
540 talloc_free(old_reason);
541 ctdb_db->unhealthy_reason = new_reason;
543 return 0;
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
547 struct ctdb_db_context *ctdb_db)
549 time_t now = time(NULL);
550 char *new_path;
551 char *new_reason;
552 int ret;
553 struct tm *tm;
555 tm = gmtime(&now);
557 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
559 "%04u%02u%02u%02u%02u%02u.0Z",
560 ctdb_db->db_path,
561 tm->tm_year+1900, tm->tm_mon+1,
562 tm->tm_mday, tm->tm_hour, tm->tm_min,
563 tm->tm_sec);
564 if (new_path == NULL) {
565 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
566 return -1;
569 new_reason = talloc_asprintf(ctdb_db,
570 "ERROR - Backup of corrupted TDB in '%s'",
571 new_path);
572 if (new_reason == NULL) {
573 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
574 return -1;
576 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
577 talloc_free(new_reason);
578 if (ret != 0) {
579 DEBUG(DEBUG_CRIT,(__location__
580 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
581 ctdb_db->db_path));
582 return -1;
585 ret = rename(ctdb_db->db_path, new_path);
586 if (ret != 0) {
587 DEBUG(DEBUG_CRIT,(__location__
588 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589 ctdb_db->db_path, new_path,
590 errno, strerror(errno)));
591 talloc_free(new_path);
592 return -1;
595 DEBUG(DEBUG_CRIT,(__location__
596 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597 ctdb_db->db_path, new_path));
598 talloc_free(new_path);
599 return 0;
602 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
604 struct ctdb_db_context *ctdb_db;
605 int ret;
606 int ok = 0;
607 int fail = 0;
609 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
610 if (!ctdb_db_persistent(ctdb_db)) {
611 continue;
614 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
615 if (ret != 0) {
616 DEBUG(DEBUG_ALERT,(__location__
617 " load persistent health for '%s' failed\n",
618 ctdb_db->db_path));
619 return -1;
622 if (ctdb_db->unhealthy_reason == NULL) {
623 ok++;
624 DEBUG(DEBUG_INFO,(__location__
625 " persistent db '%s' healthy\n",
626 ctdb_db->db_path));
627 continue;
630 fail++;
631 DEBUG(DEBUG_ALERT,(__location__
632 " persistent db '%s' unhealthy: %s\n",
633 ctdb_db->db_path,
634 ctdb_db->unhealthy_reason));
636 DEBUG(DEBUG_NOTICE,
637 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
638 ok, fail));
640 if (fail != 0) {
641 return -1;
644 return 0;
649 mark a database - as healthy
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
653 uint32_t db_id = *(uint32_t *)indata.dptr;
654 struct ctdb_db_context *ctdb_db;
655 int ret;
656 bool may_recover = false;
658 ctdb_db = find_ctdb_db(ctdb, db_id);
659 if (!ctdb_db) {
660 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
661 return -1;
664 if (ctdb_db->unhealthy_reason) {
665 may_recover = true;
668 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
669 if (ret != 0) {
670 DEBUG(DEBUG_ERR,(__location__
671 " ctdb_update_persistent_health(%s) failed\n",
672 ctdb_db->db_name));
673 return -1;
676 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
677 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
678 ctdb_db->db_name));
679 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
682 return 0;
685 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
686 TDB_DATA indata,
687 TDB_DATA *outdata)
689 uint32_t db_id = *(uint32_t *)indata.dptr;
690 struct ctdb_db_context *ctdb_db;
691 int ret;
693 ctdb_db = find_ctdb_db(ctdb, db_id);
694 if (!ctdb_db) {
695 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
696 return -1;
699 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
700 if (ret != 0) {
701 DEBUG(DEBUG_ERR,(__location__
702 " ctdb_load_persistent_health(%s) failed\n",
703 ctdb_db->db_name));
704 return -1;
707 *outdata = tdb_null;
708 if (ctdb_db->unhealthy_reason) {
709 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
710 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
713 return 0;
717 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
719 char *ropath;
721 if (ctdb_db_readonly(ctdb_db)) {
722 return 0;
725 if (! ctdb_db_volatile(ctdb_db)) {
726 DEBUG(DEBUG_ERR,
727 ("Non-volatile databases do not support readonly flag\n"));
728 return -1;
731 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
732 if (ropath == NULL) {
733 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
734 return -1;
736 ctdb_db->rottdb = tdb_open(ropath,
737 ctdb->tunable.database_hash_size,
738 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
739 O_CREAT|O_RDWR, 0600);
740 if (ctdb_db->rottdb == NULL) {
741 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
742 talloc_free(ropath);
743 return -1;
746 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
748 ctdb_db_set_readonly(ctdb_db);
750 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
752 talloc_free(ropath);
753 return 0;
757 attach to a database, handling both persistent and non-persistent databases
758 return 0 on success, -1 on failure
760 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
761 uint8_t db_flags, const char *unhealthy_reason)
763 struct ctdb_db_context *ctdb_db, *tmp_db;
764 int ret;
765 struct TDB_DATA key;
766 int tdb_flags;
767 int mode = 0600;
768 int remaining_tries = 0;
770 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
771 CTDB_NO_MEMORY(ctdb, ctdb_db);
773 ctdb_db->ctdb = ctdb;
774 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
775 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
777 key.dsize = strlen(db_name)+1;
778 key.dptr = discard_const(db_name);
779 ctdb_db->db_id = ctdb_hash(&key);
780 ctdb_db->db_flags = db_flags;
782 if (ctdb_db_volatile(ctdb_db)) {
783 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
784 if (ctdb_db->delete_queue == NULL) {
785 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
788 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
791 /* check for hash collisions */
792 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
793 if (tmp_db->db_id == ctdb_db->db_id) {
794 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
795 tmp_db->db_id, db_name, tmp_db->db_name));
796 talloc_free(ctdb_db);
797 return -1;
801 if (ctdb_db_persistent(ctdb_db)) {
802 if (unhealthy_reason) {
803 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
804 unhealthy_reason, 0);
805 if (ret != 0) {
806 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
807 ctdb_db->db_name, unhealthy_reason, ret));
808 talloc_free(ctdb_db);
809 return -1;
813 if (ctdb->max_persistent_check_errors > 0) {
814 remaining_tries = 1;
816 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
817 remaining_tries = 0;
820 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
821 if (ret != 0) {
822 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
823 ctdb_db->db_name, ret));
824 talloc_free(ctdb_db);
825 return -1;
829 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
830 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
831 ctdb_db->db_name, ctdb_db->unhealthy_reason));
832 talloc_free(ctdb_db);
833 return -1;
836 if (ctdb_db->unhealthy_reason) {
837 /* this is just a warning, but we want that in the log file! */
838 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
839 ctdb_db->db_name, ctdb_db->unhealthy_reason));
842 /* open the database */
843 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
844 ctdb_db_persistent(ctdb_db) ?
845 ctdb->db_directory_persistent :
846 ctdb->db_directory,
847 db_name, ctdb->pnn);
849 tdb_flags = ctdb_db_tdb_flags(db_flags, ctdb->valgrinding,
850 ctdb->tunable.mutex_enabled);
852 again:
853 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
854 ctdb->tunable.database_hash_size,
855 tdb_flags,
856 O_CREAT|O_RDWR, mode);
857 if (ctdb_db->ltdb == NULL) {
858 struct stat st;
859 int saved_errno = errno;
861 if (! ctdb_db_persistent(ctdb_db)) {
862 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
863 ctdb_db->db_path,
864 saved_errno,
865 strerror(saved_errno)));
866 talloc_free(ctdb_db);
867 return -1;
870 if (remaining_tries == 0) {
871 DEBUG(DEBUG_CRIT,(__location__
872 "Failed to open persistent tdb '%s': %d - %s\n",
873 ctdb_db->db_path,
874 saved_errno,
875 strerror(saved_errno)));
876 talloc_free(ctdb_db);
877 return -1;
880 ret = stat(ctdb_db->db_path, &st);
881 if (ret != 0) {
882 DEBUG(DEBUG_CRIT,(__location__
883 "Failed to open persistent tdb '%s': %d - %s\n",
884 ctdb_db->db_path,
885 saved_errno,
886 strerror(saved_errno)));
887 talloc_free(ctdb_db);
888 return -1;
891 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
892 if (ret != 0) {
893 DEBUG(DEBUG_CRIT,(__location__
894 "Failed to open persistent tdb '%s': %d - %s\n",
895 ctdb_db->db_path,
896 saved_errno,
897 strerror(saved_errno)));
898 talloc_free(ctdb_db);
899 return -1;
902 remaining_tries--;
903 mode = st.st_mode;
904 goto again;
907 if (!ctdb_db_persistent(ctdb_db)) {
908 ctdb_check_db_empty(ctdb_db);
909 } else {
910 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
911 if (ret != 0) {
912 int fd;
913 struct stat st;
915 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
916 ctdb_db->db_path, ret,
917 tdb_errorstr(ctdb_db->ltdb->tdb)));
918 if (remaining_tries == 0) {
919 talloc_free(ctdb_db);
920 return -1;
923 fd = tdb_fd(ctdb_db->ltdb->tdb);
924 ret = fstat(fd, &st);
925 if (ret != 0) {
926 DEBUG(DEBUG_CRIT,(__location__
927 "Failed to fstat() persistent tdb '%s': %d - %s\n",
928 ctdb_db->db_path,
929 errno,
930 strerror(errno)));
931 talloc_free(ctdb_db);
932 return -1;
935 /* close the TDB */
936 talloc_free(ctdb_db->ltdb);
937 ctdb_db->ltdb = NULL;
939 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
940 if (ret != 0) {
941 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
942 ctdb_db->db_path));
943 talloc_free(ctdb_db);
944 return -1;
947 remaining_tries--;
948 mode = st.st_mode;
949 goto again;
953 /* remember the flags the client has specified */
954 tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
957 /* set up a rb tree we can use to track which records we have a
958 fetch-lock in-flight for so we can defer any additional calls
959 for the same record.
961 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
962 if (ctdb_db->deferred_fetch == NULL) {
963 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
964 talloc_free(ctdb_db);
965 return -1;
968 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
969 if (ctdb_db->defer_dmaster == NULL) {
970 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
971 ctdb_db->db_name));
972 talloc_free(ctdb_db);
973 return -1;
976 DLIST_ADD(ctdb->db_list, ctdb_db);
978 /* setting this can help some high churn databases */
979 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
982 all databases support the "null" function. we need this in
983 order to do forced migration of records
985 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
986 if (ret != 0) {
987 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
988 talloc_free(ctdb_db);
989 return -1;
993 all databases support the "fetch" function. we need this
994 for efficient Samba3 ctdb fetch
996 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
997 if (ret != 0) {
998 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
999 talloc_free(ctdb_db);
1000 return -1;
1004 all databases support the "fetch_with_header" function. we need this
1005 for efficient readonly record fetches
1007 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1008 if (ret != 0) {
1009 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1010 talloc_free(ctdb_db);
1011 return -1;
1014 ret = ctdb_vacuum_init(ctdb_db);
1015 if (ret != 0) {
1016 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1017 "database '%s'\n", ctdb_db->db_name));
1018 talloc_free(ctdb_db);
1019 return -1;
1022 ret = ctdb_migration_init(ctdb_db);
1023 if (ret != 0) {
1024 DEBUG(DEBUG_ERR,
1025 ("Failed to setup migration tracking for db '%s'\n",
1026 ctdb_db->db_name));
1027 talloc_free(ctdb_db);
1028 return -1;
1031 ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1032 &ctdb_db->lock_log);
1033 if (ret != 0) {
1034 DEBUG(DEBUG_ERR,
1035 ("Failed to setup lock logging for db '%s'\n",
1036 ctdb_db->db_name));
1037 talloc_free(ctdb_db);
1038 return -1;
1041 ctdb_db->generation = ctdb->vnn_map->generation;
1043 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1044 ctdb_db->db_path, tdb_flags));
1046 /* success */
1047 return 0;
1051 struct ctdb_deferred_attach_context {
1052 struct ctdb_deferred_attach_context *next, *prev;
1053 struct ctdb_context *ctdb;
1054 struct ctdb_req_control_old *c;
1058 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1060 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1062 return 0;
1065 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1066 struct tevent_timer *te,
1067 struct timeval t, void *private_data)
1069 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1070 struct ctdb_context *ctdb = da_ctx->ctdb;
1072 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1073 talloc_free(da_ctx);
1076 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1077 struct tevent_timer *te,
1078 struct timeval t, void *private_data)
1080 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1081 struct ctdb_context *ctdb = da_ctx->ctdb;
1083 /* This talloc-steals the packet ->c */
1084 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1085 talloc_free(da_ctx);
1088 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1090 struct ctdb_deferred_attach_context *da_ctx;
1092 /* call it from the main event loop as soon as the current event
1093 finishes.
1095 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1096 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1097 tevent_add_timer(ctdb->ev, da_ctx,
1098 timeval_current_ofs(1,0),
1099 ctdb_deferred_attach_callback, da_ctx);
1102 return 0;
1106 a client has asked to attach a new database
1108 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1109 TDB_DATA *outdata,
1110 uint8_t db_flags, uint32_t client_id,
1111 struct ctdb_req_control_old *c,
1112 bool *async_reply)
1114 const char *db_name = (const char *)indata.dptr;
1115 struct ctdb_db_context *db;
1116 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1117 struct ctdb_client *client = NULL;
1118 uint32_t opcode;
1120 if (ctdb->tunable.allow_client_db_attach == 0) {
1121 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1122 "AllowClientDBAccess == 0\n", db_name));
1123 return -1;
1126 /* don't allow any local clients to attach while we are in recovery mode
1127 * except for the recovery daemon.
1128 * allow all attach from the network since these are always from remote
1129 * recovery daemons.
1131 if (client_id != 0) {
1132 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1134 if (client != NULL) {
1135 /* If the node is inactive it is not part of the cluster
1136 and we should not allow clients to attach to any
1137 databases
1139 if (node->flags & NODE_FLAGS_INACTIVE) {
1140 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1141 return -1;
1144 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1145 client->pid != ctdb->recoverd_pid &&
1146 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1147 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1149 if (da_ctx == NULL) {
1150 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1151 return -1;
1154 da_ctx->ctdb = ctdb;
1155 da_ctx->c = talloc_steal(da_ctx, c);
1156 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1157 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1159 tevent_add_timer(ctdb->ev, da_ctx,
1160 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1161 ctdb_deferred_attach_timeout, da_ctx);
1163 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1164 *async_reply = true;
1165 return 0;
1169 /* see if we already have this name */
1170 db = ctdb_db_handle(ctdb, db_name);
1171 if (db) {
1172 if ((db->db_flags & db_flags) != db_flags) {
1173 DEBUG(DEBUG_ERR,
1174 ("Error: Failed to re-attach with 0x%x flags,"
1175 " database has 0x%x flags\n", db_flags,
1176 db->db_flags));
1177 return -1;
1179 outdata->dptr = (uint8_t *)&db->db_id;
1180 outdata->dsize = sizeof(db->db_id);
1181 return 0;
1184 if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1185 return -1;
1188 db = ctdb_db_handle(ctdb, db_name);
1189 if (!db) {
1190 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1191 return -1;
1194 outdata->dptr = (uint8_t *)&db->db_id;
1195 outdata->dsize = sizeof(db->db_id);
1197 /* Try to ensure it's locked in mem */
1198 lockdown_memory(ctdb->valgrinding);
1200 if (ctdb_db_persistent(db)) {
1201 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1202 } else if (ctdb_db_replicated(db)) {
1203 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1204 } else {
1205 opcode = CTDB_CONTROL_DB_ATTACH;
1208 /* tell all the other nodes about this database */
1209 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
1210 0, CTDB_CTRL_FLAG_NOREPLY,
1211 indata, NULL, NULL);
1213 /* success */
1214 return 0;
1218 * a client has asked to detach from a database
1220 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1221 uint32_t client_id)
1223 uint32_t db_id;
1224 struct ctdb_db_context *ctdb_db;
1225 struct ctdb_client *client = NULL;
1227 db_id = *(uint32_t *)indata.dptr;
1228 ctdb_db = find_ctdb_db(ctdb, db_id);
1229 if (ctdb_db == NULL) {
1230 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1231 db_id));
1232 return -1;
1235 if (ctdb->tunable.allow_client_db_attach == 1) {
1236 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1237 "Clients are allowed access to databases "
1238 "(AllowClientDBAccess == 1)\n",
1239 ctdb_db->db_name));
1240 return -1;
1243 if (! ctdb_db_volatile(ctdb_db)) {
1244 DEBUG(DEBUG_ERR,
1245 ("Detaching non-volatile database %s denied\n",
1246 ctdb_db->db_name));
1247 return -1;
1250 /* Cannot detach from database when in recovery */
1251 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1252 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1253 return -1;
1256 /* If a control comes from a client, then broadcast it to all nodes.
1257 * Do the actual detach only if the control comes from other daemons.
1259 if (client_id != 0) {
1260 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1261 if (client != NULL) {
1262 /* forward the control to all the nodes */
1263 ctdb_daemon_send_control(ctdb,
1264 CTDB_BROADCAST_CONNECTED, 0,
1265 CTDB_CONTROL_DB_DETACH, 0,
1266 CTDB_CTRL_FLAG_NOREPLY,
1267 indata, NULL, NULL);
1268 return 0;
1270 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1271 "for database '%s'\n", ctdb_db->db_name));
1272 return -1;
1275 /* Detach database from recoverd */
1276 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1277 CTDB_SRVID_DETACH_DATABASE,
1278 indata) != 0) {
1279 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1280 return -1;
1283 /* Disable vacuuming and drop all vacuuming data */
1284 talloc_free(ctdb_db->vacuum_handle);
1285 talloc_free(ctdb_db->delete_queue);
1287 /* Terminate any deferred fetch */
1288 talloc_free(ctdb_db->deferred_fetch);
1290 /* Terminate any traverses */
1291 while (ctdb_db->traverse) {
1292 talloc_free(ctdb_db->traverse);
1295 /* Terminate any revokes */
1296 while (ctdb_db->revokechild_active) {
1297 talloc_free(ctdb_db->revokechild_active);
1300 /* Free readonly tracking database */
1301 if (ctdb_db_readonly(ctdb_db)) {
1302 talloc_free(ctdb_db->rottdb);
1305 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1307 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1308 ctdb_db->db_name));
1309 talloc_free(ctdb_db);
1311 return 0;
1315 attach to all existing persistent databases
1317 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1318 const char *unhealthy_reason)
1320 DIR *d;
1321 struct dirent *de;
1323 /* open the persistent db directory and scan it for files */
1324 d = opendir(ctdb->db_directory_persistent);
1325 if (d == NULL) {
1326 return 0;
1329 while ((de=readdir(d))) {
1330 char *p, *s, *q;
1331 size_t len = strlen(de->d_name);
1332 uint32_t node;
1333 int invalid_name = 0;
1335 s = talloc_strdup(ctdb, de->d_name);
1336 if (s == NULL) {
1337 closedir(d);
1338 CTDB_NO_MEMORY(ctdb, s);
1341 /* only accept names ending in .tdb */
1342 p = strstr(s, ".tdb.");
1343 if (len < 7 || p == NULL) {
1344 talloc_free(s);
1345 continue;
1348 /* only accept names ending with .tdb. and any number of digits */
1349 q = p+5;
1350 while (*q != 0 && invalid_name == 0) {
1351 if (!isdigit(*q++)) {
1352 invalid_name = 1;
1355 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1356 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1357 talloc_free(s);
1358 continue;
1360 p[4] = 0;
1362 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1363 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1364 closedir(d);
1365 talloc_free(s);
1366 return -1;
1369 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1371 talloc_free(s);
1373 closedir(d);
1374 return 0;
1377 int ctdb_attach_databases(struct ctdb_context *ctdb)
1379 int ret;
1380 char *persistent_health_path = NULL;
1381 char *unhealthy_reason = NULL;
1382 bool first_try = true;
1384 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1385 ctdb->db_directory_state,
1386 PERSISTENT_HEALTH_TDB,
1387 ctdb->pnn);
1388 if (persistent_health_path == NULL) {
1389 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1390 return -1;
1393 again:
1395 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1396 0, TDB_DISALLOW_NESTING,
1397 O_CREAT | O_RDWR, 0600);
1398 if (ctdb->db_persistent_health == NULL) {
1399 struct tdb_wrap *tdb;
1401 if (!first_try) {
1402 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1403 persistent_health_path,
1404 errno,
1405 strerror(errno)));
1406 talloc_free(persistent_health_path);
1407 talloc_free(unhealthy_reason);
1408 return -1;
1410 first_try = false;
1412 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1413 persistent_health_path,
1414 "was cleared after a failure",
1415 "manual verification needed");
1416 if (unhealthy_reason == NULL) {
1417 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1418 talloc_free(persistent_health_path);
1419 return -1;
1422 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1423 persistent_health_path));
1424 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1425 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1426 O_CREAT | O_RDWR, 0600);
1427 if (tdb) {
1428 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1429 persistent_health_path,
1430 errno,
1431 strerror(errno)));
1432 talloc_free(persistent_health_path);
1433 talloc_free(unhealthy_reason);
1434 return -1;
1437 talloc_free(tdb);
1438 goto again;
1440 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1441 if (ret != 0) {
1442 struct tdb_wrap *tdb;
1444 talloc_free(ctdb->db_persistent_health);
1445 ctdb->db_persistent_health = NULL;
1447 if (!first_try) {
1448 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1449 persistent_health_path));
1450 talloc_free(persistent_health_path);
1451 talloc_free(unhealthy_reason);
1452 return -1;
1454 first_try = false;
1456 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1457 persistent_health_path,
1458 "was cleared after a failure",
1459 "manual verification needed");
1460 if (unhealthy_reason == NULL) {
1461 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1462 talloc_free(persistent_health_path);
1463 return -1;
1466 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1467 persistent_health_path));
1468 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1469 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1470 O_CREAT | O_RDWR, 0600);
1471 if (tdb) {
1472 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1473 persistent_health_path,
1474 errno,
1475 strerror(errno)));
1476 talloc_free(persistent_health_path);
1477 talloc_free(unhealthy_reason);
1478 return -1;
1481 talloc_free(tdb);
1482 goto again;
1484 talloc_free(persistent_health_path);
1486 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1487 talloc_free(unhealthy_reason);
1488 if (ret != 0) {
1489 return ret;
1492 return 0;
1496 called when a broadcast seqnum update comes in
1498 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1500 struct ctdb_db_context *ctdb_db;
1501 if (srcnode == ctdb->pnn) {
1502 /* don't update ourselves! */
1503 return 0;
1506 ctdb_db = find_ctdb_db(ctdb, db_id);
1507 if (!ctdb_db) {
1508 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1509 return -1;
1512 if (ctdb_db->unhealthy_reason) {
1513 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1514 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1515 return -1;
1518 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1519 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1520 return 0;
1524 timer to check for seqnum changes in a ltdb and propagate them
1526 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1527 struct tevent_timer *te,
1528 struct timeval t, void *p)
1530 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1531 struct ctdb_context *ctdb = ctdb_db->ctdb;
1532 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1533 if (new_seqnum != ctdb_db->seqnum) {
1534 /* something has changed - propagate it */
1535 TDB_DATA data;
1536 data.dptr = (uint8_t *)&ctdb_db->db_id;
1537 data.dsize = sizeof(uint32_t);
1538 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1539 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1540 data, NULL, NULL);
1542 ctdb_db->seqnum = new_seqnum;
1544 /* setup a new timer */
1545 ctdb_db->seqnum_update =
1546 tevent_add_timer(ctdb->ev, ctdb_db,
1547 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1548 (ctdb->tunable.seqnum_interval%1000)*1000),
1549 ctdb_ltdb_seqnum_check, ctdb_db);
1553 enable seqnum handling on this db
1555 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1557 struct ctdb_db_context *ctdb_db;
1558 ctdb_db = find_ctdb_db(ctdb, db_id);
1559 if (!ctdb_db) {
1560 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1561 return -1;
1564 if (ctdb_db->seqnum_update == NULL) {
1565 ctdb_db->seqnum_update = tevent_add_timer(
1566 ctdb->ev, ctdb_db,
1567 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1568 (ctdb->tunable.seqnum_interval%1000)*1000),
1569 ctdb_ltdb_seqnum_check, ctdb_db);
1572 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1573 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1574 return 0;
1577 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1579 if (ctdb_db_sticky(ctdb_db)) {
1580 return 0;
1583 if (! ctdb_db_volatile(ctdb_db)) {
1584 DEBUG(DEBUG_ERR,
1585 ("Non-volatile databases do not support sticky flag\n"));
1586 return -1;
1589 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1591 ctdb_db_set_sticky(ctdb_db);
1593 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1595 return 0;
1598 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1600 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1601 int i;
1603 for (i=0; i<MAX_HOT_KEYS; i++) {
1604 if (s->hot_keys[i].key.dsize > 0) {
1605 talloc_free(s->hot_keys[i].key.dptr);
1609 ZERO_STRUCT(ctdb_db->statistics);
1612 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1613 uint32_t db_id,
1614 TDB_DATA *outdata)
1616 struct ctdb_db_context *ctdb_db;
1617 struct ctdb_db_statistics_old *stats;
1618 int i;
1619 int len;
1620 char *ptr;
1622 ctdb_db = find_ctdb_db(ctdb, db_id);
1623 if (!ctdb_db) {
1624 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1625 return -1;
1628 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1629 for (i = 0; i < MAX_HOT_KEYS; i++) {
1630 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1633 stats = talloc_size(outdata, len);
1634 if (stats == NULL) {
1635 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1636 return -1;
1639 memcpy(stats, &ctdb_db->statistics,
1640 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1642 stats->num_hot_keys = MAX_HOT_KEYS;
1644 ptr = &stats->hot_keys_wire[0];
1645 for (i = 0; i < MAX_HOT_KEYS; i++) {
1646 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1647 ctdb_db->statistics.hot_keys[i].key.dsize);
1648 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1651 outdata->dptr = (uint8_t *)stats;
1652 outdata->dsize = len;
1654 return 0;