2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007-2009
5 Copyright (C) Michael Adam 2009
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "system/filesys.h"
23 #include "lib/util/tdb_wrap.h"
25 #include "dbwrap/dbwrap_ctdb.h"
26 #include "dbwrap/dbwrap_rbt.h"
27 #include "lib/param/param.h"
29 #ifdef CLUSTER_SUPPORT
32 * It is not possible to include ctdb.h and tdb_compat.h (included via
33 * some other include above) without warnings. This fixes those
41 #ifdef typesafe_cb_preargs
42 #undef typesafe_cb_preargs
45 #ifdef typesafe_cb_postargs
46 #undef typesafe_cb_postargs
50 #include "ctdb_private.h"
51 #include "ctdbd_conn.h"
52 #include "dbwrap/dbwrap.h"
53 #include "dbwrap/dbwrap_private.h"
54 #include "dbwrap/dbwrap_ctdb.h"
58 struct db_ctdb_transaction_handle
{
59 struct db_ctdb_ctx
*ctx
;
61 * we store the writes done under a transaction:
63 struct ctdb_marshall_buffer
*m_write
;
70 struct db_context
*db
;
71 struct tdb_wrap
*wtdb
;
73 struct db_ctdb_transaction_handle
*transaction
;
74 struct g_lock_ctx
*lock_ctx
;
78 struct db_ctdb_ctx
*ctdb_ctx
;
79 struct ctdb_ltdb_header header
;
80 struct timeval lock_time
;
83 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
85 enum TDB_ERROR tret
= tdb_error(tdb
);
87 return map_nt_error_from_tdb(tret
);
92 * fetch a record from the tdb, separating out the header
93 * information and returning the body of the record.
95 static NTSTATUS
db_ctdb_ltdb_fetch(struct db_ctdb_ctx
*db
,
97 struct ctdb_ltdb_header
*header
,
104 rec
= tdb_fetch_compat(db
->wtdb
->tdb
, key
);
105 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
106 status
= NT_STATUS_NOT_FOUND
;
111 header
->dmaster
= (uint32_t)-1;
118 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
122 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
123 if (data
->dsize
== 0) {
126 data
->dptr
= (unsigned char *)talloc_memdup(mem_ctx
,
128 + sizeof(struct ctdb_ltdb_header
),
130 if (data
->dptr
== NULL
) {
131 status
= NT_STATUS_NO_MEMORY
;
137 status
= NT_STATUS_OK
;
145 * Store a record together with the ctdb record header
146 * in the local copy of the database.
148 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
150 struct ctdb_ltdb_header
*header
,
153 TALLOC_CTX
*tmp_ctx
= talloc_stackframe();
157 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
158 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
160 if (rec
.dptr
== NULL
) {
161 talloc_free(tmp_ctx
);
162 return NT_STATUS_NO_MEMORY
;
165 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
166 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
168 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
170 talloc_free(tmp_ctx
);
172 return (ret
== 0) ? NT_STATUS_OK
173 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
178 form a ctdb_rec_data record from a key/data pair
180 note that header may be NULL. If not NULL then it is included in the data portion
183 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
185 struct ctdb_ltdb_header
*header
,
189 struct ctdb_rec_data
*d
;
191 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
192 data
.dsize
+ (header
?sizeof(*header
):0);
193 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
199 d
->keylen
= key
.dsize
;
200 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
202 d
->datalen
= data
.dsize
+ sizeof(*header
);
203 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
204 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
206 d
->datalen
= data
.dsize
;
207 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
213 /* helper function for marshalling multiple records */
214 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
215 struct ctdb_marshall_buffer
*m
,
219 struct ctdb_ltdb_header
*header
,
222 struct ctdb_rec_data
*r
;
223 size_t m_size
, r_size
;
224 struct ctdb_marshall_buffer
*m2
= NULL
;
226 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
233 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
234 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
241 m_size
= talloc_get_size(m
);
242 r_size
= talloc_get_size(r
);
244 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
245 mem_ctx
, m
, m_size
+ r_size
);
251 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
260 /* we've finished marshalling, return a data blob with the marshalled records */
261 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
264 data
.dptr
= (uint8_t *)m
;
265 data
.dsize
= talloc_get_size(m
);
270 loop over a marshalling buffer
272 - pass r==NULL to start
273 - loop the number of times indicated by m->count
275 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
277 struct ctdb_ltdb_header
*header
,
278 TDB_DATA
*key
, TDB_DATA
*data
)
281 r
= (struct ctdb_rec_data
*)&m
->data
[0];
283 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
291 key
->dptr
= &r
->data
[0];
292 key
->dsize
= r
->keylen
;
295 data
->dptr
= &r
->data
[r
->keylen
];
296 data
->dsize
= r
->datalen
;
297 if (header
!= NULL
) {
298 data
->dptr
+= sizeof(*header
);
299 data
->dsize
-= sizeof(*header
);
303 if (header
!= NULL
) {
304 if (r
->datalen
< sizeof(*header
)) {
307 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
314 * CTDB transaction destructor
316 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
320 status
= g_lock_unlock(h
->ctx
->lock_ctx
, h
->lock_name
);
321 if (!NT_STATUS_IS_OK(status
)) {
322 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h
->lock_name
,
330 * CTDB dbwrap API: transaction_start function
331 * starts a transaction on a persistent database
333 static int db_ctdb_transaction_start(struct db_context
*db
)
335 struct db_ctdb_transaction_handle
*h
;
337 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
340 if (!db
->persistent
) {
341 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
346 if (ctx
->transaction
) {
347 ctx
->transaction
->nesting
++;
348 DEBUG(5, (__location__
" transaction start on db 0x%08x: nesting %d -> %d\n",
349 ctx
->db_id
, ctx
->transaction
->nesting
- 1, ctx
->transaction
->nesting
));
353 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
355 DEBUG(0,(__location__
" oom for transaction handle\n"));
361 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x",
362 (unsigned int)ctx
->db_id
);
363 if (h
->lock_name
== NULL
) {
364 DEBUG(0, ("talloc_asprintf failed\n"));
370 * Wait a day, i.e. forever...
372 status
= g_lock_lock(ctx
->lock_ctx
, h
->lock_name
, G_LOCK_WRITE
,
373 timeval_set(86400, 0));
374 if (!NT_STATUS_IS_OK(status
)) {
375 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status
)));
380 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
382 ctx
->transaction
= h
;
384 DEBUG(5,(__location__
" transaction started on db 0x%08x\n", ctx
->db_id
));
389 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer
*buf
,
391 struct ctdb_ltdb_header
*pheader
,
395 struct ctdb_rec_data
*rec
= NULL
;
396 struct ctdb_ltdb_header h
;
409 * Walk the list of records written during this
410 * transaction. If we want to read one we have already
411 * written, return the last written sample. Thus we do not do
412 * a "break;" for the first hit, this record might have been
416 for (i
=0; i
<buf
->count
; i
++) {
417 TDB_DATA tkey
, tdata
;
419 struct ctdb_ltdb_header hdr
;
423 rec
= db_ctdb_marshall_loop_next(buf
, rec
, &reqid
, &hdr
, &tkey
,
429 if (tdb_data_equal(key
, tkey
)) {
441 data
.dptr
= (uint8_t *)talloc_memdup(mem_ctx
, data
.dptr
,
443 if ((data
.dsize
!= 0) && (data
.dptr
== NULL
)) {
449 if (pheader
!= NULL
) {
457 fetch a record inside a transaction
459 static NTSTATUS
db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
461 TDB_DATA key
, TDB_DATA
*data
)
463 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
467 found
= pull_newest_from_marshall_buffer(h
->m_write
, key
, NULL
,
473 status
= db_ctdb_ltdb_fetch(h
->ctx
, key
, NULL
, mem_ctx
, data
);
475 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
483 * Fetch a record from a persistent database
484 * without record locking and without an active transaction.
486 * This just fetches from the local database copy.
487 * Since the databases are kept in syc cluster-wide,
488 * there is no point in doing a ctdb call to fetch the
489 * record from the lmaster. It does even harm since migration
490 * of records bump their RSN and hence render the persistent
491 * database inconsistent.
493 static NTSTATUS
db_ctdb_fetch_persistent(struct db_ctdb_ctx
*db
,
495 TDB_DATA key
, TDB_DATA
*data
)
499 status
= db_ctdb_ltdb_fetch(db
, key
, NULL
, mem_ctx
, data
);
501 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
508 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
509 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
511 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
515 struct db_record
*result
;
518 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
519 DEBUG(0, ("talloc failed\n"));
523 result
->private_data
= ctx
->transaction
;
525 result
->key
.dsize
= key
.dsize
;
526 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
527 if (result
->key
.dptr
== NULL
) {
528 DEBUG(0, ("talloc failed\n"));
533 result
->store
= db_ctdb_store_transaction
;
534 result
->delete_rec
= db_ctdb_delete_transaction
;
536 if (pull_newest_from_marshall_buffer(ctx
->transaction
->m_write
, key
,
537 NULL
, result
, &result
->value
)) {
541 ctdb_data
= tdb_fetch_compat(ctx
->wtdb
->tdb
, key
);
542 if (ctdb_data
.dptr
== NULL
) {
543 /* create the record */
544 result
->value
= tdb_null
;
548 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
549 result
->value
.dptr
= NULL
;
551 if ((result
->value
.dsize
!= 0)
552 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
553 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
554 result
->value
.dsize
))) {
555 DEBUG(0, ("talloc failed\n"));
559 SAFE_FREE(ctdb_data
.dptr
);
564 static int db_ctdb_record_destructor(struct db_record
**recp
)
566 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
567 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
568 rec
->private_data
, struct db_ctdb_transaction_handle
);
569 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
571 DEBUG(0,(__location__
" transaction_commit failed\n"));
577 auto-create a transaction for persistent databases
579 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
584 struct db_record
*rec
, **recp
;
586 res
= db_ctdb_transaction_start(ctx
->db
);
591 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
593 ctx
->db
->transaction_cancel(ctx
->db
);
597 /* destroy this transaction when we release the lock */
598 recp
= talloc(rec
, struct db_record
*);
600 ctx
->db
->transaction_cancel(ctx
->db
);
605 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
611 stores a record inside a transaction
613 static NTSTATUS
db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
614 TDB_DATA key
, TDB_DATA data
)
616 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
618 struct ctdb_ltdb_header header
;
622 /* we need the header so we can update the RSN */
624 if (!pull_newest_from_marshall_buffer(h
->m_write
, key
, &header
,
627 rec
= tdb_fetch_compat(h
->ctx
->wtdb
->tdb
, key
);
629 if (rec
.dptr
!= NULL
) {
630 memcpy(&header
, rec
.dptr
,
631 sizeof(struct ctdb_ltdb_header
));
632 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
635 * a special case, we are writing the same
636 * data that is there now
638 if (data
.dsize
== rec
.dsize
&&
640 rec
.dptr
+ sizeof(struct ctdb_ltdb_header
),
643 talloc_free(tmp_ctx
);
650 header
.dmaster
= get_my_vnn();
653 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
654 if (h
->m_write
== NULL
) {
655 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
656 talloc_free(tmp_ctx
);
657 return NT_STATUS_NO_MEMORY
;
660 talloc_free(tmp_ctx
);
666 a record store inside a transaction
668 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
670 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
671 rec
->private_data
, struct db_ctdb_transaction_handle
);
674 status
= db_ctdb_transaction_store(h
, rec
->key
, data
);
679 a record delete inside a transaction
681 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
683 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
684 rec
->private_data
, struct db_ctdb_transaction_handle
);
687 status
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
692 * Fetch the db sequence number of a persistent db directly from the db.
694 static NTSTATUS
db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx
*db
,
698 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
701 struct ctdb_ltdb_header header
;
702 TALLOC_CTX
*mem_ctx
= talloc_stackframe();
704 if (seqnum
== NULL
) {
705 return NT_STATUS_INVALID_PARAMETER
;
708 key
= string_term_tdb_data(keyname
);
710 status
= db_ctdb_ltdb_fetch(db
, key
, &header
, mem_ctx
, &data
);
711 if (!NT_STATUS_IS_OK(status
) &&
712 !NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
))
717 status
= NT_STATUS_OK
;
719 if (data
.dsize
!= sizeof(uint64_t)) {
724 *seqnum
= *(uint64_t *)data
.dptr
;
727 TALLOC_FREE(mem_ctx
);
732 * Store the database sequence number inside a transaction.
734 static NTSTATUS
db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle
*h
,
738 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
742 key
= string_term_tdb_data(keyname
);
744 data
.dptr
= (uint8_t *)&seqnum
;
745 data
.dsize
= sizeof(uint64_t);
747 status
= db_ctdb_transaction_store(h
, key
, data
);
755 static int db_ctdb_transaction_commit(struct db_context
*db
)
757 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
761 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
762 uint64_t old_seqnum
, new_seqnum
;
766 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
770 if (h
->nested_cancel
) {
771 db
->transaction_cancel(db
);
772 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
776 if (h
->nesting
!= 0) {
778 DEBUG(5, (__location__
" transaction commit on db 0x%08x: nesting %d -> %d\n",
779 ctx
->db_id
, ctx
->transaction
->nesting
+ 1, ctx
->transaction
->nesting
));
783 if (h
->m_write
== NULL
) {
785 * No changes were made, so don't change the seqnum,
786 * don't push to other node, just exit with success.
792 DEBUG(5,(__location__
" transaction commit on db 0x%08x\n", ctx
->db_id
));
795 * As the last db action before committing, bump the database sequence
796 * number. Note that this undoes all changes to the seqnum records
797 * performed under the transaction. This record is not meant to be
798 * modified by user interaction. It is for internal use only...
800 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &old_seqnum
);
801 if (!NT_STATUS_IS_OK(rets
)) {
802 DEBUG(1, (__location__
" failed to fetch the db sequence number "
803 "in transaction commit on db 0x%08x\n", ctx
->db_id
));
808 new_seqnum
= old_seqnum
+ 1;
810 rets
= db_ctdb_store_db_seqnum(h
, new_seqnum
);
811 if (!NT_STATUS_IS_OK(rets
)) {
812 DEBUG(1, (__location__
"failed to store the db sequence number "
813 " in transaction commit on db 0x%08x\n", ctx
->db_id
));
819 /* tell ctdbd to commit to the other nodes */
820 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
821 CTDB_CONTROL_TRANS3_COMMIT
,
823 db_ctdb_marshall_finish(h
->m_write
),
824 NULL
, NULL
, &status
);
825 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
827 * The TRANS3_COMMIT control should only possibly fail when a
828 * recovery has been running concurrently. In any case, the db
829 * will be the same on all nodes, either the new copy or the
830 * old copy. This can be detected by comparing the old and new
831 * local sequence numbers.
833 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &new_seqnum
);
834 if (!NT_STATUS_IS_OK(rets
)) {
835 DEBUG(1, (__location__
" failed to refetch db sequence "
836 "number after failed TRANS3_COMMIT\n"));
841 if (new_seqnum
== old_seqnum
) {
842 /* Recovery prevented all our changes: retry. */
844 } else if (new_seqnum
!= (old_seqnum
+ 1)) {
845 DEBUG(0, (__location__
" ERROR: new_seqnum[%lu] != "
846 "old_seqnum[%lu] + (0 or 1) after failed "
847 "TRANS3_COMMIT - this should not happen!\n",
848 (unsigned long)new_seqnum
,
849 (unsigned long)old_seqnum
));
854 * Recovery propagated our changes to all nodes, completing
855 * our commit for us - succeed.
862 h
->ctx
->transaction
= NULL
;
871 static int db_ctdb_transaction_cancel(struct db_context
*db
)
873 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
875 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
878 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
882 if (h
->nesting
!= 0) {
884 h
->nested_cancel
= true;
885 DEBUG(5, (__location__
" transaction cancel on db 0x%08x: nesting %d -> %d\n",
886 ctx
->db_id
, ctx
->transaction
->nesting
+ 1, ctx
->transaction
->nesting
));
890 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
892 ctx
->transaction
= NULL
;
898 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
900 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
901 rec
->private_data
, struct db_ctdb_rec
);
903 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
908 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
909 static NTSTATUS
db_ctdb_send_schedule_for_deletion(struct db_record
*rec
)
912 struct ctdb_control_schedule_for_deletion
*dd
;
915 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
916 rec
->private_data
, struct db_ctdb_rec
);
918 indata
.dsize
= offsetof(struct ctdb_control_schedule_for_deletion
, key
) + rec
->key
.dsize
;
919 indata
.dptr
= talloc_zero_array(crec
, uint8_t, indata
.dsize
);
920 if (indata
.dptr
== NULL
) {
921 DEBUG(0, (__location__
" talloc failed!\n"));
922 return NT_STATUS_NO_MEMORY
;
925 dd
= (struct ctdb_control_schedule_for_deletion
*)(void *)indata
.dptr
;
926 dd
->db_id
= crec
->ctdb_ctx
->db_id
;
927 dd
->hdr
= crec
->header
;
928 dd
->keylen
= rec
->key
.dsize
;
929 memcpy(dd
->key
, rec
->key
.dptr
, rec
->key
.dsize
);
931 status
= ctdbd_control_local(messaging_ctdbd_connection(),
932 CTDB_CONTROL_SCHEDULE_FOR_DELETION
,
933 crec
->ctdb_ctx
->db_id
,
934 CTDB_CTRL_FLAG_NOREPLY
, /* flags */
939 talloc_free(indata
.dptr
);
941 if (!NT_STATUS_IS_OK(status
) || cstatus
!= 0) {
942 DEBUG(1, (__location__
" Error sending local control "
943 "SCHEDULE_FOR_DELETION: %s, cstatus = %d\n",
944 nt_errstr(status
), cstatus
));
945 if (NT_STATUS_IS_OK(status
)) {
946 status
= NT_STATUS_UNSUCCESSFUL
;
954 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
960 * We have to store the header with empty data. TODO: Fix the
966 status
= db_ctdb_store(rec
, data
, 0);
967 if (!NT_STATUS_IS_OK(status
)) {
971 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
972 status
= db_ctdb_send_schedule_for_deletion(rec
);
978 static int db_ctdb_record_destr(struct db_record
* data
)
980 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
981 data
->private_data
, struct db_ctdb_rec
);
984 DEBUG(10, (DEBUGLEVEL
> 10
985 ? "Unlocking db %u key %s\n"
986 : "Unlocking db %u key %.20s\n",
987 (int)crec
->ctdb_ctx
->db_id
,
988 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
991 tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
);
993 threshold
= lp_ctdb_locktime_warn_threshold();
994 if (threshold
!= 0) {
995 double timediff
= timeval_elapsed(&crec
->lock_time
);
996 if ((timediff
* 1000) > threshold
) {
997 DEBUG(0, ("Held tdb lock %f seconds\n", timediff
));
1004 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
1005 TALLOC_CTX
*mem_ctx
,
1008 struct db_record
*result
;
1009 struct db_ctdb_rec
*crec
;
1012 int migrate_attempts
= 0;
1014 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
1015 DEBUG(0, ("talloc failed\n"));
1019 if (!(crec
= talloc_zero(result
, struct db_ctdb_rec
))) {
1020 DEBUG(0, ("talloc failed\n"));
1021 TALLOC_FREE(result
);
1025 result
->private_data
= (void *)crec
;
1026 crec
->ctdb_ctx
= ctx
;
1028 result
->key
.dsize
= key
.dsize
;
1029 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
1030 if (result
->key
.dptr
== NULL
) {
1031 DEBUG(0, ("talloc failed\n"));
1032 TALLOC_FREE(result
);
1037 * Do a blocking lock on the record
1041 if (DEBUGLEVEL
>= 10) {
1042 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
1043 DEBUG(10, (DEBUGLEVEL
> 10
1044 ? "Locking db %u key %s\n"
1045 : "Locking db %u key %.20s\n",
1046 (int)crec
->ctdb_ctx
->db_id
, keystr
));
1047 TALLOC_FREE(keystr
);
1050 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
1051 DEBUG(3, ("tdb_chainlock failed\n"));
1052 TALLOC_FREE(result
);
1056 result
->store
= db_ctdb_store
;
1057 result
->delete_rec
= db_ctdb_delete
;
1058 talloc_set_destructor(result
, db_ctdb_record_destr
);
1060 ctdb_data
= tdb_fetch_compat(ctx
->wtdb
->tdb
, key
);
1063 * See if we have a valid record and we are the dmaster. If so, we can
1064 * take the shortcut and just return it.
1067 if ((ctdb_data
.dptr
== NULL
) ||
1068 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
1069 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
1071 || (random() % 2 != 0)
1074 SAFE_FREE(ctdb_data
.dptr
);
1075 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
1076 talloc_set_destructor(result
, NULL
);
1078 migrate_attempts
+= 1;
1080 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1081 ctdb_data
.dptr
, ctdb_data
.dptr
?
1082 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
1085 status
= ctdbd_migrate(messaging_ctdbd_connection(), ctx
->db_id
,
1087 if (!NT_STATUS_IS_OK(status
)) {
1088 DEBUG(5, ("ctdb_migrate failed: %s\n",
1089 nt_errstr(status
)));
1090 TALLOC_FREE(result
);
1093 /* now its migrated, try again */
1097 if (migrate_attempts
> 10) {
1098 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1102 GetTimeOfDay(&crec
->lock_time
);
1104 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1106 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1107 result
->value
.dptr
= NULL
;
1109 if ((result
->value
.dsize
!= 0)
1110 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
1111 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1112 result
->value
.dsize
))) {
1113 DEBUG(0, ("talloc failed\n"));
1114 TALLOC_FREE(result
);
1117 SAFE_FREE(ctdb_data
.dptr
);
1122 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1123 TALLOC_CTX
*mem_ctx
,
1126 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1127 struct db_ctdb_ctx
);
1129 if (ctx
->transaction
!= NULL
) {
1130 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1133 if (db
->persistent
) {
1134 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1137 return fetch_locked_internal(ctx
, mem_ctx
, key
);
1141 fetch (unlocked, no migration) operation on ctdb
1143 static NTSTATUS
db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
1144 TDB_DATA key
, TDB_DATA
*data
)
1146 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1147 struct db_ctdb_ctx
);
1151 if (ctx
->transaction
) {
1152 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
1155 if (db
->persistent
) {
1156 return db_ctdb_fetch_persistent(ctx
, mem_ctx
, key
, data
);
1159 /* try a direct fetch */
1160 ctdb_data
= tdb_fetch_compat(ctx
->wtdb
->tdb
, key
);
1163 * See if we have a valid record and we are the dmaster. If so, we can
1164 * take the shortcut and just return it.
1165 * we bypass the dmaster check for persistent databases
1167 if ((ctdb_data
.dptr
!= NULL
) &&
1168 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
1169 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())
1171 /* we are the dmaster - avoid the ctdb protocol op */
1173 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
1175 data
->dptr
= (uint8
*)talloc_memdup(
1176 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1179 SAFE_FREE(ctdb_data
.dptr
);
1181 if (data
->dptr
== NULL
) {
1182 return NT_STATUS_NO_MEMORY
;
1184 return NT_STATUS_OK
;
1187 SAFE_FREE(ctdb_data
.dptr
);
1189 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1190 status
= ctdbd_fetch(messaging_ctdbd_connection(), ctx
->db_id
, key
,
1192 if (!NT_STATUS_IS_OK(status
)) {
1193 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1199 static NTSTATUS
db_ctdb_parse_record(struct db_context
*db
, TDB_DATA key
,
1200 void (*parser
)(TDB_DATA key
,
1202 void *private_data
),
1208 status
= db_ctdb_fetch(db
, talloc_tos(), key
, &data
);
1209 if (!NT_STATUS_IS_OK(status
)) {
1212 parser(key
, data
, private_data
);
1213 TALLOC_FREE(data
.dptr
);
1214 return NT_STATUS_OK
;
1217 struct traverse_state
{
1218 struct db_context
*db
;
1219 int (*fn
)(struct db_record
*rec
, void *private_data
);
1223 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1225 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1226 struct db_record
*rec
;
1227 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1228 /* we have to give them a locked record to prevent races */
1229 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1230 if (rec
&& rec
->value
.dsize
> 0) {
1231 state
->fn(rec
, state
->private_data
);
1233 talloc_free(tmp_ctx
);
1236 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1239 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1240 struct db_record
*rec
;
1241 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1245 * Skip the __db_sequence_number__ key:
1246 * This is used for persistent transactions internally.
1248 if (kbuf
.dsize
== strlen(CTDB_DB_SEQNUM_KEY
) + 1 &&
1249 strcmp((const char*)kbuf
.dptr
, CTDB_DB_SEQNUM_KEY
) == 0)
1254 /* we have to give them a locked record to prevent races */
1255 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1256 if (rec
&& rec
->value
.dsize
> 0) {
1257 ret
= state
->fn(rec
, state
->private_data
);
1261 talloc_free(tmp_ctx
);
1265 /* wrapper to use traverse_persistent_callback with dbwrap */
1266 static int traverse_persistent_callback_dbwrap(struct db_record
*rec
, void* data
)
1268 return traverse_persistent_callback(NULL
, rec
->key
, rec
->value
, data
);
1272 static int db_ctdb_traverse(struct db_context
*db
,
1273 int (*fn
)(struct db_record
*rec
,
1274 void *private_data
),
1277 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1278 struct db_ctdb_ctx
);
1279 struct traverse_state state
;
1283 state
.private_data
= private_data
;
1285 if (db
->persistent
) {
1286 struct tdb_context
*ltdb
= ctx
->wtdb
->tdb
;
1289 /* for persistent databases we don't need to do a ctdb traverse,
1290 we can do a faster local traverse */
1291 ret
= tdb_traverse(ltdb
, traverse_persistent_callback
, &state
);
1295 if (ctx
->transaction
&& ctx
->transaction
->m_write
) {
1297 * we now have to handle keys not yet
1298 * present at transaction start
1300 struct db_context
*newkeys
= db_open_rbt(talloc_tos());
1301 struct ctdb_marshall_buffer
*mbuf
= ctx
->transaction
->m_write
;
1302 struct ctdb_rec_data
*rec
=NULL
;
1307 if (newkeys
== NULL
) {
1311 for (i
=0; i
<mbuf
->count
; i
++) {
1313 rec
=db_ctdb_marshall_loop_next(mbuf
, rec
,
1316 SMB_ASSERT(rec
!= NULL
);
1318 if (!tdb_exists(ltdb
, key
)) {
1319 dbwrap_store(newkeys
, key
, tdb_null
, 0);
1322 status
= dbwrap_traverse(newkeys
,
1323 traverse_persistent_callback_dbwrap
,
1326 talloc_free(newkeys
);
1327 if (!NT_STATUS_IS_OK(status
)) {
1336 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1340 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1342 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1345 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1347 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1350 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1352 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1353 struct db_record rec
;
1356 rec
.store
= db_ctdb_store_deny
;
1357 rec
.delete_rec
= db_ctdb_delete_deny
;
1358 rec
.private_data
= state
->db
;
1359 state
->fn(&rec
, state
->private_data
);
1362 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1365 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1366 struct db_record rec
;
1369 * Skip the __db_sequence_number__ key:
1370 * This is used for persistent transactions internally.
1372 if (kbuf
.dsize
== strlen(CTDB_DB_SEQNUM_KEY
) + 1 &&
1373 strcmp((const char*)kbuf
.dptr
, CTDB_DB_SEQNUM_KEY
) == 0)
1380 rec
.store
= db_ctdb_store_deny
;
1381 rec
.delete_rec
= db_ctdb_delete_deny
;
1382 rec
.private_data
= state
->db
;
1384 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1385 /* a deleted record */
1388 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1389 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1391 return state
->fn(&rec
, state
->private_data
);
1394 static int db_ctdb_traverse_read(struct db_context
*db
,
1395 int (*fn
)(struct db_record
*rec
,
1396 void *private_data
),
1399 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1400 struct db_ctdb_ctx
);
1401 struct traverse_state state
;
1405 state
.private_data
= private_data
;
1407 if (db
->persistent
) {
1408 /* for persistent databases we don't need to do a ctdb traverse,
1409 we can do a faster local traverse */
1410 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1413 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1417 static int db_ctdb_get_seqnum(struct db_context
*db
)
1419 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1420 struct db_ctdb_ctx
);
1421 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1424 static int db_ctdb_get_flags(struct db_context
*db
)
1426 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1427 struct db_ctdb_ctx
);
1428 return tdb_get_flags(ctx
->wtdb
->tdb
);
1431 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1433 int hash_size
, int tdb_flags
,
1434 int open_flags
, mode_t mode
,
1435 enum dbwrap_lock_order lock_order
)
1437 struct db_context
*result
;
1438 struct db_ctdb_ctx
*db_ctdb
;
1440 struct ctdbd_connection
*conn
;
1441 struct loadparm_context
*lp_ctx
;
1443 if (!lp_clustering()) {
1444 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1448 if (!(result
= talloc_zero(mem_ctx
, struct db_context
))) {
1449 DEBUG(0, ("talloc failed\n"));
1450 TALLOC_FREE(result
);
1454 if (!(db_ctdb
= talloc(result
, struct db_ctdb_ctx
))) {
1455 DEBUG(0, ("talloc failed\n"));
1456 TALLOC_FREE(result
);
1460 db_ctdb
->transaction
= NULL
;
1461 db_ctdb
->db
= result
;
1463 conn
= messaging_ctdbd_connection();
1465 DEBUG(1, ("Could not connect to ctdb\n"));
1466 TALLOC_FREE(result
);
1470 if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn
, name
, &db_ctdb
->db_id
, tdb_flags
))) {
1471 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1472 TALLOC_FREE(result
);
1476 db_path
= ctdbd_dbpath(conn
, db_ctdb
, db_ctdb
->db_id
);
1478 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1480 /* only pass through specific flags */
1481 tdb_flags
&= TDB_SEQNUM
;
1483 /* honor permissions if user has specified O_CREAT */
1484 if (open_flags
& O_CREAT
) {
1485 chmod(db_path
, mode
);
1488 lp_ctx
= loadparm_init_s3(db_path
, loadparm_s3_context());
1490 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
,
1492 talloc_unlink(db_path
, lp_ctx
);
1493 if (db_ctdb
->wtdb
== NULL
) {
1494 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1495 TALLOC_FREE(result
);
1498 talloc_free(db_path
);
1500 if (result
->persistent
) {
1501 db_ctdb
->lock_ctx
= g_lock_ctx_init(db_ctdb
,
1502 ctdb_conn_msg_ctx(conn
));
1503 if (db_ctdb
->lock_ctx
== NULL
) {
1504 DEBUG(0, ("g_lock_ctx_init failed\n"));
1505 TALLOC_FREE(result
);
1510 result
->private_data
= (void *)db_ctdb
;
1511 result
->fetch_locked
= db_ctdb_fetch_locked
;
1512 result
->parse_record
= db_ctdb_parse_record
;
1513 result
->traverse
= db_ctdb_traverse
;
1514 result
->traverse_read
= db_ctdb_traverse_read
;
1515 result
->get_seqnum
= db_ctdb_get_seqnum
;
1516 result
->get_flags
= db_ctdb_get_flags
;
1517 result
->transaction_start
= db_ctdb_transaction_start
;
1518 result
->transaction_commit
= db_ctdb_transaction_commit
;
1519 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1521 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1522 name
, db_ctdb
->db_id
));
1527 #else /* CLUSTER_SUPPORT */
1529 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1531 int hash_size
, int tdb_flags
,
1532 int open_flags
, mode_t mode
,
1533 enum dbwrap_lock_order lock_order
)
1535 DEBUG(3, ("db_open_ctdb: no cluster support!\n"));