samba_dnsupdate: Always fill out the nameservers of a dns object.
[Samba.git] / ctdb / server / ctdb_ltdb_server.c
blob9ac2217a34a8251d6997e758c64fbc3ae18898fd
1 /*
2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34 /**
35 * write a record to a normal database
37 * This is the server-variant of the ctdb_ltdb_store function.
38 * It contains logic to determine whether a record should be
39 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
40 * controls to the local ctdb daemon if apporpriate.
42 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
43 TDB_DATA key,
44 struct ctdb_ltdb_header *header,
45 TDB_DATA data)
47 struct ctdb_context *ctdb = ctdb_db->ctdb;
48 TDB_DATA rec;
49 int ret;
50 bool seqnum_suppressed = false;
51 bool keep = false;
52 bool schedule_for_deletion = false;
53 bool remove_from_delete_queue = false;
54 uint32_t lmaster;
56 if (ctdb->flags & CTDB_FLAG_TORTURE) {
57 struct ctdb_ltdb_header *h2;
58 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
64 if (rec.dptr) free(rec.dptr);
67 if (ctdb->vnn_map == NULL) {
69 * Called from a client: always store the record
70 * Also don't call ctdb_lmaster since it uses the vnn_map!
72 keep = true;
73 goto store;
76 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
79 * If we migrate an empty record off to another node
80 * and the record has not been migrated with data,
81 * delete the record instead of storing the empty record.
83 if (data.dsize != 0) {
84 keep = true;
85 } else if (header->flags & CTDB_REC_RO_FLAGS) {
86 keep = true;
87 } else if (ctdb_db->persistent) {
88 keep = true;
89 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
91 * The record is not created by the client but
92 * automatically by the ctdb_ltdb_fetch logic that
93 * creates a record with an initial header in the
94 * ltdb before trying to migrate the record from
95 * the current lmaster. Keep it instead of trying
96 * to delete the non-existing record...
98 keep = true;
99 schedule_for_deletion = true;
100 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
101 keep = true;
102 } else if (ctdb_db->ctdb->pnn == lmaster) {
104 * If we are lmaster, then we usually keep the record.
105 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
106 * and the record is empty and has never been migrated
107 * with data, then we should delete it instead of storing it.
108 * This is part of the vacuuming process.
110 * The reason that we usually need to store even empty records
111 * on the lmaster is that a client operating directly on the
112 * lmaster (== dmaster) expects the local copy of the record to
113 * exist after successful ctdb migrate call. If the record does
114 * not exist, the client goes into a migrate loop and eventually
115 * fails. So storing the empty record makes sure that we do not
116 * need to change the client code.
118 if ((header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) &&
119 (ctdb_db->ctdb->pnn == header->dmaster)) {
120 keep = true;
121 schedule_for_deletion = true;
123 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
124 keep = true;
125 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
126 keep = true;
128 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
129 keep = true;
132 if (keep) {
133 if (!ctdb_db->persistent &&
134 (ctdb_db->ctdb->pnn == header->dmaster) &&
135 !(header->flags & CTDB_REC_RO_FLAGS))
137 header->rsn++;
139 if (data.dsize == 0) {
140 schedule_for_deletion = true;
143 remove_from_delete_queue = !schedule_for_deletion;
146 store:
148 * The VACUUM_MIGRATED flag is only set temporarily for
149 * the above logic when the record was retrieved by a
150 * VACUUM_MIGRATE call and should not be stored in the
151 * database.
153 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
154 * and there are two cases in which the corresponding record
155 * is stored in the local database:
156 * 1. The record has been migrated with data in the past
157 * (the MIGRATED_WITH_DATA record flag is set).
158 * 2. The record has been filled with data again since it
159 * had been submitted in the VACUUM_FETCH message to the
160 * lmaster.
161 * For such records it is important to not store the
162 * VACUUM_MIGRATED flag in the database.
164 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
167 * Similarly, clear the AUTOMATIC flag which should not enter
168 * the local database copy since this would require client
169 * modifications to clear the flag when the client stores
170 * the record.
172 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
174 rec.dsize = sizeof(*header) + data.dsize;
175 rec.dptr = talloc_size(ctdb, rec.dsize);
176 CTDB_NO_MEMORY(ctdb, rec.dptr);
178 memcpy(rec.dptr, header, sizeof(*header));
179 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
181 /* Databases with seqnum updates enabled only get their seqnum
182 changes when/if we modify the data */
183 if (ctdb_db->seqnum_update != NULL) {
184 TDB_DATA old;
185 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
187 if ( (old.dsize == rec.dsize)
188 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
189 rec.dptr+sizeof(struct ctdb_ltdb_header),
190 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
191 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
192 seqnum_suppressed = true;
194 if (old.dptr) free(old.dptr);
197 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
198 ctdb_db->db_name,
199 keep?"storing":"deleting",
200 ctdb_hash(&key)));
202 if (keep) {
203 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
204 } else {
205 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
208 if (ret != 0) {
209 int lvl = DEBUG_ERR;
211 if (keep == false &&
212 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
214 lvl = DEBUG_DEBUG;
217 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
218 "%d - %s\n",
219 ctdb_db->db_name,
220 keep?"store":"delete", ret,
221 tdb_errorstr(ctdb_db->ltdb->tdb)));
223 schedule_for_deletion = false;
224 remove_from_delete_queue = false;
226 if (seqnum_suppressed) {
227 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
230 talloc_free(rec.dptr);
232 if (schedule_for_deletion) {
233 int ret2;
234 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
235 if (ret2 != 0) {
236 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
240 if (remove_from_delete_queue) {
241 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
244 return ret;
247 struct lock_fetch_state {
248 struct ctdb_context *ctdb;
249 void (*recv_pkt)(void *, struct ctdb_req_header *);
250 void *recv_context;
251 struct ctdb_req_header *hdr;
252 uint32_t generation;
253 bool ignore_generation;
257 called when we should retry the operation
259 static void lock_fetch_callback(void *p, bool locked)
261 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
262 if (!state->ignore_generation &&
263 state->generation != state->ctdb->vnn_map->generation) {
264 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
265 talloc_free(state->hdr);
266 return;
268 state->recv_pkt(state->recv_context, state->hdr);
269 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
274 do a non-blocking ltdb_lock, deferring this ctdb request until we
275 have the chainlock
277 It does the following:
279 1) tries to get the chainlock. If it succeeds, then it returns 0
281 2) if it fails to get a chainlock immediately then it sets up a
282 non-blocking chainlock via ctdb_lock_record, and when it gets the
283 chainlock it re-submits this ctdb request to the main packet
284 receive function.
286 This effectively queues all ctdb requests that cannot be
287 immediately satisfied until it can get the lock. This means that
288 the main ctdb daemon will not block waiting for a chainlock held by
289 a client
291 There are 3 possible return values:
293 0: means that it got the lock immediately.
294 -1: means that it failed to get the lock, and won't retry
295 -2: means that it failed to get the lock immediately, but will retry
297 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
298 TDB_DATA key, struct ctdb_req_header *hdr,
299 void (*recv_pkt)(void *, struct ctdb_req_header *),
300 void *recv_context, bool ignore_generation)
302 int ret;
303 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
304 struct lock_request *lreq;
305 struct lock_fetch_state *state;
307 ret = tdb_chainlock_nonblock(tdb, key);
309 if (ret != 0 &&
310 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
311 /* a hard failure - don't try again */
312 return -1;
315 /* when torturing, ensure we test the contended path */
316 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
317 random() % 5 == 0) {
318 ret = -1;
319 tdb_chainunlock(tdb, key);
322 /* first the non-contended path */
323 if (ret == 0) {
324 return 0;
327 state = talloc(hdr, struct lock_fetch_state);
328 state->ctdb = ctdb_db->ctdb;
329 state->hdr = hdr;
330 state->recv_pkt = recv_pkt;
331 state->recv_context = recv_context;
332 state->generation = ctdb_db->ctdb->vnn_map->generation;
333 state->ignore_generation = ignore_generation;
335 /* now the contended path */
336 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
337 if (lreq == NULL) {
338 return -1;
341 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
342 so it won't be freed yet */
343 talloc_steal(state, hdr);
345 /* now tell the caller than we will retry asynchronously */
346 return -2;
350 a varient of ctdb_ltdb_lock_requeue that also fetches the record
352 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
353 TDB_DATA key, struct ctdb_ltdb_header *header,
354 struct ctdb_req_header *hdr, TDB_DATA *data,
355 void (*recv_pkt)(void *, struct ctdb_req_header *),
356 void *recv_context, bool ignore_generation)
358 int ret;
360 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
361 recv_context, ignore_generation);
362 if (ret == 0) {
363 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
364 if (ret != 0) {
365 int uret;
366 uret = ctdb_ltdb_unlock(ctdb_db, key);
367 if (uret != 0) {
368 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
372 return ret;
377 paraoid check to see if the db is empty
379 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
381 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
382 int count = tdb_traverse_read(tdb, NULL, NULL);
383 if (count != 0) {
384 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
385 ctdb_db->db_path));
386 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
390 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
391 struct ctdb_db_context *ctdb_db)
393 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
394 char *old;
395 char *reason = NULL;
396 TDB_DATA key;
397 TDB_DATA val;
399 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
400 key.dsize = strlen(ctdb_db->db_name);
402 old = ctdb_db->unhealthy_reason;
403 ctdb_db->unhealthy_reason = NULL;
405 val = tdb_fetch(tdb, key);
406 if (val.dsize > 0) {
407 reason = talloc_strndup(ctdb_db,
408 (const char *)val.dptr,
409 val.dsize);
410 if (reason == NULL) {
411 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
412 (int)val.dsize));
413 ctdb_db->unhealthy_reason = old;
414 free(val.dptr);
415 return -1;
419 if (val.dptr) {
420 free(val.dptr);
423 talloc_free(old);
424 ctdb_db->unhealthy_reason = reason;
425 return 0;
428 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
429 struct ctdb_db_context *ctdb_db,
430 const char *given_reason,/* NULL means healthy */
431 int num_healthy_nodes)
433 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
434 int ret;
435 TDB_DATA key;
436 TDB_DATA val;
437 char *new_reason = NULL;
438 char *old_reason = NULL;
440 ret = tdb_transaction_start(tdb);
441 if (ret != 0) {
442 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
443 tdb_name(tdb), ret, tdb_errorstr(tdb)));
444 return -1;
447 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
448 if (ret != 0) {
449 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
450 ctdb_db->db_name, ret));
451 return -1;
453 old_reason = ctdb_db->unhealthy_reason;
455 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
456 key.dsize = strlen(ctdb_db->db_name);
458 if (given_reason) {
459 new_reason = talloc_strdup(ctdb_db, given_reason);
460 if (new_reason == NULL) {
461 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
462 given_reason));
463 return -1;
465 } else if (old_reason && num_healthy_nodes == 0) {
467 * If the reason indicates ok, but there where no healthy nodes
468 * available, that it means, we have not recovered valid content
469 * of the db. So if there's an old reason, prefix it with
470 * "NO-HEALTHY-NODES - "
472 const char *prefix;
474 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
475 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
476 if (ret != 0) {
477 prefix = _TMP_PREFIX;
478 } else {
479 prefix = "";
481 new_reason = talloc_asprintf(ctdb_db, "%s%s",
482 prefix, old_reason);
483 if (new_reason == NULL) {
484 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
485 prefix, old_reason));
486 return -1;
488 #undef _TMP_PREFIX
491 if (new_reason) {
492 val.dptr = discard_const_p(uint8_t, new_reason);
493 val.dsize = strlen(new_reason);
495 ret = tdb_store(tdb, key, val, TDB_REPLACE);
496 if (ret != 0) {
497 tdb_transaction_cancel(tdb);
498 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
499 tdb_name(tdb), ctdb_db->db_name, new_reason,
500 ret, tdb_errorstr(tdb)));
501 talloc_free(new_reason);
502 return -1;
504 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
505 ctdb_db->db_name, new_reason));
506 } else if (old_reason) {
507 ret = tdb_delete(tdb, key);
508 if (ret != 0) {
509 tdb_transaction_cancel(tdb);
510 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
511 tdb_name(tdb), ctdb_db->db_name,
512 ret, tdb_errorstr(tdb)));
513 talloc_free(new_reason);
514 return -1;
516 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
517 ctdb_db->db_name));
520 ret = tdb_transaction_commit(tdb);
521 if (ret != TDB_SUCCESS) {
522 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
523 tdb_name(tdb), ret, tdb_errorstr(tdb)));
524 talloc_free(new_reason);
525 return -1;
528 talloc_free(old_reason);
529 ctdb_db->unhealthy_reason = new_reason;
531 return 0;
534 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
535 struct ctdb_db_context *ctdb_db)
537 time_t now = time(NULL);
538 char *new_path;
539 char *new_reason;
540 int ret;
541 struct tm *tm;
543 tm = gmtime(&now);
545 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
546 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
547 "%04u%02u%02u%02u%02u%02u.0Z",
548 ctdb_db->db_path,
549 tm->tm_year+1900, tm->tm_mon+1,
550 tm->tm_mday, tm->tm_hour, tm->tm_min,
551 tm->tm_sec);
552 if (new_path == NULL) {
553 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
554 return -1;
557 new_reason = talloc_asprintf(ctdb_db,
558 "ERROR - Backup of corrupted TDB in '%s'",
559 new_path);
560 if (new_reason == NULL) {
561 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
562 return -1;
564 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
565 talloc_free(new_reason);
566 if (ret != 0) {
567 DEBUG(DEBUG_CRIT,(__location__
568 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
569 ctdb_db->db_path));
570 return -1;
573 ret = rename(ctdb_db->db_path, new_path);
574 if (ret != 0) {
575 DEBUG(DEBUG_CRIT,(__location__
576 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
577 ctdb_db->db_path, new_path,
578 errno, strerror(errno)));
579 talloc_free(new_path);
580 return -1;
583 DEBUG(DEBUG_CRIT,(__location__
584 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
585 ctdb_db->db_path, new_path));
586 talloc_free(new_path);
587 return 0;
590 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
592 struct ctdb_db_context *ctdb_db;
593 int ret;
594 int ok = 0;
595 int fail = 0;
597 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
598 if (!ctdb_db->persistent) {
599 continue;
602 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
603 if (ret != 0) {
604 DEBUG(DEBUG_ALERT,(__location__
605 " load persistent health for '%s' failed\n",
606 ctdb_db->db_path));
607 return -1;
610 if (ctdb_db->unhealthy_reason == NULL) {
611 ok++;
612 DEBUG(DEBUG_INFO,(__location__
613 " persistent db '%s' healthy\n",
614 ctdb_db->db_path));
615 continue;
618 fail++;
619 DEBUG(DEBUG_ALERT,(__location__
620 " persistent db '%s' unhealthy: %s\n",
621 ctdb_db->db_path,
622 ctdb_db->unhealthy_reason));
624 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
625 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
626 ok, fail));
628 if (fail != 0) {
629 return -1;
632 return 0;
637 mark a database - as healthy
639 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
641 uint32_t db_id = *(uint32_t *)indata.dptr;
642 struct ctdb_db_context *ctdb_db;
643 int ret;
644 bool may_recover = false;
646 ctdb_db = find_ctdb_db(ctdb, db_id);
647 if (!ctdb_db) {
648 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
649 return -1;
652 if (ctdb_db->unhealthy_reason) {
653 may_recover = true;
656 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
657 if (ret != 0) {
658 DEBUG(DEBUG_ERR,(__location__
659 " ctdb_update_persistent_health(%s) failed\n",
660 ctdb_db->db_name));
661 return -1;
664 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
665 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
666 ctdb_db->db_name));
667 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
670 return 0;
673 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
674 TDB_DATA indata,
675 TDB_DATA *outdata)
677 uint32_t db_id = *(uint32_t *)indata.dptr;
678 struct ctdb_db_context *ctdb_db;
679 int ret;
681 ctdb_db = find_ctdb_db(ctdb, db_id);
682 if (!ctdb_db) {
683 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
684 return -1;
687 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
688 if (ret != 0) {
689 DEBUG(DEBUG_ERR,(__location__
690 " ctdb_load_persistent_health(%s) failed\n",
691 ctdb_db->db_name));
692 return -1;
695 *outdata = tdb_null;
696 if (ctdb_db->unhealthy_reason) {
697 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
698 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
701 return 0;
705 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
707 char *ropath;
709 if (ctdb_db->readonly) {
710 return 0;
713 if (ctdb_db->persistent) {
714 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
715 return -1;
718 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
719 if (ropath == NULL) {
720 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
721 return -1;
723 ctdb_db->rottdb = tdb_open(ropath,
724 ctdb->tunable.database_hash_size,
725 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
726 O_CREAT|O_RDWR, 0);
727 if (ctdb_db->rottdb == NULL) {
728 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
729 talloc_free(ropath);
730 return -1;
733 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
735 ctdb_db->readonly = true;
737 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
739 talloc_free(ropath);
740 return 0;
744 attach to a database, handling both persistent and non-persistent databases
745 return 0 on success, -1 on failure
747 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
748 bool persistent, const char *unhealthy_reason,
749 bool jenkinshash, bool mutexes)
751 struct ctdb_db_context *ctdb_db, *tmp_db;
752 int ret;
753 struct TDB_DATA key;
754 unsigned tdb_flags;
755 int mode = 0600;
756 int remaining_tries = 0;
758 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
759 CTDB_NO_MEMORY(ctdb, ctdb_db);
761 ctdb_db->priority = 1;
762 ctdb_db->ctdb = ctdb;
763 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
764 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
766 key.dsize = strlen(db_name)+1;
767 key.dptr = discard_const(db_name);
768 ctdb_db->db_id = ctdb_hash(&key);
769 ctdb_db->persistent = persistent;
771 if (!ctdb_db->persistent) {
772 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
773 if (ctdb_db->delete_queue == NULL) {
774 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
777 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
780 /* check for hash collisions */
781 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
782 if (tmp_db->db_id == ctdb_db->db_id) {
783 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
784 tmp_db->db_id, db_name, tmp_db->db_name));
785 talloc_free(ctdb_db);
786 return -1;
790 if (persistent) {
791 if (unhealthy_reason) {
792 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
793 unhealthy_reason, 0);
794 if (ret != 0) {
795 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
796 ctdb_db->db_name, unhealthy_reason, ret));
797 talloc_free(ctdb_db);
798 return -1;
802 if (ctdb->max_persistent_check_errors > 0) {
803 remaining_tries = 1;
805 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
806 remaining_tries = 0;
809 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
810 if (ret != 0) {
811 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
812 ctdb_db->db_name, ret));
813 talloc_free(ctdb_db);
814 return -1;
818 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
819 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
820 ctdb_db->db_name, ctdb_db->unhealthy_reason));
821 talloc_free(ctdb_db);
822 return -1;
825 if (ctdb_db->unhealthy_reason) {
826 /* this is just a warning, but we want that in the log file! */
827 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
828 ctdb_db->db_name, ctdb_db->unhealthy_reason));
831 /* open the database */
832 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
833 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
834 db_name, ctdb->pnn);
836 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
837 if (ctdb->valgrinding) {
838 tdb_flags |= TDB_NOMMAP;
840 tdb_flags |= TDB_DISALLOW_NESTING;
841 if (jenkinshash) {
842 tdb_flags |= TDB_INCOMPATIBLE_HASH;
844 #ifdef TDB_MUTEX_LOCKING
845 if (ctdb->tunable.mutex_enabled && mutexes &&
846 tdb_runtime_check_for_robust_mutexes()) {
847 tdb_flags |= TDB_MUTEX_LOCKING;
849 #endif
851 again:
852 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
853 ctdb->tunable.database_hash_size,
854 tdb_flags,
855 O_CREAT|O_RDWR, mode);
856 if (ctdb_db->ltdb == NULL) {
857 struct stat st;
858 int saved_errno = errno;
860 if (!persistent) {
861 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
862 ctdb_db->db_path,
863 saved_errno,
864 strerror(saved_errno)));
865 talloc_free(ctdb_db);
866 return -1;
869 if (remaining_tries == 0) {
870 DEBUG(DEBUG_CRIT,(__location__
871 "Failed to open persistent tdb '%s': %d - %s\n",
872 ctdb_db->db_path,
873 saved_errno,
874 strerror(saved_errno)));
875 talloc_free(ctdb_db);
876 return -1;
879 ret = stat(ctdb_db->db_path, &st);
880 if (ret != 0) {
881 DEBUG(DEBUG_CRIT,(__location__
882 "Failed to open persistent tdb '%s': %d - %s\n",
883 ctdb_db->db_path,
884 saved_errno,
885 strerror(saved_errno)));
886 talloc_free(ctdb_db);
887 return -1;
890 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
891 if (ret != 0) {
892 DEBUG(DEBUG_CRIT,(__location__
893 "Failed to open persistent tdb '%s': %d - %s\n",
894 ctdb_db->db_path,
895 saved_errno,
896 strerror(saved_errno)));
897 talloc_free(ctdb_db);
898 return -1;
901 remaining_tries--;
902 mode = st.st_mode;
903 goto again;
906 if (!persistent) {
907 ctdb_check_db_empty(ctdb_db);
908 } else {
909 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
910 if (ret != 0) {
911 int fd;
912 struct stat st;
914 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
915 ctdb_db->db_path, ret,
916 tdb_errorstr(ctdb_db->ltdb->tdb)));
917 if (remaining_tries == 0) {
918 talloc_free(ctdb_db);
919 return -1;
922 fd = tdb_fd(ctdb_db->ltdb->tdb);
923 ret = fstat(fd, &st);
924 if (ret != 0) {
925 DEBUG(DEBUG_CRIT,(__location__
926 "Failed to fstat() persistent tdb '%s': %d - %s\n",
927 ctdb_db->db_path,
928 errno,
929 strerror(errno)));
930 talloc_free(ctdb_db);
931 return -1;
934 /* close the TDB */
935 talloc_free(ctdb_db->ltdb);
936 ctdb_db->ltdb = NULL;
938 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
939 if (ret != 0) {
940 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
941 ctdb_db->db_path));
942 talloc_free(ctdb_db);
943 return -1;
946 remaining_tries--;
947 mode = st.st_mode;
948 goto again;
952 /* set up a rb tree we can use to track which records we have a
953 fetch-lock in-flight for so we can defer any additional calls
954 for the same record.
956 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
957 if (ctdb_db->deferred_fetch == NULL) {
958 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
959 talloc_free(ctdb_db);
960 return -1;
963 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
964 if (ctdb_db->defer_dmaster == NULL) {
965 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
966 ctdb_db->db_name));
967 talloc_free(ctdb_db);
968 return -1;
971 DLIST_ADD(ctdb->db_list, ctdb_db);
973 /* setting this can help some high churn databases */
974 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
977 all databases support the "null" function. we need this in
978 order to do forced migration of records
980 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
981 if (ret != 0) {
982 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
983 talloc_free(ctdb_db);
984 return -1;
988 all databases support the "fetch" function. we need this
989 for efficient Samba3 ctdb fetch
991 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
992 if (ret != 0) {
993 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
994 talloc_free(ctdb_db);
995 return -1;
999 all databases support the "fetch_with_header" function. we need this
1000 for efficient readonly record fetches
1002 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1003 if (ret != 0) {
1004 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1005 talloc_free(ctdb_db);
1006 return -1;
1009 ret = ctdb_vacuum_init(ctdb_db);
1010 if (ret != 0) {
1011 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1012 "database '%s'\n", ctdb_db->db_name));
1013 talloc_free(ctdb_db);
1014 return -1;
1018 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1019 ctdb_db->db_path, tdb_flags));
1021 /* success */
1022 return 0;
1026 struct ctdb_deferred_attach_context {
1027 struct ctdb_deferred_attach_context *next, *prev;
1028 struct ctdb_context *ctdb;
1029 struct ctdb_req_control *c;
1033 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1035 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1037 return 0;
1040 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1042 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1043 struct ctdb_context *ctdb = da_ctx->ctdb;
1045 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1046 talloc_free(da_ctx);
1049 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1051 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1052 struct ctdb_context *ctdb = da_ctx->ctdb;
1054 /* This talloc-steals the packet ->c */
1055 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1056 talloc_free(da_ctx);
1059 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1061 struct ctdb_deferred_attach_context *da_ctx;
1063 /* call it from the main event loop as soon as the current event
1064 finishes.
1066 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1067 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1068 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1071 return 0;
1075 a client has asked to attach a new database
1077 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1078 TDB_DATA *outdata, uint64_t tdb_flags,
1079 bool persistent, uint32_t client_id,
1080 struct ctdb_req_control *c,
1081 bool *async_reply)
1083 const char *db_name = (const char *)indata.dptr;
1084 struct ctdb_db_context *db;
1085 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1086 struct ctdb_client *client = NULL;
1087 bool with_jenkinshash, with_mutexes;
1089 if (ctdb->tunable.allow_client_db_attach == 0) {
1090 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1091 "AllowClientDBAccess == 0\n", db_name));
1092 return -1;
1095 /* dont allow any local clients to attach while we are in recovery mode
1096 * except for the recovery daemon.
1097 * allow all attach from the network since these are always from remote
1098 * recovery daemons.
1100 if (client_id != 0) {
1101 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1103 if (client != NULL) {
1104 /* If the node is inactive it is not part of the cluster
1105 and we should not allow clients to attach to any
1106 databases
1108 if (node->flags & NODE_FLAGS_INACTIVE) {
1109 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1110 return -1;
1113 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1114 client->pid != ctdb->recoverd_pid &&
1115 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1116 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1118 if (da_ctx == NULL) {
1119 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1120 return -1;
1123 da_ctx->ctdb = ctdb;
1124 da_ctx->c = talloc_steal(da_ctx, c);
1125 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1126 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1128 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1130 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1131 *async_reply = true;
1132 return 0;
1136 /* the client can optionally pass additional tdb flags, but we
1137 only allow a subset of those on the database in ctdb. Note
1138 that tdb_flags is passed in via the (otherwise unused)
1139 srvid to the attach control */
1140 #ifdef TDB_MUTEX_LOCKING
1141 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING);
1142 #else
1143 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1144 #endif
1146 /* see if we already have this name */
1147 db = ctdb_db_handle(ctdb, db_name);
1148 if (db) {
1149 if (db->persistent != persistent) {
1150 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1151 "database %s\n", persistent ? "" : "non-",
1152 db-> persistent ? "" : "non-", db_name));
1153 return -1;
1155 outdata->dptr = (uint8_t *)&db->db_id;
1156 outdata->dsize = sizeof(db->db_id);
1157 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1158 return 0;
1161 with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1162 #ifdef TDB_MUTEX_LOCKING
1163 with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1164 #else
1165 with_mutexes = false;
1166 #endif
1168 if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1169 with_jenkinshash, with_mutexes) != 0) {
1170 return -1;
1173 db = ctdb_db_handle(ctdb, db_name);
1174 if (!db) {
1175 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1176 return -1;
1179 /* remember the flags the client has specified */
1180 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1182 outdata->dptr = (uint8_t *)&db->db_id;
1183 outdata->dsize = sizeof(db->db_id);
1185 /* Try to ensure it's locked in mem */
1186 lockdown_memory(ctdb->valgrinding);
1188 /* tell all the other nodes about this database */
1189 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1190 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1191 CTDB_CONTROL_DB_ATTACH,
1192 0, CTDB_CTRL_FLAG_NOREPLY,
1193 indata, NULL, NULL);
1195 /* success */
1196 return 0;
1200 * a client has asked to detach from a database
1202 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1203 uint32_t client_id)
1205 uint32_t db_id;
1206 struct ctdb_db_context *ctdb_db;
1207 struct ctdb_client *client = NULL;
1209 db_id = *(uint32_t *)indata.dptr;
1210 ctdb_db = find_ctdb_db(ctdb, db_id);
1211 if (ctdb_db == NULL) {
1212 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1213 db_id));
1214 return -1;
1217 if (ctdb->tunable.allow_client_db_attach == 1) {
1218 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1219 "Clients are allowed access to databases "
1220 "(AllowClientDBAccess == 1)\n",
1221 ctdb_db->db_name));
1222 return -1;
1225 if (ctdb_db->persistent) {
1226 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1227 "denied\n", ctdb_db->db_name));
1228 return -1;
1231 /* Cannot detach from database when in recovery */
1232 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1233 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1234 return -1;
1237 /* If a control comes from a client, then broadcast it to all nodes.
1238 * Do the actual detach only if the control comes from other daemons.
1240 if (client_id != 0) {
1241 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1242 if (client != NULL) {
1243 /* forward the control to all the nodes */
1244 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1245 CTDB_CONTROL_DB_DETACH, 0,
1246 CTDB_CTRL_FLAG_NOREPLY,
1247 indata, NULL, NULL);
1248 return 0;
1250 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1251 "for database '%s'\n", ctdb_db->db_name));
1252 return -1;
1255 /* Detach database from recoverd */
1256 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1257 CTDB_SRVID_DETACH_DATABASE,
1258 indata) != 0) {
1259 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1260 return -1;
1263 /* Disable vacuuming and drop all vacuuming data */
1264 talloc_free(ctdb_db->vacuum_handle);
1265 talloc_free(ctdb_db->delete_queue);
1267 /* Terminate any deferred fetch */
1268 talloc_free(ctdb_db->deferred_fetch);
1270 /* Terminate any traverses */
1271 while (ctdb_db->traverse) {
1272 talloc_free(ctdb_db->traverse);
1275 /* Terminate any revokes */
1276 while (ctdb_db->revokechild_active) {
1277 talloc_free(ctdb_db->revokechild_active);
1280 /* Free readonly tracking database */
1281 if (ctdb_db->readonly) {
1282 talloc_free(ctdb_db->rottdb);
1285 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1287 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1288 ctdb_db->db_name));
1289 talloc_free(ctdb_db);
1291 return 0;
1295 attach to all existing persistent databases
1297 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1298 const char *unhealthy_reason)
1300 DIR *d;
1301 struct dirent *de;
1303 /* open the persistent db directory and scan it for files */
1304 d = opendir(ctdb->db_directory_persistent);
1305 if (d == NULL) {
1306 return 0;
1309 while ((de=readdir(d))) {
1310 char *p, *s, *q;
1311 size_t len = strlen(de->d_name);
1312 uint32_t node;
1313 int invalid_name = 0;
1315 s = talloc_strdup(ctdb, de->d_name);
1316 if (s == NULL) {
1317 closedir(d);
1318 CTDB_NO_MEMORY(ctdb, s);
1321 /* only accept names ending in .tdb */
1322 p = strstr(s, ".tdb.");
1323 if (len < 7 || p == NULL) {
1324 talloc_free(s);
1325 continue;
1328 /* only accept names ending with .tdb. and any number of digits */
1329 q = p+5;
1330 while (*q != 0 && invalid_name == 0) {
1331 if (!isdigit(*q++)) {
1332 invalid_name = 1;
1335 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1336 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1337 talloc_free(s);
1338 continue;
1340 p[4] = 0;
1342 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1343 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1344 closedir(d);
1345 talloc_free(s);
1346 return -1;
1349 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1351 talloc_free(s);
1353 closedir(d);
1354 return 0;
1357 int ctdb_attach_databases(struct ctdb_context *ctdb)
1359 int ret;
1360 char *persistent_health_path = NULL;
1361 char *unhealthy_reason = NULL;
1362 bool first_try = true;
1364 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1365 ctdb->db_directory_state,
1366 PERSISTENT_HEALTH_TDB,
1367 ctdb->pnn);
1368 if (persistent_health_path == NULL) {
1369 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1370 return -1;
1373 again:
1375 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1376 0, TDB_DISALLOW_NESTING,
1377 O_CREAT | O_RDWR, 0600);
1378 if (ctdb->db_persistent_health == NULL) {
1379 struct tdb_wrap *tdb;
1381 if (!first_try) {
1382 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1383 persistent_health_path,
1384 errno,
1385 strerror(errno)));
1386 talloc_free(persistent_health_path);
1387 talloc_free(unhealthy_reason);
1388 return -1;
1390 first_try = false;
1392 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1393 persistent_health_path,
1394 "was cleared after a failure",
1395 "manual verification needed");
1396 if (unhealthy_reason == NULL) {
1397 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1398 talloc_free(persistent_health_path);
1399 return -1;
1402 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1403 persistent_health_path));
1404 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1405 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1406 O_CREAT | O_RDWR, 0600);
1407 if (tdb) {
1408 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1409 persistent_health_path,
1410 errno,
1411 strerror(errno)));
1412 talloc_free(persistent_health_path);
1413 talloc_free(unhealthy_reason);
1414 return -1;
1417 talloc_free(tdb);
1418 goto again;
1420 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1421 if (ret != 0) {
1422 struct tdb_wrap *tdb;
1424 talloc_free(ctdb->db_persistent_health);
1425 ctdb->db_persistent_health = NULL;
1427 if (!first_try) {
1428 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1429 persistent_health_path));
1430 talloc_free(persistent_health_path);
1431 talloc_free(unhealthy_reason);
1432 return -1;
1434 first_try = false;
1436 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1437 persistent_health_path,
1438 "was cleared after a failure",
1439 "manual verification needed");
1440 if (unhealthy_reason == NULL) {
1441 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1442 talloc_free(persistent_health_path);
1443 return -1;
1446 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1447 persistent_health_path));
1448 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1449 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1450 O_CREAT | O_RDWR, 0600);
1451 if (tdb) {
1452 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1453 persistent_health_path,
1454 errno,
1455 strerror(errno)));
1456 talloc_free(persistent_health_path);
1457 talloc_free(unhealthy_reason);
1458 return -1;
1461 talloc_free(tdb);
1462 goto again;
1464 talloc_free(persistent_health_path);
1466 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1467 talloc_free(unhealthy_reason);
1468 if (ret != 0) {
1469 return ret;
1472 return 0;
1476 called when a broadcast seqnum update comes in
1478 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1480 struct ctdb_db_context *ctdb_db;
1481 if (srcnode == ctdb->pnn) {
1482 /* don't update ourselves! */
1483 return 0;
1486 ctdb_db = find_ctdb_db(ctdb, db_id);
1487 if (!ctdb_db) {
1488 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1489 return -1;
1492 if (ctdb_db->unhealthy_reason) {
1493 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1494 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1495 return -1;
1498 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1499 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1500 return 0;
1504 timer to check for seqnum changes in a ltdb and propogate them
1506 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1507 struct timeval t, void *p)
1509 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1510 struct ctdb_context *ctdb = ctdb_db->ctdb;
1511 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1512 if (new_seqnum != ctdb_db->seqnum) {
1513 /* something has changed - propogate it */
1514 TDB_DATA data;
1515 data.dptr = (uint8_t *)&ctdb_db->db_id;
1516 data.dsize = sizeof(uint32_t);
1517 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1518 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1519 data, NULL, NULL);
1521 ctdb_db->seqnum = new_seqnum;
1523 /* setup a new timer */
1524 ctdb_db->seqnum_update =
1525 event_add_timed(ctdb->ev, ctdb_db,
1526 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1527 ctdb_ltdb_seqnum_check, ctdb_db);
1531 enable seqnum handling on this db
1533 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1535 struct ctdb_db_context *ctdb_db;
1536 ctdb_db = find_ctdb_db(ctdb, db_id);
1537 if (!ctdb_db) {
1538 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1539 return -1;
1542 if (ctdb_db->seqnum_update == NULL) {
1543 ctdb_db->seqnum_update =
1544 event_add_timed(ctdb->ev, ctdb_db,
1545 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1546 ctdb_ltdb_seqnum_check, ctdb_db);
1549 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1550 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1551 return 0;
1554 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
1555 uint32_t client_id)
1557 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1558 struct ctdb_db_context *ctdb_db;
1560 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1561 if (!ctdb_db) {
1562 if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
1563 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
1564 db_prio->db_id));
1566 return 0;
1569 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1570 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1571 return 0;
1574 ctdb_db->priority = db_prio->priority;
1575 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1577 if (client_id != 0) {
1578 /* Broadcast the update to the rest of the cluster */
1579 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1580 CTDB_CONTROL_SET_DB_PRIORITY, 0,
1581 CTDB_CTRL_FLAG_NOREPLY, indata,
1582 NULL, NULL);
1584 return 0;
1588 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1590 if (ctdb_db->sticky) {
1591 return 0;
1594 if (ctdb_db->persistent) {
1595 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1596 return -1;
1599 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1601 ctdb_db->sticky = true;
1603 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1605 return 0;
1608 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1609 uint32_t db_id,
1610 TDB_DATA *outdata)
1612 struct ctdb_db_context *ctdb_db;
1613 struct ctdb_db_statistics *stats;
1614 int i;
1615 int len;
1616 char *ptr;
1618 ctdb_db = find_ctdb_db(ctdb, db_id);
1619 if (!ctdb_db) {
1620 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1621 return -1;
1624 len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
1625 for (i = 0; i < MAX_HOT_KEYS; i++) {
1626 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1629 stats = talloc_size(outdata, len);
1630 if (stats == NULL) {
1631 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1632 return -1;
1635 *stats = ctdb_db->statistics;
1637 stats->num_hot_keys = MAX_HOT_KEYS;
1639 ptr = &stats->hot_keys_wire[0];
1640 for (i = 0; i < MAX_HOT_KEYS; i++) {
1641 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1642 ctdb_db->statistics.hot_keys[i].key.dsize);
1643 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1646 outdata->dptr = (uint8_t *)stats;
1647 outdata->dsize = len;
1649 return 0;