talloc: Fix CID 1373621 Unchecked return value
[Samba.git] / ctdb / server / ctdb_ltdb_server.c
blob8feaff11c98a3c4846c622e6cf2c47d7ad80a078
1 /*
2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
27 #include <talloc.h>
28 #include <tevent.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
46 /**
47 * write a record to a normal database
49 * This is the server-variant of the ctdb_ltdb_store function.
50 * It contains logic to determine whether a record should be
51 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52 * controls to the local ctdb daemon if apporpriate.
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55 TDB_DATA key,
56 struct ctdb_ltdb_header *header,
57 TDB_DATA data)
59 struct ctdb_context *ctdb = ctdb_db->ctdb;
60 TDB_DATA rec;
61 int ret;
62 bool seqnum_suppressed = false;
63 bool keep = false;
64 bool schedule_for_deletion = false;
65 bool remove_from_delete_queue = false;
66 uint32_t lmaster;
68 if (ctdb->flags & CTDB_FLAG_TORTURE) {
69 struct ctdb_ltdb_header *h2;
70 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
71 h2 = (struct ctdb_ltdb_header *)rec.dptr;
72 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
73 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
74 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
76 if (rec.dptr) free(rec.dptr);
79 if (ctdb->vnn_map == NULL) {
81 * Called from a client: always store the record
82 * Also don't call ctdb_lmaster since it uses the vnn_map!
84 keep = true;
85 goto store;
88 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
91 * If we migrate an empty record off to another node
92 * and the record has not been migrated with data,
93 * delete the record instead of storing the empty record.
95 if (data.dsize != 0) {
96 keep = true;
97 } else if (header->flags & CTDB_REC_RO_FLAGS) {
98 keep = true;
99 } else if (ctdb_db->persistent) {
100 keep = true;
101 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
103 * The record is not created by the client but
104 * automatically by the ctdb_ltdb_fetch logic that
105 * creates a record with an initial header in the
106 * ltdb before trying to migrate the record from
107 * the current lmaster. Keep it instead of trying
108 * to delete the non-existing record...
110 keep = true;
111 schedule_for_deletion = true;
112 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
113 keep = true;
114 } else if (ctdb_db->ctdb->pnn == lmaster) {
116 * If we are lmaster, then we usually keep the record.
117 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
118 * and the record is empty and has never been migrated
119 * with data, then we should delete it instead of storing it.
120 * This is part of the vacuuming process.
122 * The reason that we usually need to store even empty records
123 * on the lmaster is that a client operating directly on the
124 * lmaster (== dmaster) expects the local copy of the record to
125 * exist after successful ctdb migrate call. If the record does
126 * not exist, the client goes into a migrate loop and eventually
127 * fails. So storing the empty record makes sure that we do not
128 * need to change the client code.
130 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
131 keep = true;
132 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
133 keep = true;
135 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
136 keep = true;
139 if (keep) {
140 if (!ctdb_db->persistent &&
141 (ctdb_db->ctdb->pnn == header->dmaster) &&
142 !(header->flags & CTDB_REC_RO_FLAGS))
144 header->rsn++;
146 if (data.dsize == 0) {
147 schedule_for_deletion = true;
150 remove_from_delete_queue = !schedule_for_deletion;
153 store:
155 * The VACUUM_MIGRATED flag is only set temporarily for
156 * the above logic when the record was retrieved by a
157 * VACUUM_MIGRATE call and should not be stored in the
158 * database.
160 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
161 * and there are two cases in which the corresponding record
162 * is stored in the local database:
163 * 1. The record has been migrated with data in the past
164 * (the MIGRATED_WITH_DATA record flag is set).
165 * 2. The record has been filled with data again since it
166 * had been submitted in the VACUUM_FETCH message to the
167 * lmaster.
168 * For such records it is important to not store the
169 * VACUUM_MIGRATED flag in the database.
171 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
174 * Similarly, clear the AUTOMATIC flag which should not enter
175 * the local database copy since this would require client
176 * modifications to clear the flag when the client stores
177 * the record.
179 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
181 rec.dsize = sizeof(*header) + data.dsize;
182 rec.dptr = talloc_size(ctdb, rec.dsize);
183 CTDB_NO_MEMORY(ctdb, rec.dptr);
185 memcpy(rec.dptr, header, sizeof(*header));
186 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
188 /* Databases with seqnum updates enabled only get their seqnum
189 changes when/if we modify the data */
190 if (ctdb_db->seqnum_update != NULL) {
191 TDB_DATA old;
192 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
194 if ( (old.dsize == rec.dsize)
195 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
196 rec.dptr+sizeof(struct ctdb_ltdb_header),
197 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
198 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
199 seqnum_suppressed = true;
201 if (old.dptr) free(old.dptr);
204 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
205 ctdb_db->db_name,
206 keep?"storing":"deleting",
207 ctdb_hash(&key)));
209 if (keep) {
210 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
211 } else {
212 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
215 if (ret != 0) {
216 int lvl = DEBUG_ERR;
218 if (keep == false &&
219 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
221 lvl = DEBUG_DEBUG;
224 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
225 "%d - %s\n",
226 ctdb_db->db_name,
227 keep?"store":"delete", ret,
228 tdb_errorstr(ctdb_db->ltdb->tdb)));
230 schedule_for_deletion = false;
231 remove_from_delete_queue = false;
233 if (seqnum_suppressed) {
234 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
237 talloc_free(rec.dptr);
239 if (schedule_for_deletion) {
240 int ret2;
241 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
242 if (ret2 != 0) {
243 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
247 if (remove_from_delete_queue) {
248 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
251 return ret;
254 struct lock_fetch_state {
255 struct ctdb_context *ctdb;
256 struct ctdb_db_context *ctdb_db;
257 void (*recv_pkt)(void *, struct ctdb_req_header *);
258 void *recv_context;
259 struct ctdb_req_header *hdr;
260 uint32_t generation;
261 bool ignore_generation;
265 called when we should retry the operation
267 static void lock_fetch_callback(void *p, bool locked)
269 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
270 if (!state->ignore_generation &&
271 state->generation != state->ctdb_db->generation) {
272 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
273 talloc_free(state->hdr);
274 return;
276 state->recv_pkt(state->recv_context, state->hdr);
277 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
282 do a non-blocking ltdb_lock, deferring this ctdb request until we
283 have the chainlock
285 It does the following:
287 1) tries to get the chainlock. If it succeeds, then it returns 0
289 2) if it fails to get a chainlock immediately then it sets up a
290 non-blocking chainlock via ctdb_lock_record, and when it gets the
291 chainlock it re-submits this ctdb request to the main packet
292 receive function.
294 This effectively queues all ctdb requests that cannot be
295 immediately satisfied until it can get the lock. This means that
296 the main ctdb daemon will not block waiting for a chainlock held by
297 a client
299 There are 3 possible return values:
301 0: means that it got the lock immediately.
302 -1: means that it failed to get the lock, and won't retry
303 -2: means that it failed to get the lock immediately, but will retry
305 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
306 TDB_DATA key, struct ctdb_req_header *hdr,
307 void (*recv_pkt)(void *, struct ctdb_req_header *),
308 void *recv_context, bool ignore_generation)
310 int ret;
311 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
312 struct lock_request *lreq;
313 struct lock_fetch_state *state;
315 ret = tdb_chainlock_nonblock(tdb, key);
317 if (ret != 0 &&
318 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
319 /* a hard failure - don't try again */
320 return -1;
323 /* when torturing, ensure we test the contended path */
324 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
325 random() % 5 == 0) {
326 ret = -1;
327 tdb_chainunlock(tdb, key);
330 /* first the non-contended path */
331 if (ret == 0) {
332 return 0;
335 state = talloc(hdr, struct lock_fetch_state);
336 state->ctdb = ctdb_db->ctdb;
337 state->ctdb_db = ctdb_db;
338 state->hdr = hdr;
339 state->recv_pkt = recv_pkt;
340 state->recv_context = recv_context;
341 state->generation = ctdb_db->generation;
342 state->ignore_generation = ignore_generation;
344 /* now the contended path */
345 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
346 if (lreq == NULL) {
347 return -1;
350 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
351 so it won't be freed yet */
352 talloc_steal(state, hdr);
354 /* now tell the caller than we will retry asynchronously */
355 return -2;
359 a varient of ctdb_ltdb_lock_requeue that also fetches the record
361 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
362 TDB_DATA key, struct ctdb_ltdb_header *header,
363 struct ctdb_req_header *hdr, TDB_DATA *data,
364 void (*recv_pkt)(void *, struct ctdb_req_header *),
365 void *recv_context, bool ignore_generation)
367 int ret;
369 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
370 recv_context, ignore_generation);
371 if (ret == 0) {
372 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
373 if (ret != 0) {
374 int uret;
375 uret = ctdb_ltdb_unlock(ctdb_db, key);
376 if (uret != 0) {
377 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
381 return ret;
386 paraoid check to see if the db is empty
388 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
390 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
391 int count = tdb_traverse_read(tdb, NULL, NULL);
392 if (count != 0) {
393 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
394 ctdb_db->db_path));
395 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
399 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
400 struct ctdb_db_context *ctdb_db)
402 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
403 char *old;
404 char *reason = NULL;
405 TDB_DATA key;
406 TDB_DATA val;
408 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
409 key.dsize = strlen(ctdb_db->db_name);
411 old = ctdb_db->unhealthy_reason;
412 ctdb_db->unhealthy_reason = NULL;
414 val = tdb_fetch(tdb, key);
415 if (val.dsize > 0) {
416 reason = talloc_strndup(ctdb_db,
417 (const char *)val.dptr,
418 val.dsize);
419 if (reason == NULL) {
420 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
421 (int)val.dsize));
422 ctdb_db->unhealthy_reason = old;
423 free(val.dptr);
424 return -1;
428 if (val.dptr) {
429 free(val.dptr);
432 talloc_free(old);
433 ctdb_db->unhealthy_reason = reason;
434 return 0;
437 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
438 struct ctdb_db_context *ctdb_db,
439 const char *given_reason,/* NULL means healthy */
440 int num_healthy_nodes)
442 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
443 int ret;
444 TDB_DATA key;
445 TDB_DATA val;
446 char *new_reason = NULL;
447 char *old_reason = NULL;
449 ret = tdb_transaction_start(tdb);
450 if (ret != 0) {
451 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
452 tdb_name(tdb), ret, tdb_errorstr(tdb)));
453 return -1;
456 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
457 if (ret != 0) {
458 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
459 ctdb_db->db_name, ret));
460 return -1;
462 old_reason = ctdb_db->unhealthy_reason;
464 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
465 key.dsize = strlen(ctdb_db->db_name);
467 if (given_reason) {
468 new_reason = talloc_strdup(ctdb_db, given_reason);
469 if (new_reason == NULL) {
470 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
471 given_reason));
472 return -1;
474 } else if (old_reason && num_healthy_nodes == 0) {
476 * If the reason indicates ok, but there where no healthy nodes
477 * available, that it means, we have not recovered valid content
478 * of the db. So if there's an old reason, prefix it with
479 * "NO-HEALTHY-NODES - "
481 const char *prefix;
483 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
484 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
485 if (ret != 0) {
486 prefix = _TMP_PREFIX;
487 } else {
488 prefix = "";
490 new_reason = talloc_asprintf(ctdb_db, "%s%s",
491 prefix, old_reason);
492 if (new_reason == NULL) {
493 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
494 prefix, old_reason));
495 return -1;
497 #undef _TMP_PREFIX
500 if (new_reason) {
501 val.dptr = discard_const_p(uint8_t, new_reason);
502 val.dsize = strlen(new_reason);
504 ret = tdb_store(tdb, key, val, TDB_REPLACE);
505 if (ret != 0) {
506 tdb_transaction_cancel(tdb);
507 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
508 tdb_name(tdb), ctdb_db->db_name, new_reason,
509 ret, tdb_errorstr(tdb)));
510 talloc_free(new_reason);
511 return -1;
513 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
514 ctdb_db->db_name, new_reason));
515 } else if (old_reason) {
516 ret = tdb_delete(tdb, key);
517 if (ret != 0) {
518 tdb_transaction_cancel(tdb);
519 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
520 tdb_name(tdb), ctdb_db->db_name,
521 ret, tdb_errorstr(tdb)));
522 talloc_free(new_reason);
523 return -1;
525 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
526 ctdb_db->db_name));
529 ret = tdb_transaction_commit(tdb);
530 if (ret != TDB_SUCCESS) {
531 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
532 tdb_name(tdb), ret, tdb_errorstr(tdb)));
533 talloc_free(new_reason);
534 return -1;
537 talloc_free(old_reason);
538 ctdb_db->unhealthy_reason = new_reason;
540 return 0;
543 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
544 struct ctdb_db_context *ctdb_db)
546 time_t now = time(NULL);
547 char *new_path;
548 char *new_reason;
549 int ret;
550 struct tm *tm;
552 tm = gmtime(&now);
554 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
555 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
556 "%04u%02u%02u%02u%02u%02u.0Z",
557 ctdb_db->db_path,
558 tm->tm_year+1900, tm->tm_mon+1,
559 tm->tm_mday, tm->tm_hour, tm->tm_min,
560 tm->tm_sec);
561 if (new_path == NULL) {
562 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
563 return -1;
566 new_reason = talloc_asprintf(ctdb_db,
567 "ERROR - Backup of corrupted TDB in '%s'",
568 new_path);
569 if (new_reason == NULL) {
570 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
571 return -1;
573 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
574 talloc_free(new_reason);
575 if (ret != 0) {
576 DEBUG(DEBUG_CRIT,(__location__
577 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
578 ctdb_db->db_path));
579 return -1;
582 ret = rename(ctdb_db->db_path, new_path);
583 if (ret != 0) {
584 DEBUG(DEBUG_CRIT,(__location__
585 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
586 ctdb_db->db_path, new_path,
587 errno, strerror(errno)));
588 talloc_free(new_path);
589 return -1;
592 DEBUG(DEBUG_CRIT,(__location__
593 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
594 ctdb_db->db_path, new_path));
595 talloc_free(new_path);
596 return 0;
599 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
601 struct ctdb_db_context *ctdb_db;
602 int ret;
603 int ok = 0;
604 int fail = 0;
606 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
607 if (!ctdb_db->persistent) {
608 continue;
611 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
612 if (ret != 0) {
613 DEBUG(DEBUG_ALERT,(__location__
614 " load persistent health for '%s' failed\n",
615 ctdb_db->db_path));
616 return -1;
619 if (ctdb_db->unhealthy_reason == NULL) {
620 ok++;
621 DEBUG(DEBUG_INFO,(__location__
622 " persistent db '%s' healthy\n",
623 ctdb_db->db_path));
624 continue;
627 fail++;
628 DEBUG(DEBUG_ALERT,(__location__
629 " persistent db '%s' unhealthy: %s\n",
630 ctdb_db->db_path,
631 ctdb_db->unhealthy_reason));
633 DEBUG(DEBUG_NOTICE,
634 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
635 ok, fail));
637 if (fail != 0) {
638 return -1;
641 return 0;
646 mark a database - as healthy
648 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
650 uint32_t db_id = *(uint32_t *)indata.dptr;
651 struct ctdb_db_context *ctdb_db;
652 int ret;
653 bool may_recover = false;
655 ctdb_db = find_ctdb_db(ctdb, db_id);
656 if (!ctdb_db) {
657 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
658 return -1;
661 if (ctdb_db->unhealthy_reason) {
662 may_recover = true;
665 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
666 if (ret != 0) {
667 DEBUG(DEBUG_ERR,(__location__
668 " ctdb_update_persistent_health(%s) failed\n",
669 ctdb_db->db_name));
670 return -1;
673 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
674 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
675 ctdb_db->db_name));
676 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
679 return 0;
682 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
683 TDB_DATA indata,
684 TDB_DATA *outdata)
686 uint32_t db_id = *(uint32_t *)indata.dptr;
687 struct ctdb_db_context *ctdb_db;
688 int ret;
690 ctdb_db = find_ctdb_db(ctdb, db_id);
691 if (!ctdb_db) {
692 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
693 return -1;
696 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
697 if (ret != 0) {
698 DEBUG(DEBUG_ERR,(__location__
699 " ctdb_load_persistent_health(%s) failed\n",
700 ctdb_db->db_name));
701 return -1;
704 *outdata = tdb_null;
705 if (ctdb_db->unhealthy_reason) {
706 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
707 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
710 return 0;
714 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
716 char *ropath;
718 if (ctdb_db->readonly) {
719 return 0;
722 if (ctdb_db->persistent) {
723 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
724 return -1;
727 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
728 if (ropath == NULL) {
729 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
730 return -1;
732 ctdb_db->rottdb = tdb_open(ropath,
733 ctdb->tunable.database_hash_size,
734 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
735 O_CREAT|O_RDWR, 0600);
736 if (ctdb_db->rottdb == NULL) {
737 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
738 talloc_free(ropath);
739 return -1;
742 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
744 ctdb_db->readonly = true;
746 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
748 talloc_free(ropath);
749 return 0;
753 attach to a database, handling both persistent and non-persistent databases
754 return 0 on success, -1 on failure
756 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
757 bool persistent, const char *unhealthy_reason,
758 bool jenkinshash, bool mutexes)
760 struct ctdb_db_context *ctdb_db, *tmp_db;
761 int ret;
762 struct TDB_DATA key;
763 unsigned tdb_flags;
764 int mode = 0600;
765 int remaining_tries = 0;
767 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
768 CTDB_NO_MEMORY(ctdb, ctdb_db);
770 ctdb_db->ctdb = ctdb;
771 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
772 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
774 key.dsize = strlen(db_name)+1;
775 key.dptr = discard_const(db_name);
776 ctdb_db->db_id = ctdb_hash(&key);
777 ctdb_db->persistent = persistent;
779 if (!ctdb_db->persistent) {
780 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
781 if (ctdb_db->delete_queue == NULL) {
782 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
785 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
788 /* check for hash collisions */
789 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
790 if (tmp_db->db_id == ctdb_db->db_id) {
791 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
792 tmp_db->db_id, db_name, tmp_db->db_name));
793 talloc_free(ctdb_db);
794 return -1;
798 if (persistent) {
799 if (unhealthy_reason) {
800 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
801 unhealthy_reason, 0);
802 if (ret != 0) {
803 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
804 ctdb_db->db_name, unhealthy_reason, ret));
805 talloc_free(ctdb_db);
806 return -1;
810 if (ctdb->max_persistent_check_errors > 0) {
811 remaining_tries = 1;
813 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
814 remaining_tries = 0;
817 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
818 if (ret != 0) {
819 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
820 ctdb_db->db_name, ret));
821 talloc_free(ctdb_db);
822 return -1;
826 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
827 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
828 ctdb_db->db_name, ctdb_db->unhealthy_reason));
829 talloc_free(ctdb_db);
830 return -1;
833 if (ctdb_db->unhealthy_reason) {
834 /* this is just a warning, but we want that in the log file! */
835 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
836 ctdb_db->db_name, ctdb_db->unhealthy_reason));
839 /* open the database */
840 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
841 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
842 db_name, ctdb->pnn);
844 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
845 if (ctdb->valgrinding) {
846 tdb_flags |= TDB_NOMMAP;
848 tdb_flags |= TDB_DISALLOW_NESTING;
849 if (jenkinshash) {
850 tdb_flags |= TDB_INCOMPATIBLE_HASH;
852 #ifdef TDB_MUTEX_LOCKING
853 if (ctdb->tunable.mutex_enabled && mutexes &&
854 tdb_runtime_check_for_robust_mutexes()) {
855 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
857 #endif
859 again:
860 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
861 ctdb->tunable.database_hash_size,
862 tdb_flags,
863 O_CREAT|O_RDWR, mode);
864 if (ctdb_db->ltdb == NULL) {
865 struct stat st;
866 int saved_errno = errno;
868 if (!persistent) {
869 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
870 ctdb_db->db_path,
871 saved_errno,
872 strerror(saved_errno)));
873 talloc_free(ctdb_db);
874 return -1;
877 if (remaining_tries == 0) {
878 DEBUG(DEBUG_CRIT,(__location__
879 "Failed to open persistent tdb '%s': %d - %s\n",
880 ctdb_db->db_path,
881 saved_errno,
882 strerror(saved_errno)));
883 talloc_free(ctdb_db);
884 return -1;
887 ret = stat(ctdb_db->db_path, &st);
888 if (ret != 0) {
889 DEBUG(DEBUG_CRIT,(__location__
890 "Failed to open persistent tdb '%s': %d - %s\n",
891 ctdb_db->db_path,
892 saved_errno,
893 strerror(saved_errno)));
894 talloc_free(ctdb_db);
895 return -1;
898 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
899 if (ret != 0) {
900 DEBUG(DEBUG_CRIT,(__location__
901 "Failed to open persistent tdb '%s': %d - %s\n",
902 ctdb_db->db_path,
903 saved_errno,
904 strerror(saved_errno)));
905 talloc_free(ctdb_db);
906 return -1;
909 remaining_tries--;
910 mode = st.st_mode;
911 goto again;
914 if (!persistent) {
915 ctdb_check_db_empty(ctdb_db);
916 } else {
917 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
918 if (ret != 0) {
919 int fd;
920 struct stat st;
922 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
923 ctdb_db->db_path, ret,
924 tdb_errorstr(ctdb_db->ltdb->tdb)));
925 if (remaining_tries == 0) {
926 talloc_free(ctdb_db);
927 return -1;
930 fd = tdb_fd(ctdb_db->ltdb->tdb);
931 ret = fstat(fd, &st);
932 if (ret != 0) {
933 DEBUG(DEBUG_CRIT,(__location__
934 "Failed to fstat() persistent tdb '%s': %d - %s\n",
935 ctdb_db->db_path,
936 errno,
937 strerror(errno)));
938 talloc_free(ctdb_db);
939 return -1;
942 /* close the TDB */
943 talloc_free(ctdb_db->ltdb);
944 ctdb_db->ltdb = NULL;
946 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
947 if (ret != 0) {
948 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
949 ctdb_db->db_path));
950 talloc_free(ctdb_db);
951 return -1;
954 remaining_tries--;
955 mode = st.st_mode;
956 goto again;
960 /* set up a rb tree we can use to track which records we have a
961 fetch-lock in-flight for so we can defer any additional calls
962 for the same record.
964 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
965 if (ctdb_db->deferred_fetch == NULL) {
966 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
967 talloc_free(ctdb_db);
968 return -1;
971 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
972 if (ctdb_db->defer_dmaster == NULL) {
973 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
974 ctdb_db->db_name));
975 talloc_free(ctdb_db);
976 return -1;
979 DLIST_ADD(ctdb->db_list, ctdb_db);
981 /* setting this can help some high churn databases */
982 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
985 all databases support the "null" function. we need this in
986 order to do forced migration of records
988 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
989 if (ret != 0) {
990 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
991 talloc_free(ctdb_db);
992 return -1;
996 all databases support the "fetch" function. we need this
997 for efficient Samba3 ctdb fetch
999 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1000 if (ret != 0) {
1001 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1002 talloc_free(ctdb_db);
1003 return -1;
1007 all databases support the "fetch_with_header" function. we need this
1008 for efficient readonly record fetches
1010 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1011 if (ret != 0) {
1012 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1013 talloc_free(ctdb_db);
1014 return -1;
1017 ret = ctdb_vacuum_init(ctdb_db);
1018 if (ret != 0) {
1019 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1020 "database '%s'\n", ctdb_db->db_name));
1021 talloc_free(ctdb_db);
1022 return -1;
1025 ctdb_db->generation = ctdb->vnn_map->generation;
1027 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1028 ctdb_db->db_path, tdb_flags));
1030 /* success */
1031 return 0;
1035 struct ctdb_deferred_attach_context {
1036 struct ctdb_deferred_attach_context *next, *prev;
1037 struct ctdb_context *ctdb;
1038 struct ctdb_req_control_old *c;
1042 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1044 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1046 return 0;
1049 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1050 struct tevent_timer *te,
1051 struct timeval t, void *private_data)
1053 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1054 struct ctdb_context *ctdb = da_ctx->ctdb;
1056 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1057 talloc_free(da_ctx);
1060 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1061 struct tevent_timer *te,
1062 struct timeval t, void *private_data)
1064 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1065 struct ctdb_context *ctdb = da_ctx->ctdb;
1067 /* This talloc-steals the packet ->c */
1068 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1069 talloc_free(da_ctx);
1072 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1074 struct ctdb_deferred_attach_context *da_ctx;
1076 /* call it from the main event loop as soon as the current event
1077 finishes.
1079 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1080 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1081 tevent_add_timer(ctdb->ev, da_ctx,
1082 timeval_current_ofs(1,0),
1083 ctdb_deferred_attach_callback, da_ctx);
1086 return 0;
1090 a client has asked to attach a new database
1092 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1093 TDB_DATA *outdata, uint64_t tdb_flags,
1094 bool persistent, uint32_t client_id,
1095 struct ctdb_req_control_old *c,
1096 bool *async_reply)
1098 const char *db_name = (const char *)indata.dptr;
1099 struct ctdb_db_context *db;
1100 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1101 struct ctdb_client *client = NULL;
1102 bool with_jenkinshash, with_mutexes;
1104 if (ctdb->tunable.allow_client_db_attach == 0) {
1105 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1106 "AllowClientDBAccess == 0\n", db_name));
1107 return -1;
1110 /* don't allow any local clients to attach while we are in recovery mode
1111 * except for the recovery daemon.
1112 * allow all attach from the network since these are always from remote
1113 * recovery daemons.
1115 if (client_id != 0) {
1116 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1118 if (client != NULL) {
1119 /* If the node is inactive it is not part of the cluster
1120 and we should not allow clients to attach to any
1121 databases
1123 if (node->flags & NODE_FLAGS_INACTIVE) {
1124 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1125 return -1;
1128 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1129 client->pid != ctdb->recoverd_pid &&
1130 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1131 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1133 if (da_ctx == NULL) {
1134 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1135 return -1;
1138 da_ctx->ctdb = ctdb;
1139 da_ctx->c = talloc_steal(da_ctx, c);
1140 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1141 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1143 tevent_add_timer(ctdb->ev, da_ctx,
1144 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1145 ctdb_deferred_attach_timeout, da_ctx);
1147 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1148 *async_reply = true;
1149 return 0;
1153 /* the client can optionally pass additional tdb flags, but we
1154 only allow a subset of those on the database in ctdb. Note
1155 that tdb_flags is passed in via the (otherwise unused)
1156 srvid to the attach control */
1157 #ifdef TDB_MUTEX_LOCKING
1158 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1159 #else
1160 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1161 #endif
1163 /* see if we already have this name */
1164 db = ctdb_db_handle(ctdb, db_name);
1165 if (db) {
1166 if (db->persistent != persistent) {
1167 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1168 "database %s\n", persistent ? "" : "non-",
1169 db-> persistent ? "" : "non-", db_name));
1170 return -1;
1172 outdata->dptr = (uint8_t *)&db->db_id;
1173 outdata->dsize = sizeof(db->db_id);
1174 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1175 return 0;
1178 with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1179 #ifdef TDB_MUTEX_LOCKING
1180 with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1181 #else
1182 with_mutexes = false;
1183 #endif
1185 if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1186 with_jenkinshash, with_mutexes) != 0) {
1187 return -1;
1190 db = ctdb_db_handle(ctdb, db_name);
1191 if (!db) {
1192 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1193 return -1;
1196 /* remember the flags the client has specified */
1197 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1199 outdata->dptr = (uint8_t *)&db->db_id;
1200 outdata->dsize = sizeof(db->db_id);
1202 /* Try to ensure it's locked in mem */
1203 lockdown_memory(ctdb->valgrinding);
1205 /* tell all the other nodes about this database */
1206 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1207 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1208 CTDB_CONTROL_DB_ATTACH,
1209 0, CTDB_CTRL_FLAG_NOREPLY,
1210 indata, NULL, NULL);
1212 /* success */
1213 return 0;
1217 * a client has asked to detach from a database
1219 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1220 uint32_t client_id)
1222 uint32_t db_id;
1223 struct ctdb_db_context *ctdb_db;
1224 struct ctdb_client *client = NULL;
1226 db_id = *(uint32_t *)indata.dptr;
1227 ctdb_db = find_ctdb_db(ctdb, db_id);
1228 if (ctdb_db == NULL) {
1229 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1230 db_id));
1231 return -1;
1234 if (ctdb->tunable.allow_client_db_attach == 1) {
1235 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1236 "Clients are allowed access to databases "
1237 "(AllowClientDBAccess == 1)\n",
1238 ctdb_db->db_name));
1239 return -1;
1242 if (ctdb_db->persistent) {
1243 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1244 "denied\n", ctdb_db->db_name));
1245 return -1;
1248 /* Cannot detach from database when in recovery */
1249 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1250 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1251 return -1;
1254 /* If a control comes from a client, then broadcast it to all nodes.
1255 * Do the actual detach only if the control comes from other daemons.
1257 if (client_id != 0) {
1258 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1259 if (client != NULL) {
1260 /* forward the control to all the nodes */
1261 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1262 CTDB_CONTROL_DB_DETACH, 0,
1263 CTDB_CTRL_FLAG_NOREPLY,
1264 indata, NULL, NULL);
1265 return 0;
1267 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1268 "for database '%s'\n", ctdb_db->db_name));
1269 return -1;
1272 /* Detach database from recoverd */
1273 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1274 CTDB_SRVID_DETACH_DATABASE,
1275 indata) != 0) {
1276 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1277 return -1;
1280 /* Disable vacuuming and drop all vacuuming data */
1281 talloc_free(ctdb_db->vacuum_handle);
1282 talloc_free(ctdb_db->delete_queue);
1284 /* Terminate any deferred fetch */
1285 talloc_free(ctdb_db->deferred_fetch);
1287 /* Terminate any traverses */
1288 while (ctdb_db->traverse) {
1289 talloc_free(ctdb_db->traverse);
1292 /* Terminate any revokes */
1293 while (ctdb_db->revokechild_active) {
1294 talloc_free(ctdb_db->revokechild_active);
1297 /* Free readonly tracking database */
1298 if (ctdb_db->readonly) {
1299 talloc_free(ctdb_db->rottdb);
1302 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1304 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1305 ctdb_db->db_name));
1306 talloc_free(ctdb_db);
1308 return 0;
1312 attach to all existing persistent databases
1314 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1315 const char *unhealthy_reason)
1317 DIR *d;
1318 struct dirent *de;
1320 /* open the persistent db directory and scan it for files */
1321 d = opendir(ctdb->db_directory_persistent);
1322 if (d == NULL) {
1323 return 0;
1326 while ((de=readdir(d))) {
1327 char *p, *s, *q;
1328 size_t len = strlen(de->d_name);
1329 uint32_t node;
1330 int invalid_name = 0;
1332 s = talloc_strdup(ctdb, de->d_name);
1333 if (s == NULL) {
1334 closedir(d);
1335 CTDB_NO_MEMORY(ctdb, s);
1338 /* only accept names ending in .tdb */
1339 p = strstr(s, ".tdb.");
1340 if (len < 7 || p == NULL) {
1341 talloc_free(s);
1342 continue;
1345 /* only accept names ending with .tdb. and any number of digits */
1346 q = p+5;
1347 while (*q != 0 && invalid_name == 0) {
1348 if (!isdigit(*q++)) {
1349 invalid_name = 1;
1352 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1353 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1354 talloc_free(s);
1355 continue;
1357 p[4] = 0;
1359 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1360 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1361 closedir(d);
1362 talloc_free(s);
1363 return -1;
1366 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1368 talloc_free(s);
1370 closedir(d);
1371 return 0;
1374 int ctdb_attach_databases(struct ctdb_context *ctdb)
1376 int ret;
1377 char *persistent_health_path = NULL;
1378 char *unhealthy_reason = NULL;
1379 bool first_try = true;
1381 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1382 ctdb->db_directory_state,
1383 PERSISTENT_HEALTH_TDB,
1384 ctdb->pnn);
1385 if (persistent_health_path == NULL) {
1386 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1387 return -1;
1390 again:
1392 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1393 0, TDB_DISALLOW_NESTING,
1394 O_CREAT | O_RDWR, 0600);
1395 if (ctdb->db_persistent_health == NULL) {
1396 struct tdb_wrap *tdb;
1398 if (!first_try) {
1399 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1400 persistent_health_path,
1401 errno,
1402 strerror(errno)));
1403 talloc_free(persistent_health_path);
1404 talloc_free(unhealthy_reason);
1405 return -1;
1407 first_try = false;
1409 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1410 persistent_health_path,
1411 "was cleared after a failure",
1412 "manual verification needed");
1413 if (unhealthy_reason == NULL) {
1414 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1415 talloc_free(persistent_health_path);
1416 return -1;
1419 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1420 persistent_health_path));
1421 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1422 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1423 O_CREAT | O_RDWR, 0600);
1424 if (tdb) {
1425 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1426 persistent_health_path,
1427 errno,
1428 strerror(errno)));
1429 talloc_free(persistent_health_path);
1430 talloc_free(unhealthy_reason);
1431 return -1;
1434 talloc_free(tdb);
1435 goto again;
1437 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1438 if (ret != 0) {
1439 struct tdb_wrap *tdb;
1441 talloc_free(ctdb->db_persistent_health);
1442 ctdb->db_persistent_health = NULL;
1444 if (!first_try) {
1445 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1446 persistent_health_path));
1447 talloc_free(persistent_health_path);
1448 talloc_free(unhealthy_reason);
1449 return -1;
1451 first_try = false;
1453 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1454 persistent_health_path,
1455 "was cleared after a failure",
1456 "manual verification needed");
1457 if (unhealthy_reason == NULL) {
1458 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1459 talloc_free(persistent_health_path);
1460 return -1;
1463 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1464 persistent_health_path));
1465 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1466 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1467 O_CREAT | O_RDWR, 0600);
1468 if (tdb) {
1469 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1470 persistent_health_path,
1471 errno,
1472 strerror(errno)));
1473 talloc_free(persistent_health_path);
1474 talloc_free(unhealthy_reason);
1475 return -1;
1478 talloc_free(tdb);
1479 goto again;
1481 talloc_free(persistent_health_path);
1483 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1484 talloc_free(unhealthy_reason);
1485 if (ret != 0) {
1486 return ret;
1489 return 0;
1493 called when a broadcast seqnum update comes in
1495 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1497 struct ctdb_db_context *ctdb_db;
1498 if (srcnode == ctdb->pnn) {
1499 /* don't update ourselves! */
1500 return 0;
1503 ctdb_db = find_ctdb_db(ctdb, db_id);
1504 if (!ctdb_db) {
1505 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1506 return -1;
1509 if (ctdb_db->unhealthy_reason) {
1510 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1511 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1512 return -1;
1515 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1516 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1517 return 0;
1521 timer to check for seqnum changes in a ltdb and propogate them
1523 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1524 struct tevent_timer *te,
1525 struct timeval t, void *p)
1527 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1528 struct ctdb_context *ctdb = ctdb_db->ctdb;
1529 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1530 if (new_seqnum != ctdb_db->seqnum) {
1531 /* something has changed - propogate it */
1532 TDB_DATA data;
1533 data.dptr = (uint8_t *)&ctdb_db->db_id;
1534 data.dsize = sizeof(uint32_t);
1535 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1536 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1537 data, NULL, NULL);
1539 ctdb_db->seqnum = new_seqnum;
1541 /* setup a new timer */
1542 ctdb_db->seqnum_update =
1543 tevent_add_timer(ctdb->ev, ctdb_db,
1544 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1545 (ctdb->tunable.seqnum_interval%1000)*1000),
1546 ctdb_ltdb_seqnum_check, ctdb_db);
1550 enable seqnum handling on this db
1552 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1554 struct ctdb_db_context *ctdb_db;
1555 ctdb_db = find_ctdb_db(ctdb, db_id);
1556 if (!ctdb_db) {
1557 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1558 return -1;
1561 if (ctdb_db->seqnum_update == NULL) {
1562 ctdb_db->seqnum_update = tevent_add_timer(
1563 ctdb->ev, ctdb_db,
1564 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1565 (ctdb->tunable.seqnum_interval%1000)*1000),
1566 ctdb_ltdb_seqnum_check, ctdb_db);
1569 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1570 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1571 return 0;
1574 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1576 if (ctdb_db->sticky) {
1577 return 0;
1580 if (ctdb_db->persistent) {
1581 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1582 return -1;
1585 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1587 ctdb_db->sticky = true;
1589 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1591 return 0;
1594 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1596 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1597 int i;
1599 for (i=0; i<MAX_HOT_KEYS; i++) {
1600 if (s->hot_keys[i].key.dsize > 0) {
1601 talloc_free(s->hot_keys[i].key.dptr);
1605 ZERO_STRUCT(ctdb_db->statistics);
1608 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1609 uint32_t db_id,
1610 TDB_DATA *outdata)
1612 struct ctdb_db_context *ctdb_db;
1613 struct ctdb_db_statistics_old *stats;
1614 int i;
1615 int len;
1616 char *ptr;
1618 ctdb_db = find_ctdb_db(ctdb, db_id);
1619 if (!ctdb_db) {
1620 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1621 return -1;
1624 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1625 for (i = 0; i < MAX_HOT_KEYS; i++) {
1626 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1629 stats = talloc_size(outdata, len);
1630 if (stats == NULL) {
1631 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1632 return -1;
1635 memcpy(stats, &ctdb_db->statistics,
1636 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1638 stats->num_hot_keys = MAX_HOT_KEYS;
1640 ptr = &stats->hot_keys_wire[0];
1641 for (i = 0; i < MAX_HOT_KEYS; i++) {
1642 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1643 ctdb_db->statistics.hot_keys[i].key.dsize);
1644 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1647 outdata->dptr = (uint8_t *)stats;
1648 outdata->dsize = len;
1650 return 0;