2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
47 * write a record to a normal database
49 * This is the server-variant of the ctdb_ltdb_store function.
50 * It contains logic to determine whether a record should be
51 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52 * controls to the local ctdb daemon if apporpriate.
54 static int ctdb_ltdb_store_server(struct ctdb_db_context
*ctdb_db
,
56 struct ctdb_ltdb_header
*header
,
59 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
61 uint32_t hsize
= sizeof(struct ctdb_ltdb_header
);
63 bool seqnum_suppressed
= false;
65 bool schedule_for_deletion
= false;
66 bool remove_from_delete_queue
= false;
69 if (ctdb
->flags
& CTDB_FLAG_TORTURE
) {
71 struct ctdb_ltdb_header
*h2
;
73 old
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
74 h2
= (struct ctdb_ltdb_header
*)old
.dptr
;
75 if (old
.dptr
!= NULL
&&
77 h2
->rsn
> header
->rsn
) {
79 ("RSN regression! %"PRIu64
" %"PRIu64
"\n",
80 h2
->rsn
, header
->rsn
));
87 if (ctdb
->vnn_map
== NULL
) {
89 * Called from a client: always store the record
90 * Also don't call ctdb_lmaster since it uses the vnn_map!
96 lmaster
= ctdb_lmaster(ctdb_db
->ctdb
, &key
);
99 * If we migrate an empty record off to another node
100 * and the record has not been migrated with data,
101 * delete the record instead of storing the empty record.
103 if (data
.dsize
!= 0) {
105 } else if (header
->flags
& CTDB_REC_RO_FLAGS
) {
107 } else if (header
->flags
& CTDB_REC_FLAG_AUTOMATIC
) {
109 * The record is not created by the client but
110 * automatically by the ctdb_ltdb_fetch logic that
111 * creates a record with an initial header in the
112 * ltdb before trying to migrate the record from
113 * the current lmaster. Keep it instead of trying
114 * to delete the non-existing record...
117 schedule_for_deletion
= true;
118 } else if (header
->flags
& CTDB_REC_FLAG_MIGRATED_WITH_DATA
) {
120 } else if (ctdb_db
->ctdb
->pnn
== lmaster
) {
122 * If we are lmaster, then we usually keep the record.
123 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124 * and the record is empty and has never been migrated
125 * with data, then we should delete it instead of storing it.
126 * This is part of the vacuuming process.
128 * The reason that we usually need to store even empty records
129 * on the lmaster is that a client operating directly on the
130 * lmaster (== dmaster) expects the local copy of the record to
131 * exist after successful ctdb migrate call. If the record does
132 * not exist, the client goes into a migrate loop and eventually
133 * fails. So storing the empty record makes sure that we do not
134 * need to change the client code.
136 if (!(header
->flags
& CTDB_REC_FLAG_VACUUM_MIGRATED
)) {
138 } else if (ctdb_db
->ctdb
->pnn
!= header
->dmaster
) {
141 } else if (ctdb_db
->ctdb
->pnn
== header
->dmaster
) {
146 if (ctdb_db_volatile(ctdb_db
) &&
147 (ctdb_db
->ctdb
->pnn
== header
->dmaster
) &&
148 !(header
->flags
& CTDB_REC_RO_FLAGS
))
152 if (data
.dsize
== 0) {
153 schedule_for_deletion
= true;
156 remove_from_delete_queue
= !schedule_for_deletion
;
161 * The VACUUM_MIGRATED flag is only set temporarily for
162 * the above logic when the record was retrieved by a
163 * VACUUM_MIGRATE call and should not be stored in the
166 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167 * and there are two cases in which the corresponding record
168 * is stored in the local database:
169 * 1. The record has been migrated with data in the past
170 * (the MIGRATED_WITH_DATA record flag is set).
171 * 2. The record has been filled with data again since it
172 * had been submitted in the VACUUM_FETCH message to the
174 * For such records it is important to not store the
175 * VACUUM_MIGRATED flag in the database.
177 header
->flags
&= ~CTDB_REC_FLAG_VACUUM_MIGRATED
;
180 * Similarly, clear the AUTOMATIC flag which should not enter
181 * the local database copy since this would require client
182 * modifications to clear the flag when the client stores
185 header
->flags
&= ~CTDB_REC_FLAG_AUTOMATIC
;
187 rec
[0].dsize
= hsize
;
188 rec
[0].dptr
= (uint8_t *)header
;
190 rec
[1].dsize
= data
.dsize
;
191 rec
[1].dptr
= data
.dptr
;
193 /* Databases with seqnum updates enabled only get their seqnum
194 changes when/if we modify the data */
195 if (ctdb_db
->seqnum_update
!= NULL
) {
197 old
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
199 if ((old
.dsize
== hsize
+ data
.dsize
) &&
200 memcmp(old
.dptr
+ hsize
, data
.dptr
, data
.dsize
) == 0) {
201 tdb_remove_flags(ctdb_db
->ltdb
->tdb
, TDB_SEQNUM
);
202 seqnum_suppressed
= true;
204 if (old
.dptr
!= NULL
) {
209 DEBUG(DEBUG_DEBUG
, (__location__
" db[%s]: %s record: hash[0x%08x]\n",
211 keep
?"storing":"deleting",
215 ret
= tdb_storev(ctdb_db
->ltdb
->tdb
, key
, rec
, 2, TDB_REPLACE
);
217 ret
= tdb_delete(ctdb_db
->ltdb
->tdb
, key
);
224 tdb_error(ctdb_db
->ltdb
->tdb
) == TDB_ERR_NOEXIST
)
229 DEBUG(lvl
, (__location__
" db[%s]: Failed to %s record: "
232 keep
?"store":"delete", ret
,
233 tdb_errorstr(ctdb_db
->ltdb
->tdb
)));
235 schedule_for_deletion
= false;
236 remove_from_delete_queue
= false;
238 if (seqnum_suppressed
) {
239 tdb_add_flags(ctdb_db
->ltdb
->tdb
, TDB_SEQNUM
);
242 if (schedule_for_deletion
) {
244 ret2
= ctdb_local_schedule_for_deletion(ctdb_db
, header
, key
);
246 DEBUG(DEBUG_ERR
, (__location__
" ctdb_local_schedule_for_deletion failed.\n"));
250 if (remove_from_delete_queue
) {
251 ctdb_local_remove_from_delete_queue(ctdb_db
, header
, key
);
257 struct lock_fetch_state
{
258 struct ctdb_context
*ctdb
;
259 struct ctdb_db_context
*ctdb_db
;
260 void (*recv_pkt
)(void *, struct ctdb_req_header
*);
262 struct ctdb_req_header
*hdr
;
264 bool ignore_generation
;
268 called when we should retry the operation
270 static void lock_fetch_callback(void *p
, bool locked
)
272 struct lock_fetch_state
*state
= talloc_get_type(p
, struct lock_fetch_state
);
273 if (!state
->ignore_generation
&&
274 state
->generation
!= state
->ctdb_db
->generation
) {
275 DEBUG(DEBUG_NOTICE
,("Discarding previous generation lockwait packet\n"));
276 talloc_free(state
->hdr
);
279 state
->recv_pkt(state
->recv_context
, state
->hdr
);
280 DEBUG(DEBUG_INFO
,(__location__
" PACKET REQUEUED\n"));
285 do a non-blocking ltdb_lock, deferring this ctdb request until we
288 It does the following:
290 1) tries to get the chainlock. If it succeeds, then it returns 0
292 2) if it fails to get a chainlock immediately then it sets up a
293 non-blocking chainlock via ctdb_lock_record, and when it gets the
294 chainlock it re-submits this ctdb request to the main packet
297 This effectively queues all ctdb requests that cannot be
298 immediately satisfied until it can get the lock. This means that
299 the main ctdb daemon will not block waiting for a chainlock held by
302 There are 3 possible return values:
304 0: means that it got the lock immediately.
305 -1: means that it failed to get the lock, and won't retry
306 -2: means that it failed to get the lock immediately, but will retry
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context
*ctdb_db
,
309 TDB_DATA key
, struct ctdb_req_header
*hdr
,
310 void (*recv_pkt
)(void *, struct ctdb_req_header
*),
311 void *recv_context
, bool ignore_generation
)
314 struct tdb_context
*tdb
= ctdb_db
->ltdb
->tdb
;
315 struct lock_request
*lreq
;
316 struct lock_fetch_state
*state
;
318 ret
= tdb_chainlock_nonblock(tdb
, key
);
321 !(errno
== EACCES
|| errno
== EAGAIN
|| errno
== EDEADLK
)) {
322 /* a hard failure - don't try again */
326 /* when torturing, ensure we test the contended path */
327 if ((ctdb_db
->ctdb
->flags
& CTDB_FLAG_TORTURE
) &&
330 tdb_chainunlock(tdb
, key
);
333 /* first the non-contended path */
338 state
= talloc(hdr
, struct lock_fetch_state
);
339 state
->ctdb
= ctdb_db
->ctdb
;
340 state
->ctdb_db
= ctdb_db
;
342 state
->recv_pkt
= recv_pkt
;
343 state
->recv_context
= recv_context
;
344 state
->generation
= ctdb_db
->generation
;
345 state
->ignore_generation
= ignore_generation
;
347 /* now the contended path */
348 lreq
= ctdb_lock_record(state
, ctdb_db
, key
, true, lock_fetch_callback
, state
);
353 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354 so it won't be freed yet */
355 talloc_steal(state
, hdr
);
357 /* now tell the caller than we will retry asynchronously */
362 a varient of ctdb_ltdb_lock_requeue that also fetches the record
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context
*ctdb_db
,
365 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
366 struct ctdb_req_header
*hdr
, TDB_DATA
*data
,
367 void (*recv_pkt
)(void *, struct ctdb_req_header
*),
368 void *recv_context
, bool ignore_generation
)
372 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
, recv_pkt
,
373 recv_context
, ignore_generation
);
375 ret
= ctdb_ltdb_fetch(ctdb_db
, key
, header
, hdr
, data
);
378 uret
= ctdb_ltdb_unlock(ctdb_db
, key
);
380 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", uret
));
389 paraoid check to see if the db is empty
391 static void ctdb_check_db_empty(struct ctdb_db_context
*ctdb_db
)
393 struct tdb_context
*tdb
= ctdb_db
->ltdb
->tdb
;
394 int count
= tdb_traverse_read(tdb
, NULL
, NULL
);
396 DEBUG(DEBUG_ALERT
,(__location__
" tdb '%s' not empty on attach! aborting\n",
398 ctdb_fatal(ctdb_db
->ctdb
, "database not empty on attach");
402 int ctdb_load_persistent_health(struct ctdb_context
*ctdb
,
403 struct ctdb_db_context
*ctdb_db
)
405 struct tdb_context
*tdb
= ctdb
->db_persistent_health
->tdb
;
411 key
.dptr
= discard_const_p(uint8_t, ctdb_db
->db_name
);
412 key
.dsize
= strlen(ctdb_db
->db_name
);
414 old
= ctdb_db
->unhealthy_reason
;
415 ctdb_db
->unhealthy_reason
= NULL
;
417 val
= tdb_fetch(tdb
, key
);
419 reason
= talloc_strndup(ctdb_db
,
420 (const char *)val
.dptr
,
422 if (reason
== NULL
) {
423 DEBUG(DEBUG_ALERT
,(__location__
" talloc_strndup(%d) failed\n",
425 ctdb_db
->unhealthy_reason
= old
;
436 ctdb_db
->unhealthy_reason
= reason
;
440 int ctdb_update_persistent_health(struct ctdb_context
*ctdb
,
441 struct ctdb_db_context
*ctdb_db
,
442 const char *given_reason
,/* NULL means healthy */
443 int num_healthy_nodes
)
445 struct tdb_context
*tdb
= ctdb
->db_persistent_health
->tdb
;
449 char *new_reason
= NULL
;
450 char *old_reason
= NULL
;
452 ret
= tdb_transaction_start(tdb
);
454 DEBUG(DEBUG_ALERT
,(__location__
" tdb_transaction_start('%s') failed: %d - %s\n",
455 tdb_name(tdb
), ret
, tdb_errorstr(tdb
)));
459 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
461 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_load_persistent_health('%s') failed: %d\n",
462 ctdb_db
->db_name
, ret
));
465 old_reason
= ctdb_db
->unhealthy_reason
;
467 key
.dptr
= discard_const_p(uint8_t, ctdb_db
->db_name
);
468 key
.dsize
= strlen(ctdb_db
->db_name
);
471 new_reason
= talloc_strdup(ctdb_db
, given_reason
);
472 if (new_reason
== NULL
) {
473 DEBUG(DEBUG_ALERT
,(__location__
" talloc_strdup(%s) failed\n",
477 } else if (old_reason
&& num_healthy_nodes
== 0) {
479 * If the reason indicates ok, but there where no healthy nodes
480 * available, that it means, we have not recovered valid content
481 * of the db. So if there's an old reason, prefix it with
482 * "NO-HEALTHY-NODES - "
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487 ret
= strncmp(_TMP_PREFIX
, old_reason
, strlen(_TMP_PREFIX
));
489 prefix
= _TMP_PREFIX
;
493 new_reason
= talloc_asprintf(ctdb_db
, "%s%s",
495 if (new_reason
== NULL
) {
496 DEBUG(DEBUG_ALERT
,(__location__
" talloc_asprintf(%s%s) failed\n",
497 prefix
, old_reason
));
504 val
.dptr
= discard_const_p(uint8_t, new_reason
);
505 val
.dsize
= strlen(new_reason
);
507 ret
= tdb_store(tdb
, key
, val
, TDB_REPLACE
);
509 tdb_transaction_cancel(tdb
);
510 DEBUG(DEBUG_ALERT
,(__location__
" tdb_store('%s', %s, %s) failed: %d - %s\n",
511 tdb_name(tdb
), ctdb_db
->db_name
, new_reason
,
512 ret
, tdb_errorstr(tdb
)));
513 talloc_free(new_reason
);
516 DEBUG(DEBUG_ALERT
,("Updated db health for db(%s) to: %s\n",
517 ctdb_db
->db_name
, new_reason
));
518 } else if (old_reason
) {
519 ret
= tdb_delete(tdb
, key
);
521 tdb_transaction_cancel(tdb
);
522 DEBUG(DEBUG_ALERT
,(__location__
" tdb_delete('%s', %s) failed: %d - %s\n",
523 tdb_name(tdb
), ctdb_db
->db_name
,
524 ret
, tdb_errorstr(tdb
)));
525 talloc_free(new_reason
);
528 DEBUG(DEBUG_NOTICE
,("Updated db health for db(%s): OK\n",
532 ret
= tdb_transaction_commit(tdb
);
533 if (ret
!= TDB_SUCCESS
) {
534 DEBUG(DEBUG_ALERT
,(__location__
" tdb_transaction_commit('%s') failed: %d - %s\n",
535 tdb_name(tdb
), ret
, tdb_errorstr(tdb
)));
536 talloc_free(new_reason
);
540 talloc_free(old_reason
);
541 ctdb_db
->unhealthy_reason
= new_reason
;
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context
*ctdb
,
547 struct ctdb_db_context
*ctdb_db
)
549 time_t now
= time(NULL
);
557 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558 new_path
= talloc_asprintf(ctdb_db
, "%s.corrupted."
559 "%04u%02u%02u%02u%02u%02u.0Z",
561 tm
->tm_year
+1900, tm
->tm_mon
+1,
562 tm
->tm_mday
, tm
->tm_hour
, tm
->tm_min
,
564 if (new_path
== NULL
) {
565 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
569 new_reason
= talloc_asprintf(ctdb_db
,
570 "ERROR - Backup of corrupted TDB in '%s'",
572 if (new_reason
== NULL
) {
573 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
576 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
, new_reason
, 0);
577 talloc_free(new_reason
);
579 DEBUG(DEBUG_CRIT
,(__location__
580 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
585 ret
= rename(ctdb_db
->db_path
, new_path
);
587 DEBUG(DEBUG_CRIT
,(__location__
588 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589 ctdb_db
->db_path
, new_path
,
590 errno
, strerror(errno
)));
591 talloc_free(new_path
);
595 DEBUG(DEBUG_CRIT
,(__location__
596 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597 ctdb_db
->db_path
, new_path
));
598 talloc_free(new_path
);
602 int ctdb_recheck_persistent_health(struct ctdb_context
*ctdb
)
604 struct ctdb_db_context
*ctdb_db
;
609 for (ctdb_db
= ctdb
->db_list
; ctdb_db
; ctdb_db
= ctdb_db
->next
) {
610 if (!ctdb_db_persistent(ctdb_db
)) {
614 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
616 DEBUG(DEBUG_ALERT
,(__location__
617 " load persistent health for '%s' failed\n",
622 if (ctdb_db
->unhealthy_reason
== NULL
) {
624 DEBUG(DEBUG_INFO
,(__location__
625 " persistent db '%s' healthy\n",
631 DEBUG(DEBUG_ALERT
,(__location__
632 " persistent db '%s' unhealthy: %s\n",
634 ctdb_db
->unhealthy_reason
));
637 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
649 mark a database - as healthy
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context
*ctdb
, TDB_DATA indata
)
653 uint32_t db_id
= *(uint32_t *)indata
.dptr
;
654 struct ctdb_db_context
*ctdb_db
;
656 bool may_recover
= false;
658 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
660 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%x\n", db_id
));
664 if (ctdb_db
->unhealthy_reason
) {
668 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
, NULL
, 1);
670 DEBUG(DEBUG_ERR
,(__location__
671 " ctdb_update_persistent_health(%s) failed\n",
676 if (may_recover
&& ctdb
->runstate
== CTDB_RUNSTATE_STARTUP
) {
677 DEBUG(DEBUG_ERR
, (__location__
" db %s become healthy - force recovery for startup\n",
679 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
685 int32_t ctdb_control_db_get_health(struct ctdb_context
*ctdb
,
689 uint32_t db_id
= *(uint32_t *)indata
.dptr
;
690 struct ctdb_db_context
*ctdb_db
;
693 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
695 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%x\n", db_id
));
699 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
701 DEBUG(DEBUG_ERR
,(__location__
702 " ctdb_load_persistent_health(%s) failed\n",
708 if (ctdb_db
->unhealthy_reason
) {
709 outdata
->dptr
= (uint8_t *)ctdb_db
->unhealthy_reason
;
710 outdata
->dsize
= strlen(ctdb_db
->unhealthy_reason
)+1;
717 int ctdb_set_db_readonly(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
)
721 if (ctdb_db_readonly(ctdb_db
)) {
725 if (! ctdb_db_volatile(ctdb_db
)) {
727 ("Non-volatile databases do not support readonly flag\n"));
731 ropath
= talloc_asprintf(ctdb_db
, "%s.RO", ctdb_db
->db_path
);
732 if (ropath
== NULL
) {
733 DEBUG(DEBUG_CRIT
,("Failed to asprintf the tracking database\n"));
736 ctdb_db
->rottdb
= tdb_open(ropath
,
737 ctdb
->tunable
.database_hash_size
,
738 TDB_NOLOCK
|TDB_CLEAR_IF_FIRST
|TDB_NOSYNC
,
739 O_CREAT
|O_RDWR
, 0600);
740 if (ctdb_db
->rottdb
== NULL
) {
741 DEBUG(DEBUG_CRIT
,("Failed to open/create the tracking database '%s'\n", ropath
));
746 DEBUG(DEBUG_NOTICE
,("OPENED tracking database : '%s'\n", ropath
));
748 ctdb_db_set_readonly(ctdb_db
);
750 DEBUG(DEBUG_NOTICE
, ("Readonly property set on DB %s\n", ctdb_db
->db_name
));
757 attach to a database, handling both persistent and non-persistent databases
758 return 0 on success, -1 on failure
760 static int ctdb_local_attach(struct ctdb_context
*ctdb
, const char *db_name
,
761 uint8_t db_flags
, const char *unhealthy_reason
)
763 struct ctdb_db_context
*ctdb_db
, *tmp_db
;
768 int remaining_tries
= 0;
770 ctdb_db
= talloc_zero(ctdb
, struct ctdb_db_context
);
771 CTDB_NO_MEMORY(ctdb
, ctdb_db
);
773 ctdb_db
->ctdb
= ctdb
;
774 ctdb_db
->db_name
= talloc_strdup(ctdb_db
, db_name
);
775 CTDB_NO_MEMORY(ctdb
, ctdb_db
->db_name
);
777 key
.dsize
= strlen(db_name
)+1;
778 key
.dptr
= discard_const(db_name
);
779 ctdb_db
->db_id
= ctdb_hash(&key
);
780 ctdb_db
->db_flags
= db_flags
;
782 if (ctdb_db_volatile(ctdb_db
)) {
783 ctdb_db
->delete_queue
= trbt_create(ctdb_db
, 0);
784 if (ctdb_db
->delete_queue
== NULL
) {
785 CTDB_NO_MEMORY(ctdb
, ctdb_db
->delete_queue
);
788 ctdb_db
->ctdb_ltdb_store_fn
= ctdb_ltdb_store_server
;
791 /* check for hash collisions */
792 for (tmp_db
=ctdb
->db_list
;tmp_db
;tmp_db
=tmp_db
->next
) {
793 if (tmp_db
->db_id
== ctdb_db
->db_id
) {
794 DEBUG(DEBUG_CRIT
,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
795 tmp_db
->db_id
, db_name
, tmp_db
->db_name
));
796 talloc_free(ctdb_db
);
801 if (ctdb_db_persistent(ctdb_db
)) {
802 if (unhealthy_reason
) {
803 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
,
804 unhealthy_reason
, 0);
806 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_update_persistent_health('%s','%s') failed: %d\n",
807 ctdb_db
->db_name
, unhealthy_reason
, ret
));
808 talloc_free(ctdb_db
);
813 if (ctdb
->max_persistent_check_errors
> 0) {
816 if (ctdb
->runstate
== CTDB_RUNSTATE_RUNNING
) {
820 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
822 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_load_persistent_health('%s') failed: %d\n",
823 ctdb_db
->db_name
, ret
));
824 talloc_free(ctdb_db
);
829 if (ctdb_db
->unhealthy_reason
&& remaining_tries
== 0) {
830 DEBUG(DEBUG_ALERT
,(__location__
"ERROR: tdb %s is marked as unhealthy: %s\n",
831 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
832 talloc_free(ctdb_db
);
836 if (ctdb_db
->unhealthy_reason
) {
837 /* this is just a warning, but we want that in the log file! */
838 DEBUG(DEBUG_ALERT
,(__location__
"Warning: tdb %s is marked as unhealthy: %s\n",
839 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
842 /* open the database */
843 ctdb_db
->db_path
= talloc_asprintf(ctdb_db
, "%s/%s.%u",
844 ctdb_db_persistent(ctdb_db
) ?
845 ctdb
->db_directory_persistent
:
849 tdb_flags
= ctdb_db_tdb_flags(db_flags
, ctdb
->valgrinding
,
850 ctdb
->tunable
.mutex_enabled
);
853 ctdb_db
->ltdb
= tdb_wrap_open(ctdb_db
, ctdb_db
->db_path
,
854 ctdb
->tunable
.database_hash_size
,
856 O_CREAT
|O_RDWR
, mode
);
857 if (ctdb_db
->ltdb
== NULL
) {
859 int saved_errno
= errno
;
861 if (! ctdb_db_persistent(ctdb_db
)) {
862 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s': %d - %s\n",
865 strerror(saved_errno
)));
866 talloc_free(ctdb_db
);
870 if (remaining_tries
== 0) {
871 DEBUG(DEBUG_CRIT
,(__location__
872 "Failed to open persistent tdb '%s': %d - %s\n",
875 strerror(saved_errno
)));
876 talloc_free(ctdb_db
);
880 ret
= stat(ctdb_db
->db_path
, &st
);
882 DEBUG(DEBUG_CRIT
,(__location__
883 "Failed to open persistent tdb '%s': %d - %s\n",
886 strerror(saved_errno
)));
887 talloc_free(ctdb_db
);
891 ret
= ctdb_backup_corrupted_tdb(ctdb
, ctdb_db
);
893 DEBUG(DEBUG_CRIT
,(__location__
894 "Failed to open persistent tdb '%s': %d - %s\n",
897 strerror(saved_errno
)));
898 talloc_free(ctdb_db
);
907 if (!ctdb_db_persistent(ctdb_db
)) {
908 ctdb_check_db_empty(ctdb_db
);
910 ret
= tdb_check(ctdb_db
->ltdb
->tdb
, NULL
, NULL
);
915 DEBUG(DEBUG_CRIT
,("tdb_check(%s) failed: %d - %s\n",
916 ctdb_db
->db_path
, ret
,
917 tdb_errorstr(ctdb_db
->ltdb
->tdb
)));
918 if (remaining_tries
== 0) {
919 talloc_free(ctdb_db
);
923 fd
= tdb_fd(ctdb_db
->ltdb
->tdb
);
924 ret
= fstat(fd
, &st
);
926 DEBUG(DEBUG_CRIT
,(__location__
927 "Failed to fstat() persistent tdb '%s': %d - %s\n",
931 talloc_free(ctdb_db
);
936 talloc_free(ctdb_db
->ltdb
);
937 ctdb_db
->ltdb
= NULL
;
939 ret
= ctdb_backup_corrupted_tdb(ctdb
, ctdb_db
);
941 DEBUG(DEBUG_CRIT
,("Failed to backup corrupted tdb '%s'\n",
943 talloc_free(ctdb_db
);
953 /* remember the flags the client has specified */
954 tdb_add_flags(ctdb_db
->ltdb
->tdb
, tdb_flags
);
957 /* set up a rb tree we can use to track which records we have a
958 fetch-lock in-flight for so we can defer any additional calls
961 ctdb_db
->deferred_fetch
= trbt_create(ctdb_db
, 0);
962 if (ctdb_db
->deferred_fetch
== NULL
) {
963 DEBUG(DEBUG_ERR
,("Failed to create deferred fetch rb tree for ctdb database\n"));
964 talloc_free(ctdb_db
);
968 ctdb_db
->defer_dmaster
= trbt_create(ctdb_db
, 0);
969 if (ctdb_db
->defer_dmaster
== NULL
) {
970 DEBUG(DEBUG_ERR
, ("Failed to create defer dmaster rb tree for %s\n",
972 talloc_free(ctdb_db
);
976 DLIST_ADD(ctdb
->db_list
, ctdb_db
);
978 /* setting this can help some high churn databases */
979 tdb_set_max_dead(ctdb_db
->ltdb
->tdb
, ctdb
->tunable
.database_max_dead
);
982 all databases support the "null" function. we need this in
983 order to do forced migration of records
985 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_null_func
, CTDB_NULL_FUNC
);
987 DEBUG(DEBUG_CRIT
,("Failed to setup null function for '%s'\n", ctdb_db
->db_name
));
988 talloc_free(ctdb_db
);
993 all databases support the "fetch" function. we need this
994 for efficient Samba3 ctdb fetch
996 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_fetch_func
, CTDB_FETCH_FUNC
);
998 DEBUG(DEBUG_CRIT
,("Failed to setup fetch function for '%s'\n", ctdb_db
->db_name
));
999 talloc_free(ctdb_db
);
1004 all databases support the "fetch_with_header" function. we need this
1005 for efficient readonly record fetches
1007 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_fetch_with_header_func
, CTDB_FETCH_WITH_HEADER_FUNC
);
1009 DEBUG(DEBUG_CRIT
,("Failed to setup fetch function for '%s'\n", ctdb_db
->db_name
));
1010 talloc_free(ctdb_db
);
1014 ret
= ctdb_vacuum_init(ctdb_db
);
1016 DEBUG(DEBUG_CRIT
,("Failed to setup vacuuming for "
1017 "database '%s'\n", ctdb_db
->db_name
));
1018 talloc_free(ctdb_db
);
1022 ret
= ctdb_migration_init(ctdb_db
);
1025 ("Failed to setup migration tracking for db '%s'\n",
1027 talloc_free(ctdb_db
);
1031 ret
= db_hash_init(ctdb_db
, "lock_log", 2048, DB_HASH_COMPLEX
,
1032 &ctdb_db
->lock_log
);
1035 ("Failed to setup lock logging for db '%s'\n",
1037 talloc_free(ctdb_db
);
1041 ctdb_db
->generation
= ctdb
->vnn_map
->generation
;
1043 DEBUG(DEBUG_NOTICE
,("Attached to database '%s' with flags 0x%x\n",
1044 ctdb_db
->db_path
, tdb_flags
));
1051 struct ctdb_deferred_attach_context
{
1052 struct ctdb_deferred_attach_context
*next
, *prev
;
1053 struct ctdb_context
*ctdb
;
1054 struct ctdb_req_control_old
*c
;
1058 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context
*da_ctx
)
1060 DLIST_REMOVE(da_ctx
->ctdb
->deferred_attach
, da_ctx
);
1065 static void ctdb_deferred_attach_timeout(struct tevent_context
*ev
,
1066 struct tevent_timer
*te
,
1067 struct timeval t
, void *private_data
)
1069 struct ctdb_deferred_attach_context
*da_ctx
= talloc_get_type(private_data
, struct ctdb_deferred_attach_context
);
1070 struct ctdb_context
*ctdb
= da_ctx
->ctdb
;
1072 ctdb_request_control_reply(ctdb
, da_ctx
->c
, NULL
, -1, NULL
);
1073 talloc_free(da_ctx
);
1076 static void ctdb_deferred_attach_callback(struct tevent_context
*ev
,
1077 struct tevent_timer
*te
,
1078 struct timeval t
, void *private_data
)
1080 struct ctdb_deferred_attach_context
*da_ctx
= talloc_get_type(private_data
, struct ctdb_deferred_attach_context
);
1081 struct ctdb_context
*ctdb
= da_ctx
->ctdb
;
1083 /* This talloc-steals the packet ->c */
1084 ctdb_input_pkt(ctdb
, (struct ctdb_req_header
*)da_ctx
->c
);
1085 talloc_free(da_ctx
);
1088 int ctdb_process_deferred_attach(struct ctdb_context
*ctdb
)
1090 struct ctdb_deferred_attach_context
*da_ctx
;
1092 /* call it from the main event loop as soon as the current event
1095 while ((da_ctx
= ctdb
->deferred_attach
) != NULL
) {
1096 DLIST_REMOVE(ctdb
->deferred_attach
, da_ctx
);
1097 tevent_add_timer(ctdb
->ev
, da_ctx
,
1098 timeval_current_ofs(1,0),
1099 ctdb_deferred_attach_callback
, da_ctx
);
1106 a client has asked to attach a new database
1108 int32_t ctdb_control_db_attach(struct ctdb_context
*ctdb
, TDB_DATA indata
,
1110 uint8_t db_flags
, uint32_t client_id
,
1111 struct ctdb_req_control_old
*c
,
1114 const char *db_name
= (const char *)indata
.dptr
;
1115 struct ctdb_db_context
*db
;
1116 struct ctdb_node
*node
= ctdb
->nodes
[ctdb
->pnn
];
1117 struct ctdb_client
*client
= NULL
;
1120 if (ctdb
->tunable
.allow_client_db_attach
== 0) {
1121 DEBUG(DEBUG_ERR
, ("DB Attach to database %s denied by tunable "
1122 "AllowClientDBAccess == 0\n", db_name
));
1126 /* don't allow any local clients to attach while we are in recovery mode
1127 * except for the recovery daemon.
1128 * allow all attach from the network since these are always from remote
1131 if (client_id
!= 0) {
1132 client
= reqid_find(ctdb
->idr
, client_id
, struct ctdb_client
);
1134 if (client
!= NULL
) {
1135 /* If the node is inactive it is not part of the cluster
1136 and we should not allow clients to attach to any
1139 if (node
->flags
& NODE_FLAGS_INACTIVE
) {
1140 DEBUG(DEBUG_ERR
,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name
, node
->flags
));
1144 if (ctdb
->recovery_mode
== CTDB_RECOVERY_ACTIVE
&&
1145 client
->pid
!= ctdb
->recoverd_pid
&&
1146 ctdb
->runstate
< CTDB_RUNSTATE_RUNNING
) {
1147 struct ctdb_deferred_attach_context
*da_ctx
= talloc(client
, struct ctdb_deferred_attach_context
);
1149 if (da_ctx
== NULL
) {
1150 DEBUG(DEBUG_ERR
,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name
, client
->pid
));
1154 da_ctx
->ctdb
= ctdb
;
1155 da_ctx
->c
= talloc_steal(da_ctx
, c
);
1156 talloc_set_destructor(da_ctx
, ctdb_deferred_attach_destructor
);
1157 DLIST_ADD(ctdb
->deferred_attach
, da_ctx
);
1159 tevent_add_timer(ctdb
->ev
, da_ctx
,
1160 timeval_current_ofs(ctdb
->tunable
.deferred_attach_timeout
, 0),
1161 ctdb_deferred_attach_timeout
, da_ctx
);
1163 DEBUG(DEBUG_ERR
,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name
, client
->pid
));
1164 *async_reply
= true;
1169 /* see if we already have this name */
1170 db
= ctdb_db_handle(ctdb
, db_name
);
1172 if ((db
->db_flags
& db_flags
) != db_flags
) {
1174 ("Error: Failed to re-attach with 0x%x flags,"
1175 " database has 0x%x flags\n", db_flags
,
1179 outdata
->dptr
= (uint8_t *)&db
->db_id
;
1180 outdata
->dsize
= sizeof(db
->db_id
);
1184 if (ctdb_local_attach(ctdb
, db_name
, db_flags
, NULL
) != 0) {
1188 db
= ctdb_db_handle(ctdb
, db_name
);
1190 DEBUG(DEBUG_ERR
,("Failed to find db handle for name '%s'\n", db_name
));
1194 outdata
->dptr
= (uint8_t *)&db
->db_id
;
1195 outdata
->dsize
= sizeof(db
->db_id
);
1197 /* Try to ensure it's locked in mem */
1198 lockdown_memory(ctdb
->valgrinding
);
1200 if (ctdb_db_persistent(db
)) {
1201 opcode
= CTDB_CONTROL_DB_ATTACH_PERSISTENT
;
1202 } else if (ctdb_db_replicated(db
)) {
1203 opcode
= CTDB_CONTROL_DB_ATTACH_REPLICATED
;
1205 opcode
= CTDB_CONTROL_DB_ATTACH
;
1208 /* tell all the other nodes about this database */
1209 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_CONNECTED
, 0, opcode
,
1210 0, CTDB_CTRL_FLAG_NOREPLY
,
1211 indata
, NULL
, NULL
);
1218 * a client has asked to detach from a database
1220 int32_t ctdb_control_db_detach(struct ctdb_context
*ctdb
, TDB_DATA indata
,
1224 struct ctdb_db_context
*ctdb_db
;
1225 struct ctdb_client
*client
= NULL
;
1227 db_id
= *(uint32_t *)indata
.dptr
;
1228 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1229 if (ctdb_db
== NULL
) {
1230 DEBUG(DEBUG_ERR
, ("Invalid dbid 0x%08x in DB detach\n",
1235 if (ctdb
->tunable
.allow_client_db_attach
== 1) {
1236 DEBUG(DEBUG_ERR
, ("DB detach from database %s denied. "
1237 "Clients are allowed access to databases "
1238 "(AllowClientDBAccess == 1)\n",
1243 if (! ctdb_db_volatile(ctdb_db
)) {
1245 ("Detaching non-volatile database %s denied\n",
1250 /* Cannot detach from database when in recovery */
1251 if (ctdb
->recovery_mode
== CTDB_RECOVERY_ACTIVE
) {
1252 DEBUG(DEBUG_ERR
, ("DB detach denied while in recovery\n"));
1256 /* If a control comes from a client, then broadcast it to all nodes.
1257 * Do the actual detach only if the control comes from other daemons.
1259 if (client_id
!= 0) {
1260 client
= reqid_find(ctdb
->idr
, client_id
, struct ctdb_client
);
1261 if (client
!= NULL
) {
1262 /* forward the control to all the nodes */
1263 ctdb_daemon_send_control(ctdb
,
1264 CTDB_BROADCAST_CONNECTED
, 0,
1265 CTDB_CONTROL_DB_DETACH
, 0,
1266 CTDB_CTRL_FLAG_NOREPLY
,
1267 indata
, NULL
, NULL
);
1270 DEBUG(DEBUG_ERR
, ("Client has gone away. Failing DB detach "
1271 "for database '%s'\n", ctdb_db
->db_name
));
1275 /* Detach database from recoverd */
1276 if (ctdb_daemon_send_message(ctdb
, ctdb
->pnn
,
1277 CTDB_SRVID_DETACH_DATABASE
,
1279 DEBUG(DEBUG_ERR
, ("Unable to detach DB from recoverd\n"));
1283 /* Disable vacuuming and drop all vacuuming data */
1284 talloc_free(ctdb_db
->vacuum_handle
);
1285 talloc_free(ctdb_db
->delete_queue
);
1287 /* Terminate any deferred fetch */
1288 talloc_free(ctdb_db
->deferred_fetch
);
1290 /* Terminate any traverses */
1291 while (ctdb_db
->traverse
) {
1292 talloc_free(ctdb_db
->traverse
);
1295 /* Terminate any revokes */
1296 while (ctdb_db
->revokechild_active
) {
1297 talloc_free(ctdb_db
->revokechild_active
);
1300 /* Free readonly tracking database */
1301 if (ctdb_db_readonly(ctdb_db
)) {
1302 talloc_free(ctdb_db
->rottdb
);
1305 DLIST_REMOVE(ctdb
->db_list
, ctdb_db
);
1307 DEBUG(DEBUG_NOTICE
, ("Detached from database '%s'\n",
1309 talloc_free(ctdb_db
);
1315 attach to all existing persistent databases
1317 static int ctdb_attach_persistent(struct ctdb_context
*ctdb
,
1318 const char *unhealthy_reason
)
1323 /* open the persistent db directory and scan it for files */
1324 d
= opendir(ctdb
->db_directory_persistent
);
1329 while ((de
=readdir(d
))) {
1331 size_t len
= strlen(de
->d_name
);
1333 int invalid_name
= 0;
1335 s
= talloc_strdup(ctdb
, de
->d_name
);
1338 CTDB_NO_MEMORY(ctdb
, s
);
1341 /* only accept names ending in .tdb */
1342 p
= strstr(s
, ".tdb.");
1343 if (len
< 7 || p
== NULL
) {
1348 /* only accept names ending with .tdb. and any number of digits */
1350 while (*q
!= 0 && invalid_name
== 0) {
1351 if (!isdigit(*q
++)) {
1355 if (invalid_name
== 1 || sscanf(p
+5, "%u", &node
) != 1 || node
!= ctdb
->pnn
) {
1356 DEBUG(DEBUG_ERR
,("Ignoring persistent database '%s'\n", de
->d_name
));
1362 if (ctdb_local_attach(ctdb
, s
, CTDB_DB_FLAGS_PERSISTENT
, unhealthy_reason
) != 0) {
1363 DEBUG(DEBUG_ERR
,("Failed to attach to persistent database '%s'\n", de
->d_name
));
1369 DEBUG(DEBUG_INFO
,("Attached to persistent database %s\n", s
));
1377 int ctdb_attach_databases(struct ctdb_context
*ctdb
)
1380 char *persistent_health_path
= NULL
;
1381 char *unhealthy_reason
= NULL
;
1382 bool first_try
= true;
1384 persistent_health_path
= talloc_asprintf(ctdb
, "%s/%s.%u",
1385 ctdb
->db_directory_state
,
1386 PERSISTENT_HEALTH_TDB
,
1388 if (persistent_health_path
== NULL
) {
1389 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1395 ctdb
->db_persistent_health
= tdb_wrap_open(ctdb
, persistent_health_path
,
1396 0, TDB_DISALLOW_NESTING
,
1397 O_CREAT
| O_RDWR
, 0600);
1398 if (ctdb
->db_persistent_health
== NULL
) {
1399 struct tdb_wrap
*tdb
;
1402 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s': %d - %s\n",
1403 persistent_health_path
,
1406 talloc_free(persistent_health_path
);
1407 talloc_free(unhealthy_reason
);
1412 unhealthy_reason
= talloc_asprintf(ctdb
, "WARNING - '%s' %s - %s",
1413 persistent_health_path
,
1414 "was cleared after a failure",
1415 "manual verification needed");
1416 if (unhealthy_reason
== NULL
) {
1417 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1418 talloc_free(persistent_health_path
);
1422 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1423 persistent_health_path
));
1424 tdb
= tdb_wrap_open(ctdb
, persistent_health_path
,
1425 0, TDB_CLEAR_IF_FIRST
| TDB_DISALLOW_NESTING
,
1426 O_CREAT
| O_RDWR
, 0600);
1428 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1429 persistent_health_path
,
1432 talloc_free(persistent_health_path
);
1433 talloc_free(unhealthy_reason
);
1440 ret
= tdb_check(ctdb
->db_persistent_health
->tdb
, NULL
, NULL
);
1442 struct tdb_wrap
*tdb
;
1444 talloc_free(ctdb
->db_persistent_health
);
1445 ctdb
->db_persistent_health
= NULL
;
1448 DEBUG(DEBUG_CRIT
,("tdb_check('%s') failed\n",
1449 persistent_health_path
));
1450 talloc_free(persistent_health_path
);
1451 talloc_free(unhealthy_reason
);
1456 unhealthy_reason
= talloc_asprintf(ctdb
, "WARNING - '%s' %s - %s",
1457 persistent_health_path
,
1458 "was cleared after a failure",
1459 "manual verification needed");
1460 if (unhealthy_reason
== NULL
) {
1461 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1462 talloc_free(persistent_health_path
);
1466 DEBUG(DEBUG_CRIT
,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1467 persistent_health_path
));
1468 tdb
= tdb_wrap_open(ctdb
, persistent_health_path
,
1469 0, TDB_CLEAR_IF_FIRST
| TDB_DISALLOW_NESTING
,
1470 O_CREAT
| O_RDWR
, 0600);
1472 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1473 persistent_health_path
,
1476 talloc_free(persistent_health_path
);
1477 talloc_free(unhealthy_reason
);
1484 talloc_free(persistent_health_path
);
1486 ret
= ctdb_attach_persistent(ctdb
, unhealthy_reason
);
1487 talloc_free(unhealthy_reason
);
1496 called when a broadcast seqnum update comes in
1498 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context
*ctdb
, uint32_t db_id
, uint32_t srcnode
)
1500 struct ctdb_db_context
*ctdb_db
;
1501 if (srcnode
== ctdb
->pnn
) {
1502 /* don't update ourselves! */
1506 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1508 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id
));
1512 if (ctdb_db
->unhealthy_reason
) {
1513 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1514 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
1518 tdb_increment_seqnum_nonblock(ctdb_db
->ltdb
->tdb
);
1519 ctdb_db
->seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1524 timer to check for seqnum changes in a ltdb and propogate them
1526 static void ctdb_ltdb_seqnum_check(struct tevent_context
*ev
,
1527 struct tevent_timer
*te
,
1528 struct timeval t
, void *p
)
1530 struct ctdb_db_context
*ctdb_db
= talloc_get_type(p
, struct ctdb_db_context
);
1531 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1532 uint32_t new_seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1533 if (new_seqnum
!= ctdb_db
->seqnum
) {
1534 /* something has changed - propogate it */
1536 data
.dptr
= (uint8_t *)&ctdb_db
->db_id
;
1537 data
.dsize
= sizeof(uint32_t);
1538 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_VNNMAP
, 0,
1539 CTDB_CONTROL_UPDATE_SEQNUM
, 0, CTDB_CTRL_FLAG_NOREPLY
,
1542 ctdb_db
->seqnum
= new_seqnum
;
1544 /* setup a new timer */
1545 ctdb_db
->seqnum_update
=
1546 tevent_add_timer(ctdb
->ev
, ctdb_db
,
1547 timeval_current_ofs(ctdb
->tunable
.seqnum_interval
/1000,
1548 (ctdb
->tunable
.seqnum_interval
%1000)*1000),
1549 ctdb_ltdb_seqnum_check
, ctdb_db
);
1553 enable seqnum handling on this db
1555 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context
*ctdb
, uint32_t db_id
)
1557 struct ctdb_db_context
*ctdb_db
;
1558 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1560 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id
));
1564 if (ctdb_db
->seqnum_update
== NULL
) {
1565 ctdb_db
->seqnum_update
= tevent_add_timer(
1567 timeval_current_ofs(ctdb
->tunable
.seqnum_interval
/1000,
1568 (ctdb
->tunable
.seqnum_interval
%1000)*1000),
1569 ctdb_ltdb_seqnum_check
, ctdb_db
);
1572 tdb_enable_seqnum(ctdb_db
->ltdb
->tdb
);
1573 ctdb_db
->seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1577 int ctdb_set_db_sticky(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
)
1579 if (ctdb_db_sticky(ctdb_db
)) {
1583 if (! ctdb_db_volatile(ctdb_db
)) {
1585 ("Non-volatile databases do not support sticky flag\n"));
1589 ctdb_db
->sticky_records
= trbt_create(ctdb_db
, 0);
1591 ctdb_db_set_sticky(ctdb_db
);
1593 DEBUG(DEBUG_NOTICE
,("set db sticky %s\n", ctdb_db
->db_name
));
1598 void ctdb_db_statistics_reset(struct ctdb_db_context
*ctdb_db
)
1600 struct ctdb_db_statistics_old
*s
= &ctdb_db
->statistics
;
1603 for (i
=0; i
<MAX_HOT_KEYS
; i
++) {
1604 if (s
->hot_keys
[i
].key
.dsize
> 0) {
1605 talloc_free(s
->hot_keys
[i
].key
.dptr
);
1609 ZERO_STRUCT(ctdb_db
->statistics
);
1612 int32_t ctdb_control_get_db_statistics(struct ctdb_context
*ctdb
,
1616 struct ctdb_db_context
*ctdb_db
;
1617 struct ctdb_db_statistics_old
*stats
;
1622 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1624 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in get_db_statistics\n", db_id
));
1628 len
= offsetof(struct ctdb_db_statistics_old
, hot_keys_wire
);
1629 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
1630 len
+= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
;
1633 stats
= talloc_size(outdata
, len
);
1634 if (stats
== NULL
) {
1635 DEBUG(DEBUG_ERR
,("Failed to allocate db statistics structure\n"));
1639 memcpy(stats
, &ctdb_db
->statistics
,
1640 offsetof(struct ctdb_db_statistics_old
, hot_keys_wire
));
1642 stats
->num_hot_keys
= MAX_HOT_KEYS
;
1644 ptr
= &stats
->hot_keys_wire
[0];
1645 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
1646 memcpy(ptr
, ctdb_db
->statistics
.hot_keys
[i
].key
.dptr
,
1647 ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
);
1648 ptr
+= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
;
1651 outdata
->dptr
= (uint8_t *)stats
;
1652 outdata
->dsize
= len
;