2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #include "server/ctdb_config.h"
46 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
49 * write a record to a normal database
51 * This is the server-variant of the ctdb_ltdb_store function.
52 * It contains logic to determine whether a record should be
53 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
54 * controls to the local ctdb daemon if apporpriate.
56 static int ctdb_ltdb_store_server(struct ctdb_db_context
*ctdb_db
,
58 struct ctdb_ltdb_header
*header
,
61 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
63 uint32_t hsize
= sizeof(struct ctdb_ltdb_header
);
65 bool seqnum_suppressed
= false;
67 bool schedule_for_deletion
= false;
68 bool remove_from_delete_queue
= false;
71 if (ctdb
->flags
& CTDB_FLAG_TORTURE
) {
73 struct ctdb_ltdb_header
*h2
;
75 old
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
76 h2
= (struct ctdb_ltdb_header
*)old
.dptr
;
77 if (old
.dptr
!= NULL
&&
79 h2
->rsn
> header
->rsn
) {
81 ("RSN regression! %"PRIu64
" %"PRIu64
"\n",
82 h2
->rsn
, header
->rsn
));
89 if (ctdb
->vnn_map
== NULL
) {
91 * Called from a client: always store the record
92 * Also don't call ctdb_lmaster since it uses the vnn_map!
98 lmaster
= ctdb_lmaster(ctdb_db
->ctdb
, &key
);
101 * If we migrate an empty record off to another node
102 * and the record has not been migrated with data,
103 * delete the record instead of storing the empty record.
105 if (data
.dsize
!= 0) {
107 } else if (header
->flags
& CTDB_REC_RO_FLAGS
) {
109 } else if (header
->flags
& CTDB_REC_FLAG_AUTOMATIC
) {
111 * The record is not created by the client but
112 * automatically by the ctdb_ltdb_fetch logic that
113 * creates a record with an initial header in the
114 * ltdb before trying to migrate the record from
115 * the current lmaster. Keep it instead of trying
116 * to delete the non-existing record...
119 schedule_for_deletion
= true;
120 } else if (header
->flags
& CTDB_REC_FLAG_MIGRATED_WITH_DATA
) {
122 } else if (ctdb_db
->ctdb
->pnn
== lmaster
) {
124 * If we are lmaster, then we usually keep the record.
125 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
126 * and the record is empty and has never been migrated
127 * with data, then we should delete it instead of storing it.
128 * This is part of the vacuuming process.
130 * The reason that we usually need to store even empty records
131 * on the lmaster is that a client operating directly on the
132 * lmaster (== dmaster) expects the local copy of the record to
133 * exist after successful ctdb migrate call. If the record does
134 * not exist, the client goes into a migrate loop and eventually
135 * fails. So storing the empty record makes sure that we do not
136 * need to change the client code.
138 if (!(header
->flags
& CTDB_REC_FLAG_VACUUM_MIGRATED
)) {
140 } else if (ctdb_db
->ctdb
->pnn
!= header
->dmaster
) {
143 } else if (ctdb_db
->ctdb
->pnn
== header
->dmaster
) {
148 if (ctdb_db_volatile(ctdb_db
) &&
149 (ctdb_db
->ctdb
->pnn
== header
->dmaster
) &&
150 !(header
->flags
& CTDB_REC_RO_FLAGS
))
154 if (data
.dsize
== 0) {
155 schedule_for_deletion
= true;
158 remove_from_delete_queue
= !schedule_for_deletion
;
163 * The VACUUM_MIGRATED flag is only set temporarily for
164 * the above logic when the record was retrieved by a
165 * VACUUM_MIGRATE call and should not be stored in the
168 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
169 * and there are two cases in which the corresponding record
170 * is stored in the local database:
171 * 1. The record has been migrated with data in the past
172 * (the MIGRATED_WITH_DATA record flag is set).
173 * 2. The record has been filled with data again since it
174 * had been submitted in the VACUUM_FETCH message to the
176 * For such records it is important to not store the
177 * VACUUM_MIGRATED flag in the database.
179 header
->flags
&= ~CTDB_REC_FLAG_VACUUM_MIGRATED
;
182 * Similarly, clear the AUTOMATIC flag which should not enter
183 * the local database copy since this would require client
184 * modifications to clear the flag when the client stores
187 header
->flags
&= ~CTDB_REC_FLAG_AUTOMATIC
;
189 rec
[0].dsize
= hsize
;
190 rec
[0].dptr
= (uint8_t *)header
;
192 rec
[1].dsize
= data
.dsize
;
193 rec
[1].dptr
= data
.dptr
;
195 /* Databases with seqnum updates enabled only get their seqnum
196 changes when/if we modify the data */
197 if (ctdb_db
->seqnum_update
!= NULL
) {
199 old
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
201 if ((old
.dsize
== hsize
+ data
.dsize
) &&
202 memcmp(old
.dptr
+ hsize
, data
.dptr
, data
.dsize
) == 0) {
203 tdb_remove_flags(ctdb_db
->ltdb
->tdb
, TDB_SEQNUM
);
204 seqnum_suppressed
= true;
206 if (old
.dptr
!= NULL
) {
211 DEBUG(DEBUG_DEBUG
, (__location__
" db[%s]: %s record: hash[0x%08x]\n",
213 keep
?"storing":"deleting",
217 ret
= tdb_storev(ctdb_db
->ltdb
->tdb
, key
, rec
, 2, TDB_REPLACE
);
219 ret
= tdb_delete(ctdb_db
->ltdb
->tdb
, key
);
226 tdb_error(ctdb_db
->ltdb
->tdb
) == TDB_ERR_NOEXIST
)
231 DEBUG(lvl
, (__location__
" db[%s]: Failed to %s record: "
234 keep
?"store":"delete", ret
,
235 tdb_errorstr(ctdb_db
->ltdb
->tdb
)));
237 schedule_for_deletion
= false;
238 remove_from_delete_queue
= false;
240 if (seqnum_suppressed
) {
241 tdb_add_flags(ctdb_db
->ltdb
->tdb
, TDB_SEQNUM
);
244 if (schedule_for_deletion
) {
246 ret2
= ctdb_local_schedule_for_deletion(ctdb_db
, header
, key
);
248 DEBUG(DEBUG_ERR
, (__location__
" ctdb_local_schedule_for_deletion failed.\n"));
252 if (remove_from_delete_queue
) {
253 ctdb_local_remove_from_delete_queue(ctdb_db
, header
, key
);
259 struct lock_fetch_state
{
260 struct ctdb_context
*ctdb
;
261 struct ctdb_db_context
*ctdb_db
;
262 void (*recv_pkt
)(void *, struct ctdb_req_header
*);
264 struct ctdb_req_header
*hdr
;
266 bool ignore_generation
;
270 called when we should retry the operation
272 static void lock_fetch_callback(void *p
, bool locked
)
274 struct lock_fetch_state
*state
= talloc_get_type(p
, struct lock_fetch_state
);
275 if (!state
->ignore_generation
&&
276 state
->generation
!= state
->ctdb_db
->generation
) {
277 DEBUG(DEBUG_NOTICE
,("Discarding previous generation lockwait packet\n"));
278 talloc_free(state
->hdr
);
281 state
->recv_pkt(state
->recv_context
, state
->hdr
);
282 DEBUG(DEBUG_INFO
,(__location__
" PACKET REQUEUED\n"));
287 do a non-blocking ltdb_lock, deferring this ctdb request until we
290 It does the following:
292 1) tries to get the chainlock. If it succeeds, then it returns 0
294 2) if it fails to get a chainlock immediately then it sets up a
295 non-blocking chainlock via ctdb_lock_record, and when it gets the
296 chainlock it re-submits this ctdb request to the main packet
299 This effectively queues all ctdb requests that cannot be
300 immediately satisfied until it can get the lock. This means that
301 the main ctdb daemon will not block waiting for a chainlock held by
304 There are 3 possible return values:
306 0: means that it got the lock immediately.
307 -1: means that it failed to get the lock, and won't retry
308 -2: means that it failed to get the lock immediately, but will retry
310 int ctdb_ltdb_lock_requeue(struct ctdb_db_context
*ctdb_db
,
311 TDB_DATA key
, struct ctdb_req_header
*hdr
,
312 void (*recv_pkt
)(void *, struct ctdb_req_header
*),
313 void *recv_context
, bool ignore_generation
)
316 struct tdb_context
*tdb
= ctdb_db
->ltdb
->tdb
;
317 struct lock_request
*lreq
;
318 struct lock_fetch_state
*state
;
320 ret
= tdb_chainlock_nonblock(tdb
, key
);
323 !(errno
== EACCES
|| errno
== EAGAIN
|| errno
== EDEADLK
)) {
324 /* a hard failure - don't try again */
328 /* when torturing, ensure we test the contended path */
329 if ((ctdb_db
->ctdb
->flags
& CTDB_FLAG_TORTURE
) &&
332 tdb_chainunlock(tdb
, key
);
335 /* first the non-contended path */
340 state
= talloc(hdr
, struct lock_fetch_state
);
341 state
->ctdb
= ctdb_db
->ctdb
;
342 state
->ctdb_db
= ctdb_db
;
344 state
->recv_pkt
= recv_pkt
;
345 state
->recv_context
= recv_context
;
346 state
->generation
= ctdb_db
->generation
;
347 state
->ignore_generation
= ignore_generation
;
349 /* now the contended path */
350 lreq
= ctdb_lock_record(state
, ctdb_db
, key
, true, lock_fetch_callback
, state
);
355 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
356 so it won't be freed yet */
357 talloc_steal(state
, hdr
);
359 /* now tell the caller than we will retry asynchronously */
364 a varient of ctdb_ltdb_lock_requeue that also fetches the record
366 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context
*ctdb_db
,
367 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
368 struct ctdb_req_header
*hdr
, TDB_DATA
*data
,
369 void (*recv_pkt
)(void *, struct ctdb_req_header
*),
370 void *recv_context
, bool ignore_generation
)
374 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
, recv_pkt
,
375 recv_context
, ignore_generation
);
377 ret
= ctdb_ltdb_fetch(ctdb_db
, key
, header
, hdr
, data
);
380 uret
= ctdb_ltdb_unlock(ctdb_db
, key
);
382 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", uret
));
391 paraoid check to see if the db is empty
393 static void ctdb_check_db_empty(struct ctdb_db_context
*ctdb_db
)
395 struct tdb_context
*tdb
= ctdb_db
->ltdb
->tdb
;
396 int count
= tdb_traverse_read(tdb
, NULL
, NULL
);
398 DEBUG(DEBUG_ALERT
,(__location__
" tdb '%s' not empty on attach! aborting\n",
400 ctdb_fatal(ctdb_db
->ctdb
, "database not empty on attach");
404 int ctdb_load_persistent_health(struct ctdb_context
*ctdb
,
405 struct ctdb_db_context
*ctdb_db
)
407 struct tdb_context
*tdb
= ctdb
->db_persistent_health
->tdb
;
413 key
.dptr
= discard_const_p(uint8_t, ctdb_db
->db_name
);
414 key
.dsize
= strlen(ctdb_db
->db_name
);
416 old
= ctdb_db
->unhealthy_reason
;
417 ctdb_db
->unhealthy_reason
= NULL
;
419 val
= tdb_fetch(tdb
, key
);
421 reason
= talloc_strndup(ctdb_db
,
422 (const char *)val
.dptr
,
424 if (reason
== NULL
) {
425 DEBUG(DEBUG_ALERT
,(__location__
" talloc_strndup(%d) failed\n",
427 ctdb_db
->unhealthy_reason
= old
;
438 ctdb_db
->unhealthy_reason
= reason
;
442 int ctdb_update_persistent_health(struct ctdb_context
*ctdb
,
443 struct ctdb_db_context
*ctdb_db
,
444 const char *given_reason
,/* NULL means healthy */
445 int num_healthy_nodes
)
447 struct tdb_context
*tdb
= ctdb
->db_persistent_health
->tdb
;
451 char *new_reason
= NULL
;
452 char *old_reason
= NULL
;
454 ret
= tdb_transaction_start(tdb
);
456 DEBUG(DEBUG_ALERT
,(__location__
" tdb_transaction_start('%s') failed: %d - %s\n",
457 tdb_name(tdb
), ret
, tdb_errorstr(tdb
)));
461 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
463 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_load_persistent_health('%s') failed: %d\n",
464 ctdb_db
->db_name
, ret
));
467 old_reason
= ctdb_db
->unhealthy_reason
;
469 key
.dptr
= discard_const_p(uint8_t, ctdb_db
->db_name
);
470 key
.dsize
= strlen(ctdb_db
->db_name
);
473 new_reason
= talloc_strdup(ctdb_db
, given_reason
);
474 if (new_reason
== NULL
) {
475 DEBUG(DEBUG_ALERT
,(__location__
" talloc_strdup(%s) failed\n",
479 } else if (old_reason
&& num_healthy_nodes
== 0) {
481 * If the reason indicates ok, but there where no healthy nodes
482 * available, that it means, we have not recovered valid content
483 * of the db. So if there's an old reason, prefix it with
484 * "NO-HEALTHY-NODES - "
488 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
489 ret
= strncmp(_TMP_PREFIX
, old_reason
, strlen(_TMP_PREFIX
));
491 prefix
= _TMP_PREFIX
;
495 new_reason
= talloc_asprintf(ctdb_db
, "%s%s",
497 if (new_reason
== NULL
) {
498 DEBUG(DEBUG_ALERT
,(__location__
" talloc_asprintf(%s%s) failed\n",
499 prefix
, old_reason
));
506 val
.dptr
= discard_const_p(uint8_t, new_reason
);
507 val
.dsize
= strlen(new_reason
);
509 ret
= tdb_store(tdb
, key
, val
, TDB_REPLACE
);
511 tdb_transaction_cancel(tdb
);
512 DEBUG(DEBUG_ALERT
,(__location__
" tdb_store('%s', %s, %s) failed: %d - %s\n",
513 tdb_name(tdb
), ctdb_db
->db_name
, new_reason
,
514 ret
, tdb_errorstr(tdb
)));
515 talloc_free(new_reason
);
518 DEBUG(DEBUG_ALERT
,("Updated db health for db(%s) to: %s\n",
519 ctdb_db
->db_name
, new_reason
));
520 } else if (old_reason
) {
521 ret
= tdb_delete(tdb
, key
);
523 tdb_transaction_cancel(tdb
);
524 DEBUG(DEBUG_ALERT
,(__location__
" tdb_delete('%s', %s) failed: %d - %s\n",
525 tdb_name(tdb
), ctdb_db
->db_name
,
526 ret
, tdb_errorstr(tdb
)));
527 talloc_free(new_reason
);
530 DEBUG(DEBUG_NOTICE
,("Updated db health for db(%s): OK\n",
534 ret
= tdb_transaction_commit(tdb
);
535 if (ret
!= TDB_SUCCESS
) {
536 DEBUG(DEBUG_ALERT
,(__location__
" tdb_transaction_commit('%s') failed: %d - %s\n",
537 tdb_name(tdb
), ret
, tdb_errorstr(tdb
)));
538 talloc_free(new_reason
);
542 talloc_free(old_reason
);
543 ctdb_db
->unhealthy_reason
= new_reason
;
548 static int ctdb_backup_corrupted_tdb(struct ctdb_context
*ctdb
,
549 struct ctdb_db_context
*ctdb_db
)
551 time_t now
= time(NULL
);
559 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
560 new_path
= talloc_asprintf(ctdb_db
, "%s.corrupted."
561 "%04u%02u%02u%02u%02u%02u.0Z",
563 tm
->tm_year
+1900, tm
->tm_mon
+1,
564 tm
->tm_mday
, tm
->tm_hour
, tm
->tm_min
,
566 if (new_path
== NULL
) {
567 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
571 new_reason
= talloc_asprintf(ctdb_db
,
572 "ERROR - Backup of corrupted TDB in '%s'",
574 if (new_reason
== NULL
) {
575 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
578 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
, new_reason
, 0);
579 talloc_free(new_reason
);
581 DEBUG(DEBUG_CRIT
,(__location__
582 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
587 ret
= rename(ctdb_db
->db_path
, new_path
);
589 DEBUG(DEBUG_CRIT
,(__location__
590 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
591 ctdb_db
->db_path
, new_path
,
592 errno
, strerror(errno
)));
593 talloc_free(new_path
);
597 DEBUG(DEBUG_CRIT
,(__location__
598 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
599 ctdb_db
->db_path
, new_path
));
600 talloc_free(new_path
);
604 int ctdb_recheck_persistent_health(struct ctdb_context
*ctdb
)
606 struct ctdb_db_context
*ctdb_db
;
611 for (ctdb_db
= ctdb
->db_list
; ctdb_db
; ctdb_db
= ctdb_db
->next
) {
612 if (!ctdb_db_persistent(ctdb_db
)) {
616 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
618 DEBUG(DEBUG_ALERT
,(__location__
619 " load persistent health for '%s' failed\n",
624 if (ctdb_db
->unhealthy_reason
== NULL
) {
626 DEBUG(DEBUG_INFO
,(__location__
627 " persistent db '%s' healthy\n",
633 DEBUG(DEBUG_ALERT
,(__location__
634 " persistent db '%s' unhealthy: %s\n",
636 ctdb_db
->unhealthy_reason
));
639 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
651 mark a database - as healthy
653 int32_t ctdb_control_db_set_healthy(struct ctdb_context
*ctdb
, TDB_DATA indata
)
655 uint32_t db_id
= *(uint32_t *)indata
.dptr
;
656 struct ctdb_db_context
*ctdb_db
;
658 bool may_recover
= false;
660 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
662 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%x\n", db_id
));
666 if (ctdb_db
->unhealthy_reason
) {
670 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
, NULL
, 1);
672 DEBUG(DEBUG_ERR
,(__location__
673 " ctdb_update_persistent_health(%s) failed\n",
678 if (may_recover
&& ctdb
->runstate
== CTDB_RUNSTATE_STARTUP
) {
679 DEBUG(DEBUG_ERR
, (__location__
" db %s become healthy - force recovery for startup\n",
681 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
687 int32_t ctdb_control_db_get_health(struct ctdb_context
*ctdb
,
691 uint32_t db_id
= *(uint32_t *)indata
.dptr
;
692 struct ctdb_db_context
*ctdb_db
;
695 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
697 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%x\n", db_id
));
701 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
703 DEBUG(DEBUG_ERR
,(__location__
704 " ctdb_load_persistent_health(%s) failed\n",
710 if (ctdb_db
->unhealthy_reason
) {
711 outdata
->dptr
= (uint8_t *)ctdb_db
->unhealthy_reason
;
712 outdata
->dsize
= strlen(ctdb_db
->unhealthy_reason
)+1;
719 int ctdb_set_db_readonly(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
)
723 if (ctdb_db_readonly(ctdb_db
)) {
727 if (! ctdb_db_volatile(ctdb_db
)) {
729 ("Non-volatile databases do not support readonly flag\n"));
733 ropath
= talloc_asprintf(ctdb_db
, "%s.RO", ctdb_db
->db_path
);
734 if (ropath
== NULL
) {
735 DEBUG(DEBUG_CRIT
,("Failed to asprintf the tracking database\n"));
738 ctdb_db
->rottdb
= tdb_open(ropath
,
739 ctdb
->tunable
.database_hash_size
,
740 TDB_NOLOCK
|TDB_CLEAR_IF_FIRST
|TDB_NOSYNC
,
741 O_CREAT
|O_RDWR
, 0600);
742 if (ctdb_db
->rottdb
== NULL
) {
743 DEBUG(DEBUG_CRIT
,("Failed to open/create the tracking database '%s'\n", ropath
));
748 DEBUG(DEBUG_NOTICE
,("OPENED tracking database : '%s'\n", ropath
));
750 ctdb_db_set_readonly(ctdb_db
);
752 DEBUG(DEBUG_NOTICE
, ("Readonly property set on DB %s\n", ctdb_db
->db_name
));
759 attach to a database, handling both persistent and non-persistent databases
760 return 0 on success, -1 on failure
762 static int ctdb_local_attach(struct ctdb_context
*ctdb
, const char *db_name
,
763 uint8_t db_flags
, const char *unhealthy_reason
)
765 struct ctdb_db_context
*ctdb_db
, *tmp_db
;
770 int remaining_tries
= 0;
772 ctdb_db
= talloc_zero(ctdb
, struct ctdb_db_context
);
773 CTDB_NO_MEMORY(ctdb
, ctdb_db
);
775 ctdb_db
->ctdb
= ctdb
;
776 ctdb_db
->db_name
= talloc_strdup(ctdb_db
, db_name
);
777 CTDB_NO_MEMORY(ctdb
, ctdb_db
->db_name
);
779 key
.dsize
= strlen(db_name
)+1;
780 key
.dptr
= discard_const(db_name
);
781 ctdb_db
->db_id
= ctdb_hash(&key
);
782 ctdb_db
->db_flags
= db_flags
;
784 if (ctdb_db_volatile(ctdb_db
)) {
785 ctdb_db
->delete_queue
= trbt_create(ctdb_db
, 0);
786 if (ctdb_db
->delete_queue
== NULL
) {
787 CTDB_NO_MEMORY(ctdb
, ctdb_db
->delete_queue
);
790 ctdb_db
->ctdb_ltdb_store_fn
= ctdb_ltdb_store_server
;
793 /* check for hash collisions */
794 for (tmp_db
=ctdb
->db_list
;tmp_db
;tmp_db
=tmp_db
->next
) {
795 if (tmp_db
->db_id
== ctdb_db
->db_id
) {
796 DEBUG(DEBUG_CRIT
,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
797 tmp_db
->db_id
, db_name
, tmp_db
->db_name
));
798 talloc_free(ctdb_db
);
803 if (ctdb_db_persistent(ctdb_db
)) {
804 if (unhealthy_reason
) {
805 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
,
806 unhealthy_reason
, 0);
808 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_update_persistent_health('%s','%s') failed: %d\n",
809 ctdb_db
->db_name
, unhealthy_reason
, ret
));
810 talloc_free(ctdb_db
);
815 if (ctdb
->max_persistent_check_errors
> 0) {
818 if (ctdb
->runstate
== CTDB_RUNSTATE_RUNNING
) {
822 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
824 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_load_persistent_health('%s') failed: %d\n",
825 ctdb_db
->db_name
, ret
));
826 talloc_free(ctdb_db
);
831 if (ctdb_db
->unhealthy_reason
&& remaining_tries
== 0) {
832 DEBUG(DEBUG_ALERT
,(__location__
"ERROR: tdb %s is marked as unhealthy: %s\n",
833 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
834 talloc_free(ctdb_db
);
838 if (ctdb_db
->unhealthy_reason
) {
839 /* this is just a warning, but we want that in the log file! */
840 DEBUG(DEBUG_ALERT
,(__location__
"Warning: tdb %s is marked as unhealthy: %s\n",
841 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
844 /* open the database */
845 ctdb_db
->db_path
= talloc_asprintf(ctdb_db
, "%s/%s.%u",
846 ctdb_db_persistent(ctdb_db
) ?
847 ctdb
->db_directory_persistent
:
851 tdb_flags
= ctdb_db_tdb_flags(db_flags
,
853 ctdb_config
.tdb_mutexes
);
856 ctdb_db
->ltdb
= tdb_wrap_open(ctdb_db
, ctdb_db
->db_path
,
857 ctdb
->tunable
.database_hash_size
,
859 O_CREAT
|O_RDWR
, mode
);
860 if (ctdb_db
->ltdb
== NULL
) {
862 int saved_errno
= errno
;
864 if (! ctdb_db_persistent(ctdb_db
)) {
865 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s': %d - %s\n",
868 strerror(saved_errno
)));
869 talloc_free(ctdb_db
);
873 if (remaining_tries
== 0) {
874 DEBUG(DEBUG_CRIT
,(__location__
875 "Failed to open persistent tdb '%s': %d - %s\n",
878 strerror(saved_errno
)));
879 talloc_free(ctdb_db
);
883 ret
= stat(ctdb_db
->db_path
, &st
);
885 DEBUG(DEBUG_CRIT
,(__location__
886 "Failed to open persistent tdb '%s': %d - %s\n",
889 strerror(saved_errno
)));
890 talloc_free(ctdb_db
);
894 ret
= ctdb_backup_corrupted_tdb(ctdb
, ctdb_db
);
896 DEBUG(DEBUG_CRIT
,(__location__
897 "Failed to open persistent tdb '%s': %d - %s\n",
900 strerror(saved_errno
)));
901 talloc_free(ctdb_db
);
910 if (!ctdb_db_persistent(ctdb_db
)) {
911 ctdb_check_db_empty(ctdb_db
);
913 ret
= tdb_check(ctdb_db
->ltdb
->tdb
, NULL
, NULL
);
918 DEBUG(DEBUG_CRIT
,("tdb_check(%s) failed: %d - %s\n",
919 ctdb_db
->db_path
, ret
,
920 tdb_errorstr(ctdb_db
->ltdb
->tdb
)));
921 if (remaining_tries
== 0) {
922 talloc_free(ctdb_db
);
926 fd
= tdb_fd(ctdb_db
->ltdb
->tdb
);
927 ret
= fstat(fd
, &st
);
929 DEBUG(DEBUG_CRIT
,(__location__
930 "Failed to fstat() persistent tdb '%s': %d - %s\n",
934 talloc_free(ctdb_db
);
939 talloc_free(ctdb_db
->ltdb
);
940 ctdb_db
->ltdb
= NULL
;
942 ret
= ctdb_backup_corrupted_tdb(ctdb
, ctdb_db
);
944 DEBUG(DEBUG_CRIT
,("Failed to backup corrupted tdb '%s'\n",
946 talloc_free(ctdb_db
);
956 /* remember the flags the client has specified */
957 tdb_add_flags(ctdb_db
->ltdb
->tdb
, tdb_flags
);
960 /* set up a rb tree we can use to track which records we have a
961 fetch-lock in-flight for so we can defer any additional calls
964 ctdb_db
->deferred_fetch
= trbt_create(ctdb_db
, 0);
965 if (ctdb_db
->deferred_fetch
== NULL
) {
966 DEBUG(DEBUG_ERR
,("Failed to create deferred fetch rb tree for ctdb database\n"));
967 talloc_free(ctdb_db
);
971 ctdb_db
->defer_dmaster
= trbt_create(ctdb_db
, 0);
972 if (ctdb_db
->defer_dmaster
== NULL
) {
973 DEBUG(DEBUG_ERR
, ("Failed to create defer dmaster rb tree for %s\n",
975 talloc_free(ctdb_db
);
979 DLIST_ADD(ctdb
->db_list
, ctdb_db
);
981 /* setting this can help some high churn databases */
982 tdb_set_max_dead(ctdb_db
->ltdb
->tdb
, ctdb
->tunable
.database_max_dead
);
985 all databases support the "null" function. we need this in
986 order to do forced migration of records
988 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_null_func
, CTDB_NULL_FUNC
);
990 DEBUG(DEBUG_CRIT
,("Failed to setup null function for '%s'\n", ctdb_db
->db_name
));
991 talloc_free(ctdb_db
);
996 all databases support the "fetch" function. we need this
997 for efficient Samba3 ctdb fetch
999 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_fetch_func
, CTDB_FETCH_FUNC
);
1001 DEBUG(DEBUG_CRIT
,("Failed to setup fetch function for '%s'\n", ctdb_db
->db_name
));
1002 talloc_free(ctdb_db
);
1007 all databases support the "fetch_with_header" function. we need this
1008 for efficient readonly record fetches
1010 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_fetch_with_header_func
, CTDB_FETCH_WITH_HEADER_FUNC
);
1012 DEBUG(DEBUG_CRIT
,("Failed to setup fetch function for '%s'\n", ctdb_db
->db_name
));
1013 talloc_free(ctdb_db
);
1017 ret
= ctdb_vacuum_init(ctdb_db
);
1019 DEBUG(DEBUG_CRIT
,("Failed to setup vacuuming for "
1020 "database '%s'\n", ctdb_db
->db_name
));
1021 talloc_free(ctdb_db
);
1025 ret
= ctdb_migration_init(ctdb_db
);
1028 ("Failed to setup migration tracking for db '%s'\n",
1030 talloc_free(ctdb_db
);
1034 ret
= db_hash_init(ctdb_db
, "lock_log", 2048, DB_HASH_COMPLEX
,
1035 &ctdb_db
->lock_log
);
1038 ("Failed to setup lock logging for db '%s'\n",
1040 talloc_free(ctdb_db
);
1044 ctdb_db
->generation
= ctdb
->vnn_map
->generation
;
1046 DEBUG(DEBUG_NOTICE
,("Attached to database '%s' with flags 0x%x\n",
1047 ctdb_db
->db_path
, tdb_flags
));
1054 struct ctdb_deferred_attach_context
{
1055 struct ctdb_deferred_attach_context
*next
, *prev
;
1056 struct ctdb_context
*ctdb
;
1057 struct ctdb_req_control_old
*c
;
1061 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context
*da_ctx
)
1063 DLIST_REMOVE(da_ctx
->ctdb
->deferred_attach
, da_ctx
);
1068 static void ctdb_deferred_attach_timeout(struct tevent_context
*ev
,
1069 struct tevent_timer
*te
,
1070 struct timeval t
, void *private_data
)
1072 struct ctdb_deferred_attach_context
*da_ctx
= talloc_get_type(private_data
, struct ctdb_deferred_attach_context
);
1073 struct ctdb_context
*ctdb
= da_ctx
->ctdb
;
1075 ctdb_request_control_reply(ctdb
, da_ctx
->c
, NULL
, -1, NULL
);
1076 talloc_free(da_ctx
);
1079 static void ctdb_deferred_attach_callback(struct tevent_context
*ev
,
1080 struct tevent_timer
*te
,
1081 struct timeval t
, void *private_data
)
1083 struct ctdb_deferred_attach_context
*da_ctx
= talloc_get_type(private_data
, struct ctdb_deferred_attach_context
);
1084 struct ctdb_context
*ctdb
= da_ctx
->ctdb
;
1086 /* This talloc-steals the packet ->c */
1087 ctdb_input_pkt(ctdb
, (struct ctdb_req_header
*)da_ctx
->c
);
1088 talloc_free(da_ctx
);
1091 int ctdb_process_deferred_attach(struct ctdb_context
*ctdb
)
1093 struct ctdb_deferred_attach_context
*da_ctx
;
1095 /* call it from the main event loop as soon as the current event
1098 while ((da_ctx
= ctdb
->deferred_attach
) != NULL
) {
1099 DLIST_REMOVE(ctdb
->deferred_attach
, da_ctx
);
1100 tevent_add_timer(ctdb
->ev
, da_ctx
,
1101 timeval_current_ofs(1,0),
1102 ctdb_deferred_attach_callback
, da_ctx
);
1109 a client has asked to attach a new database
1111 int32_t ctdb_control_db_attach(struct ctdb_context
*ctdb
,
1117 struct ctdb_req_control_old
*c
,
1120 const char *db_name
= (const char *)indata
.dptr
;
1121 struct ctdb_db_context
*db
;
1122 struct ctdb_node
*node
= ctdb
->nodes
[ctdb
->pnn
];
1123 struct ctdb_client
*client
= NULL
;
1126 if (ctdb
->tunable
.allow_client_db_attach
== 0) {
1127 DEBUG(DEBUG_ERR
, ("DB Attach to database %s denied by tunable "
1128 "AllowClientDBAccess == 0\n", db_name
));
1132 /* don't allow any local clients to attach while we are in recovery mode
1133 * except for the recovery daemon.
1134 * allow all attach from the network since these are always from remote
1137 if (srcnode
== ctdb
->pnn
&& client_id
!= 0) {
1138 client
= reqid_find(ctdb
->idr
, client_id
, struct ctdb_client
);
1140 if (client
!= NULL
) {
1141 /* If the node is inactive it is not part of the cluster
1142 and we should not allow clients to attach to any
1145 if (node
->flags
& NODE_FLAGS_INACTIVE
) {
1146 DEBUG(DEBUG_ERR
,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name
, node
->flags
));
1150 if (ctdb
->recovery_mode
== CTDB_RECOVERY_ACTIVE
&&
1151 client
->pid
!= ctdb
->recoverd_pid
&&
1152 ctdb
->runstate
< CTDB_RUNSTATE_RUNNING
) {
1153 struct ctdb_deferred_attach_context
*da_ctx
= talloc(client
, struct ctdb_deferred_attach_context
);
1155 if (da_ctx
== NULL
) {
1156 DEBUG(DEBUG_ERR
,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name
, client
->pid
));
1160 da_ctx
->ctdb
= ctdb
;
1161 da_ctx
->c
= talloc_steal(da_ctx
, c
);
1162 talloc_set_destructor(da_ctx
, ctdb_deferred_attach_destructor
);
1163 DLIST_ADD(ctdb
->deferred_attach
, da_ctx
);
1165 tevent_add_timer(ctdb
->ev
, da_ctx
,
1166 timeval_current_ofs(ctdb
->tunable
.deferred_attach_timeout
, 0),
1167 ctdb_deferred_attach_timeout
, da_ctx
);
1169 DEBUG(DEBUG_ERR
,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name
, client
->pid
));
1170 *async_reply
= true;
1175 /* see if we already have this name */
1176 db
= ctdb_db_handle(ctdb
, db_name
);
1178 if ((db
->db_flags
& db_flags
) != db_flags
) {
1180 ("Error: Failed to re-attach with 0x%x flags,"
1181 " database has 0x%x flags\n", db_flags
,
1185 outdata
->dptr
= (uint8_t *)&db
->db_id
;
1186 outdata
->dsize
= sizeof(db
->db_id
);
1190 if (ctdb_local_attach(ctdb
, db_name
, db_flags
, NULL
) != 0) {
1194 db
= ctdb_db_handle(ctdb
, db_name
);
1196 DEBUG(DEBUG_ERR
,("Failed to find db handle for name '%s'\n", db_name
));
1200 outdata
->dptr
= (uint8_t *)&db
->db_id
;
1201 outdata
->dsize
= sizeof(db
->db_id
);
1203 /* Try to ensure it's locked in mem */
1204 lockdown_memory(ctdb
->valgrinding
);
1206 if (ctdb_db_persistent(db
)) {
1207 opcode
= CTDB_CONTROL_DB_ATTACH_PERSISTENT
;
1208 } else if (ctdb_db_replicated(db
)) {
1209 opcode
= CTDB_CONTROL_DB_ATTACH_REPLICATED
;
1211 opcode
= CTDB_CONTROL_DB_ATTACH
;
1214 /* tell all the other nodes about this database */
1215 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_CONNECTED
, 0, opcode
,
1216 0, CTDB_CTRL_FLAG_NOREPLY
,
1217 indata
, NULL
, NULL
);
1224 * a client has asked to detach from a database
1226 int32_t ctdb_control_db_detach(struct ctdb_context
*ctdb
, TDB_DATA indata
,
1230 struct ctdb_db_context
*ctdb_db
;
1231 struct ctdb_client
*client
= NULL
;
1233 db_id
= *(uint32_t *)indata
.dptr
;
1234 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1235 if (ctdb_db
== NULL
) {
1236 DEBUG(DEBUG_ERR
, ("Invalid dbid 0x%08x in DB detach\n",
1241 if (ctdb
->tunable
.allow_client_db_attach
== 1) {
1242 DEBUG(DEBUG_ERR
, ("DB detach from database %s denied. "
1243 "Clients are allowed access to databases "
1244 "(AllowClientDBAccess == 1)\n",
1249 if (! ctdb_db_volatile(ctdb_db
)) {
1251 ("Detaching non-volatile database %s denied\n",
1256 /* Cannot detach from database when in recovery */
1257 if (ctdb
->recovery_mode
== CTDB_RECOVERY_ACTIVE
) {
1258 DEBUG(DEBUG_ERR
, ("DB detach denied while in recovery\n"));
1262 /* If a control comes from a client, then broadcast it to all nodes.
1263 * Do the actual detach only if the control comes from other daemons.
1265 if (client_id
!= 0) {
1266 client
= reqid_find(ctdb
->idr
, client_id
, struct ctdb_client
);
1267 if (client
!= NULL
) {
1268 /* forward the control to all the nodes */
1269 ctdb_daemon_send_control(ctdb
,
1270 CTDB_BROADCAST_CONNECTED
, 0,
1271 CTDB_CONTROL_DB_DETACH
, 0,
1272 CTDB_CTRL_FLAG_NOREPLY
,
1273 indata
, NULL
, NULL
);
1276 DEBUG(DEBUG_ERR
, ("Client has gone away. Failing DB detach "
1277 "for database '%s'\n", ctdb_db
->db_name
));
1281 /* Detach database from recoverd */
1282 if (ctdb_daemon_send_message(ctdb
, ctdb
->pnn
,
1283 CTDB_SRVID_DETACH_DATABASE
,
1285 DEBUG(DEBUG_ERR
, ("Unable to detach DB from recoverd\n"));
1289 /* Disable vacuuming and drop all vacuuming data */
1290 talloc_free(ctdb_db
->vacuum_handle
);
1291 talloc_free(ctdb_db
->delete_queue
);
1293 /* Terminate any deferred fetch */
1294 talloc_free(ctdb_db
->deferred_fetch
);
1296 /* Terminate any traverses */
1297 while (ctdb_db
->traverse
) {
1298 talloc_free(ctdb_db
->traverse
);
1301 /* Terminate any revokes */
1302 while (ctdb_db
->revokechild_active
) {
1303 talloc_free(ctdb_db
->revokechild_active
);
1306 /* Free readonly tracking database */
1307 if (ctdb_db_readonly(ctdb_db
)) {
1308 talloc_free(ctdb_db
->rottdb
);
1311 DLIST_REMOVE(ctdb
->db_list
, ctdb_db
);
1313 DEBUG(DEBUG_NOTICE
, ("Detached from database '%s'\n",
1315 talloc_free(ctdb_db
);
1321 attach to all existing persistent databases
1323 static int ctdb_attach_persistent(struct ctdb_context
*ctdb
,
1324 const char *unhealthy_reason
)
1329 /* open the persistent db directory and scan it for files */
1330 d
= opendir(ctdb
->db_directory_persistent
);
1335 while ((de
=readdir(d
))) {
1337 size_t len
= strlen(de
->d_name
);
1339 int invalid_name
= 0;
1341 s
= talloc_strdup(ctdb
, de
->d_name
);
1344 CTDB_NO_MEMORY(ctdb
, s
);
1347 /* only accept names ending in .tdb */
1348 p
= strstr(s
, ".tdb.");
1349 if (len
< 7 || p
== NULL
) {
1354 /* only accept names ending with .tdb. and any number of digits */
1356 while (*q
!= 0 && invalid_name
== 0) {
1357 if (!isdigit(*q
++)) {
1361 if (invalid_name
== 1 || sscanf(p
+5, "%u", &node
) != 1 || node
!= ctdb
->pnn
) {
1362 DEBUG(DEBUG_ERR
,("Ignoring persistent database '%s'\n", de
->d_name
));
1368 if (ctdb_local_attach(ctdb
, s
, CTDB_DB_FLAGS_PERSISTENT
, unhealthy_reason
) != 0) {
1369 DEBUG(DEBUG_ERR
,("Failed to attach to persistent database '%s'\n", de
->d_name
));
1375 DEBUG(DEBUG_INFO
,("Attached to persistent database %s\n", s
));
1383 int ctdb_attach_databases(struct ctdb_context
*ctdb
)
1386 char *persistent_health_path
= NULL
;
1387 char *unhealthy_reason
= NULL
;
1388 bool first_try
= true;
1390 persistent_health_path
= talloc_asprintf(ctdb
, "%s/%s.%u",
1391 ctdb
->db_directory_state
,
1392 PERSISTENT_HEALTH_TDB
,
1394 if (persistent_health_path
== NULL
) {
1395 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1401 ctdb
->db_persistent_health
= tdb_wrap_open(ctdb
, persistent_health_path
,
1402 0, TDB_DISALLOW_NESTING
,
1403 O_CREAT
| O_RDWR
, 0600);
1404 if (ctdb
->db_persistent_health
== NULL
) {
1405 struct tdb_wrap
*tdb
;
1408 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s': %d - %s\n",
1409 persistent_health_path
,
1412 talloc_free(persistent_health_path
);
1413 talloc_free(unhealthy_reason
);
1418 unhealthy_reason
= talloc_asprintf(ctdb
, "WARNING - '%s' %s - %s",
1419 persistent_health_path
,
1420 "was cleared after a failure",
1421 "manual verification needed");
1422 if (unhealthy_reason
== NULL
) {
1423 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1424 talloc_free(persistent_health_path
);
1428 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1429 persistent_health_path
));
1430 tdb
= tdb_wrap_open(ctdb
, persistent_health_path
,
1431 0, TDB_CLEAR_IF_FIRST
| TDB_DISALLOW_NESTING
,
1432 O_CREAT
| O_RDWR
, 0600);
1434 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1435 persistent_health_path
,
1438 talloc_free(persistent_health_path
);
1439 talloc_free(unhealthy_reason
);
1446 ret
= tdb_check(ctdb
->db_persistent_health
->tdb
, NULL
, NULL
);
1448 struct tdb_wrap
*tdb
;
1450 talloc_free(ctdb
->db_persistent_health
);
1451 ctdb
->db_persistent_health
= NULL
;
1454 DEBUG(DEBUG_CRIT
,("tdb_check('%s') failed\n",
1455 persistent_health_path
));
1456 talloc_free(persistent_health_path
);
1457 talloc_free(unhealthy_reason
);
1462 unhealthy_reason
= talloc_asprintf(ctdb
, "WARNING - '%s' %s - %s",
1463 persistent_health_path
,
1464 "was cleared after a failure",
1465 "manual verification needed");
1466 if (unhealthy_reason
== NULL
) {
1467 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1468 talloc_free(persistent_health_path
);
1472 DEBUG(DEBUG_CRIT
,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1473 persistent_health_path
));
1474 tdb
= tdb_wrap_open(ctdb
, persistent_health_path
,
1475 0, TDB_CLEAR_IF_FIRST
| TDB_DISALLOW_NESTING
,
1476 O_CREAT
| O_RDWR
, 0600);
1478 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1479 persistent_health_path
,
1482 talloc_free(persistent_health_path
);
1483 talloc_free(unhealthy_reason
);
1490 talloc_free(persistent_health_path
);
1492 ret
= ctdb_attach_persistent(ctdb
, unhealthy_reason
);
1493 talloc_free(unhealthy_reason
);
1502 called when a broadcast seqnum update comes in
1504 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context
*ctdb
, uint32_t db_id
, uint32_t srcnode
)
1506 struct ctdb_db_context
*ctdb_db
;
1507 if (srcnode
== ctdb
->pnn
) {
1508 /* don't update ourselves! */
1512 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1514 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id
));
1518 if (ctdb_db
->unhealthy_reason
) {
1519 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1520 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
1524 tdb_increment_seqnum_nonblock(ctdb_db
->ltdb
->tdb
);
1525 ctdb_db
->seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1530 timer to check for seqnum changes in a ltdb and propagate them
1532 static void ctdb_ltdb_seqnum_check(struct tevent_context
*ev
,
1533 struct tevent_timer
*te
,
1534 struct timeval t
, void *p
)
1536 struct ctdb_db_context
*ctdb_db
= talloc_get_type(p
, struct ctdb_db_context
);
1537 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1538 uint32_t new_seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1539 if (new_seqnum
!= ctdb_db
->seqnum
) {
1540 /* something has changed - propagate it */
1542 data
.dptr
= (uint8_t *)&ctdb_db
->db_id
;
1543 data
.dsize
= sizeof(uint32_t);
1544 ctdb_daemon_send_control(ctdb
,
1545 CTDB_BROADCAST_ACTIVE
,
1547 CTDB_CONTROL_UPDATE_SEQNUM
,
1549 CTDB_CTRL_FLAG_NOREPLY
,
1554 ctdb_db
->seqnum
= new_seqnum
;
1556 /* setup a new timer */
1557 ctdb_db
->seqnum_update
=
1558 tevent_add_timer(ctdb
->ev
, ctdb_db
,
1559 timeval_current_ofs(ctdb
->tunable
.seqnum_interval
/1000,
1560 (ctdb
->tunable
.seqnum_interval
%1000)*1000),
1561 ctdb_ltdb_seqnum_check
, ctdb_db
);
1565 enable seqnum handling on this db
1567 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context
*ctdb
, uint32_t db_id
)
1569 struct ctdb_db_context
*ctdb_db
;
1570 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1572 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id
));
1576 if (ctdb_db
->seqnum_update
== NULL
) {
1577 ctdb_db
->seqnum_update
= tevent_add_timer(
1579 timeval_current_ofs(ctdb
->tunable
.seqnum_interval
/1000,
1580 (ctdb
->tunable
.seqnum_interval
%1000)*1000),
1581 ctdb_ltdb_seqnum_check
, ctdb_db
);
1584 tdb_enable_seqnum(ctdb_db
->ltdb
->tdb
);
1585 ctdb_db
->seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1589 int ctdb_set_db_sticky(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
)
1591 if (ctdb_db_sticky(ctdb_db
)) {
1595 if (! ctdb_db_volatile(ctdb_db
)) {
1597 ("Non-volatile databases do not support sticky flag\n"));
1601 ctdb_db
->sticky_records
= trbt_create(ctdb_db
, 0);
1603 ctdb_db_set_sticky(ctdb_db
);
1605 DEBUG(DEBUG_NOTICE
,("set db sticky %s\n", ctdb_db
->db_name
));
1610 void ctdb_db_statistics_reset(struct ctdb_db_context
*ctdb_db
)
1612 struct ctdb_db_statistics_old
*s
= &ctdb_db
->statistics
;
1615 for (i
=0; i
<MAX_HOT_KEYS
; i
++) {
1616 if (s
->hot_keys
[i
].key
.dsize
> 0) {
1617 talloc_free(s
->hot_keys
[i
].key
.dptr
);
1621 ZERO_STRUCT(ctdb_db
->statistics
);
1624 int32_t ctdb_control_get_db_statistics(struct ctdb_context
*ctdb
,
1628 struct ctdb_db_context
*ctdb_db
;
1629 struct ctdb_db_statistics_old
*stats
;
1634 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1636 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in get_db_statistics\n", db_id
));
1640 len
= offsetof(struct ctdb_db_statistics_old
, hot_keys_wire
);
1641 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
1642 len
+= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
;
1645 stats
= talloc_size(outdata
, len
);
1646 if (stats
== NULL
) {
1647 DEBUG(DEBUG_ERR
,("Failed to allocate db statistics structure\n"));
1651 memcpy(stats
, &ctdb_db
->statistics
,
1652 offsetof(struct ctdb_db_statistics_old
, hot_keys_wire
));
1654 stats
->num_hot_keys
= MAX_HOT_KEYS
;
1656 ptr
= &stats
->hot_keys_wire
[0];
1657 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
1658 memcpy(ptr
, ctdb_db
->statistics
.hot_keys
[i
].key
.dptr
,
1659 ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
);
1660 ptr
+= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
;
1663 outdata
->dptr
= (uint8_t *)stats
;
1664 outdata
->dsize
= len
;