2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007-2009
5 Copyright (C) Michael Adam 2009
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "system/filesys.h"
23 #include "lib/tdb_wrap/tdb_wrap.h"
25 #include "dbwrap/dbwrap.h"
26 #include "dbwrap/dbwrap_ctdb.h"
27 #include "dbwrap/dbwrap_rbt.h"
28 #include "lib/param/param.h"
31 * It is not possible to include ctdb.h and tdb_compat.h (included via
32 * some other include above) without warnings. This fixes those
40 #ifdef typesafe_cb_preargs
41 #undef typesafe_cb_preargs
44 #ifdef typesafe_cb_postargs
45 #undef typesafe_cb_postargs
49 #include "ctdb_private.h"
50 #include "ctdbd_conn.h"
51 #include "dbwrap/dbwrap.h"
52 #include "dbwrap/dbwrap_private.h"
53 #include "dbwrap/dbwrap_ctdb.h"
57 struct db_ctdb_transaction_handle
{
58 struct db_ctdb_ctx
*ctx
;
60 * we store the writes done under a transaction:
62 struct ctdb_marshall_buffer
*m_write
;
69 struct db_context
*db
;
70 struct tdb_wrap
*wtdb
;
72 struct db_ctdb_transaction_handle
*transaction
;
73 struct g_lock_ctx
*lock_ctx
;
75 /* thresholds for warning messages */
76 int warn_unlock_msecs
;
77 int warn_migrate_msecs
;
78 int warn_migrate_attempts
;
79 int warn_locktime_msecs
;
83 struct db_ctdb_ctx
*ctdb_ctx
;
84 struct ctdb_ltdb_header header
;
85 struct timeval lock_time
;
88 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
90 enum TDB_ERROR tret
= tdb_error(tdb
);
92 return map_nt_error_from_tdb(tret
);
95 struct db_ctdb_ltdb_parse_state
{
96 void (*parser
)(TDB_DATA key
, struct ctdb_ltdb_header
*header
,
97 TDB_DATA data
, void *private_data
);
101 static int db_ctdb_ltdb_parser(TDB_DATA key
, TDB_DATA data
,
104 struct db_ctdb_ltdb_parse_state
*state
=
105 (struct db_ctdb_ltdb_parse_state
*)private_data
;
107 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
112 key
, (struct ctdb_ltdb_header
*)data
.dptr
,
113 make_tdb_data(data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
114 data
.dsize
- sizeof(struct ctdb_ltdb_header
)),
115 state
->private_data
);
119 static NTSTATUS
db_ctdb_ltdb_parse(
120 struct db_ctdb_ctx
*db
, TDB_DATA key
,
121 void (*parser
)(TDB_DATA key
, struct ctdb_ltdb_header
*header
,
122 TDB_DATA data
, void *private_data
),
125 struct db_ctdb_ltdb_parse_state state
;
128 state
.parser
= parser
;
129 state
.private_data
= private_data
;
131 ret
= tdb_parse_record(db
->wtdb
->tdb
, key
, db_ctdb_ltdb_parser
,
134 return NT_STATUS_NOT_FOUND
;
140 * Store a record together with the ctdb record header
141 * in the local copy of the database.
143 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
145 struct ctdb_ltdb_header
*header
,
151 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
152 rec
.dptr
= (uint8_t *)talloc_size(talloc_tos(), rec
.dsize
);
154 if (rec
.dptr
== NULL
) {
155 return NT_STATUS_NO_MEMORY
;
158 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
159 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
161 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
163 talloc_free(rec
.dptr
);
165 return (ret
== 0) ? NT_STATUS_OK
166 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
171 form a ctdb_rec_data record from a key/data pair
173 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
175 struct ctdb_ltdb_header
*header
,
179 struct ctdb_rec_data
*d
;
181 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
182 data
.dsize
+ sizeof(*header
);
183 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
189 d
->keylen
= key
.dsize
;
190 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
192 d
->datalen
= data
.dsize
+ sizeof(*header
);
193 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
194 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
199 /* helper function for marshalling multiple records */
200 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
201 struct ctdb_marshall_buffer
*m
,
205 struct ctdb_ltdb_header
*header
,
208 struct ctdb_rec_data
*r
;
209 size_t m_size
, r_size
;
210 struct ctdb_marshall_buffer
*m2
= NULL
;
212 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
219 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
220 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
227 m_size
= talloc_get_size(m
);
228 r_size
= talloc_get_size(r
);
230 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
231 mem_ctx
, m
, m_size
+ r_size
);
237 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
246 /* we've finished marshalling, return a data blob with the marshalled records */
247 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
250 data
.dptr
= (uint8_t *)m
;
251 data
.dsize
= talloc_get_size(m
);
256 loop over a marshalling buffer
258 - pass r==NULL to start
259 - loop the number of times indicated by m->count
261 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next_key(
262 struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
, TDB_DATA
*key
)
265 r
= (struct ctdb_rec_data
*)&m
->data
[0];
267 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
270 key
->dptr
= &r
->data
[0];
271 key
->dsize
= r
->keylen
;
275 static bool db_ctdb_marshall_buf_parse(
276 struct ctdb_rec_data
*r
, uint32_t *reqid
,
277 struct ctdb_ltdb_header
**header
, TDB_DATA
*data
)
279 if (r
->datalen
< sizeof(struct ctdb_ltdb_header
)) {
285 data
->dptr
= &r
->data
[r
->keylen
] + sizeof(struct ctdb_ltdb_header
);
286 data
->dsize
= r
->datalen
- sizeof(struct ctdb_ltdb_header
);
288 *header
= (struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
294 * CTDB transaction destructor
296 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
300 status
= g_lock_unlock(h
->ctx
->lock_ctx
, h
->lock_name
);
301 if (!NT_STATUS_IS_OK(status
)) {
302 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h
->lock_name
,
310 * CTDB dbwrap API: transaction_start function
311 * starts a transaction on a persistent database
313 static int db_ctdb_transaction_start(struct db_context
*db
)
315 struct db_ctdb_transaction_handle
*h
;
317 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
320 if (!db
->persistent
) {
321 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
326 if (ctx
->transaction
) {
327 ctx
->transaction
->nesting
++;
328 DEBUG(5, (__location__
" transaction start on db 0x%08x: nesting %d -> %d\n",
329 ctx
->db_id
, ctx
->transaction
->nesting
- 1, ctx
->transaction
->nesting
));
333 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
335 DEBUG(0,(__location__
" oom for transaction handle\n"));
341 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x",
342 (unsigned int)ctx
->db_id
);
343 if (h
->lock_name
== NULL
) {
344 DEBUG(0, ("talloc_asprintf failed\n"));
350 * Wait a day, i.e. forever...
352 status
= g_lock_lock(ctx
->lock_ctx
, h
->lock_name
, G_LOCK_WRITE
,
353 timeval_set(86400, 0));
354 if (!NT_STATUS_IS_OK(status
)) {
355 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status
)));
360 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
362 ctx
->transaction
= h
;
364 DEBUG(5,(__location__
" transaction started on db 0x%08x\n", ctx
->db_id
));
369 static bool parse_newest_in_marshall_buffer(
370 struct ctdb_marshall_buffer
*buf
, TDB_DATA key
,
371 void (*parser
)(TDB_DATA key
, struct ctdb_ltdb_header
*header
,
372 TDB_DATA data
, void *private_data
),
375 struct ctdb_rec_data
*rec
= NULL
;
376 struct ctdb_ltdb_header
*h
= NULL
;
385 * Walk the list of records written during this
386 * transaction. If we want to read one we have already
387 * written, return the last written sample. Thus we do not do
388 * a "break;" for the first hit, this record might have been
392 for (i
=0; i
<buf
->count
; i
++) {
396 rec
= db_ctdb_marshall_loop_next_key(buf
, rec
, &tkey
);
401 if (!tdb_data_equal(key
, tkey
)) {
405 if (!db_ctdb_marshall_buf_parse(rec
, &reqid
, &h
, &data
)) {
414 parser(key
, h
, data
, private_data
);
419 struct pull_newest_from_marshall_buffer_state
{
420 struct ctdb_ltdb_header
*pheader
;
425 static void pull_newest_from_marshall_buffer_parser(
426 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
427 TDB_DATA data
, void *private_data
)
429 struct pull_newest_from_marshall_buffer_state
*state
=
430 (struct pull_newest_from_marshall_buffer_state
*)private_data
;
432 if (state
->pheader
!= NULL
) {
433 memcpy(state
->pheader
, header
, sizeof(*state
->pheader
));
435 if (state
->pdata
!= NULL
) {
436 state
->pdata
->dsize
= data
.dsize
;
437 state
->pdata
->dptr
= (uint8_t *)talloc_memdup(
438 state
->mem_ctx
, data
.dptr
, data
.dsize
);
442 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer
*buf
,
444 struct ctdb_ltdb_header
*pheader
,
448 struct pull_newest_from_marshall_buffer_state state
;
450 state
.pheader
= pheader
;
451 state
.mem_ctx
= mem_ctx
;
454 if (!parse_newest_in_marshall_buffer(
455 buf
, key
, pull_newest_from_marshall_buffer_parser
,
459 if ((pdata
!= NULL
) && (pdata
->dsize
!= 0) && (pdata
->dptr
== NULL
)) {
466 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
467 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
469 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
473 struct db_record
*result
;
476 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
477 DEBUG(0, ("talloc failed\n"));
481 result
->db
= ctx
->db
;
482 result
->private_data
= ctx
->transaction
;
484 result
->key
.dsize
= key
.dsize
;
485 result
->key
.dptr
= (uint8_t *)talloc_memdup(result
, key
.dptr
,
487 if (result
->key
.dptr
== NULL
) {
488 DEBUG(0, ("talloc failed\n"));
493 result
->store
= db_ctdb_store_transaction
;
494 result
->delete_rec
= db_ctdb_delete_transaction
;
496 if (pull_newest_from_marshall_buffer(ctx
->transaction
->m_write
, key
,
497 NULL
, result
, &result
->value
)) {
501 ctdb_data
= tdb_fetch_compat(ctx
->wtdb
->tdb
, key
);
502 if (ctdb_data
.dptr
== NULL
) {
503 /* create the record */
504 result
->value
= tdb_null
;
508 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
509 result
->value
.dptr
= NULL
;
511 if ((result
->value
.dsize
!= 0)
512 && !(result
->value
.dptr
= (uint8_t *)talloc_memdup(
513 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
514 result
->value
.dsize
))) {
515 DEBUG(0, ("talloc failed\n"));
519 SAFE_FREE(ctdb_data
.dptr
);
524 static int db_ctdb_record_destructor(struct db_record
**recp
)
526 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
527 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
528 rec
->private_data
, struct db_ctdb_transaction_handle
);
529 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
531 DEBUG(0,(__location__
" transaction_commit failed\n"));
537 auto-create a transaction for persistent databases
539 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
544 struct db_record
*rec
, **recp
;
546 res
= db_ctdb_transaction_start(ctx
->db
);
551 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
553 ctx
->db
->transaction_cancel(ctx
->db
);
557 /* destroy this transaction when we release the lock */
558 recp
= talloc(rec
, struct db_record
*);
560 ctx
->db
->transaction_cancel(ctx
->db
);
565 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
571 stores a record inside a transaction
573 static NTSTATUS
db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
574 TDB_DATA key
, TDB_DATA data
)
576 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
578 struct ctdb_ltdb_header header
;
582 /* we need the header so we can update the RSN */
584 if (!pull_newest_from_marshall_buffer(h
->m_write
, key
, &header
,
587 rec
= tdb_fetch_compat(h
->ctx
->wtdb
->tdb
, key
);
589 if (rec
.dptr
!= NULL
) {
590 memcpy(&header
, rec
.dptr
,
591 sizeof(struct ctdb_ltdb_header
));
592 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
595 * a special case, we are writing the same
596 * data that is there now
598 if (data
.dsize
== rec
.dsize
&&
600 rec
.dptr
+ sizeof(struct ctdb_ltdb_header
),
603 talloc_free(tmp_ctx
);
610 header
.dmaster
= get_my_vnn();
613 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
614 if (h
->m_write
== NULL
) {
615 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
616 talloc_free(tmp_ctx
);
617 return NT_STATUS_NO_MEMORY
;
620 talloc_free(tmp_ctx
);
626 a record store inside a transaction
628 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
630 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
631 rec
->private_data
, struct db_ctdb_transaction_handle
);
634 status
= db_ctdb_transaction_store(h
, rec
->key
, data
);
639 a record delete inside a transaction
641 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
643 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
644 rec
->private_data
, struct db_ctdb_transaction_handle
);
647 status
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
651 static void db_ctdb_fetch_db_seqnum_parser(
652 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
653 TDB_DATA data
, void *private_data
)
655 uint64_t *seqnum
= (uint64_t *)private_data
;
657 if (data
.dsize
!= sizeof(uint64_t)) {
661 memcpy(seqnum
, data
.dptr
, sizeof(*seqnum
));
665 * Fetch the db sequence number of a persistent db directly from the db.
667 static NTSTATUS
db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx
*db
,
673 if (seqnum
== NULL
) {
674 return NT_STATUS_INVALID_PARAMETER
;
677 key
= string_term_tdb_data(CTDB_DB_SEQNUM_KEY
);
679 status
= db_ctdb_ltdb_parse(
680 db
, key
, db_ctdb_fetch_db_seqnum_parser
, seqnum
);
682 if (NT_STATUS_IS_OK(status
)) {
685 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
693 * Store the database sequence number inside a transaction.
695 static NTSTATUS
db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle
*h
,
699 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
703 key
= string_term_tdb_data(keyname
);
705 data
.dptr
= (uint8_t *)&seqnum
;
706 data
.dsize
= sizeof(uint64_t);
708 status
= db_ctdb_transaction_store(h
, key
, data
);
716 static int db_ctdb_transaction_commit(struct db_context
*db
)
718 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
722 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
723 uint64_t old_seqnum
, new_seqnum
;
727 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
731 if (h
->nested_cancel
) {
732 db
->transaction_cancel(db
);
733 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
737 if (h
->nesting
!= 0) {
739 DEBUG(5, (__location__
" transaction commit on db 0x%08x: nesting %d -> %d\n",
740 ctx
->db_id
, ctx
->transaction
->nesting
+ 1, ctx
->transaction
->nesting
));
744 if (h
->m_write
== NULL
) {
746 * No changes were made, so don't change the seqnum,
747 * don't push to other node, just exit with success.
753 DEBUG(5,(__location__
" transaction commit on db 0x%08x\n", ctx
->db_id
));
756 * As the last db action before committing, bump the database sequence
757 * number. Note that this undoes all changes to the seqnum records
758 * performed under the transaction. This record is not meant to be
759 * modified by user interaction. It is for internal use only...
761 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &old_seqnum
);
762 if (!NT_STATUS_IS_OK(rets
)) {
763 DEBUG(1, (__location__
" failed to fetch the db sequence number "
764 "in transaction commit on db 0x%08x\n", ctx
->db_id
));
769 new_seqnum
= old_seqnum
+ 1;
771 rets
= db_ctdb_store_db_seqnum(h
, new_seqnum
);
772 if (!NT_STATUS_IS_OK(rets
)) {
773 DEBUG(1, (__location__
"failed to store the db sequence number "
774 " in transaction commit on db 0x%08x\n", ctx
->db_id
));
780 /* tell ctdbd to commit to the other nodes */
781 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
782 CTDB_CONTROL_TRANS3_COMMIT
,
784 db_ctdb_marshall_finish(h
->m_write
),
785 NULL
, NULL
, &status
);
786 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
788 * The TRANS3_COMMIT control should only possibly fail when a
789 * recovery has been running concurrently. In any case, the db
790 * will be the same on all nodes, either the new copy or the
791 * old copy. This can be detected by comparing the old and new
792 * local sequence numbers.
794 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &new_seqnum
);
795 if (!NT_STATUS_IS_OK(rets
)) {
796 DEBUG(1, (__location__
" failed to refetch db sequence "
797 "number after failed TRANS3_COMMIT\n"));
802 if (new_seqnum
== old_seqnum
) {
803 /* Recovery prevented all our changes: retry. */
806 if (new_seqnum
!= (old_seqnum
+ 1)) {
807 DEBUG(0, (__location__
" ERROR: new_seqnum[%lu] != "
808 "old_seqnum[%lu] + (0 or 1) after failed "
809 "TRANS3_COMMIT - this should not happen!\n",
810 (unsigned long)new_seqnum
,
811 (unsigned long)old_seqnum
));
816 * Recovery propagated our changes to all nodes, completing
817 * our commit for us - succeed.
824 h
->ctx
->transaction
= NULL
;
833 static int db_ctdb_transaction_cancel(struct db_context
*db
)
835 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
837 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
840 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
844 if (h
->nesting
!= 0) {
846 h
->nested_cancel
= true;
847 DEBUG(5, (__location__
" transaction cancel on db 0x%08x: nesting %d -> %d\n",
848 ctx
->db_id
, ctx
->transaction
->nesting
+ 1, ctx
->transaction
->nesting
));
852 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
854 ctx
->transaction
= NULL
;
860 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
862 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
863 rec
->private_data
, struct db_ctdb_rec
);
865 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
870 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
871 static NTSTATUS
db_ctdb_send_schedule_for_deletion(struct db_record
*rec
)
874 struct ctdb_control_schedule_for_deletion
*dd
;
877 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
878 rec
->private_data
, struct db_ctdb_rec
);
880 indata
.dsize
= offsetof(struct ctdb_control_schedule_for_deletion
, key
) + rec
->key
.dsize
;
881 indata
.dptr
= talloc_zero_array(crec
, uint8_t, indata
.dsize
);
882 if (indata
.dptr
== NULL
) {
883 DEBUG(0, (__location__
" talloc failed!\n"));
884 return NT_STATUS_NO_MEMORY
;
887 dd
= (struct ctdb_control_schedule_for_deletion
*)(void *)indata
.dptr
;
888 dd
->db_id
= crec
->ctdb_ctx
->db_id
;
889 dd
->hdr
= crec
->header
;
890 dd
->keylen
= rec
->key
.dsize
;
891 memcpy(dd
->key
, rec
->key
.dptr
, rec
->key
.dsize
);
893 status
= ctdbd_control_local(messaging_ctdbd_connection(),
894 CTDB_CONTROL_SCHEDULE_FOR_DELETION
,
895 crec
->ctdb_ctx
->db_id
,
896 CTDB_CTRL_FLAG_NOREPLY
, /* flags */
901 talloc_free(indata
.dptr
);
903 if (!NT_STATUS_IS_OK(status
) || cstatus
!= 0) {
904 DEBUG(1, (__location__
" Error sending local control "
905 "SCHEDULE_FOR_DELETION: %s, cstatus = %d\n",
906 nt_errstr(status
), cstatus
));
907 if (NT_STATUS_IS_OK(status
)) {
908 status
= NT_STATUS_UNSUCCESSFUL
;
916 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
921 * We have to store the header with empty data. TODO: Fix the
925 status
= db_ctdb_store(rec
, tdb_null
, 0);
926 if (!NT_STATUS_IS_OK(status
)) {
930 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
931 status
= db_ctdb_send_schedule_for_deletion(rec
);
937 static int db_ctdb_record_destr(struct db_record
* data
)
939 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
940 data
->private_data
, struct db_ctdb_rec
);
943 struct timeval before
;
946 DEBUG(10, (DEBUGLEVEL
> 10
947 ? "Unlocking db %u key %s\n"
948 : "Unlocking db %u key %.20s\n",
949 (int)crec
->ctdb_ctx
->db_id
,
950 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
953 before
= timeval_current();
955 ret
= tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
);
957 timediff
= timeval_elapsed(&before
);
958 timediff
*= 1000; /* get us milliseconds */
960 if (timediff
> crec
->ctdb_ctx
->warn_unlock_msecs
) {
962 key
= hex_encode_talloc(talloc_tos(),
963 (unsigned char *)data
->key
.dptr
,
965 DEBUG(0, ("tdb_chainunlock on db %s, key %s took %f milliseconds\n",
966 tdb_name(crec
->ctdb_ctx
->wtdb
->tdb
), key
,
972 DEBUG(0, ("tdb_chainunlock failed\n"));
976 threshold
= crec
->ctdb_ctx
->warn_locktime_msecs
;
977 if (threshold
!= 0) {
978 timediff
= timeval_elapsed(&crec
->lock_time
) * 1000;
979 if (timediff
> threshold
) {
982 key
= hex_encode_talloc(data
,
983 (unsigned char *)data
->key
.dptr
,
985 DEBUG(0, ("Held tdb lock on db %s, key %s "
987 tdb_name(crec
->ctdb_ctx
->wtdb
->tdb
),
996 * Check whether we have a valid local copy of the given record,
997 * either for reading or for writing.
999 static bool db_ctdb_can_use_local_hdr(const struct ctdb_ltdb_header
*hdr
,
1002 #ifdef HAVE_CTDB_WANT_READONLY_DECL
1003 if (hdr
->dmaster
!= get_my_vnn()) {
1004 /* If we're not dmaster, it must be r/o copy. */
1005 return read_only
&& (hdr
->flags
& CTDB_REC_RO_HAVE_READONLY
);
1009 * If we want write access, no one may have r/o copies.
1011 return read_only
|| !(hdr
->flags
& CTDB_REC_RO_HAVE_DELEGATIONS
);
1013 return (hdr
->dmaster
== get_my_vnn());
1017 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data
, bool read_only
)
1019 if (ctdb_data
.dptr
== NULL
) {
1023 if (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1027 return db_ctdb_can_use_local_hdr(
1028 (struct ctdb_ltdb_header
*)ctdb_data
.dptr
, read_only
);
1031 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
1032 TALLOC_CTX
*mem_ctx
,
1036 struct db_record
*result
;
1037 struct db_ctdb_rec
*crec
;
1040 int migrate_attempts
;
1041 struct timeval migrate_start
;
1042 struct timeval chainlock_start
;
1043 struct timeval ctdb_start_time
;
1044 double chainlock_time
= 0;
1045 double ctdb_time
= 0;
1049 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
1050 DEBUG(0, ("talloc failed\n"));
1054 if (!(crec
= talloc_zero(result
, struct db_ctdb_rec
))) {
1055 DEBUG(0, ("talloc failed\n"));
1056 TALLOC_FREE(result
);
1060 result
->db
= ctx
->db
;
1061 result
->private_data
= (void *)crec
;
1062 crec
->ctdb_ctx
= ctx
;
1064 result
->key
.dsize
= key
.dsize
;
1065 result
->key
.dptr
= (uint8_t *)talloc_memdup(result
, key
.dptr
,
1067 if (result
->key
.dptr
== NULL
) {
1068 DEBUG(0, ("talloc failed\n"));
1069 TALLOC_FREE(result
);
1073 migrate_attempts
= 0;
1074 GetTimeOfDay(&migrate_start
);
1077 * Do a blocking lock on the record
1081 if (DEBUGLEVEL
>= 10) {
1082 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
1083 DEBUG(10, (DEBUGLEVEL
> 10
1084 ? "Locking db %u key %s\n"
1085 : "Locking db %u key %.20s\n",
1086 (int)crec
->ctdb_ctx
->db_id
, keystr
));
1087 TALLOC_FREE(keystr
);
1090 GetTimeOfDay(&chainlock_start
);
1092 ? tdb_chainlock_nonblock(ctx
->wtdb
->tdb
, key
)
1093 : tdb_chainlock(ctx
->wtdb
->tdb
, key
);
1094 chainlock_time
+= timeval_elapsed(&chainlock_start
);
1097 DEBUG(3, ("tdb_chainlock failed\n"));
1098 TALLOC_FREE(result
);
1102 result
->store
= db_ctdb_store
;
1103 result
->delete_rec
= db_ctdb_delete
;
1104 talloc_set_destructor(result
, db_ctdb_record_destr
);
1106 ctdb_data
= tdb_fetch_compat(ctx
->wtdb
->tdb
, key
);
1109 * See if we have a valid record and we are the dmaster. If so, we can
1110 * take the shortcut and just return it.
1113 if (!db_ctdb_can_use_local_copy(ctdb_data
, false)) {
1114 SAFE_FREE(ctdb_data
.dptr
);
1115 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
1116 talloc_set_destructor(result
, NULL
);
1118 if (tryonly
&& (migrate_attempts
!= 0)) {
1119 DEBUG(5, ("record migrated away again\n"));
1120 TALLOC_FREE(result
);
1124 migrate_attempts
+= 1;
1126 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u) %u\n",
1127 ctdb_data
.dptr
, ctdb_data
.dptr
?
1128 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
1131 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->flags
: 0));
1133 GetTimeOfDay(&ctdb_start_time
);
1134 status
= ctdbd_migrate(messaging_ctdbd_connection(), ctx
->db_id
,
1136 ctdb_time
+= timeval_elapsed(&ctdb_start_time
);
1138 if (!NT_STATUS_IS_OK(status
)) {
1139 DEBUG(5, ("ctdb_migrate failed: %s\n",
1140 nt_errstr(status
)));
1141 TALLOC_FREE(result
);
1144 /* now its migrated, try again */
1150 duration
= timeval_elapsed(&migrate_start
);
1153 * Convert the duration to milliseconds to avoid a
1154 * floating-point division of
1155 * lp_parm_int("migrate_duration") by 1000.
1157 duration_msecs
= duration
* 1000;
1160 if ((migrate_attempts
> ctx
->warn_migrate_attempts
) ||
1161 (duration_msecs
> ctx
->warn_migrate_msecs
)) {
1164 if (tdb_get_flags(ctx
->wtdb
->tdb
) & TDB_INCOMPATIBLE_HASH
) {
1165 chain
= tdb_jenkins_hash(&key
) %
1166 tdb_hash_size(ctx
->wtdb
->tdb
);
1169 DEBUG(0, ("db_ctdb_fetch_locked for %s key %s, chain %d "
1170 "needed %d attempts, %d milliseconds, "
1171 "chainlock: %f ms, CTDB %f ms\n",
1172 tdb_name(ctx
->wtdb
->tdb
),
1173 hex_encode_talloc(talloc_tos(),
1174 (unsigned char *)key
.dptr
,
1177 migrate_attempts
, duration_msecs
,
1178 chainlock_time
* 1000.0,
1179 ctdb_time
* 1000.0));
1182 GetTimeOfDay(&crec
->lock_time
);
1184 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1186 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1187 result
->value
.dptr
= NULL
;
1189 if ((result
->value
.dsize
!= 0)
1190 && !(result
->value
.dptr
= (uint8_t *)talloc_memdup(
1191 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1192 result
->value
.dsize
))) {
1193 DEBUG(0, ("talloc failed\n"));
1194 TALLOC_FREE(result
);
1197 SAFE_FREE(ctdb_data
.dptr
);
1202 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1203 TALLOC_CTX
*mem_ctx
,
1206 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1207 struct db_ctdb_ctx
);
1209 if (ctx
->transaction
!= NULL
) {
1210 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1213 if (db
->persistent
) {
1214 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1217 return fetch_locked_internal(ctx
, mem_ctx
, key
, false);
1220 static struct db_record
*db_ctdb_try_fetch_locked(struct db_context
*db
,
1221 TALLOC_CTX
*mem_ctx
,
1224 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1225 struct db_ctdb_ctx
);
1227 if (ctx
->transaction
!= NULL
) {
1228 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1231 if (db
->persistent
) {
1232 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1235 return fetch_locked_internal(ctx
, mem_ctx
, key
, true);
1238 struct db_ctdb_parse_record_state
{
1239 void (*parser
)(TDB_DATA key
, TDB_DATA data
, void *private_data
);
1241 bool ask_for_readonly_copy
;
1245 static void db_ctdb_parse_record_parser(
1246 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
1247 TDB_DATA data
, void *private_data
)
1249 struct db_ctdb_parse_record_state
*state
=
1250 (struct db_ctdb_parse_record_state
*)private_data
;
1251 state
->parser(key
, data
, state
->private_data
);
1254 static void db_ctdb_parse_record_parser_nonpersistent(
1255 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
1256 TDB_DATA data
, void *private_data
)
1258 struct db_ctdb_parse_record_state
*state
=
1259 (struct db_ctdb_parse_record_state
*)private_data
;
1261 if (db_ctdb_can_use_local_hdr(header
, true)) {
1262 state
->parser(key
, data
, state
->private_data
);
1266 * We found something in the db, so it seems that this record,
1267 * while not usable locally right now, is popular. Ask for a
1270 state
->ask_for_readonly_copy
= true;
1274 static NTSTATUS
db_ctdb_parse_record(struct db_context
*db
, TDB_DATA key
,
1275 void (*parser
)(TDB_DATA key
,
1277 void *private_data
),
1280 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(
1281 db
->private_data
, struct db_ctdb_ctx
);
1282 struct db_ctdb_parse_record_state state
;
1285 state
.parser
= parser
;
1286 state
.private_data
= private_data
;
1288 if (ctx
->transaction
!= NULL
) {
1289 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
1293 * Transactions only happen for persistent db's.
1296 found
= parse_newest_in_marshall_buffer(
1297 h
->m_write
, key
, db_ctdb_parse_record_parser
, &state
);
1300 return NT_STATUS_OK
;
1304 if (db
->persistent
) {
1306 * Persistent db, but not found in the transaction buffer
1308 return db_ctdb_ltdb_parse(
1309 ctx
, key
, db_ctdb_parse_record_parser
, &state
);
1313 state
.ask_for_readonly_copy
= false;
1315 status
= db_ctdb_ltdb_parse(
1316 ctx
, key
, db_ctdb_parse_record_parser_nonpersistent
, &state
);
1317 if (NT_STATUS_IS_OK(status
) && state
.done
) {
1318 return NT_STATUS_OK
;
1321 return ctdbd_parse(messaging_ctdbd_connection(), ctx
->db_id
, key
,
1322 state
.ask_for_readonly_copy
, parser
, private_data
);
1325 struct traverse_state
{
1326 struct db_context
*db
;
1327 int (*fn
)(struct db_record
*rec
, void *private_data
);
1332 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1334 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1335 struct db_record
*rec
;
1336 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1337 /* we have to give them a locked record to prevent races */
1338 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1339 if (rec
&& rec
->value
.dsize
> 0) {
1340 state
->fn(rec
, state
->private_data
);
1342 talloc_free(tmp_ctx
);
1345 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1348 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1349 struct db_record
*rec
;
1350 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1354 * Skip the __db_sequence_number__ key:
1355 * This is used for persistent transactions internally.
1357 if (kbuf
.dsize
== strlen(CTDB_DB_SEQNUM_KEY
) + 1 &&
1358 strcmp((const char*)kbuf
.dptr
, CTDB_DB_SEQNUM_KEY
) == 0)
1363 /* we have to give them a locked record to prevent races */
1364 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1365 if (rec
&& rec
->value
.dsize
> 0) {
1366 ret
= state
->fn(rec
, state
->private_data
);
1370 talloc_free(tmp_ctx
);
1374 /* wrapper to use traverse_persistent_callback with dbwrap */
1375 static int traverse_persistent_callback_dbwrap(struct db_record
*rec
, void* data
)
1377 return traverse_persistent_callback(NULL
, rec
->key
, rec
->value
, data
);
1381 static int db_ctdb_traverse(struct db_context
*db
,
1382 int (*fn
)(struct db_record
*rec
,
1383 void *private_data
),
1387 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1388 struct db_ctdb_ctx
);
1389 struct traverse_state state
;
1393 state
.private_data
= private_data
;
1396 if (db
->persistent
) {
1397 struct tdb_context
*ltdb
= ctx
->wtdb
->tdb
;
1400 /* for persistent databases we don't need to do a ctdb traverse,
1401 we can do a faster local traverse */
1402 ret
= tdb_traverse(ltdb
, traverse_persistent_callback
, &state
);
1406 if (ctx
->transaction
&& ctx
->transaction
->m_write
) {
1408 * we now have to handle keys not yet
1409 * present at transaction start
1411 struct db_context
*newkeys
= db_open_rbt(talloc_tos());
1412 struct ctdb_marshall_buffer
*mbuf
= ctx
->transaction
->m_write
;
1413 struct ctdb_rec_data
*rec
=NULL
;
1417 if (newkeys
== NULL
) {
1421 for (i
=0; i
<mbuf
->count
; i
++) {
1423 rec
= db_ctdb_marshall_loop_next_key(
1425 SMB_ASSERT(rec
!= NULL
);
1427 if (!tdb_exists(ltdb
, key
)) {
1428 dbwrap_store(newkeys
, key
, tdb_null
, 0);
1431 status
= dbwrap_traverse(newkeys
,
1432 traverse_persistent_callback_dbwrap
,
1435 talloc_free(newkeys
);
1436 if (!NT_STATUS_IS_OK(status
)) {
1444 status
= ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1445 if (!NT_STATUS_IS_OK(status
)) {
1451 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1453 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1456 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1458 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1461 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1463 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1464 struct db_record rec
;
1470 rec
.store
= db_ctdb_store_deny
;
1471 rec
.delete_rec
= db_ctdb_delete_deny
;
1472 rec
.private_data
= NULL
;
1473 state
->fn(&rec
, state
->private_data
);
1477 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1480 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1481 struct db_record rec
;
1484 * Skip the __db_sequence_number__ key:
1485 * This is used for persistent transactions internally.
1487 if (kbuf
.dsize
== strlen(CTDB_DB_SEQNUM_KEY
) + 1 &&
1488 strcmp((const char*)kbuf
.dptr
, CTDB_DB_SEQNUM_KEY
) == 0)
1497 rec
.store
= db_ctdb_store_deny
;
1498 rec
.delete_rec
= db_ctdb_delete_deny
;
1499 rec
.private_data
= NULL
;
1501 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1502 /* a deleted record */
1505 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1506 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1509 return state
->fn(&rec
, state
->private_data
);
1512 static int db_ctdb_traverse_read(struct db_context
*db
,
1513 int (*fn
)(struct db_record
*rec
,
1514 void *private_data
),
1518 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1519 struct db_ctdb_ctx
);
1520 struct traverse_state state
;
1524 state
.private_data
= private_data
;
1527 if (db
->persistent
) {
1528 /* for persistent databases we don't need to do a ctdb traverse,
1529 we can do a faster local traverse */
1530 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1533 status
= ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1534 if (!NT_STATUS_IS_OK(status
)) {
1540 static int db_ctdb_get_seqnum(struct db_context
*db
)
1542 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1543 struct db_ctdb_ctx
);
1544 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1547 static void db_ctdb_id(struct db_context
*db
, const uint8_t **id
,
1550 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(
1551 db
->private_data
, struct db_ctdb_ctx
);
1553 *id
= (uint8_t *)&ctx
->db_id
;
1554 *idlen
= sizeof(ctx
->db_id
);
1557 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1559 int hash_size
, int tdb_flags
,
1560 int open_flags
, mode_t mode
,
1561 enum dbwrap_lock_order lock_order
,
1562 uint64_t dbwrap_flags
)
1564 struct db_context
*result
;
1565 struct db_ctdb_ctx
*db_ctdb
;
1567 struct ctdbd_connection
*conn
;
1568 struct loadparm_context
*lp_ctx
;
1569 struct ctdb_db_priority prio
;
1573 if (!lp_clustering()) {
1574 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1578 if (!(result
= talloc_zero(mem_ctx
, struct db_context
))) {
1579 DEBUG(0, ("talloc failed\n"));
1580 TALLOC_FREE(result
);
1584 if (!(db_ctdb
= talloc(result
, struct db_ctdb_ctx
))) {
1585 DEBUG(0, ("talloc failed\n"));
1586 TALLOC_FREE(result
);
1590 result
->name
= talloc_strdup(result
, name
);
1591 if (result
->name
== NULL
) {
1592 DEBUG(0, ("talloc failed\n"));
1593 TALLOC_FREE(result
);
1597 db_ctdb
->transaction
= NULL
;
1598 db_ctdb
->db
= result
;
1600 conn
= messaging_ctdbd_connection();
1602 DEBUG(1, ("Could not connect to ctdb\n"));
1603 TALLOC_FREE(result
);
1607 if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn
, name
, &db_ctdb
->db_id
, tdb_flags
))) {
1608 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1609 TALLOC_FREE(result
);
1613 db_path
= ctdbd_dbpath(conn
, db_ctdb
, db_ctdb
->db_id
);
1615 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1616 result
->lock_order
= lock_order
;
1618 /* only pass through specific flags */
1619 tdb_flags
&= TDB_SEQNUM
|TDB_VOLATILE
;
1621 /* honor permissions if user has specified O_CREAT */
1622 if (open_flags
& O_CREAT
) {
1623 chmod(db_path
, mode
);
1626 prio
.db_id
= db_ctdb
->db_id
;
1627 prio
.priority
= lock_order
;
1629 status
= ctdbd_control_local(
1630 conn
, CTDB_CONTROL_SET_DB_PRIORITY
, 0, 0,
1631 make_tdb_data((uint8_t *)&prio
, sizeof(prio
)),
1632 NULL
, NULL
, &cstatus
);
1634 if (!NT_STATUS_IS_OK(status
) || (cstatus
!= 0)) {
1635 DEBUG(1, ("CTDB_CONTROL_SET_DB_PRIORITY failed: %s, %d\n",
1636 nt_errstr(status
), cstatus
));
1637 TALLOC_FREE(result
);
1641 #ifdef HAVE_CTDB_WANT_READONLY_DECL
1642 if (!result
->persistent
&&
1643 (dbwrap_flags
& DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS
))
1647 indata
= make_tdb_data((uint8_t *)&db_ctdb
->db_id
,
1648 sizeof(db_ctdb
->db_id
));
1650 status
= ctdbd_control_local(
1651 conn
, CTDB_CONTROL_SET_DB_READONLY
, 0, 0, indata
,
1652 NULL
, NULL
, &cstatus
);
1653 if (!NT_STATUS_IS_OK(status
) || (cstatus
!= 0)) {
1654 DEBUG(1, ("CTDB_CONTROL_SET_DB_READONLY failed: "
1655 "%s, %d\n", nt_errstr(status
), cstatus
));
1656 TALLOC_FREE(result
);
1662 lp_ctx
= loadparm_init_s3(db_path
, loadparm_s3_helpers());
1664 if (hash_size
== 0) {
1665 hash_size
= lpcfg_tdb_hash_size(lp_ctx
, db_path
);
1668 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
,
1669 lpcfg_tdb_flags(lp_ctx
, tdb_flags
),
1671 talloc_unlink(db_path
, lp_ctx
);
1672 if (db_ctdb
->wtdb
== NULL
) {
1673 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1674 TALLOC_FREE(result
);
1677 talloc_free(db_path
);
1679 if (result
->persistent
) {
1680 db_ctdb
->lock_ctx
= g_lock_ctx_init(db_ctdb
,
1681 ctdb_conn_msg_ctx(conn
));
1682 if (db_ctdb
->lock_ctx
== NULL
) {
1683 DEBUG(0, ("g_lock_ctx_init failed\n"));
1684 TALLOC_FREE(result
);
1689 db_ctdb
->warn_unlock_msecs
= lp_parm_int(-1, "ctdb",
1690 "unlock_warn_threshold", 5);
1691 db_ctdb
->warn_migrate_attempts
= lp_parm_int(-1, "ctdb",
1692 "migrate_attempts", 10);
1693 db_ctdb
->warn_migrate_msecs
= lp_parm_int(-1, "ctdb",
1694 "migrate_duration", 5000);
1695 db_ctdb
->warn_locktime_msecs
= lp_ctdb_locktime_warn_threshold();
1697 result
->private_data
= (void *)db_ctdb
;
1698 result
->fetch_locked
= db_ctdb_fetch_locked
;
1699 result
->try_fetch_locked
= db_ctdb_try_fetch_locked
;
1700 result
->parse_record
= db_ctdb_parse_record
;
1701 result
->traverse
= db_ctdb_traverse
;
1702 result
->traverse_read
= db_ctdb_traverse_read
;
1703 result
->get_seqnum
= db_ctdb_get_seqnum
;
1704 result
->transaction_start
= db_ctdb_transaction_start
;
1705 result
->transaction_commit
= db_ctdb_transaction_commit
;
1706 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1707 result
->id
= db_ctdb_id
;
1708 result
->stored_callback
= NULL
;
1710 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1711 name
, db_ctdb
->db_id
));