ldap_server: Move a variable into a smaller scope
[Samba.git] / ctdb / server / ctdb_ltdb_server.c
blob8ff963419f15197fd0236d6bedad26fc40098b23
1 /*
2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
27 #include <talloc.h>
28 #include <tevent.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
46 /**
47 * write a record to a normal database
49 * This is the server-variant of the ctdb_ltdb_store function.
50 * It contains logic to determine whether a record should be
51 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52 * controls to the local ctdb daemon if apporpriate.
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55 TDB_DATA key,
56 struct ctdb_ltdb_header *header,
57 TDB_DATA data)
59 struct ctdb_context *ctdb = ctdb_db->ctdb;
60 TDB_DATA rec[2];
61 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
62 int ret;
63 bool seqnum_suppressed = false;
64 bool keep = false;
65 bool schedule_for_deletion = false;
66 bool remove_from_delete_queue = false;
67 uint32_t lmaster;
69 if (ctdb->flags & CTDB_FLAG_TORTURE) {
70 TDB_DATA old;
71 struct ctdb_ltdb_header *h2;
73 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74 h2 = (struct ctdb_ltdb_header *)old.dptr;
75 if (old.dptr != NULL &&
76 old.dsize >= hsize &&
77 h2->rsn > header->rsn) {
78 DEBUG(DEBUG_ERR,
79 ("RSN regression! %"PRIu64" %"PRIu64"\n",
80 h2->rsn, header->rsn));
82 if (old.dptr) {
83 free(old.dptr);
87 if (ctdb->vnn_map == NULL) {
89 * Called from a client: always store the record
90 * Also don't call ctdb_lmaster since it uses the vnn_map!
92 keep = true;
93 goto store;
96 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
99 * If we migrate an empty record off to another node
100 * and the record has not been migrated with data,
101 * delete the record instead of storing the empty record.
103 if (data.dsize != 0) {
104 keep = true;
105 } else if (header->flags & CTDB_REC_RO_FLAGS) {
106 keep = true;
107 } else if (ctdb_db->persistent) {
108 keep = true;
109 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
111 * The record is not created by the client but
112 * automatically by the ctdb_ltdb_fetch logic that
113 * creates a record with an initial header in the
114 * ltdb before trying to migrate the record from
115 * the current lmaster. Keep it instead of trying
116 * to delete the non-existing record...
118 keep = true;
119 schedule_for_deletion = true;
120 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
121 keep = true;
122 } else if (ctdb_db->ctdb->pnn == lmaster) {
124 * If we are lmaster, then we usually keep the record.
125 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
126 * and the record is empty and has never been migrated
127 * with data, then we should delete it instead of storing it.
128 * This is part of the vacuuming process.
130 * The reason that we usually need to store even empty records
131 * on the lmaster is that a client operating directly on the
132 * lmaster (== dmaster) expects the local copy of the record to
133 * exist after successful ctdb migrate call. If the record does
134 * not exist, the client goes into a migrate loop and eventually
135 * fails. So storing the empty record makes sure that we do not
136 * need to change the client code.
138 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
139 keep = true;
140 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
141 keep = true;
143 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
144 keep = true;
147 if (keep) {
148 if (!ctdb_db->persistent &&
149 (ctdb_db->ctdb->pnn == header->dmaster) &&
150 !(header->flags & CTDB_REC_RO_FLAGS))
152 header->rsn++;
154 if (data.dsize == 0) {
155 schedule_for_deletion = true;
158 remove_from_delete_queue = !schedule_for_deletion;
161 store:
163 * The VACUUM_MIGRATED flag is only set temporarily for
164 * the above logic when the record was retrieved by a
165 * VACUUM_MIGRATE call and should not be stored in the
166 * database.
168 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
169 * and there are two cases in which the corresponding record
170 * is stored in the local database:
171 * 1. The record has been migrated with data in the past
172 * (the MIGRATED_WITH_DATA record flag is set).
173 * 2. The record has been filled with data again since it
174 * had been submitted in the VACUUM_FETCH message to the
175 * lmaster.
176 * For such records it is important to not store the
177 * VACUUM_MIGRATED flag in the database.
179 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
182 * Similarly, clear the AUTOMATIC flag which should not enter
183 * the local database copy since this would require client
184 * modifications to clear the flag when the client stores
185 * the record.
187 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
189 rec[0].dsize = hsize;
190 rec[0].dptr = (uint8_t *)header;
192 rec[1].dsize = data.dsize;
193 rec[1].dptr = data.dptr;
195 /* Databases with seqnum updates enabled only get their seqnum
196 changes when/if we modify the data */
197 if (ctdb_db->seqnum_update != NULL) {
198 TDB_DATA old;
199 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
201 if ((old.dsize == hsize + data.dsize) &&
202 memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
203 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
204 seqnum_suppressed = true;
206 if (old.dptr != NULL) {
207 free(old.dptr);
211 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
212 ctdb_db->db_name,
213 keep?"storing":"deleting",
214 ctdb_hash(&key)));
216 if (keep) {
217 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
218 } else {
219 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
222 if (ret != 0) {
223 int lvl = DEBUG_ERR;
225 if (keep == false &&
226 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
228 lvl = DEBUG_DEBUG;
231 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
232 "%d - %s\n",
233 ctdb_db->db_name,
234 keep?"store":"delete", ret,
235 tdb_errorstr(ctdb_db->ltdb->tdb)));
237 schedule_for_deletion = false;
238 remove_from_delete_queue = false;
240 if (seqnum_suppressed) {
241 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
244 if (schedule_for_deletion) {
245 int ret2;
246 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
247 if (ret2 != 0) {
248 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
252 if (remove_from_delete_queue) {
253 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
256 return ret;
259 struct lock_fetch_state {
260 struct ctdb_context *ctdb;
261 struct ctdb_db_context *ctdb_db;
262 void (*recv_pkt)(void *, struct ctdb_req_header *);
263 void *recv_context;
264 struct ctdb_req_header *hdr;
265 uint32_t generation;
266 bool ignore_generation;
270 called when we should retry the operation
272 static void lock_fetch_callback(void *p, bool locked)
274 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
275 if (!state->ignore_generation &&
276 state->generation != state->ctdb_db->generation) {
277 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
278 talloc_free(state->hdr);
279 return;
281 state->recv_pkt(state->recv_context, state->hdr);
282 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
287 do a non-blocking ltdb_lock, deferring this ctdb request until we
288 have the chainlock
290 It does the following:
292 1) tries to get the chainlock. If it succeeds, then it returns 0
294 2) if it fails to get a chainlock immediately then it sets up a
295 non-blocking chainlock via ctdb_lock_record, and when it gets the
296 chainlock it re-submits this ctdb request to the main packet
297 receive function.
299 This effectively queues all ctdb requests that cannot be
300 immediately satisfied until it can get the lock. This means that
301 the main ctdb daemon will not block waiting for a chainlock held by
302 a client
304 There are 3 possible return values:
306 0: means that it got the lock immediately.
307 -1: means that it failed to get the lock, and won't retry
308 -2: means that it failed to get the lock immediately, but will retry
310 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
311 TDB_DATA key, struct ctdb_req_header *hdr,
312 void (*recv_pkt)(void *, struct ctdb_req_header *),
313 void *recv_context, bool ignore_generation)
315 int ret;
316 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
317 struct lock_request *lreq;
318 struct lock_fetch_state *state;
320 ret = tdb_chainlock_nonblock(tdb, key);
322 if (ret != 0 &&
323 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
324 /* a hard failure - don't try again */
325 return -1;
328 /* when torturing, ensure we test the contended path */
329 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
330 random() % 5 == 0) {
331 ret = -1;
332 tdb_chainunlock(tdb, key);
335 /* first the non-contended path */
336 if (ret == 0) {
337 return 0;
340 state = talloc(hdr, struct lock_fetch_state);
341 state->ctdb = ctdb_db->ctdb;
342 state->ctdb_db = ctdb_db;
343 state->hdr = hdr;
344 state->recv_pkt = recv_pkt;
345 state->recv_context = recv_context;
346 state->generation = ctdb_db->generation;
347 state->ignore_generation = ignore_generation;
349 /* now the contended path */
350 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
351 if (lreq == NULL) {
352 return -1;
355 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
356 so it won't be freed yet */
357 talloc_steal(state, hdr);
359 /* now tell the caller than we will retry asynchronously */
360 return -2;
364 a varient of ctdb_ltdb_lock_requeue that also fetches the record
366 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
367 TDB_DATA key, struct ctdb_ltdb_header *header,
368 struct ctdb_req_header *hdr, TDB_DATA *data,
369 void (*recv_pkt)(void *, struct ctdb_req_header *),
370 void *recv_context, bool ignore_generation)
372 int ret;
374 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
375 recv_context, ignore_generation);
376 if (ret == 0) {
377 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
378 if (ret != 0) {
379 int uret;
380 uret = ctdb_ltdb_unlock(ctdb_db, key);
381 if (uret != 0) {
382 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
386 return ret;
391 paraoid check to see if the db is empty
393 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
395 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
396 int count = tdb_traverse_read(tdb, NULL, NULL);
397 if (count != 0) {
398 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
399 ctdb_db->db_path));
400 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
404 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
405 struct ctdb_db_context *ctdb_db)
407 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
408 char *old;
409 char *reason = NULL;
410 TDB_DATA key;
411 TDB_DATA val;
413 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
414 key.dsize = strlen(ctdb_db->db_name);
416 old = ctdb_db->unhealthy_reason;
417 ctdb_db->unhealthy_reason = NULL;
419 val = tdb_fetch(tdb, key);
420 if (val.dsize > 0) {
421 reason = talloc_strndup(ctdb_db,
422 (const char *)val.dptr,
423 val.dsize);
424 if (reason == NULL) {
425 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
426 (int)val.dsize));
427 ctdb_db->unhealthy_reason = old;
428 free(val.dptr);
429 return -1;
433 if (val.dptr) {
434 free(val.dptr);
437 talloc_free(old);
438 ctdb_db->unhealthy_reason = reason;
439 return 0;
442 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
443 struct ctdb_db_context *ctdb_db,
444 const char *given_reason,/* NULL means healthy */
445 int num_healthy_nodes)
447 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
448 int ret;
449 TDB_DATA key;
450 TDB_DATA val;
451 char *new_reason = NULL;
452 char *old_reason = NULL;
454 ret = tdb_transaction_start(tdb);
455 if (ret != 0) {
456 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
457 tdb_name(tdb), ret, tdb_errorstr(tdb)));
458 return -1;
461 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
462 if (ret != 0) {
463 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
464 ctdb_db->db_name, ret));
465 return -1;
467 old_reason = ctdb_db->unhealthy_reason;
469 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
470 key.dsize = strlen(ctdb_db->db_name);
472 if (given_reason) {
473 new_reason = talloc_strdup(ctdb_db, given_reason);
474 if (new_reason == NULL) {
475 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
476 given_reason));
477 return -1;
479 } else if (old_reason && num_healthy_nodes == 0) {
481 * If the reason indicates ok, but there where no healthy nodes
482 * available, that it means, we have not recovered valid content
483 * of the db. So if there's an old reason, prefix it with
484 * "NO-HEALTHY-NODES - "
486 const char *prefix;
488 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
489 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
490 if (ret != 0) {
491 prefix = _TMP_PREFIX;
492 } else {
493 prefix = "";
495 new_reason = talloc_asprintf(ctdb_db, "%s%s",
496 prefix, old_reason);
497 if (new_reason == NULL) {
498 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
499 prefix, old_reason));
500 return -1;
502 #undef _TMP_PREFIX
505 if (new_reason) {
506 val.dptr = discard_const_p(uint8_t, new_reason);
507 val.dsize = strlen(new_reason);
509 ret = tdb_store(tdb, key, val, TDB_REPLACE);
510 if (ret != 0) {
511 tdb_transaction_cancel(tdb);
512 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
513 tdb_name(tdb), ctdb_db->db_name, new_reason,
514 ret, tdb_errorstr(tdb)));
515 talloc_free(new_reason);
516 return -1;
518 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
519 ctdb_db->db_name, new_reason));
520 } else if (old_reason) {
521 ret = tdb_delete(tdb, key);
522 if (ret != 0) {
523 tdb_transaction_cancel(tdb);
524 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
525 tdb_name(tdb), ctdb_db->db_name,
526 ret, tdb_errorstr(tdb)));
527 talloc_free(new_reason);
528 return -1;
530 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
531 ctdb_db->db_name));
534 ret = tdb_transaction_commit(tdb);
535 if (ret != TDB_SUCCESS) {
536 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
537 tdb_name(tdb), ret, tdb_errorstr(tdb)));
538 talloc_free(new_reason);
539 return -1;
542 talloc_free(old_reason);
543 ctdb_db->unhealthy_reason = new_reason;
545 return 0;
548 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
549 struct ctdb_db_context *ctdb_db)
551 time_t now = time(NULL);
552 char *new_path;
553 char *new_reason;
554 int ret;
555 struct tm *tm;
557 tm = gmtime(&now);
559 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
560 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
561 "%04u%02u%02u%02u%02u%02u.0Z",
562 ctdb_db->db_path,
563 tm->tm_year+1900, tm->tm_mon+1,
564 tm->tm_mday, tm->tm_hour, tm->tm_min,
565 tm->tm_sec);
566 if (new_path == NULL) {
567 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
568 return -1;
571 new_reason = talloc_asprintf(ctdb_db,
572 "ERROR - Backup of corrupted TDB in '%s'",
573 new_path);
574 if (new_reason == NULL) {
575 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576 return -1;
578 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
579 talloc_free(new_reason);
580 if (ret != 0) {
581 DEBUG(DEBUG_CRIT,(__location__
582 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
583 ctdb_db->db_path));
584 return -1;
587 ret = rename(ctdb_db->db_path, new_path);
588 if (ret != 0) {
589 DEBUG(DEBUG_CRIT,(__location__
590 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
591 ctdb_db->db_path, new_path,
592 errno, strerror(errno)));
593 talloc_free(new_path);
594 return -1;
597 DEBUG(DEBUG_CRIT,(__location__
598 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
599 ctdb_db->db_path, new_path));
600 talloc_free(new_path);
601 return 0;
604 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
606 struct ctdb_db_context *ctdb_db;
607 int ret;
608 int ok = 0;
609 int fail = 0;
611 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
612 if (!ctdb_db->persistent) {
613 continue;
616 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
617 if (ret != 0) {
618 DEBUG(DEBUG_ALERT,(__location__
619 " load persistent health for '%s' failed\n",
620 ctdb_db->db_path));
621 return -1;
624 if (ctdb_db->unhealthy_reason == NULL) {
625 ok++;
626 DEBUG(DEBUG_INFO,(__location__
627 " persistent db '%s' healthy\n",
628 ctdb_db->db_path));
629 continue;
632 fail++;
633 DEBUG(DEBUG_ALERT,(__location__
634 " persistent db '%s' unhealthy: %s\n",
635 ctdb_db->db_path,
636 ctdb_db->unhealthy_reason));
638 DEBUG(DEBUG_NOTICE,
639 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
640 ok, fail));
642 if (fail != 0) {
643 return -1;
646 return 0;
651 mark a database - as healthy
653 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
655 uint32_t db_id = *(uint32_t *)indata.dptr;
656 struct ctdb_db_context *ctdb_db;
657 int ret;
658 bool may_recover = false;
660 ctdb_db = find_ctdb_db(ctdb, db_id);
661 if (!ctdb_db) {
662 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
663 return -1;
666 if (ctdb_db->unhealthy_reason) {
667 may_recover = true;
670 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
671 if (ret != 0) {
672 DEBUG(DEBUG_ERR,(__location__
673 " ctdb_update_persistent_health(%s) failed\n",
674 ctdb_db->db_name));
675 return -1;
678 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
679 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
680 ctdb_db->db_name));
681 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
684 return 0;
687 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
688 TDB_DATA indata,
689 TDB_DATA *outdata)
691 uint32_t db_id = *(uint32_t *)indata.dptr;
692 struct ctdb_db_context *ctdb_db;
693 int ret;
695 ctdb_db = find_ctdb_db(ctdb, db_id);
696 if (!ctdb_db) {
697 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
698 return -1;
701 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
702 if (ret != 0) {
703 DEBUG(DEBUG_ERR,(__location__
704 " ctdb_load_persistent_health(%s) failed\n",
705 ctdb_db->db_name));
706 return -1;
709 *outdata = tdb_null;
710 if (ctdb_db->unhealthy_reason) {
711 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
712 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
715 return 0;
719 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
721 char *ropath;
723 if (ctdb_db->readonly) {
724 return 0;
727 if (ctdb_db->persistent) {
728 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
729 return -1;
732 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
733 if (ropath == NULL) {
734 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
735 return -1;
737 ctdb_db->rottdb = tdb_open(ropath,
738 ctdb->tunable.database_hash_size,
739 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
740 O_CREAT|O_RDWR, 0600);
741 if (ctdb_db->rottdb == NULL) {
742 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
743 talloc_free(ropath);
744 return -1;
747 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
749 ctdb_db->readonly = true;
751 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
753 talloc_free(ropath);
754 return 0;
758 attach to a database, handling both persistent and non-persistent databases
759 return 0 on success, -1 on failure
761 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
762 bool persistent, const char *unhealthy_reason,
763 bool jenkinshash, bool mutexes)
765 struct ctdb_db_context *ctdb_db, *tmp_db;
766 int ret;
767 struct TDB_DATA key;
768 unsigned tdb_flags;
769 int mode = 0600;
770 int remaining_tries = 0;
772 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
773 CTDB_NO_MEMORY(ctdb, ctdb_db);
775 ctdb_db->ctdb = ctdb;
776 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
777 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
779 key.dsize = strlen(db_name)+1;
780 key.dptr = discard_const(db_name);
781 ctdb_db->db_id = ctdb_hash(&key);
782 ctdb_db->persistent = persistent;
784 if (!ctdb_db->persistent) {
785 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
786 if (ctdb_db->delete_queue == NULL) {
787 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
790 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
793 /* check for hash collisions */
794 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
795 if (tmp_db->db_id == ctdb_db->db_id) {
796 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
797 tmp_db->db_id, db_name, tmp_db->db_name));
798 talloc_free(ctdb_db);
799 return -1;
803 if (persistent) {
804 if (unhealthy_reason) {
805 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
806 unhealthy_reason, 0);
807 if (ret != 0) {
808 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
809 ctdb_db->db_name, unhealthy_reason, ret));
810 talloc_free(ctdb_db);
811 return -1;
815 if (ctdb->max_persistent_check_errors > 0) {
816 remaining_tries = 1;
818 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
819 remaining_tries = 0;
822 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
823 if (ret != 0) {
824 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
825 ctdb_db->db_name, ret));
826 talloc_free(ctdb_db);
827 return -1;
831 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
832 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
833 ctdb_db->db_name, ctdb_db->unhealthy_reason));
834 talloc_free(ctdb_db);
835 return -1;
838 if (ctdb_db->unhealthy_reason) {
839 /* this is just a warning, but we want that in the log file! */
840 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
841 ctdb_db->db_name, ctdb_db->unhealthy_reason));
844 /* open the database */
845 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
846 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
847 db_name, ctdb->pnn);
849 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
850 if (ctdb->valgrinding) {
851 tdb_flags |= TDB_NOMMAP;
853 tdb_flags |= TDB_DISALLOW_NESTING;
854 if (jenkinshash) {
855 tdb_flags |= TDB_INCOMPATIBLE_HASH;
857 #ifdef TDB_MUTEX_LOCKING
858 if (ctdb->tunable.mutex_enabled && mutexes &&
859 tdb_runtime_check_for_robust_mutexes()) {
860 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
862 #endif
864 again:
865 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
866 ctdb->tunable.database_hash_size,
867 tdb_flags,
868 O_CREAT|O_RDWR, mode);
869 if (ctdb_db->ltdb == NULL) {
870 struct stat st;
871 int saved_errno = errno;
873 if (!persistent) {
874 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
875 ctdb_db->db_path,
876 saved_errno,
877 strerror(saved_errno)));
878 talloc_free(ctdb_db);
879 return -1;
882 if (remaining_tries == 0) {
883 DEBUG(DEBUG_CRIT,(__location__
884 "Failed to open persistent tdb '%s': %d - %s\n",
885 ctdb_db->db_path,
886 saved_errno,
887 strerror(saved_errno)));
888 talloc_free(ctdb_db);
889 return -1;
892 ret = stat(ctdb_db->db_path, &st);
893 if (ret != 0) {
894 DEBUG(DEBUG_CRIT,(__location__
895 "Failed to open persistent tdb '%s': %d - %s\n",
896 ctdb_db->db_path,
897 saved_errno,
898 strerror(saved_errno)));
899 talloc_free(ctdb_db);
900 return -1;
903 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
904 if (ret != 0) {
905 DEBUG(DEBUG_CRIT,(__location__
906 "Failed to open persistent tdb '%s': %d - %s\n",
907 ctdb_db->db_path,
908 saved_errno,
909 strerror(saved_errno)));
910 talloc_free(ctdb_db);
911 return -1;
914 remaining_tries--;
915 mode = st.st_mode;
916 goto again;
919 if (!persistent) {
920 ctdb_check_db_empty(ctdb_db);
921 } else {
922 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
923 if (ret != 0) {
924 int fd;
925 struct stat st;
927 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
928 ctdb_db->db_path, ret,
929 tdb_errorstr(ctdb_db->ltdb->tdb)));
930 if (remaining_tries == 0) {
931 talloc_free(ctdb_db);
932 return -1;
935 fd = tdb_fd(ctdb_db->ltdb->tdb);
936 ret = fstat(fd, &st);
937 if (ret != 0) {
938 DEBUG(DEBUG_CRIT,(__location__
939 "Failed to fstat() persistent tdb '%s': %d - %s\n",
940 ctdb_db->db_path,
941 errno,
942 strerror(errno)));
943 talloc_free(ctdb_db);
944 return -1;
947 /* close the TDB */
948 talloc_free(ctdb_db->ltdb);
949 ctdb_db->ltdb = NULL;
951 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
952 if (ret != 0) {
953 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
954 ctdb_db->db_path));
955 talloc_free(ctdb_db);
956 return -1;
959 remaining_tries--;
960 mode = st.st_mode;
961 goto again;
965 /* set up a rb tree we can use to track which records we have a
966 fetch-lock in-flight for so we can defer any additional calls
967 for the same record.
969 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
970 if (ctdb_db->deferred_fetch == NULL) {
971 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
972 talloc_free(ctdb_db);
973 return -1;
976 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
977 if (ctdb_db->defer_dmaster == NULL) {
978 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
979 ctdb_db->db_name));
980 talloc_free(ctdb_db);
981 return -1;
984 DLIST_ADD(ctdb->db_list, ctdb_db);
986 /* setting this can help some high churn databases */
987 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
990 all databases support the "null" function. we need this in
991 order to do forced migration of records
993 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
994 if (ret != 0) {
995 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
996 talloc_free(ctdb_db);
997 return -1;
1001 all databases support the "fetch" function. we need this
1002 for efficient Samba3 ctdb fetch
1004 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1005 if (ret != 0) {
1006 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1007 talloc_free(ctdb_db);
1008 return -1;
1012 all databases support the "fetch_with_header" function. we need this
1013 for efficient readonly record fetches
1015 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1016 if (ret != 0) {
1017 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1018 talloc_free(ctdb_db);
1019 return -1;
1022 ret = ctdb_vacuum_init(ctdb_db);
1023 if (ret != 0) {
1024 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1025 "database '%s'\n", ctdb_db->db_name));
1026 talloc_free(ctdb_db);
1027 return -1;
1030 ctdb_db->generation = ctdb->vnn_map->generation;
1032 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1033 ctdb_db->db_path, tdb_flags));
1035 /* success */
1036 return 0;
1040 struct ctdb_deferred_attach_context {
1041 struct ctdb_deferred_attach_context *next, *prev;
1042 struct ctdb_context *ctdb;
1043 struct ctdb_req_control_old *c;
1047 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1049 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1051 return 0;
1054 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1055 struct tevent_timer *te,
1056 struct timeval t, void *private_data)
1058 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1059 struct ctdb_context *ctdb = da_ctx->ctdb;
1061 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1062 talloc_free(da_ctx);
1065 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1066 struct tevent_timer *te,
1067 struct timeval t, void *private_data)
1069 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1070 struct ctdb_context *ctdb = da_ctx->ctdb;
1072 /* This talloc-steals the packet ->c */
1073 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1074 talloc_free(da_ctx);
1077 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1079 struct ctdb_deferred_attach_context *da_ctx;
1081 /* call it from the main event loop as soon as the current event
1082 finishes.
1084 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1085 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1086 tevent_add_timer(ctdb->ev, da_ctx,
1087 timeval_current_ofs(1,0),
1088 ctdb_deferred_attach_callback, da_ctx);
1091 return 0;
1095 a client has asked to attach a new database
1097 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1098 TDB_DATA *outdata, uint64_t tdb_flags,
1099 bool persistent, uint32_t client_id,
1100 struct ctdb_req_control_old *c,
1101 bool *async_reply)
1103 const char *db_name = (const char *)indata.dptr;
1104 struct ctdb_db_context *db;
1105 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1106 struct ctdb_client *client = NULL;
1107 bool with_jenkinshash, with_mutexes;
1109 if (ctdb->tunable.allow_client_db_attach == 0) {
1110 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1111 "AllowClientDBAccess == 0\n", db_name));
1112 return -1;
1115 /* don't allow any local clients to attach while we are in recovery mode
1116 * except for the recovery daemon.
1117 * allow all attach from the network since these are always from remote
1118 * recovery daemons.
1120 if (client_id != 0) {
1121 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1123 if (client != NULL) {
1124 /* If the node is inactive it is not part of the cluster
1125 and we should not allow clients to attach to any
1126 databases
1128 if (node->flags & NODE_FLAGS_INACTIVE) {
1129 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1130 return -1;
1133 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1134 client->pid != ctdb->recoverd_pid &&
1135 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1136 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1138 if (da_ctx == NULL) {
1139 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1140 return -1;
1143 da_ctx->ctdb = ctdb;
1144 da_ctx->c = talloc_steal(da_ctx, c);
1145 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1146 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1148 tevent_add_timer(ctdb->ev, da_ctx,
1149 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1150 ctdb_deferred_attach_timeout, da_ctx);
1152 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1153 *async_reply = true;
1154 return 0;
1158 /* the client can optionally pass additional tdb flags, but we
1159 only allow a subset of those on the database in ctdb. Note
1160 that tdb_flags is passed in via the (otherwise unused)
1161 srvid to the attach control */
1162 #ifdef TDB_MUTEX_LOCKING
1163 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1164 #else
1165 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1166 #endif
1168 /* see if we already have this name */
1169 db = ctdb_db_handle(ctdb, db_name);
1170 if (db) {
1171 if (db->persistent != persistent) {
1172 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1173 "database %s\n", persistent ? "" : "non-",
1174 db-> persistent ? "" : "non-", db_name));
1175 return -1;
1177 outdata->dptr = (uint8_t *)&db->db_id;
1178 outdata->dsize = sizeof(db->db_id);
1179 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1180 return 0;
1183 with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1184 #ifdef TDB_MUTEX_LOCKING
1185 with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1186 #else
1187 with_mutexes = false;
1188 #endif
1190 if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1191 with_jenkinshash, with_mutexes) != 0) {
1192 return -1;
1195 db = ctdb_db_handle(ctdb, db_name);
1196 if (!db) {
1197 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1198 return -1;
1201 /* remember the flags the client has specified */
1202 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1204 outdata->dptr = (uint8_t *)&db->db_id;
1205 outdata->dsize = sizeof(db->db_id);
1207 /* Try to ensure it's locked in mem */
1208 lockdown_memory(ctdb->valgrinding);
1210 /* tell all the other nodes about this database */
1211 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1212 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1213 CTDB_CONTROL_DB_ATTACH,
1214 0, CTDB_CTRL_FLAG_NOREPLY,
1215 indata, NULL, NULL);
1217 /* success */
1218 return 0;
1222 * a client has asked to detach from a database
1224 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1225 uint32_t client_id)
1227 uint32_t db_id;
1228 struct ctdb_db_context *ctdb_db;
1229 struct ctdb_client *client = NULL;
1231 db_id = *(uint32_t *)indata.dptr;
1232 ctdb_db = find_ctdb_db(ctdb, db_id);
1233 if (ctdb_db == NULL) {
1234 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1235 db_id));
1236 return -1;
1239 if (ctdb->tunable.allow_client_db_attach == 1) {
1240 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1241 "Clients are allowed access to databases "
1242 "(AllowClientDBAccess == 1)\n",
1243 ctdb_db->db_name));
1244 return -1;
1247 if (ctdb_db->persistent) {
1248 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1249 "denied\n", ctdb_db->db_name));
1250 return -1;
1253 /* Cannot detach from database when in recovery */
1254 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1255 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1256 return -1;
1259 /* If a control comes from a client, then broadcast it to all nodes.
1260 * Do the actual detach only if the control comes from other daemons.
1262 if (client_id != 0) {
1263 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1264 if (client != NULL) {
1265 /* forward the control to all the nodes */
1266 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1267 CTDB_CONTROL_DB_DETACH, 0,
1268 CTDB_CTRL_FLAG_NOREPLY,
1269 indata, NULL, NULL);
1270 return 0;
1272 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1273 "for database '%s'\n", ctdb_db->db_name));
1274 return -1;
1277 /* Detach database from recoverd */
1278 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1279 CTDB_SRVID_DETACH_DATABASE,
1280 indata) != 0) {
1281 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1282 return -1;
1285 /* Disable vacuuming and drop all vacuuming data */
1286 talloc_free(ctdb_db->vacuum_handle);
1287 talloc_free(ctdb_db->delete_queue);
1289 /* Terminate any deferred fetch */
1290 talloc_free(ctdb_db->deferred_fetch);
1292 /* Terminate any traverses */
1293 while (ctdb_db->traverse) {
1294 talloc_free(ctdb_db->traverse);
1297 /* Terminate any revokes */
1298 while (ctdb_db->revokechild_active) {
1299 talloc_free(ctdb_db->revokechild_active);
1302 /* Free readonly tracking database */
1303 if (ctdb_db->readonly) {
1304 talloc_free(ctdb_db->rottdb);
1307 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1309 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1310 ctdb_db->db_name));
1311 talloc_free(ctdb_db);
1313 return 0;
1317 attach to all existing persistent databases
1319 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1320 const char *unhealthy_reason)
1322 DIR *d;
1323 struct dirent *de;
1325 /* open the persistent db directory and scan it for files */
1326 d = opendir(ctdb->db_directory_persistent);
1327 if (d == NULL) {
1328 return 0;
1331 while ((de=readdir(d))) {
1332 char *p, *s, *q;
1333 size_t len = strlen(de->d_name);
1334 uint32_t node;
1335 int invalid_name = 0;
1337 s = talloc_strdup(ctdb, de->d_name);
1338 if (s == NULL) {
1339 closedir(d);
1340 CTDB_NO_MEMORY(ctdb, s);
1343 /* only accept names ending in .tdb */
1344 p = strstr(s, ".tdb.");
1345 if (len < 7 || p == NULL) {
1346 talloc_free(s);
1347 continue;
1350 /* only accept names ending with .tdb. and any number of digits */
1351 q = p+5;
1352 while (*q != 0 && invalid_name == 0) {
1353 if (!isdigit(*q++)) {
1354 invalid_name = 1;
1357 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1358 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1359 talloc_free(s);
1360 continue;
1362 p[4] = 0;
1364 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1365 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1366 closedir(d);
1367 talloc_free(s);
1368 return -1;
1371 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1373 talloc_free(s);
1375 closedir(d);
1376 return 0;
1379 int ctdb_attach_databases(struct ctdb_context *ctdb)
1381 int ret;
1382 char *persistent_health_path = NULL;
1383 char *unhealthy_reason = NULL;
1384 bool first_try = true;
1386 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1387 ctdb->db_directory_state,
1388 PERSISTENT_HEALTH_TDB,
1389 ctdb->pnn);
1390 if (persistent_health_path == NULL) {
1391 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1392 return -1;
1395 again:
1397 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1398 0, TDB_DISALLOW_NESTING,
1399 O_CREAT | O_RDWR, 0600);
1400 if (ctdb->db_persistent_health == NULL) {
1401 struct tdb_wrap *tdb;
1403 if (!first_try) {
1404 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1405 persistent_health_path,
1406 errno,
1407 strerror(errno)));
1408 talloc_free(persistent_health_path);
1409 talloc_free(unhealthy_reason);
1410 return -1;
1412 first_try = false;
1414 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1415 persistent_health_path,
1416 "was cleared after a failure",
1417 "manual verification needed");
1418 if (unhealthy_reason == NULL) {
1419 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1420 talloc_free(persistent_health_path);
1421 return -1;
1424 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1425 persistent_health_path));
1426 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1427 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1428 O_CREAT | O_RDWR, 0600);
1429 if (tdb) {
1430 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1431 persistent_health_path,
1432 errno,
1433 strerror(errno)));
1434 talloc_free(persistent_health_path);
1435 talloc_free(unhealthy_reason);
1436 return -1;
1439 talloc_free(tdb);
1440 goto again;
1442 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1443 if (ret != 0) {
1444 struct tdb_wrap *tdb;
1446 talloc_free(ctdb->db_persistent_health);
1447 ctdb->db_persistent_health = NULL;
1449 if (!first_try) {
1450 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1451 persistent_health_path));
1452 talloc_free(persistent_health_path);
1453 talloc_free(unhealthy_reason);
1454 return -1;
1456 first_try = false;
1458 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1459 persistent_health_path,
1460 "was cleared after a failure",
1461 "manual verification needed");
1462 if (unhealthy_reason == NULL) {
1463 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1464 talloc_free(persistent_health_path);
1465 return -1;
1468 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1469 persistent_health_path));
1470 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1471 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1472 O_CREAT | O_RDWR, 0600);
1473 if (tdb) {
1474 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1475 persistent_health_path,
1476 errno,
1477 strerror(errno)));
1478 talloc_free(persistent_health_path);
1479 talloc_free(unhealthy_reason);
1480 return -1;
1483 talloc_free(tdb);
1484 goto again;
1486 talloc_free(persistent_health_path);
1488 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1489 talloc_free(unhealthy_reason);
1490 if (ret != 0) {
1491 return ret;
1494 return 0;
1498 called when a broadcast seqnum update comes in
1500 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1502 struct ctdb_db_context *ctdb_db;
1503 if (srcnode == ctdb->pnn) {
1504 /* don't update ourselves! */
1505 return 0;
1508 ctdb_db = find_ctdb_db(ctdb, db_id);
1509 if (!ctdb_db) {
1510 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1511 return -1;
1514 if (ctdb_db->unhealthy_reason) {
1515 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1516 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1517 return -1;
1520 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1521 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1522 return 0;
1526 timer to check for seqnum changes in a ltdb and propogate them
1528 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1529 struct tevent_timer *te,
1530 struct timeval t, void *p)
1532 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1533 struct ctdb_context *ctdb = ctdb_db->ctdb;
1534 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1535 if (new_seqnum != ctdb_db->seqnum) {
1536 /* something has changed - propogate it */
1537 TDB_DATA data;
1538 data.dptr = (uint8_t *)&ctdb_db->db_id;
1539 data.dsize = sizeof(uint32_t);
1540 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1541 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1542 data, NULL, NULL);
1544 ctdb_db->seqnum = new_seqnum;
1546 /* setup a new timer */
1547 ctdb_db->seqnum_update =
1548 tevent_add_timer(ctdb->ev, ctdb_db,
1549 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1550 (ctdb->tunable.seqnum_interval%1000)*1000),
1551 ctdb_ltdb_seqnum_check, ctdb_db);
1555 enable seqnum handling on this db
1557 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1559 struct ctdb_db_context *ctdb_db;
1560 ctdb_db = find_ctdb_db(ctdb, db_id);
1561 if (!ctdb_db) {
1562 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1563 return -1;
1566 if (ctdb_db->seqnum_update == NULL) {
1567 ctdb_db->seqnum_update = tevent_add_timer(
1568 ctdb->ev, ctdb_db,
1569 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1570 (ctdb->tunable.seqnum_interval%1000)*1000),
1571 ctdb_ltdb_seqnum_check, ctdb_db);
1574 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1575 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1576 return 0;
1579 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1581 if (ctdb_db->sticky) {
1582 return 0;
1585 if (ctdb_db->persistent) {
1586 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1587 return -1;
1590 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1592 ctdb_db->sticky = true;
1594 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1596 return 0;
1599 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1601 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1602 int i;
1604 for (i=0; i<MAX_HOT_KEYS; i++) {
1605 if (s->hot_keys[i].key.dsize > 0) {
1606 talloc_free(s->hot_keys[i].key.dptr);
1610 ZERO_STRUCT(ctdb_db->statistics);
1613 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1614 uint32_t db_id,
1615 TDB_DATA *outdata)
1617 struct ctdb_db_context *ctdb_db;
1618 struct ctdb_db_statistics_old *stats;
1619 int i;
1620 int len;
1621 char *ptr;
1623 ctdb_db = find_ctdb_db(ctdb, db_id);
1624 if (!ctdb_db) {
1625 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1626 return -1;
1629 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1630 for (i = 0; i < MAX_HOT_KEYS; i++) {
1631 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1634 stats = talloc_size(outdata, len);
1635 if (stats == NULL) {
1636 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1637 return -1;
1640 memcpy(stats, &ctdb_db->statistics,
1641 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1643 stats->num_hot_keys = MAX_HOT_KEYS;
1645 ptr = &stats->hot_keys_wire[0];
1646 for (i = 0; i < MAX_HOT_KEYS; i++) {
1647 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1648 ctdb_db->statistics.hot_keys[i].key.dsize);
1649 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1652 outdata->dptr = (uint8_t *)stats;
1653 outdata->dsize = len;
1655 return 0;