2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
29 #include "lib/util/dlinklist.h"
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35 * write a record to a normal database
37 * This is the server-variant of the ctdb_ltdb_store function.
38 * It contains logic to determine whether a record should be
39 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
40 * controls to the local ctdb daemon if apporpriate.
42 static int ctdb_ltdb_store_server(struct ctdb_db_context
*ctdb_db
,
44 struct ctdb_ltdb_header
*header
,
47 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
50 bool seqnum_suppressed
= false;
52 bool schedule_for_deletion
= false;
53 bool remove_from_delete_queue
= false;
56 if (ctdb
->flags
& CTDB_FLAG_TORTURE
) {
57 struct ctdb_ltdb_header
*h2
;
58 rec
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
59 h2
= (struct ctdb_ltdb_header
*)rec
.dptr
;
60 if (rec
.dptr
&& rec
.dsize
>= sizeof(h2
) && h2
->rsn
> header
->rsn
) {
61 DEBUG(DEBUG_CRIT
,("RSN regression! %llu %llu\n",
62 (unsigned long long)h2
->rsn
, (unsigned long long)header
->rsn
));
64 if (rec
.dptr
) free(rec
.dptr
);
67 if (ctdb
->vnn_map
== NULL
) {
69 * Called from a client: always store the record
70 * Also don't call ctdb_lmaster since it uses the vnn_map!
76 lmaster
= ctdb_lmaster(ctdb_db
->ctdb
, &key
);
79 * If we migrate an empty record off to another node
80 * and the record has not been migrated with data,
81 * delete the record instead of storing the empty record.
83 if (data
.dsize
!= 0) {
85 } else if (header
->flags
& CTDB_REC_RO_FLAGS
) {
87 } else if (ctdb_db
->persistent
) {
89 } else if (header
->flags
& CTDB_REC_FLAG_AUTOMATIC
) {
91 * The record is not created by the client but
92 * automatically by the ctdb_ltdb_fetch logic that
93 * creates a record with an initial header in the
94 * ltdb before trying to migrate the record from
95 * the current lmaster. Keep it instead of trying
96 * to delete the non-existing record...
99 schedule_for_deletion
= true;
100 } else if (header
->flags
& CTDB_REC_FLAG_MIGRATED_WITH_DATA
) {
102 } else if (ctdb_db
->ctdb
->pnn
== lmaster
) {
104 * If we are lmaster, then we usually keep the record.
105 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
106 * and the record is empty and has never been migrated
107 * with data, then we should delete it instead of storing it.
108 * This is part of the vacuuming process.
110 * The reason that we usually need to store even empty records
111 * on the lmaster is that a client operating directly on the
112 * lmaster (== dmaster) expects the local copy of the record to
113 * exist after successful ctdb migrate call. If the record does
114 * not exist, the client goes into a migrate loop and eventually
115 * fails. So storing the empty record makes sure that we do not
116 * need to change the client code.
118 if (!(header
->flags
& CTDB_REC_FLAG_VACUUM_MIGRATED
)) {
120 } else if (ctdb_db
->ctdb
->pnn
!= header
->dmaster
) {
123 } else if (ctdb_db
->ctdb
->pnn
== header
->dmaster
) {
128 if (!ctdb_db
->persistent
&&
129 (ctdb_db
->ctdb
->pnn
== header
->dmaster
) &&
130 !(header
->flags
& CTDB_REC_RO_FLAGS
))
134 if (data
.dsize
== 0) {
135 schedule_for_deletion
= true;
138 remove_from_delete_queue
= !schedule_for_deletion
;
143 * The VACUUM_MIGRATED flag is only set temporarily for
144 * the above logic when the record was retrieved by a
145 * VACUUM_MIGRATE call and should not be stored in the
148 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
149 * and there are two cases in which the corresponding record
150 * is stored in the local database:
151 * 1. The record has been migrated with data in the past
152 * (the MIGRATED_WITH_DATA record flag is set).
153 * 2. The record has been filled with data again since it
154 * had been submitted in the VACUUM_FETCH message to the
156 * For such records it is important to not store the
157 * VACUUM_MIGRATED flag in the database.
159 header
->flags
&= ~CTDB_REC_FLAG_VACUUM_MIGRATED
;
162 * Similarly, clear the AUTOMATIC flag which should not enter
163 * the local database copy since this would require client
164 * modifications to clear the flag when the client stores
167 header
->flags
&= ~CTDB_REC_FLAG_AUTOMATIC
;
169 rec
.dsize
= sizeof(*header
) + data
.dsize
;
170 rec
.dptr
= talloc_size(ctdb
, rec
.dsize
);
171 CTDB_NO_MEMORY(ctdb
, rec
.dptr
);
173 memcpy(rec
.dptr
, header
, sizeof(*header
));
174 memcpy(rec
.dptr
+ sizeof(*header
), data
.dptr
, data
.dsize
);
176 /* Databases with seqnum updates enabled only get their seqnum
177 changes when/if we modify the data */
178 if (ctdb_db
->seqnum_update
!= NULL
) {
180 old
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
182 if ( (old
.dsize
== rec
.dsize
)
183 && !memcmp(old
.dptr
+sizeof(struct ctdb_ltdb_header
),
184 rec
.dptr
+sizeof(struct ctdb_ltdb_header
),
185 rec
.dsize
-sizeof(struct ctdb_ltdb_header
)) ) {
186 tdb_remove_flags(ctdb_db
->ltdb
->tdb
, TDB_SEQNUM
);
187 seqnum_suppressed
= true;
189 if (old
.dptr
) free(old
.dptr
);
192 DEBUG(DEBUG_DEBUG
, (__location__
" db[%s]: %s record: hash[0x%08x]\n",
194 keep
?"storing":"deleting",
198 ret
= tdb_store(ctdb_db
->ltdb
->tdb
, key
, rec
, TDB_REPLACE
);
200 ret
= tdb_delete(ctdb_db
->ltdb
->tdb
, key
);
207 tdb_error(ctdb_db
->ltdb
->tdb
) == TDB_ERR_NOEXIST
)
212 DEBUG(lvl
, (__location__
" db[%s]: Failed to %s record: "
215 keep
?"store":"delete", ret
,
216 tdb_errorstr(ctdb_db
->ltdb
->tdb
)));
218 schedule_for_deletion
= false;
219 remove_from_delete_queue
= false;
221 if (seqnum_suppressed
) {
222 tdb_add_flags(ctdb_db
->ltdb
->tdb
, TDB_SEQNUM
);
225 talloc_free(rec
.dptr
);
227 if (schedule_for_deletion
) {
229 ret2
= ctdb_local_schedule_for_deletion(ctdb_db
, header
, key
);
231 DEBUG(DEBUG_ERR
, (__location__
" ctdb_local_schedule_for_deletion failed.\n"));
235 if (remove_from_delete_queue
) {
236 ctdb_local_remove_from_delete_queue(ctdb_db
, header
, key
);
242 struct lock_fetch_state
{
243 struct ctdb_context
*ctdb
;
244 void (*recv_pkt
)(void *, struct ctdb_req_header
*);
246 struct ctdb_req_header
*hdr
;
248 bool ignore_generation
;
252 called when we should retry the operation
254 static void lock_fetch_callback(void *p
, bool locked
)
256 struct lock_fetch_state
*state
= talloc_get_type(p
, struct lock_fetch_state
);
257 if (!state
->ignore_generation
&&
258 state
->generation
!= state
->ctdb
->vnn_map
->generation
) {
259 DEBUG(DEBUG_NOTICE
,("Discarding previous generation lockwait packet\n"));
260 talloc_free(state
->hdr
);
263 state
->recv_pkt(state
->recv_context
, state
->hdr
);
264 DEBUG(DEBUG_INFO
,(__location__
" PACKET REQUEUED\n"));
269 do a non-blocking ltdb_lock, deferring this ctdb request until we
272 It does the following:
274 1) tries to get the chainlock. If it succeeds, then it returns 0
276 2) if it fails to get a chainlock immediately then it sets up a
277 non-blocking chainlock via ctdb_lock_record, and when it gets the
278 chainlock it re-submits this ctdb request to the main packet
281 This effectively queues all ctdb requests that cannot be
282 immediately satisfied until it can get the lock. This means that
283 the main ctdb daemon will not block waiting for a chainlock held by
286 There are 3 possible return values:
288 0: means that it got the lock immediately.
289 -1: means that it failed to get the lock, and won't retry
290 -2: means that it failed to get the lock immediately, but will retry
292 int ctdb_ltdb_lock_requeue(struct ctdb_db_context
*ctdb_db
,
293 TDB_DATA key
, struct ctdb_req_header
*hdr
,
294 void (*recv_pkt
)(void *, struct ctdb_req_header
*),
295 void *recv_context
, bool ignore_generation
)
298 struct tdb_context
*tdb
= ctdb_db
->ltdb
->tdb
;
299 struct lock_request
*lreq
;
300 struct lock_fetch_state
*state
;
302 ret
= tdb_chainlock_nonblock(tdb
, key
);
305 !(errno
== EACCES
|| errno
== EAGAIN
|| errno
== EDEADLK
)) {
306 /* a hard failure - don't try again */
310 /* when torturing, ensure we test the contended path */
311 if ((ctdb_db
->ctdb
->flags
& CTDB_FLAG_TORTURE
) &&
314 tdb_chainunlock(tdb
, key
);
317 /* first the non-contended path */
322 state
= talloc(hdr
, struct lock_fetch_state
);
323 state
->ctdb
= ctdb_db
->ctdb
;
325 state
->recv_pkt
= recv_pkt
;
326 state
->recv_context
= recv_context
;
327 state
->generation
= ctdb_db
->ctdb
->vnn_map
->generation
;
328 state
->ignore_generation
= ignore_generation
;
330 /* now the contended path */
331 lreq
= ctdb_lock_record(ctdb_db
, key
, true, lock_fetch_callback
, state
);
336 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
337 so it won't be freed yet */
338 talloc_steal(state
, hdr
);
340 /* now tell the caller than we will retry asynchronously */
345 a varient of ctdb_ltdb_lock_requeue that also fetches the record
347 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context
*ctdb_db
,
348 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
349 struct ctdb_req_header
*hdr
, TDB_DATA
*data
,
350 void (*recv_pkt
)(void *, struct ctdb_req_header
*),
351 void *recv_context
, bool ignore_generation
)
355 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
, recv_pkt
,
356 recv_context
, ignore_generation
);
358 ret
= ctdb_ltdb_fetch(ctdb_db
, key
, header
, hdr
, data
);
361 uret
= ctdb_ltdb_unlock(ctdb_db
, key
);
363 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", uret
));
372 paraoid check to see if the db is empty
374 static void ctdb_check_db_empty(struct ctdb_db_context
*ctdb_db
)
376 struct tdb_context
*tdb
= ctdb_db
->ltdb
->tdb
;
377 int count
= tdb_traverse_read(tdb
, NULL
, NULL
);
379 DEBUG(DEBUG_ALERT
,(__location__
" tdb '%s' not empty on attach! aborting\n",
381 ctdb_fatal(ctdb_db
->ctdb
, "database not empty on attach");
385 int ctdb_load_persistent_health(struct ctdb_context
*ctdb
,
386 struct ctdb_db_context
*ctdb_db
)
388 struct tdb_context
*tdb
= ctdb
->db_persistent_health
->tdb
;
394 key
.dptr
= discard_const_p(uint8_t, ctdb_db
->db_name
);
395 key
.dsize
= strlen(ctdb_db
->db_name
);
397 old
= ctdb_db
->unhealthy_reason
;
398 ctdb_db
->unhealthy_reason
= NULL
;
400 val
= tdb_fetch(tdb
, key
);
402 reason
= talloc_strndup(ctdb_db
,
403 (const char *)val
.dptr
,
405 if (reason
== NULL
) {
406 DEBUG(DEBUG_ALERT
,(__location__
" talloc_strndup(%d) failed\n",
408 ctdb_db
->unhealthy_reason
= old
;
419 ctdb_db
->unhealthy_reason
= reason
;
423 int ctdb_update_persistent_health(struct ctdb_context
*ctdb
,
424 struct ctdb_db_context
*ctdb_db
,
425 const char *given_reason
,/* NULL means healthy */
426 int num_healthy_nodes
)
428 struct tdb_context
*tdb
= ctdb
->db_persistent_health
->tdb
;
432 char *new_reason
= NULL
;
433 char *old_reason
= NULL
;
435 ret
= tdb_transaction_start(tdb
);
437 DEBUG(DEBUG_ALERT
,(__location__
" tdb_transaction_start('%s') failed: %d - %s\n",
438 tdb_name(tdb
), ret
, tdb_errorstr(tdb
)));
442 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
444 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_load_persistent_health('%s') failed: %d\n",
445 ctdb_db
->db_name
, ret
));
448 old_reason
= ctdb_db
->unhealthy_reason
;
450 key
.dptr
= discard_const_p(uint8_t, ctdb_db
->db_name
);
451 key
.dsize
= strlen(ctdb_db
->db_name
);
454 new_reason
= talloc_strdup(ctdb_db
, given_reason
);
455 if (new_reason
== NULL
) {
456 DEBUG(DEBUG_ALERT
,(__location__
" talloc_strdup(%s) failed\n",
460 } else if (old_reason
&& num_healthy_nodes
== 0) {
462 * If the reason indicates ok, but there where no healthy nodes
463 * available, that it means, we have not recovered valid content
464 * of the db. So if there's an old reason, prefix it with
465 * "NO-HEALTHY-NODES - "
469 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
470 ret
= strncmp(_TMP_PREFIX
, old_reason
, strlen(_TMP_PREFIX
));
472 prefix
= _TMP_PREFIX
;
476 new_reason
= talloc_asprintf(ctdb_db
, "%s%s",
478 if (new_reason
== NULL
) {
479 DEBUG(DEBUG_ALERT
,(__location__
" talloc_asprintf(%s%s) failed\n",
480 prefix
, old_reason
));
487 val
.dptr
= discard_const_p(uint8_t, new_reason
);
488 val
.dsize
= strlen(new_reason
);
490 ret
= tdb_store(tdb
, key
, val
, TDB_REPLACE
);
492 tdb_transaction_cancel(tdb
);
493 DEBUG(DEBUG_ALERT
,(__location__
" tdb_store('%s', %s, %s) failed: %d - %s\n",
494 tdb_name(tdb
), ctdb_db
->db_name
, new_reason
,
495 ret
, tdb_errorstr(tdb
)));
496 talloc_free(new_reason
);
499 DEBUG(DEBUG_ALERT
,("Updated db health for db(%s) to: %s\n",
500 ctdb_db
->db_name
, new_reason
));
501 } else if (old_reason
) {
502 ret
= tdb_delete(tdb
, key
);
504 tdb_transaction_cancel(tdb
);
505 DEBUG(DEBUG_ALERT
,(__location__
" tdb_delete('%s', %s) failed: %d - %s\n",
506 tdb_name(tdb
), ctdb_db
->db_name
,
507 ret
, tdb_errorstr(tdb
)));
508 talloc_free(new_reason
);
511 DEBUG(DEBUG_NOTICE
,("Updated db health for db(%s): OK\n",
515 ret
= tdb_transaction_commit(tdb
);
516 if (ret
!= TDB_SUCCESS
) {
517 DEBUG(DEBUG_ALERT
,(__location__
" tdb_transaction_commit('%s') failed: %d - %s\n",
518 tdb_name(tdb
), ret
, tdb_errorstr(tdb
)));
519 talloc_free(new_reason
);
523 talloc_free(old_reason
);
524 ctdb_db
->unhealthy_reason
= new_reason
;
529 static int ctdb_backup_corrupted_tdb(struct ctdb_context
*ctdb
,
530 struct ctdb_db_context
*ctdb_db
)
532 time_t now
= time(NULL
);
540 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
541 new_path
= talloc_asprintf(ctdb_db
, "%s.corrupted."
542 "%04u%02u%02u%02u%02u%02u.0Z",
544 tm
->tm_year
+1900, tm
->tm_mon
+1,
545 tm
->tm_mday
, tm
->tm_hour
, tm
->tm_min
,
547 if (new_path
== NULL
) {
548 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
552 new_reason
= talloc_asprintf(ctdb_db
,
553 "ERROR - Backup of corrupted TDB in '%s'",
555 if (new_reason
== NULL
) {
556 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
559 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
, new_reason
, 0);
560 talloc_free(new_reason
);
562 DEBUG(DEBUG_CRIT
,(__location__
563 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
568 ret
= rename(ctdb_db
->db_path
, new_path
);
570 DEBUG(DEBUG_CRIT
,(__location__
571 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
572 ctdb_db
->db_path
, new_path
,
573 errno
, strerror(errno
)));
574 talloc_free(new_path
);
578 DEBUG(DEBUG_CRIT
,(__location__
579 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
580 ctdb_db
->db_path
, new_path
));
581 talloc_free(new_path
);
585 int ctdb_recheck_persistent_health(struct ctdb_context
*ctdb
)
587 struct ctdb_db_context
*ctdb_db
;
592 for (ctdb_db
= ctdb
->db_list
; ctdb_db
; ctdb_db
= ctdb_db
->next
) {
593 if (!ctdb_db
->persistent
) {
597 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
599 DEBUG(DEBUG_ALERT
,(__location__
600 " load persistent health for '%s' failed\n",
605 if (ctdb_db
->unhealthy_reason
== NULL
) {
607 DEBUG(DEBUG_INFO
,(__location__
608 " persistent db '%s' healthy\n",
614 DEBUG(DEBUG_ALERT
,(__location__
615 " persistent db '%s' unhealthy: %s\n",
617 ctdb_db
->unhealthy_reason
));
619 DEBUG((fail
!=0)?DEBUG_ALERT
:DEBUG_NOTICE
,
620 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
632 mark a database - as healthy
634 int32_t ctdb_control_db_set_healthy(struct ctdb_context
*ctdb
, TDB_DATA indata
)
636 uint32_t db_id
= *(uint32_t *)indata
.dptr
;
637 struct ctdb_db_context
*ctdb_db
;
639 bool may_recover
= false;
641 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
643 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%x\n", db_id
));
647 if (ctdb_db
->unhealthy_reason
) {
651 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
, NULL
, 1);
653 DEBUG(DEBUG_ERR
,(__location__
654 " ctdb_update_persistent_health(%s) failed\n",
659 if (may_recover
&& ctdb
->runstate
== CTDB_RUNSTATE_STARTUP
) {
660 DEBUG(DEBUG_ERR
, (__location__
" db %s become healthy - force recovery for startup\n",
662 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
668 int32_t ctdb_control_db_get_health(struct ctdb_context
*ctdb
,
672 uint32_t db_id
= *(uint32_t *)indata
.dptr
;
673 struct ctdb_db_context
*ctdb_db
;
676 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
678 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%x\n", db_id
));
682 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
684 DEBUG(DEBUG_ERR
,(__location__
685 " ctdb_load_persistent_health(%s) failed\n",
691 if (ctdb_db
->unhealthy_reason
) {
692 outdata
->dptr
= (uint8_t *)ctdb_db
->unhealthy_reason
;
693 outdata
->dsize
= strlen(ctdb_db
->unhealthy_reason
)+1;
700 int ctdb_set_db_readonly(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
)
704 if (ctdb_db
->readonly
) {
708 if (ctdb_db
->persistent
) {
709 DEBUG(DEBUG_ERR
,("Persistent databases do not support readonly property\n"));
713 ropath
= talloc_asprintf(ctdb_db
, "%s.RO", ctdb_db
->db_path
);
714 if (ropath
== NULL
) {
715 DEBUG(DEBUG_CRIT
,("Failed to asprintf the tracking database\n"));
718 ctdb_db
->rottdb
= tdb_open(ropath
,
719 ctdb
->tunable
.database_hash_size
,
720 TDB_NOLOCK
|TDB_CLEAR_IF_FIRST
|TDB_NOSYNC
,
722 if (ctdb_db
->rottdb
== NULL
) {
723 DEBUG(DEBUG_CRIT
,("Failed to open/create the tracking database '%s'\n", ropath
));
728 DEBUG(DEBUG_NOTICE
,("OPENED tracking database : '%s'\n", ropath
));
730 ctdb_db
->readonly
= true;
732 DEBUG(DEBUG_NOTICE
, ("Readonly property set on DB %s\n", ctdb_db
->db_name
));
739 attach to a database, handling both persistent and non-persistent databases
740 return 0 on success, -1 on failure
742 static int ctdb_local_attach(struct ctdb_context
*ctdb
, const char *db_name
,
743 bool persistent
, const char *unhealthy_reason
,
744 bool jenkinshash
, bool mutexes
)
746 struct ctdb_db_context
*ctdb_db
, *tmp_db
;
751 int remaining_tries
= 0;
753 ctdb_db
= talloc_zero(ctdb
, struct ctdb_db_context
);
754 CTDB_NO_MEMORY(ctdb
, ctdb_db
);
756 ctdb_db
->priority
= 1;
757 ctdb_db
->ctdb
= ctdb
;
758 ctdb_db
->db_name
= talloc_strdup(ctdb_db
, db_name
);
759 CTDB_NO_MEMORY(ctdb
, ctdb_db
->db_name
);
761 key
.dsize
= strlen(db_name
)+1;
762 key
.dptr
= discard_const(db_name
);
763 ctdb_db
->db_id
= ctdb_hash(&key
);
764 ctdb_db
->persistent
= persistent
;
766 if (!ctdb_db
->persistent
) {
767 ctdb_db
->delete_queue
= trbt_create(ctdb_db
, 0);
768 if (ctdb_db
->delete_queue
== NULL
) {
769 CTDB_NO_MEMORY(ctdb
, ctdb_db
->delete_queue
);
772 ctdb_db
->ctdb_ltdb_store_fn
= ctdb_ltdb_store_server
;
775 /* check for hash collisions */
776 for (tmp_db
=ctdb
->db_list
;tmp_db
;tmp_db
=tmp_db
->next
) {
777 if (tmp_db
->db_id
== ctdb_db
->db_id
) {
778 DEBUG(DEBUG_CRIT
,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
779 tmp_db
->db_id
, db_name
, tmp_db
->db_name
));
780 talloc_free(ctdb_db
);
786 if (unhealthy_reason
) {
787 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
,
788 unhealthy_reason
, 0);
790 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_update_persistent_health('%s','%s') failed: %d\n",
791 ctdb_db
->db_name
, unhealthy_reason
, ret
));
792 talloc_free(ctdb_db
);
797 if (ctdb
->max_persistent_check_errors
> 0) {
800 if (ctdb
->runstate
== CTDB_RUNSTATE_RUNNING
) {
804 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
806 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_load_persistent_health('%s') failed: %d\n",
807 ctdb_db
->db_name
, ret
));
808 talloc_free(ctdb_db
);
813 if (ctdb_db
->unhealthy_reason
&& remaining_tries
== 0) {
814 DEBUG(DEBUG_ALERT
,(__location__
"ERROR: tdb %s is marked as unhealthy: %s\n",
815 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
816 talloc_free(ctdb_db
);
820 if (ctdb_db
->unhealthy_reason
) {
821 /* this is just a warning, but we want that in the log file! */
822 DEBUG(DEBUG_ALERT
,(__location__
"Warning: tdb %s is marked as unhealthy: %s\n",
823 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
826 /* open the database */
827 ctdb_db
->db_path
= talloc_asprintf(ctdb_db
, "%s/%s.%u",
828 persistent
?ctdb
->db_directory_persistent
:ctdb
->db_directory
,
831 tdb_flags
= persistent
? TDB_DEFAULT
: TDB_CLEAR_IF_FIRST
| TDB_NOSYNC
;
832 if (ctdb
->valgrinding
) {
833 tdb_flags
|= TDB_NOMMAP
;
835 tdb_flags
|= TDB_DISALLOW_NESTING
;
837 tdb_flags
|= TDB_INCOMPATIBLE_HASH
;
839 #ifdef TDB_MUTEX_LOCKING
840 if (ctdb
->tunable
.mutex_enabled
&& mutexes
&&
841 tdb_runtime_check_for_robust_mutexes()) {
842 tdb_flags
|= TDB_MUTEX_LOCKING
;
847 ctdb_db
->ltdb
= tdb_wrap_open(ctdb_db
, ctdb_db
->db_path
,
848 ctdb
->tunable
.database_hash_size
,
850 O_CREAT
|O_RDWR
, mode
);
851 if (ctdb_db
->ltdb
== NULL
) {
853 int saved_errno
= errno
;
856 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s': %d - %s\n",
859 strerror(saved_errno
)));
860 talloc_free(ctdb_db
);
864 if (remaining_tries
== 0) {
865 DEBUG(DEBUG_CRIT
,(__location__
866 "Failed to open persistent tdb '%s': %d - %s\n",
869 strerror(saved_errno
)));
870 talloc_free(ctdb_db
);
874 ret
= stat(ctdb_db
->db_path
, &st
);
876 DEBUG(DEBUG_CRIT
,(__location__
877 "Failed to open persistent tdb '%s': %d - %s\n",
880 strerror(saved_errno
)));
881 talloc_free(ctdb_db
);
885 ret
= ctdb_backup_corrupted_tdb(ctdb
, ctdb_db
);
887 DEBUG(DEBUG_CRIT
,(__location__
888 "Failed to open persistent tdb '%s': %d - %s\n",
891 strerror(saved_errno
)));
892 talloc_free(ctdb_db
);
902 ctdb_check_db_empty(ctdb_db
);
904 ret
= tdb_check(ctdb_db
->ltdb
->tdb
, NULL
, NULL
);
909 DEBUG(DEBUG_CRIT
,("tdb_check(%s) failed: %d - %s\n",
910 ctdb_db
->db_path
, ret
,
911 tdb_errorstr(ctdb_db
->ltdb
->tdb
)));
912 if (remaining_tries
== 0) {
913 talloc_free(ctdb_db
);
917 fd
= tdb_fd(ctdb_db
->ltdb
->tdb
);
918 ret
= fstat(fd
, &st
);
920 DEBUG(DEBUG_CRIT
,(__location__
921 "Failed to fstat() persistent tdb '%s': %d - %s\n",
925 talloc_free(ctdb_db
);
930 talloc_free(ctdb_db
->ltdb
);
931 ctdb_db
->ltdb
= NULL
;
933 ret
= ctdb_backup_corrupted_tdb(ctdb
, ctdb_db
);
935 DEBUG(DEBUG_CRIT
,("Failed to backup corrupted tdb '%s'\n",
937 talloc_free(ctdb_db
);
947 /* set up a rb tree we can use to track which records we have a
948 fetch-lock in-flight for so we can defer any additional calls
951 ctdb_db
->deferred_fetch
= trbt_create(ctdb_db
, 0);
952 if (ctdb_db
->deferred_fetch
== NULL
) {
953 DEBUG(DEBUG_ERR
,("Failed to create deferred fetch rb tree for ctdb database\n"));
954 talloc_free(ctdb_db
);
958 DLIST_ADD(ctdb
->db_list
, ctdb_db
);
960 /* setting this can help some high churn databases */
961 tdb_set_max_dead(ctdb_db
->ltdb
->tdb
, ctdb
->tunable
.database_max_dead
);
964 all databases support the "null" function. we need this in
965 order to do forced migration of records
967 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_null_func
, CTDB_NULL_FUNC
);
969 DEBUG(DEBUG_CRIT
,("Failed to setup null function for '%s'\n", ctdb_db
->db_name
));
970 talloc_free(ctdb_db
);
975 all databases support the "fetch" function. we need this
976 for efficient Samba3 ctdb fetch
978 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_fetch_func
, CTDB_FETCH_FUNC
);
980 DEBUG(DEBUG_CRIT
,("Failed to setup fetch function for '%s'\n", ctdb_db
->db_name
));
981 talloc_free(ctdb_db
);
986 all databases support the "fetch_with_header" function. we need this
987 for efficient readonly record fetches
989 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_fetch_with_header_func
, CTDB_FETCH_WITH_HEADER_FUNC
);
991 DEBUG(DEBUG_CRIT
,("Failed to setup fetch function for '%s'\n", ctdb_db
->db_name
));
992 talloc_free(ctdb_db
);
996 ret
= ctdb_vacuum_init(ctdb_db
);
998 DEBUG(DEBUG_CRIT
,("Failed to setup vacuuming for "
999 "database '%s'\n", ctdb_db
->db_name
));
1000 talloc_free(ctdb_db
);
1005 DEBUG(DEBUG_NOTICE
,("Attached to database '%s' with flags 0x%x\n",
1006 ctdb_db
->db_path
, tdb_flags
));
1013 struct ctdb_deferred_attach_context
{
1014 struct ctdb_deferred_attach_context
*next
, *prev
;
1015 struct ctdb_context
*ctdb
;
1016 struct ctdb_req_control
*c
;
1020 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context
*da_ctx
)
1022 DLIST_REMOVE(da_ctx
->ctdb
->deferred_attach
, da_ctx
);
1027 static void ctdb_deferred_attach_timeout(struct event_context
*ev
, struct timed_event
*te
, struct timeval t
, void *private_data
)
1029 struct ctdb_deferred_attach_context
*da_ctx
= talloc_get_type(private_data
, struct ctdb_deferred_attach_context
);
1030 struct ctdb_context
*ctdb
= da_ctx
->ctdb
;
1032 ctdb_request_control_reply(ctdb
, da_ctx
->c
, NULL
, -1, NULL
);
1033 talloc_free(da_ctx
);
1036 static void ctdb_deferred_attach_callback(struct event_context
*ev
, struct timed_event
*te
, struct timeval t
, void *private_data
)
1038 struct ctdb_deferred_attach_context
*da_ctx
= talloc_get_type(private_data
, struct ctdb_deferred_attach_context
);
1039 struct ctdb_context
*ctdb
= da_ctx
->ctdb
;
1041 /* This talloc-steals the packet ->c */
1042 ctdb_input_pkt(ctdb
, (struct ctdb_req_header
*)da_ctx
->c
);
1043 talloc_free(da_ctx
);
1046 int ctdb_process_deferred_attach(struct ctdb_context
*ctdb
)
1048 struct ctdb_deferred_attach_context
*da_ctx
;
1050 /* call it from the main event loop as soon as the current event
1053 while ((da_ctx
= ctdb
->deferred_attach
) != NULL
) {
1054 DLIST_REMOVE(ctdb
->deferred_attach
, da_ctx
);
1055 event_add_timed(ctdb
->ev
, da_ctx
, timeval_current_ofs(1,0), ctdb_deferred_attach_callback
, da_ctx
);
1062 a client has asked to attach a new database
1064 int32_t ctdb_control_db_attach(struct ctdb_context
*ctdb
, TDB_DATA indata
,
1065 TDB_DATA
*outdata
, uint64_t tdb_flags
,
1066 bool persistent
, uint32_t client_id
,
1067 struct ctdb_req_control
*c
,
1070 const char *db_name
= (const char *)indata
.dptr
;
1071 struct ctdb_db_context
*db
;
1072 struct ctdb_node
*node
= ctdb
->nodes
[ctdb
->pnn
];
1073 struct ctdb_client
*client
= NULL
;
1074 bool with_jenkinshash
, with_mutexes
;
1076 if (ctdb
->tunable
.allow_client_db_attach
== 0) {
1077 DEBUG(DEBUG_ERR
, ("DB Attach to database %s denied by tunable "
1078 "AllowClientDBAccess == 0\n", db_name
));
1082 /* dont allow any local clients to attach while we are in recovery mode
1083 * except for the recovery daemon.
1084 * allow all attach from the network since these are always from remote
1087 if (client_id
!= 0) {
1088 client
= ctdb_reqid_find(ctdb
, client_id
, struct ctdb_client
);
1090 if (client
!= NULL
) {
1091 /* If the node is inactive it is not part of the cluster
1092 and we should not allow clients to attach to any
1095 if (node
->flags
& NODE_FLAGS_INACTIVE
) {
1096 DEBUG(DEBUG_ERR
,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name
, node
->flags
));
1100 if (ctdb
->recovery_mode
== CTDB_RECOVERY_ACTIVE
&&
1101 client
->pid
!= ctdb
->recoverd_pid
&&
1102 ctdb
->runstate
< CTDB_RUNSTATE_RUNNING
) {
1103 struct ctdb_deferred_attach_context
*da_ctx
= talloc(client
, struct ctdb_deferred_attach_context
);
1105 if (da_ctx
== NULL
) {
1106 DEBUG(DEBUG_ERR
,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name
, client
->pid
));
1110 da_ctx
->ctdb
= ctdb
;
1111 da_ctx
->c
= talloc_steal(da_ctx
, c
);
1112 talloc_set_destructor(da_ctx
, ctdb_deferred_attach_destructor
);
1113 DLIST_ADD(ctdb
->deferred_attach
, da_ctx
);
1115 event_add_timed(ctdb
->ev
, da_ctx
, timeval_current_ofs(ctdb
->tunable
.deferred_attach_timeout
, 0), ctdb_deferred_attach_timeout
, da_ctx
);
1117 DEBUG(DEBUG_ERR
,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name
, client
->pid
));
1118 *async_reply
= true;
1123 /* the client can optionally pass additional tdb flags, but we
1124 only allow a subset of those on the database in ctdb. Note
1125 that tdb_flags is passed in via the (otherwise unused)
1126 srvid to the attach control */
1127 #ifdef TDB_MUTEX_LOCKING
1128 tdb_flags
&= (TDB_NOSYNC
|TDB_INCOMPATIBLE_HASH
|TDB_MUTEX_LOCKING
);
1130 tdb_flags
&= (TDB_NOSYNC
|TDB_INCOMPATIBLE_HASH
);
1133 /* see if we already have this name */
1134 db
= ctdb_db_handle(ctdb
, db_name
);
1136 if (db
->persistent
!= persistent
) {
1137 DEBUG(DEBUG_ERR
, ("ERROR: DB Attach %spersistent to %spersistent "
1138 "database %s\n", persistent
? "" : "non-",
1139 db
-> persistent
? "" : "non-", db_name
));
1142 outdata
->dptr
= (uint8_t *)&db
->db_id
;
1143 outdata
->dsize
= sizeof(db
->db_id
);
1144 tdb_add_flags(db
->ltdb
->tdb
, tdb_flags
);
1148 with_jenkinshash
= (tdb_flags
& TDB_INCOMPATIBLE_HASH
) ? true : false;
1149 #ifdef TDB_MUTEX_LOCKING
1150 with_mutexes
= (tdb_flags
& TDB_MUTEX_LOCKING
) ? true : false;
1152 with_mutexes
= false;
1155 if (ctdb_local_attach(ctdb
, db_name
, persistent
, NULL
,
1156 with_jenkinshash
, with_mutexes
) != 0) {
1160 db
= ctdb_db_handle(ctdb
, db_name
);
1162 DEBUG(DEBUG_ERR
,("Failed to find db handle for name '%s'\n", db_name
));
1166 /* remember the flags the client has specified */
1167 tdb_add_flags(db
->ltdb
->tdb
, tdb_flags
);
1169 outdata
->dptr
= (uint8_t *)&db
->db_id
;
1170 outdata
->dsize
= sizeof(db
->db_id
);
1172 /* Try to ensure it's locked in mem */
1173 lockdown_memory(ctdb
->valgrinding
);
1175 /* tell all the other nodes about this database */
1176 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_ALL
, tdb_flags
,
1177 persistent
?CTDB_CONTROL_DB_ATTACH_PERSISTENT
:
1178 CTDB_CONTROL_DB_ATTACH
,
1179 0, CTDB_CTRL_FLAG_NOREPLY
,
1180 indata
, NULL
, NULL
);
1187 * a client has asked to detach from a database
1189 int32_t ctdb_control_db_detach(struct ctdb_context
*ctdb
, TDB_DATA indata
,
1193 struct ctdb_db_context
*ctdb_db
;
1194 struct ctdb_client
*client
= NULL
;
1196 db_id
= *(uint32_t *)indata
.dptr
;
1197 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1198 if (ctdb_db
== NULL
) {
1199 DEBUG(DEBUG_ERR
, ("Invalid dbid 0x%08x in DB detach\n",
1204 if (ctdb
->tunable
.allow_client_db_attach
== 1) {
1205 DEBUG(DEBUG_ERR
, ("DB detach from database %s denied. "
1206 "Clients are allowed access to databases "
1207 "(AllowClientDBAccess == 1)\n",
1212 if (ctdb_db
->persistent
) {
1213 DEBUG(DEBUG_ERR
, ("DB detach from persistent database %s "
1214 "denied\n", ctdb_db
->db_name
));
1218 /* Cannot detach from database when in recovery */
1219 if (ctdb
->recovery_mode
== CTDB_RECOVERY_ACTIVE
) {
1220 DEBUG(DEBUG_ERR
, ("DB detach denied while in recovery\n"));
1224 /* If a control comes from a client, then broadcast it to all nodes.
1225 * Do the actual detach only if the control comes from other daemons.
1227 if (client_id
!= 0) {
1228 client
= ctdb_reqid_find(ctdb
, client_id
, struct ctdb_client
);
1229 if (client
!= NULL
) {
1230 /* forward the control to all the nodes */
1231 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_ALL
, 0,
1232 CTDB_CONTROL_DB_DETACH
, 0,
1233 CTDB_CTRL_FLAG_NOREPLY
,
1234 indata
, NULL
, NULL
);
1237 DEBUG(DEBUG_ERR
, ("Client has gone away. Failing DB detach "
1238 "for database '%s'\n", ctdb_db
->db_name
));
1242 /* Detach database from recoverd */
1243 if (ctdb_daemon_send_message(ctdb
, ctdb
->pnn
,
1244 CTDB_SRVID_DETACH_DATABASE
,
1246 DEBUG(DEBUG_ERR
, ("Unable to detach DB from recoverd\n"));
1250 /* Disable vacuuming and drop all vacuuming data */
1251 talloc_free(ctdb_db
->vacuum_handle
);
1252 talloc_free(ctdb_db
->delete_queue
);
1254 /* Terminate any deferred fetch */
1255 talloc_free(ctdb_db
->deferred_fetch
);
1257 /* Terminate any traverses */
1258 while (ctdb_db
->traverse
) {
1259 talloc_free(ctdb_db
->traverse
);
1262 /* Terminate any revokes */
1263 while (ctdb_db
->revokechild_active
) {
1264 talloc_free(ctdb_db
->revokechild_active
);
1267 /* Free readonly tracking database */
1268 if (ctdb_db
->readonly
) {
1269 talloc_free(ctdb_db
->rottdb
);
1272 DLIST_REMOVE(ctdb
->db_list
, ctdb_db
);
1274 DEBUG(DEBUG_NOTICE
, ("Detached from database '%s'\n",
1276 talloc_free(ctdb_db
);
1282 attach to all existing persistent databases
1284 static int ctdb_attach_persistent(struct ctdb_context
*ctdb
,
1285 const char *unhealthy_reason
)
1290 /* open the persistent db directory and scan it for files */
1291 d
= opendir(ctdb
->db_directory_persistent
);
1296 while ((de
=readdir(d
))) {
1298 size_t len
= strlen(de
->d_name
);
1300 int invalid_name
= 0;
1302 s
= talloc_strdup(ctdb
, de
->d_name
);
1305 CTDB_NO_MEMORY(ctdb
, s
);
1308 /* only accept names ending in .tdb */
1309 p
= strstr(s
, ".tdb.");
1310 if (len
< 7 || p
== NULL
) {
1315 /* only accept names ending with .tdb. and any number of digits */
1317 while (*q
!= 0 && invalid_name
== 0) {
1318 if (!isdigit(*q
++)) {
1322 if (invalid_name
== 1 || sscanf(p
+5, "%u", &node
) != 1 || node
!= ctdb
->pnn
) {
1323 DEBUG(DEBUG_ERR
,("Ignoring persistent database '%s'\n", de
->d_name
));
1329 if (ctdb_local_attach(ctdb
, s
, true, unhealthy_reason
, false, false) != 0) {
1330 DEBUG(DEBUG_ERR
,("Failed to attach to persistent database '%s'\n", de
->d_name
));
1336 DEBUG(DEBUG_INFO
,("Attached to persistent database %s\n", s
));
1344 int ctdb_attach_databases(struct ctdb_context
*ctdb
)
1347 char *persistent_health_path
= NULL
;
1348 char *unhealthy_reason
= NULL
;
1349 bool first_try
= true;
1351 persistent_health_path
= talloc_asprintf(ctdb
, "%s/%s.%u",
1352 ctdb
->db_directory_state
,
1353 PERSISTENT_HEALTH_TDB
,
1355 if (persistent_health_path
== NULL
) {
1356 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1362 ctdb
->db_persistent_health
= tdb_wrap_open(ctdb
, persistent_health_path
,
1363 0, TDB_DISALLOW_NESTING
,
1364 O_CREAT
| O_RDWR
, 0600);
1365 if (ctdb
->db_persistent_health
== NULL
) {
1366 struct tdb_wrap
*tdb
;
1369 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s': %d - %s\n",
1370 persistent_health_path
,
1373 talloc_free(persistent_health_path
);
1374 talloc_free(unhealthy_reason
);
1379 unhealthy_reason
= talloc_asprintf(ctdb
, "WARNING - '%s' %s - %s",
1380 persistent_health_path
,
1381 "was cleared after a failure",
1382 "manual verification needed");
1383 if (unhealthy_reason
== NULL
) {
1384 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1385 talloc_free(persistent_health_path
);
1389 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1390 persistent_health_path
));
1391 tdb
= tdb_wrap_open(ctdb
, persistent_health_path
,
1392 0, TDB_CLEAR_IF_FIRST
| TDB_DISALLOW_NESTING
,
1393 O_CREAT
| O_RDWR
, 0600);
1395 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1396 persistent_health_path
,
1399 talloc_free(persistent_health_path
);
1400 talloc_free(unhealthy_reason
);
1407 ret
= tdb_check(ctdb
->db_persistent_health
->tdb
, NULL
, NULL
);
1409 struct tdb_wrap
*tdb
;
1411 talloc_free(ctdb
->db_persistent_health
);
1412 ctdb
->db_persistent_health
= NULL
;
1415 DEBUG(DEBUG_CRIT
,("tdb_check('%s') failed\n",
1416 persistent_health_path
));
1417 talloc_free(persistent_health_path
);
1418 talloc_free(unhealthy_reason
);
1423 unhealthy_reason
= talloc_asprintf(ctdb
, "WARNING - '%s' %s - %s",
1424 persistent_health_path
,
1425 "was cleared after a failure",
1426 "manual verification needed");
1427 if (unhealthy_reason
== NULL
) {
1428 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1429 talloc_free(persistent_health_path
);
1433 DEBUG(DEBUG_CRIT
,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1434 persistent_health_path
));
1435 tdb
= tdb_wrap_open(ctdb
, persistent_health_path
,
1436 0, TDB_CLEAR_IF_FIRST
| TDB_DISALLOW_NESTING
,
1437 O_CREAT
| O_RDWR
, 0600);
1439 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1440 persistent_health_path
,
1443 talloc_free(persistent_health_path
);
1444 talloc_free(unhealthy_reason
);
1451 talloc_free(persistent_health_path
);
1453 ret
= ctdb_attach_persistent(ctdb
, unhealthy_reason
);
1454 talloc_free(unhealthy_reason
);
1463 called when a broadcast seqnum update comes in
1465 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context
*ctdb
, uint32_t db_id
, uint32_t srcnode
)
1467 struct ctdb_db_context
*ctdb_db
;
1468 if (srcnode
== ctdb
->pnn
) {
1469 /* don't update ourselves! */
1473 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1475 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id
));
1479 if (ctdb_db
->unhealthy_reason
) {
1480 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1481 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
1485 tdb_increment_seqnum_nonblock(ctdb_db
->ltdb
->tdb
);
1486 ctdb_db
->seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1491 timer to check for seqnum changes in a ltdb and propogate them
1493 static void ctdb_ltdb_seqnum_check(struct event_context
*ev
, struct timed_event
*te
,
1494 struct timeval t
, void *p
)
1496 struct ctdb_db_context
*ctdb_db
= talloc_get_type(p
, struct ctdb_db_context
);
1497 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1498 uint32_t new_seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1499 if (new_seqnum
!= ctdb_db
->seqnum
) {
1500 /* something has changed - propogate it */
1502 data
.dptr
= (uint8_t *)&ctdb_db
->db_id
;
1503 data
.dsize
= sizeof(uint32_t);
1504 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_VNNMAP
, 0,
1505 CTDB_CONTROL_UPDATE_SEQNUM
, 0, CTDB_CTRL_FLAG_NOREPLY
,
1508 ctdb_db
->seqnum
= new_seqnum
;
1510 /* setup a new timer */
1511 ctdb_db
->seqnum_update
=
1512 event_add_timed(ctdb
->ev
, ctdb_db
,
1513 timeval_current_ofs(ctdb
->tunable
.seqnum_interval
/1000, (ctdb
->tunable
.seqnum_interval
%1000)*1000),
1514 ctdb_ltdb_seqnum_check
, ctdb_db
);
1518 enable seqnum handling on this db
1520 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context
*ctdb
, uint32_t db_id
)
1522 struct ctdb_db_context
*ctdb_db
;
1523 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1525 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id
));
1529 if (ctdb_db
->seqnum_update
== NULL
) {
1530 ctdb_db
->seqnum_update
=
1531 event_add_timed(ctdb
->ev
, ctdb_db
,
1532 timeval_current_ofs(ctdb
->tunable
.seqnum_interval
/1000, (ctdb
->tunable
.seqnum_interval
%1000)*1000),
1533 ctdb_ltdb_seqnum_check
, ctdb_db
);
1536 tdb_enable_seqnum(ctdb_db
->ltdb
->tdb
);
1537 ctdb_db
->seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1541 int32_t ctdb_control_set_db_priority(struct ctdb_context
*ctdb
, TDB_DATA indata
,
1544 struct ctdb_db_priority
*db_prio
= (struct ctdb_db_priority
*)indata
.dptr
;
1545 struct ctdb_db_context
*ctdb_db
;
1547 ctdb_db
= find_ctdb_db(ctdb
, db_prio
->db_id
);
1549 if (!(ctdb
->nodes
[ctdb
->pnn
]->flags
& NODE_FLAGS_INACTIVE
)) {
1550 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
1556 if ((db_prio
->priority
<1) || (db_prio
->priority
>NUM_DB_PRIORITIES
)) {
1557 DEBUG(DEBUG_ERR
,("Trying to set invalid priority : %u\n", db_prio
->priority
));
1561 ctdb_db
->priority
= db_prio
->priority
;
1562 DEBUG(DEBUG_INFO
,("Setting DB priority to %u for db 0x%08x\n", db_prio
->priority
, db_prio
->db_id
));
1564 if (client_id
!= 0) {
1565 /* Broadcast the update to the rest of the cluster */
1566 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_ALL
, 0,
1567 CTDB_CONTROL_SET_DB_PRIORITY
, 0,
1568 CTDB_CTRL_FLAG_NOREPLY
, indata
,
1575 int ctdb_set_db_sticky(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
)
1577 if (ctdb_db
->sticky
) {
1581 if (ctdb_db
->persistent
) {
1582 DEBUG(DEBUG_ERR
,("Trying to set persistent database with sticky property\n"));
1586 ctdb_db
->sticky_records
= trbt_create(ctdb_db
, 0);
1588 ctdb_db
->sticky
= true;
1590 DEBUG(DEBUG_NOTICE
,("set db sticky %s\n", ctdb_db
->db_name
));
1595 int32_t ctdb_control_get_db_statistics(struct ctdb_context
*ctdb
,
1599 struct ctdb_db_context
*ctdb_db
;
1600 struct ctdb_db_statistics
*stats
;
1605 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1607 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in get_db_statistics\n", db_id
));
1611 len
= offsetof(struct ctdb_db_statistics
, hot_keys_wire
);
1612 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
1613 len
+= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
;
1616 stats
= talloc_size(outdata
, len
);
1617 if (stats
== NULL
) {
1618 DEBUG(DEBUG_ERR
,("Failed to allocate db statistics structure\n"));
1622 *stats
= ctdb_db
->statistics
;
1624 stats
->num_hot_keys
= MAX_HOT_KEYS
;
1626 ptr
= &stats
->hot_keys_wire
[0];
1627 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
1628 memcpy(ptr
, ctdb_db
->statistics
.hot_keys
[i
].key
.dptr
,
1629 ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
);
1630 ptr
+= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
;
1633 outdata
->dptr
= (uint8_t *)stats
;
1634 outdata
->dsize
= len
;