2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
47 * write a record to a normal database
49 * This is the server-variant of the ctdb_ltdb_store function.
50 * It contains logic to determine whether a record should be
51 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52 * controls to the local ctdb daemon if apporpriate.
54 static int ctdb_ltdb_store_server(struct ctdb_db_context
*ctdb_db
,
56 struct ctdb_ltdb_header
*header
,
59 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
62 bool seqnum_suppressed
= false;
64 bool schedule_for_deletion
= false;
65 bool remove_from_delete_queue
= false;
68 if (ctdb
->flags
& CTDB_FLAG_TORTURE
) {
69 struct ctdb_ltdb_header
*h2
;
70 rec
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
71 h2
= (struct ctdb_ltdb_header
*)rec
.dptr
;
72 if (rec
.dptr
&& rec
.dsize
>= sizeof(h2
) && h2
->rsn
> header
->rsn
) {
73 DEBUG(DEBUG_CRIT
,("RSN regression! %llu %llu\n",
74 (unsigned long long)h2
->rsn
, (unsigned long long)header
->rsn
));
76 if (rec
.dptr
) free(rec
.dptr
);
79 if (ctdb
->vnn_map
== NULL
) {
81 * Called from a client: always store the record
82 * Also don't call ctdb_lmaster since it uses the vnn_map!
88 lmaster
= ctdb_lmaster(ctdb_db
->ctdb
, &key
);
91 * If we migrate an empty record off to another node
92 * and the record has not been migrated with data,
93 * delete the record instead of storing the empty record.
95 if (data
.dsize
!= 0) {
97 } else if (header
->flags
& CTDB_REC_RO_FLAGS
) {
99 } else if (ctdb_db
->persistent
) {
101 } else if (header
->flags
& CTDB_REC_FLAG_AUTOMATIC
) {
103 * The record is not created by the client but
104 * automatically by the ctdb_ltdb_fetch logic that
105 * creates a record with an initial header in the
106 * ltdb before trying to migrate the record from
107 * the current lmaster. Keep it instead of trying
108 * to delete the non-existing record...
111 schedule_for_deletion
= true;
112 } else if (header
->flags
& CTDB_REC_FLAG_MIGRATED_WITH_DATA
) {
114 } else if (ctdb_db
->ctdb
->pnn
== lmaster
) {
116 * If we are lmaster, then we usually keep the record.
117 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
118 * and the record is empty and has never been migrated
119 * with data, then we should delete it instead of storing it.
120 * This is part of the vacuuming process.
122 * The reason that we usually need to store even empty records
123 * on the lmaster is that a client operating directly on the
124 * lmaster (== dmaster) expects the local copy of the record to
125 * exist after successful ctdb migrate call. If the record does
126 * not exist, the client goes into a migrate loop and eventually
127 * fails. So storing the empty record makes sure that we do not
128 * need to change the client code.
130 if (!(header
->flags
& CTDB_REC_FLAG_VACUUM_MIGRATED
)) {
132 } else if (ctdb_db
->ctdb
->pnn
!= header
->dmaster
) {
135 } else if (ctdb_db
->ctdb
->pnn
== header
->dmaster
) {
140 if (!ctdb_db
->persistent
&&
141 (ctdb_db
->ctdb
->pnn
== header
->dmaster
) &&
142 !(header
->flags
& CTDB_REC_RO_FLAGS
))
146 if (data
.dsize
== 0) {
147 schedule_for_deletion
= true;
150 remove_from_delete_queue
= !schedule_for_deletion
;
155 * The VACUUM_MIGRATED flag is only set temporarily for
156 * the above logic when the record was retrieved by a
157 * VACUUM_MIGRATE call and should not be stored in the
160 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
161 * and there are two cases in which the corresponding record
162 * is stored in the local database:
163 * 1. The record has been migrated with data in the past
164 * (the MIGRATED_WITH_DATA record flag is set).
165 * 2. The record has been filled with data again since it
166 * had been submitted in the VACUUM_FETCH message to the
168 * For such records it is important to not store the
169 * VACUUM_MIGRATED flag in the database.
171 header
->flags
&= ~CTDB_REC_FLAG_VACUUM_MIGRATED
;
174 * Similarly, clear the AUTOMATIC flag which should not enter
175 * the local database copy since this would require client
176 * modifications to clear the flag when the client stores
179 header
->flags
&= ~CTDB_REC_FLAG_AUTOMATIC
;
181 rec
.dsize
= sizeof(*header
) + data
.dsize
;
182 rec
.dptr
= talloc_size(ctdb
, rec
.dsize
);
183 CTDB_NO_MEMORY(ctdb
, rec
.dptr
);
185 memcpy(rec
.dptr
, header
, sizeof(*header
));
186 memcpy(rec
.dptr
+ sizeof(*header
), data
.dptr
, data
.dsize
);
188 /* Databases with seqnum updates enabled only get their seqnum
189 changes when/if we modify the data */
190 if (ctdb_db
->seqnum_update
!= NULL
) {
192 old
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
194 if ( (old
.dsize
== rec
.dsize
)
195 && !memcmp(old
.dptr
+sizeof(struct ctdb_ltdb_header
),
196 rec
.dptr
+sizeof(struct ctdb_ltdb_header
),
197 rec
.dsize
-sizeof(struct ctdb_ltdb_header
)) ) {
198 tdb_remove_flags(ctdb_db
->ltdb
->tdb
, TDB_SEQNUM
);
199 seqnum_suppressed
= true;
201 if (old
.dptr
) free(old
.dptr
);
204 DEBUG(DEBUG_DEBUG
, (__location__
" db[%s]: %s record: hash[0x%08x]\n",
206 keep
?"storing":"deleting",
210 ret
= tdb_store(ctdb_db
->ltdb
->tdb
, key
, rec
, TDB_REPLACE
);
212 ret
= tdb_delete(ctdb_db
->ltdb
->tdb
, key
);
219 tdb_error(ctdb_db
->ltdb
->tdb
) == TDB_ERR_NOEXIST
)
224 DEBUG(lvl
, (__location__
" db[%s]: Failed to %s record: "
227 keep
?"store":"delete", ret
,
228 tdb_errorstr(ctdb_db
->ltdb
->tdb
)));
230 schedule_for_deletion
= false;
231 remove_from_delete_queue
= false;
233 if (seqnum_suppressed
) {
234 tdb_add_flags(ctdb_db
->ltdb
->tdb
, TDB_SEQNUM
);
237 talloc_free(rec
.dptr
);
239 if (schedule_for_deletion
) {
241 ret2
= ctdb_local_schedule_for_deletion(ctdb_db
, header
, key
);
243 DEBUG(DEBUG_ERR
, (__location__
" ctdb_local_schedule_for_deletion failed.\n"));
247 if (remove_from_delete_queue
) {
248 ctdb_local_remove_from_delete_queue(ctdb_db
, header
, key
);
254 struct lock_fetch_state
{
255 struct ctdb_context
*ctdb
;
256 struct ctdb_db_context
*ctdb_db
;
257 void (*recv_pkt
)(void *, struct ctdb_req_header
*);
259 struct ctdb_req_header
*hdr
;
261 bool ignore_generation
;
265 called when we should retry the operation
267 static void lock_fetch_callback(void *p
, bool locked
)
269 struct lock_fetch_state
*state
= talloc_get_type(p
, struct lock_fetch_state
);
270 if (!state
->ignore_generation
&&
271 state
->generation
!= state
->ctdb_db
->generation
) {
272 DEBUG(DEBUG_NOTICE
,("Discarding previous generation lockwait packet\n"));
273 talloc_free(state
->hdr
);
276 state
->recv_pkt(state
->recv_context
, state
->hdr
);
277 DEBUG(DEBUG_INFO
,(__location__
" PACKET REQUEUED\n"));
282 do a non-blocking ltdb_lock, deferring this ctdb request until we
285 It does the following:
287 1) tries to get the chainlock. If it succeeds, then it returns 0
289 2) if it fails to get a chainlock immediately then it sets up a
290 non-blocking chainlock via ctdb_lock_record, and when it gets the
291 chainlock it re-submits this ctdb request to the main packet
294 This effectively queues all ctdb requests that cannot be
295 immediately satisfied until it can get the lock. This means that
296 the main ctdb daemon will not block waiting for a chainlock held by
299 There are 3 possible return values:
301 0: means that it got the lock immediately.
302 -1: means that it failed to get the lock, and won't retry
303 -2: means that it failed to get the lock immediately, but will retry
305 int ctdb_ltdb_lock_requeue(struct ctdb_db_context
*ctdb_db
,
306 TDB_DATA key
, struct ctdb_req_header
*hdr
,
307 void (*recv_pkt
)(void *, struct ctdb_req_header
*),
308 void *recv_context
, bool ignore_generation
)
311 struct tdb_context
*tdb
= ctdb_db
->ltdb
->tdb
;
312 struct lock_request
*lreq
;
313 struct lock_fetch_state
*state
;
315 ret
= tdb_chainlock_nonblock(tdb
, key
);
318 !(errno
== EACCES
|| errno
== EAGAIN
|| errno
== EDEADLK
)) {
319 /* a hard failure - don't try again */
323 /* when torturing, ensure we test the contended path */
324 if ((ctdb_db
->ctdb
->flags
& CTDB_FLAG_TORTURE
) &&
327 tdb_chainunlock(tdb
, key
);
330 /* first the non-contended path */
335 state
= talloc(hdr
, struct lock_fetch_state
);
336 state
->ctdb
= ctdb_db
->ctdb
;
337 state
->ctdb_db
= ctdb_db
;
339 state
->recv_pkt
= recv_pkt
;
340 state
->recv_context
= recv_context
;
341 state
->generation
= ctdb_db
->generation
;
342 state
->ignore_generation
= ignore_generation
;
344 /* now the contended path */
345 lreq
= ctdb_lock_record(state
, ctdb_db
, key
, true, lock_fetch_callback
, state
);
350 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
351 so it won't be freed yet */
352 talloc_steal(state
, hdr
);
354 /* now tell the caller than we will retry asynchronously */
359 a varient of ctdb_ltdb_lock_requeue that also fetches the record
361 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context
*ctdb_db
,
362 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
363 struct ctdb_req_header
*hdr
, TDB_DATA
*data
,
364 void (*recv_pkt
)(void *, struct ctdb_req_header
*),
365 void *recv_context
, bool ignore_generation
)
369 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
, recv_pkt
,
370 recv_context
, ignore_generation
);
372 ret
= ctdb_ltdb_fetch(ctdb_db
, key
, header
, hdr
, data
);
375 uret
= ctdb_ltdb_unlock(ctdb_db
, key
);
377 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", uret
));
386 paraoid check to see if the db is empty
388 static void ctdb_check_db_empty(struct ctdb_db_context
*ctdb_db
)
390 struct tdb_context
*tdb
= ctdb_db
->ltdb
->tdb
;
391 int count
= tdb_traverse_read(tdb
, NULL
, NULL
);
393 DEBUG(DEBUG_ALERT
,(__location__
" tdb '%s' not empty on attach! aborting\n",
395 ctdb_fatal(ctdb_db
->ctdb
, "database not empty on attach");
399 int ctdb_load_persistent_health(struct ctdb_context
*ctdb
,
400 struct ctdb_db_context
*ctdb_db
)
402 struct tdb_context
*tdb
= ctdb
->db_persistent_health
->tdb
;
408 key
.dptr
= discard_const_p(uint8_t, ctdb_db
->db_name
);
409 key
.dsize
= strlen(ctdb_db
->db_name
);
411 old
= ctdb_db
->unhealthy_reason
;
412 ctdb_db
->unhealthy_reason
= NULL
;
414 val
= tdb_fetch(tdb
, key
);
416 reason
= talloc_strndup(ctdb_db
,
417 (const char *)val
.dptr
,
419 if (reason
== NULL
) {
420 DEBUG(DEBUG_ALERT
,(__location__
" talloc_strndup(%d) failed\n",
422 ctdb_db
->unhealthy_reason
= old
;
433 ctdb_db
->unhealthy_reason
= reason
;
437 int ctdb_update_persistent_health(struct ctdb_context
*ctdb
,
438 struct ctdb_db_context
*ctdb_db
,
439 const char *given_reason
,/* NULL means healthy */
440 int num_healthy_nodes
)
442 struct tdb_context
*tdb
= ctdb
->db_persistent_health
->tdb
;
446 char *new_reason
= NULL
;
447 char *old_reason
= NULL
;
449 ret
= tdb_transaction_start(tdb
);
451 DEBUG(DEBUG_ALERT
,(__location__
" tdb_transaction_start('%s') failed: %d - %s\n",
452 tdb_name(tdb
), ret
, tdb_errorstr(tdb
)));
456 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
458 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_load_persistent_health('%s') failed: %d\n",
459 ctdb_db
->db_name
, ret
));
462 old_reason
= ctdb_db
->unhealthy_reason
;
464 key
.dptr
= discard_const_p(uint8_t, ctdb_db
->db_name
);
465 key
.dsize
= strlen(ctdb_db
->db_name
);
468 new_reason
= talloc_strdup(ctdb_db
, given_reason
);
469 if (new_reason
== NULL
) {
470 DEBUG(DEBUG_ALERT
,(__location__
" talloc_strdup(%s) failed\n",
474 } else if (old_reason
&& num_healthy_nodes
== 0) {
476 * If the reason indicates ok, but there where no healthy nodes
477 * available, that it means, we have not recovered valid content
478 * of the db. So if there's an old reason, prefix it with
479 * "NO-HEALTHY-NODES - "
483 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
484 ret
= strncmp(_TMP_PREFIX
, old_reason
, strlen(_TMP_PREFIX
));
486 prefix
= _TMP_PREFIX
;
490 new_reason
= talloc_asprintf(ctdb_db
, "%s%s",
492 if (new_reason
== NULL
) {
493 DEBUG(DEBUG_ALERT
,(__location__
" talloc_asprintf(%s%s) failed\n",
494 prefix
, old_reason
));
501 val
.dptr
= discard_const_p(uint8_t, new_reason
);
502 val
.dsize
= strlen(new_reason
);
504 ret
= tdb_store(tdb
, key
, val
, TDB_REPLACE
);
506 tdb_transaction_cancel(tdb
);
507 DEBUG(DEBUG_ALERT
,(__location__
" tdb_store('%s', %s, %s) failed: %d - %s\n",
508 tdb_name(tdb
), ctdb_db
->db_name
, new_reason
,
509 ret
, tdb_errorstr(tdb
)));
510 talloc_free(new_reason
);
513 DEBUG(DEBUG_ALERT
,("Updated db health for db(%s) to: %s\n",
514 ctdb_db
->db_name
, new_reason
));
515 } else if (old_reason
) {
516 ret
= tdb_delete(tdb
, key
);
518 tdb_transaction_cancel(tdb
);
519 DEBUG(DEBUG_ALERT
,(__location__
" tdb_delete('%s', %s) failed: %d - %s\n",
520 tdb_name(tdb
), ctdb_db
->db_name
,
521 ret
, tdb_errorstr(tdb
)));
522 talloc_free(new_reason
);
525 DEBUG(DEBUG_NOTICE
,("Updated db health for db(%s): OK\n",
529 ret
= tdb_transaction_commit(tdb
);
530 if (ret
!= TDB_SUCCESS
) {
531 DEBUG(DEBUG_ALERT
,(__location__
" tdb_transaction_commit('%s') failed: %d - %s\n",
532 tdb_name(tdb
), ret
, tdb_errorstr(tdb
)));
533 talloc_free(new_reason
);
537 talloc_free(old_reason
);
538 ctdb_db
->unhealthy_reason
= new_reason
;
543 static int ctdb_backup_corrupted_tdb(struct ctdb_context
*ctdb
,
544 struct ctdb_db_context
*ctdb_db
)
546 time_t now
= time(NULL
);
554 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
555 new_path
= talloc_asprintf(ctdb_db
, "%s.corrupted."
556 "%04u%02u%02u%02u%02u%02u.0Z",
558 tm
->tm_year
+1900, tm
->tm_mon
+1,
559 tm
->tm_mday
, tm
->tm_hour
, tm
->tm_min
,
561 if (new_path
== NULL
) {
562 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
566 new_reason
= talloc_asprintf(ctdb_db
,
567 "ERROR - Backup of corrupted TDB in '%s'",
569 if (new_reason
== NULL
) {
570 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
573 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
, new_reason
, 0);
574 talloc_free(new_reason
);
576 DEBUG(DEBUG_CRIT
,(__location__
577 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
582 ret
= rename(ctdb_db
->db_path
, new_path
);
584 DEBUG(DEBUG_CRIT
,(__location__
585 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
586 ctdb_db
->db_path
, new_path
,
587 errno
, strerror(errno
)));
588 talloc_free(new_path
);
592 DEBUG(DEBUG_CRIT
,(__location__
593 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
594 ctdb_db
->db_path
, new_path
));
595 talloc_free(new_path
);
599 int ctdb_recheck_persistent_health(struct ctdb_context
*ctdb
)
601 struct ctdb_db_context
*ctdb_db
;
606 for (ctdb_db
= ctdb
->db_list
; ctdb_db
; ctdb_db
= ctdb_db
->next
) {
607 if (!ctdb_db
->persistent
) {
611 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
613 DEBUG(DEBUG_ALERT
,(__location__
614 " load persistent health for '%s' failed\n",
619 if (ctdb_db
->unhealthy_reason
== NULL
) {
621 DEBUG(DEBUG_INFO
,(__location__
622 " persistent db '%s' healthy\n",
628 DEBUG(DEBUG_ALERT
,(__location__
629 " persistent db '%s' unhealthy: %s\n",
631 ctdb_db
->unhealthy_reason
));
633 DEBUG((fail
!=0)?DEBUG_ALERT
:DEBUG_NOTICE
,
634 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
646 mark a database - as healthy
648 int32_t ctdb_control_db_set_healthy(struct ctdb_context
*ctdb
, TDB_DATA indata
)
650 uint32_t db_id
= *(uint32_t *)indata
.dptr
;
651 struct ctdb_db_context
*ctdb_db
;
653 bool may_recover
= false;
655 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
657 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%x\n", db_id
));
661 if (ctdb_db
->unhealthy_reason
) {
665 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
, NULL
, 1);
667 DEBUG(DEBUG_ERR
,(__location__
668 " ctdb_update_persistent_health(%s) failed\n",
673 if (may_recover
&& ctdb
->runstate
== CTDB_RUNSTATE_STARTUP
) {
674 DEBUG(DEBUG_ERR
, (__location__
" db %s become healthy - force recovery for startup\n",
676 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
682 int32_t ctdb_control_db_get_health(struct ctdb_context
*ctdb
,
686 uint32_t db_id
= *(uint32_t *)indata
.dptr
;
687 struct ctdb_db_context
*ctdb_db
;
690 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
692 DEBUG(DEBUG_ERR
,(__location__
" Unknown db 0x%x\n", db_id
));
696 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
698 DEBUG(DEBUG_ERR
,(__location__
699 " ctdb_load_persistent_health(%s) failed\n",
705 if (ctdb_db
->unhealthy_reason
) {
706 outdata
->dptr
= (uint8_t *)ctdb_db
->unhealthy_reason
;
707 outdata
->dsize
= strlen(ctdb_db
->unhealthy_reason
)+1;
714 int ctdb_set_db_readonly(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
)
718 if (ctdb_db
->readonly
) {
722 if (ctdb_db
->persistent
) {
723 DEBUG(DEBUG_ERR
,("Persistent databases do not support readonly property\n"));
727 ropath
= talloc_asprintf(ctdb_db
, "%s.RO", ctdb_db
->db_path
);
728 if (ropath
== NULL
) {
729 DEBUG(DEBUG_CRIT
,("Failed to asprintf the tracking database\n"));
732 ctdb_db
->rottdb
= tdb_open(ropath
,
733 ctdb
->tunable
.database_hash_size
,
734 TDB_NOLOCK
|TDB_CLEAR_IF_FIRST
|TDB_NOSYNC
,
735 O_CREAT
|O_RDWR
, 0600);
736 if (ctdb_db
->rottdb
== NULL
) {
737 DEBUG(DEBUG_CRIT
,("Failed to open/create the tracking database '%s'\n", ropath
));
742 DEBUG(DEBUG_NOTICE
,("OPENED tracking database : '%s'\n", ropath
));
744 ctdb_db
->readonly
= true;
746 DEBUG(DEBUG_NOTICE
, ("Readonly property set on DB %s\n", ctdb_db
->db_name
));
753 attach to a database, handling both persistent and non-persistent databases
754 return 0 on success, -1 on failure
756 static int ctdb_local_attach(struct ctdb_context
*ctdb
, const char *db_name
,
757 bool persistent
, const char *unhealthy_reason
,
758 bool jenkinshash
, bool mutexes
)
760 struct ctdb_db_context
*ctdb_db
, *tmp_db
;
765 int remaining_tries
= 0;
767 ctdb_db
= talloc_zero(ctdb
, struct ctdb_db_context
);
768 CTDB_NO_MEMORY(ctdb
, ctdb_db
);
770 ctdb_db
->priority
= 1;
771 ctdb_db
->ctdb
= ctdb
;
772 ctdb_db
->db_name
= talloc_strdup(ctdb_db
, db_name
);
773 CTDB_NO_MEMORY(ctdb
, ctdb_db
->db_name
);
775 key
.dsize
= strlen(db_name
)+1;
776 key
.dptr
= discard_const(db_name
);
777 ctdb_db
->db_id
= ctdb_hash(&key
);
778 ctdb_db
->persistent
= persistent
;
780 if (!ctdb_db
->persistent
) {
781 ctdb_db
->delete_queue
= trbt_create(ctdb_db
, 0);
782 if (ctdb_db
->delete_queue
== NULL
) {
783 CTDB_NO_MEMORY(ctdb
, ctdb_db
->delete_queue
);
786 ctdb_db
->ctdb_ltdb_store_fn
= ctdb_ltdb_store_server
;
789 /* check for hash collisions */
790 for (tmp_db
=ctdb
->db_list
;tmp_db
;tmp_db
=tmp_db
->next
) {
791 if (tmp_db
->db_id
== ctdb_db
->db_id
) {
792 DEBUG(DEBUG_CRIT
,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
793 tmp_db
->db_id
, db_name
, tmp_db
->db_name
));
794 talloc_free(ctdb_db
);
800 if (unhealthy_reason
) {
801 ret
= ctdb_update_persistent_health(ctdb
, ctdb_db
,
802 unhealthy_reason
, 0);
804 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_update_persistent_health('%s','%s') failed: %d\n",
805 ctdb_db
->db_name
, unhealthy_reason
, ret
));
806 talloc_free(ctdb_db
);
811 if (ctdb
->max_persistent_check_errors
> 0) {
814 if (ctdb
->runstate
== CTDB_RUNSTATE_RUNNING
) {
818 ret
= ctdb_load_persistent_health(ctdb
, ctdb_db
);
820 DEBUG(DEBUG_ALERT
,(__location__
" ctdb_load_persistent_health('%s') failed: %d\n",
821 ctdb_db
->db_name
, ret
));
822 talloc_free(ctdb_db
);
827 if (ctdb_db
->unhealthy_reason
&& remaining_tries
== 0) {
828 DEBUG(DEBUG_ALERT
,(__location__
"ERROR: tdb %s is marked as unhealthy: %s\n",
829 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
830 talloc_free(ctdb_db
);
834 if (ctdb_db
->unhealthy_reason
) {
835 /* this is just a warning, but we want that in the log file! */
836 DEBUG(DEBUG_ALERT
,(__location__
"Warning: tdb %s is marked as unhealthy: %s\n",
837 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
840 /* open the database */
841 ctdb_db
->db_path
= talloc_asprintf(ctdb_db
, "%s/%s.%u",
842 persistent
?ctdb
->db_directory_persistent
:ctdb
->db_directory
,
845 tdb_flags
= persistent
? TDB_DEFAULT
: TDB_CLEAR_IF_FIRST
| TDB_NOSYNC
;
846 if (ctdb
->valgrinding
) {
847 tdb_flags
|= TDB_NOMMAP
;
849 tdb_flags
|= TDB_DISALLOW_NESTING
;
851 tdb_flags
|= TDB_INCOMPATIBLE_HASH
;
853 #ifdef TDB_MUTEX_LOCKING
854 if (ctdb
->tunable
.mutex_enabled
&& mutexes
&&
855 tdb_runtime_check_for_robust_mutexes()) {
856 tdb_flags
|= (TDB_MUTEX_LOCKING
| TDB_CLEAR_IF_FIRST
);
861 ctdb_db
->ltdb
= tdb_wrap_open(ctdb_db
, ctdb_db
->db_path
,
862 ctdb
->tunable
.database_hash_size
,
864 O_CREAT
|O_RDWR
, mode
);
865 if (ctdb_db
->ltdb
== NULL
) {
867 int saved_errno
= errno
;
870 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s': %d - %s\n",
873 strerror(saved_errno
)));
874 talloc_free(ctdb_db
);
878 if (remaining_tries
== 0) {
879 DEBUG(DEBUG_CRIT
,(__location__
880 "Failed to open persistent tdb '%s': %d - %s\n",
883 strerror(saved_errno
)));
884 talloc_free(ctdb_db
);
888 ret
= stat(ctdb_db
->db_path
, &st
);
890 DEBUG(DEBUG_CRIT
,(__location__
891 "Failed to open persistent tdb '%s': %d - %s\n",
894 strerror(saved_errno
)));
895 talloc_free(ctdb_db
);
899 ret
= ctdb_backup_corrupted_tdb(ctdb
, ctdb_db
);
901 DEBUG(DEBUG_CRIT
,(__location__
902 "Failed to open persistent tdb '%s': %d - %s\n",
905 strerror(saved_errno
)));
906 talloc_free(ctdb_db
);
916 ctdb_check_db_empty(ctdb_db
);
918 ret
= tdb_check(ctdb_db
->ltdb
->tdb
, NULL
, NULL
);
923 DEBUG(DEBUG_CRIT
,("tdb_check(%s) failed: %d - %s\n",
924 ctdb_db
->db_path
, ret
,
925 tdb_errorstr(ctdb_db
->ltdb
->tdb
)));
926 if (remaining_tries
== 0) {
927 talloc_free(ctdb_db
);
931 fd
= tdb_fd(ctdb_db
->ltdb
->tdb
);
932 ret
= fstat(fd
, &st
);
934 DEBUG(DEBUG_CRIT
,(__location__
935 "Failed to fstat() persistent tdb '%s': %d - %s\n",
939 talloc_free(ctdb_db
);
944 talloc_free(ctdb_db
->ltdb
);
945 ctdb_db
->ltdb
= NULL
;
947 ret
= ctdb_backup_corrupted_tdb(ctdb
, ctdb_db
);
949 DEBUG(DEBUG_CRIT
,("Failed to backup corrupted tdb '%s'\n",
951 talloc_free(ctdb_db
);
961 /* set up a rb tree we can use to track which records we have a
962 fetch-lock in-flight for so we can defer any additional calls
965 ctdb_db
->deferred_fetch
= trbt_create(ctdb_db
, 0);
966 if (ctdb_db
->deferred_fetch
== NULL
) {
967 DEBUG(DEBUG_ERR
,("Failed to create deferred fetch rb tree for ctdb database\n"));
968 talloc_free(ctdb_db
);
972 ctdb_db
->defer_dmaster
= trbt_create(ctdb_db
, 0);
973 if (ctdb_db
->defer_dmaster
== NULL
) {
974 DEBUG(DEBUG_ERR
, ("Failed to create defer dmaster rb tree for %s\n",
976 talloc_free(ctdb_db
);
980 DLIST_ADD(ctdb
->db_list
, ctdb_db
);
982 /* setting this can help some high churn databases */
983 tdb_set_max_dead(ctdb_db
->ltdb
->tdb
, ctdb
->tunable
.database_max_dead
);
986 all databases support the "null" function. we need this in
987 order to do forced migration of records
989 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_null_func
, CTDB_NULL_FUNC
);
991 DEBUG(DEBUG_CRIT
,("Failed to setup null function for '%s'\n", ctdb_db
->db_name
));
992 talloc_free(ctdb_db
);
997 all databases support the "fetch" function. we need this
998 for efficient Samba3 ctdb fetch
1000 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_fetch_func
, CTDB_FETCH_FUNC
);
1002 DEBUG(DEBUG_CRIT
,("Failed to setup fetch function for '%s'\n", ctdb_db
->db_name
));
1003 talloc_free(ctdb_db
);
1008 all databases support the "fetch_with_header" function. we need this
1009 for efficient readonly record fetches
1011 ret
= ctdb_daemon_set_call(ctdb
, ctdb_db
->db_id
, ctdb_fetch_with_header_func
, CTDB_FETCH_WITH_HEADER_FUNC
);
1013 DEBUG(DEBUG_CRIT
,("Failed to setup fetch function for '%s'\n", ctdb_db
->db_name
));
1014 talloc_free(ctdb_db
);
1018 ret
= ctdb_vacuum_init(ctdb_db
);
1020 DEBUG(DEBUG_CRIT
,("Failed to setup vacuuming for "
1021 "database '%s'\n", ctdb_db
->db_name
));
1022 talloc_free(ctdb_db
);
1026 ctdb_db
->generation
= ctdb
->vnn_map
->generation
;
1028 DEBUG(DEBUG_NOTICE
,("Attached to database '%s' with flags 0x%x\n",
1029 ctdb_db
->db_path
, tdb_flags
));
1036 struct ctdb_deferred_attach_context
{
1037 struct ctdb_deferred_attach_context
*next
, *prev
;
1038 struct ctdb_context
*ctdb
;
1039 struct ctdb_req_control_old
*c
;
1043 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context
*da_ctx
)
1045 DLIST_REMOVE(da_ctx
->ctdb
->deferred_attach
, da_ctx
);
1050 static void ctdb_deferred_attach_timeout(struct tevent_context
*ev
,
1051 struct tevent_timer
*te
,
1052 struct timeval t
, void *private_data
)
1054 struct ctdb_deferred_attach_context
*da_ctx
= talloc_get_type(private_data
, struct ctdb_deferred_attach_context
);
1055 struct ctdb_context
*ctdb
= da_ctx
->ctdb
;
1057 ctdb_request_control_reply(ctdb
, da_ctx
->c
, NULL
, -1, NULL
);
1058 talloc_free(da_ctx
);
1061 static void ctdb_deferred_attach_callback(struct tevent_context
*ev
,
1062 struct tevent_timer
*te
,
1063 struct timeval t
, void *private_data
)
1065 struct ctdb_deferred_attach_context
*da_ctx
= talloc_get_type(private_data
, struct ctdb_deferred_attach_context
);
1066 struct ctdb_context
*ctdb
= da_ctx
->ctdb
;
1068 /* This talloc-steals the packet ->c */
1069 ctdb_input_pkt(ctdb
, (struct ctdb_req_header
*)da_ctx
->c
);
1070 talloc_free(da_ctx
);
1073 int ctdb_process_deferred_attach(struct ctdb_context
*ctdb
)
1075 struct ctdb_deferred_attach_context
*da_ctx
;
1077 /* call it from the main event loop as soon as the current event
1080 while ((da_ctx
= ctdb
->deferred_attach
) != NULL
) {
1081 DLIST_REMOVE(ctdb
->deferred_attach
, da_ctx
);
1082 tevent_add_timer(ctdb
->ev
, da_ctx
,
1083 timeval_current_ofs(1,0),
1084 ctdb_deferred_attach_callback
, da_ctx
);
1091 a client has asked to attach a new database
1093 int32_t ctdb_control_db_attach(struct ctdb_context
*ctdb
, TDB_DATA indata
,
1094 TDB_DATA
*outdata
, uint64_t tdb_flags
,
1095 bool persistent
, uint32_t client_id
,
1096 struct ctdb_req_control_old
*c
,
1099 const char *db_name
= (const char *)indata
.dptr
;
1100 struct ctdb_db_context
*db
;
1101 struct ctdb_node
*node
= ctdb
->nodes
[ctdb
->pnn
];
1102 struct ctdb_client
*client
= NULL
;
1103 bool with_jenkinshash
, with_mutexes
;
1105 if (ctdb
->tunable
.allow_client_db_attach
== 0) {
1106 DEBUG(DEBUG_ERR
, ("DB Attach to database %s denied by tunable "
1107 "AllowClientDBAccess == 0\n", db_name
));
1111 /* don't allow any local clients to attach while we are in recovery mode
1112 * except for the recovery daemon.
1113 * allow all attach from the network since these are always from remote
1116 if (client_id
!= 0) {
1117 client
= reqid_find(ctdb
->idr
, client_id
, struct ctdb_client
);
1119 if (client
!= NULL
) {
1120 /* If the node is inactive it is not part of the cluster
1121 and we should not allow clients to attach to any
1124 if (node
->flags
& NODE_FLAGS_INACTIVE
) {
1125 DEBUG(DEBUG_ERR
,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name
, node
->flags
));
1129 if (ctdb
->recovery_mode
== CTDB_RECOVERY_ACTIVE
&&
1130 client
->pid
!= ctdb
->recoverd_pid
&&
1131 ctdb
->runstate
< CTDB_RUNSTATE_RUNNING
) {
1132 struct ctdb_deferred_attach_context
*da_ctx
= talloc(client
, struct ctdb_deferred_attach_context
);
1134 if (da_ctx
== NULL
) {
1135 DEBUG(DEBUG_ERR
,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name
, client
->pid
));
1139 da_ctx
->ctdb
= ctdb
;
1140 da_ctx
->c
= talloc_steal(da_ctx
, c
);
1141 talloc_set_destructor(da_ctx
, ctdb_deferred_attach_destructor
);
1142 DLIST_ADD(ctdb
->deferred_attach
, da_ctx
);
1144 tevent_add_timer(ctdb
->ev
, da_ctx
,
1145 timeval_current_ofs(ctdb
->tunable
.deferred_attach_timeout
, 0),
1146 ctdb_deferred_attach_timeout
, da_ctx
);
1148 DEBUG(DEBUG_ERR
,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name
, client
->pid
));
1149 *async_reply
= true;
1154 /* the client can optionally pass additional tdb flags, but we
1155 only allow a subset of those on the database in ctdb. Note
1156 that tdb_flags is passed in via the (otherwise unused)
1157 srvid to the attach control */
1158 #ifdef TDB_MUTEX_LOCKING
1159 tdb_flags
&= (TDB_NOSYNC
|TDB_INCOMPATIBLE_HASH
|TDB_MUTEX_LOCKING
|TDB_CLEAR_IF_FIRST
);
1161 tdb_flags
&= (TDB_NOSYNC
|TDB_INCOMPATIBLE_HASH
);
1164 /* see if we already have this name */
1165 db
= ctdb_db_handle(ctdb
, db_name
);
1167 if (db
->persistent
!= persistent
) {
1168 DEBUG(DEBUG_ERR
, ("ERROR: DB Attach %spersistent to %spersistent "
1169 "database %s\n", persistent
? "" : "non-",
1170 db
-> persistent
? "" : "non-", db_name
));
1173 outdata
->dptr
= (uint8_t *)&db
->db_id
;
1174 outdata
->dsize
= sizeof(db
->db_id
);
1175 tdb_add_flags(db
->ltdb
->tdb
, tdb_flags
);
1179 with_jenkinshash
= (tdb_flags
& TDB_INCOMPATIBLE_HASH
) ? true : false;
1180 #ifdef TDB_MUTEX_LOCKING
1181 with_mutexes
= (tdb_flags
& TDB_MUTEX_LOCKING
) ? true : false;
1183 with_mutexes
= false;
1186 if (ctdb_local_attach(ctdb
, db_name
, persistent
, NULL
,
1187 with_jenkinshash
, with_mutexes
) != 0) {
1191 db
= ctdb_db_handle(ctdb
, db_name
);
1193 DEBUG(DEBUG_ERR
,("Failed to find db handle for name '%s'\n", db_name
));
1197 /* remember the flags the client has specified */
1198 tdb_add_flags(db
->ltdb
->tdb
, tdb_flags
);
1200 outdata
->dptr
= (uint8_t *)&db
->db_id
;
1201 outdata
->dsize
= sizeof(db
->db_id
);
1203 /* Try to ensure it's locked in mem */
1204 lockdown_memory(ctdb
->valgrinding
);
1206 /* tell all the other nodes about this database */
1207 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_ALL
, tdb_flags
,
1208 persistent
?CTDB_CONTROL_DB_ATTACH_PERSISTENT
:
1209 CTDB_CONTROL_DB_ATTACH
,
1210 0, CTDB_CTRL_FLAG_NOREPLY
,
1211 indata
, NULL
, NULL
);
1218 * a client has asked to detach from a database
1220 int32_t ctdb_control_db_detach(struct ctdb_context
*ctdb
, TDB_DATA indata
,
1224 struct ctdb_db_context
*ctdb_db
;
1225 struct ctdb_client
*client
= NULL
;
1227 db_id
= *(uint32_t *)indata
.dptr
;
1228 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1229 if (ctdb_db
== NULL
) {
1230 DEBUG(DEBUG_ERR
, ("Invalid dbid 0x%08x in DB detach\n",
1235 if (ctdb
->tunable
.allow_client_db_attach
== 1) {
1236 DEBUG(DEBUG_ERR
, ("DB detach from database %s denied. "
1237 "Clients are allowed access to databases "
1238 "(AllowClientDBAccess == 1)\n",
1243 if (ctdb_db
->persistent
) {
1244 DEBUG(DEBUG_ERR
, ("DB detach from persistent database %s "
1245 "denied\n", ctdb_db
->db_name
));
1249 /* Cannot detach from database when in recovery */
1250 if (ctdb
->recovery_mode
== CTDB_RECOVERY_ACTIVE
) {
1251 DEBUG(DEBUG_ERR
, ("DB detach denied while in recovery\n"));
1255 /* If a control comes from a client, then broadcast it to all nodes.
1256 * Do the actual detach only if the control comes from other daemons.
1258 if (client_id
!= 0) {
1259 client
= reqid_find(ctdb
->idr
, client_id
, struct ctdb_client
);
1260 if (client
!= NULL
) {
1261 /* forward the control to all the nodes */
1262 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_ALL
, 0,
1263 CTDB_CONTROL_DB_DETACH
, 0,
1264 CTDB_CTRL_FLAG_NOREPLY
,
1265 indata
, NULL
, NULL
);
1268 DEBUG(DEBUG_ERR
, ("Client has gone away. Failing DB detach "
1269 "for database '%s'\n", ctdb_db
->db_name
));
1273 /* Detach database from recoverd */
1274 if (ctdb_daemon_send_message(ctdb
, ctdb
->pnn
,
1275 CTDB_SRVID_DETACH_DATABASE
,
1277 DEBUG(DEBUG_ERR
, ("Unable to detach DB from recoverd\n"));
1281 /* Disable vacuuming and drop all vacuuming data */
1282 talloc_free(ctdb_db
->vacuum_handle
);
1283 talloc_free(ctdb_db
->delete_queue
);
1285 /* Terminate any deferred fetch */
1286 talloc_free(ctdb_db
->deferred_fetch
);
1288 /* Terminate any traverses */
1289 while (ctdb_db
->traverse
) {
1290 talloc_free(ctdb_db
->traverse
);
1293 /* Terminate any revokes */
1294 while (ctdb_db
->revokechild_active
) {
1295 talloc_free(ctdb_db
->revokechild_active
);
1298 /* Free readonly tracking database */
1299 if (ctdb_db
->readonly
) {
1300 talloc_free(ctdb_db
->rottdb
);
1303 DLIST_REMOVE(ctdb
->db_list
, ctdb_db
);
1305 DEBUG(DEBUG_NOTICE
, ("Detached from database '%s'\n",
1307 talloc_free(ctdb_db
);
1313 attach to all existing persistent databases
1315 static int ctdb_attach_persistent(struct ctdb_context
*ctdb
,
1316 const char *unhealthy_reason
)
1321 /* open the persistent db directory and scan it for files */
1322 d
= opendir(ctdb
->db_directory_persistent
);
1327 while ((de
=readdir(d
))) {
1329 size_t len
= strlen(de
->d_name
);
1331 int invalid_name
= 0;
1333 s
= talloc_strdup(ctdb
, de
->d_name
);
1336 CTDB_NO_MEMORY(ctdb
, s
);
1339 /* only accept names ending in .tdb */
1340 p
= strstr(s
, ".tdb.");
1341 if (len
< 7 || p
== NULL
) {
1346 /* only accept names ending with .tdb. and any number of digits */
1348 while (*q
!= 0 && invalid_name
== 0) {
1349 if (!isdigit(*q
++)) {
1353 if (invalid_name
== 1 || sscanf(p
+5, "%u", &node
) != 1 || node
!= ctdb
->pnn
) {
1354 DEBUG(DEBUG_ERR
,("Ignoring persistent database '%s'\n", de
->d_name
));
1360 if (ctdb_local_attach(ctdb
, s
, true, unhealthy_reason
, false, false) != 0) {
1361 DEBUG(DEBUG_ERR
,("Failed to attach to persistent database '%s'\n", de
->d_name
));
1367 DEBUG(DEBUG_INFO
,("Attached to persistent database %s\n", s
));
1375 int ctdb_attach_databases(struct ctdb_context
*ctdb
)
1378 char *persistent_health_path
= NULL
;
1379 char *unhealthy_reason
= NULL
;
1380 bool first_try
= true;
1382 persistent_health_path
= talloc_asprintf(ctdb
, "%s/%s.%u",
1383 ctdb
->db_directory_state
,
1384 PERSISTENT_HEALTH_TDB
,
1386 if (persistent_health_path
== NULL
) {
1387 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1393 ctdb
->db_persistent_health
= tdb_wrap_open(ctdb
, persistent_health_path
,
1394 0, TDB_DISALLOW_NESTING
,
1395 O_CREAT
| O_RDWR
, 0600);
1396 if (ctdb
->db_persistent_health
== NULL
) {
1397 struct tdb_wrap
*tdb
;
1400 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s': %d - %s\n",
1401 persistent_health_path
,
1404 talloc_free(persistent_health_path
);
1405 talloc_free(unhealthy_reason
);
1410 unhealthy_reason
= talloc_asprintf(ctdb
, "WARNING - '%s' %s - %s",
1411 persistent_health_path
,
1412 "was cleared after a failure",
1413 "manual verification needed");
1414 if (unhealthy_reason
== NULL
) {
1415 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1416 talloc_free(persistent_health_path
);
1420 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1421 persistent_health_path
));
1422 tdb
= tdb_wrap_open(ctdb
, persistent_health_path
,
1423 0, TDB_CLEAR_IF_FIRST
| TDB_DISALLOW_NESTING
,
1424 O_CREAT
| O_RDWR
, 0600);
1426 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1427 persistent_health_path
,
1430 talloc_free(persistent_health_path
);
1431 talloc_free(unhealthy_reason
);
1438 ret
= tdb_check(ctdb
->db_persistent_health
->tdb
, NULL
, NULL
);
1440 struct tdb_wrap
*tdb
;
1442 talloc_free(ctdb
->db_persistent_health
);
1443 ctdb
->db_persistent_health
= NULL
;
1446 DEBUG(DEBUG_CRIT
,("tdb_check('%s') failed\n",
1447 persistent_health_path
));
1448 talloc_free(persistent_health_path
);
1449 talloc_free(unhealthy_reason
);
1454 unhealthy_reason
= talloc_asprintf(ctdb
, "WARNING - '%s' %s - %s",
1455 persistent_health_path
,
1456 "was cleared after a failure",
1457 "manual verification needed");
1458 if (unhealthy_reason
== NULL
) {
1459 DEBUG(DEBUG_CRIT
,(__location__
" talloc_asprintf() failed\n"));
1460 talloc_free(persistent_health_path
);
1464 DEBUG(DEBUG_CRIT
,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1465 persistent_health_path
));
1466 tdb
= tdb_wrap_open(ctdb
, persistent_health_path
,
1467 0, TDB_CLEAR_IF_FIRST
| TDB_DISALLOW_NESTING
,
1468 O_CREAT
| O_RDWR
, 0600);
1470 DEBUG(DEBUG_CRIT
,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1471 persistent_health_path
,
1474 talloc_free(persistent_health_path
);
1475 talloc_free(unhealthy_reason
);
1482 talloc_free(persistent_health_path
);
1484 ret
= ctdb_attach_persistent(ctdb
, unhealthy_reason
);
1485 talloc_free(unhealthy_reason
);
1494 called when a broadcast seqnum update comes in
1496 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context
*ctdb
, uint32_t db_id
, uint32_t srcnode
)
1498 struct ctdb_db_context
*ctdb_db
;
1499 if (srcnode
== ctdb
->pnn
) {
1500 /* don't update ourselves! */
1504 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1506 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id
));
1510 if (ctdb_db
->unhealthy_reason
) {
1511 DEBUG(DEBUG_ERR
,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1512 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
1516 tdb_increment_seqnum_nonblock(ctdb_db
->ltdb
->tdb
);
1517 ctdb_db
->seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1522 timer to check for seqnum changes in a ltdb and propogate them
1524 static void ctdb_ltdb_seqnum_check(struct tevent_context
*ev
,
1525 struct tevent_timer
*te
,
1526 struct timeval t
, void *p
)
1528 struct ctdb_db_context
*ctdb_db
= talloc_get_type(p
, struct ctdb_db_context
);
1529 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1530 uint32_t new_seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1531 if (new_seqnum
!= ctdb_db
->seqnum
) {
1532 /* something has changed - propogate it */
1534 data
.dptr
= (uint8_t *)&ctdb_db
->db_id
;
1535 data
.dsize
= sizeof(uint32_t);
1536 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_VNNMAP
, 0,
1537 CTDB_CONTROL_UPDATE_SEQNUM
, 0, CTDB_CTRL_FLAG_NOREPLY
,
1540 ctdb_db
->seqnum
= new_seqnum
;
1542 /* setup a new timer */
1543 ctdb_db
->seqnum_update
=
1544 tevent_add_timer(ctdb
->ev
, ctdb_db
,
1545 timeval_current_ofs(ctdb
->tunable
.seqnum_interval
/1000,
1546 (ctdb
->tunable
.seqnum_interval
%1000)*1000),
1547 ctdb_ltdb_seqnum_check
, ctdb_db
);
1551 enable seqnum handling on this db
1553 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context
*ctdb
, uint32_t db_id
)
1555 struct ctdb_db_context
*ctdb_db
;
1556 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1558 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id
));
1562 if (ctdb_db
->seqnum_update
== NULL
) {
1563 ctdb_db
->seqnum_update
= tevent_add_timer(
1565 timeval_current_ofs(ctdb
->tunable
.seqnum_interval
/1000,
1566 (ctdb
->tunable
.seqnum_interval
%1000)*1000),
1567 ctdb_ltdb_seqnum_check
, ctdb_db
);
1570 tdb_enable_seqnum(ctdb_db
->ltdb
->tdb
);
1571 ctdb_db
->seqnum
= tdb_get_seqnum(ctdb_db
->ltdb
->tdb
);
1575 int32_t ctdb_control_set_db_priority(struct ctdb_context
*ctdb
, TDB_DATA indata
,
1578 struct ctdb_db_priority
*db_prio
= (struct ctdb_db_priority
*)indata
.dptr
;
1579 struct ctdb_db_context
*ctdb_db
;
1581 ctdb_db
= find_ctdb_db(ctdb
, db_prio
->db_id
);
1583 if (!(ctdb
->nodes
[ctdb
->pnn
]->flags
& NODE_FLAGS_INACTIVE
)) {
1584 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
1590 if ((db_prio
->priority
<1) || (db_prio
->priority
>NUM_DB_PRIORITIES
)) {
1591 DEBUG(DEBUG_ERR
,("Trying to set invalid priority : %u\n", db_prio
->priority
));
1595 ctdb_db
->priority
= db_prio
->priority
;
1596 DEBUG(DEBUG_INFO
,("Setting DB priority to %u for db 0x%08x\n", db_prio
->priority
, db_prio
->db_id
));
1598 if (client_id
!= 0) {
1599 /* Broadcast the update to the rest of the cluster */
1600 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_ALL
, 0,
1601 CTDB_CONTROL_SET_DB_PRIORITY
, 0,
1602 CTDB_CTRL_FLAG_NOREPLY
, indata
,
1609 int ctdb_set_db_sticky(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
)
1611 if (ctdb_db
->sticky
) {
1615 if (ctdb_db
->persistent
) {
1616 DEBUG(DEBUG_ERR
,("Trying to set persistent database with sticky property\n"));
1620 ctdb_db
->sticky_records
= trbt_create(ctdb_db
, 0);
1622 ctdb_db
->sticky
= true;
1624 DEBUG(DEBUG_NOTICE
,("set db sticky %s\n", ctdb_db
->db_name
));
1629 void ctdb_db_statistics_reset(struct ctdb_db_context
*ctdb_db
)
1631 struct ctdb_db_statistics_old
*s
= &ctdb_db
->statistics
;
1634 for (i
=0; i
<MAX_HOT_KEYS
; i
++) {
1635 if (s
->hot_keys
[i
].key
.dsize
> 0) {
1636 talloc_free(s
->hot_keys
[i
].key
.dptr
);
1640 ZERO_STRUCT(ctdb_db
->statistics
);
1643 int32_t ctdb_control_get_db_statistics(struct ctdb_context
*ctdb
,
1647 struct ctdb_db_context
*ctdb_db
;
1648 struct ctdb_db_statistics_old
*stats
;
1653 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1655 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in get_db_statistics\n", db_id
));
1659 len
= offsetof(struct ctdb_db_statistics_old
, hot_keys_wire
);
1660 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
1661 len
+= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
;
1664 stats
= talloc_size(outdata
, len
);
1665 if (stats
== NULL
) {
1666 DEBUG(DEBUG_ERR
,("Failed to allocate db statistics structure\n"));
1670 memcpy(stats
, &ctdb_db
->statistics
,
1671 offsetof(struct ctdb_db_statistics_old
, hot_keys_wire
));
1673 stats
->num_hot_keys
= MAX_HOT_KEYS
;
1675 ptr
= &stats
->hot_keys_wire
[0];
1676 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
1677 memcpy(ptr
, ctdb_db
->statistics
.hot_keys
[i
].key
.dptr
,
1678 ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
);
1679 ptr
+= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
;
1682 outdata
->dptr
= (uint8_t *)stats
;
1683 outdata
->dsize
= len
;