2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007-2009
5 Copyright (C) Michael Adam 2009
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "system/filesys.h"
23 #include "lib/tdb_wrap/tdb_wrap.h"
25 #include "dbwrap/dbwrap.h"
26 #include "dbwrap/dbwrap_ctdb.h"
27 #include "dbwrap/dbwrap_rbt.h"
28 #include "lib/param/param.h"
30 #ifdef CLUSTER_SUPPORT
33 * It is not possible to include ctdb.h and tdb_compat.h (included via
34 * some other include above) without warnings. This fixes those
42 #ifdef typesafe_cb_preargs
43 #undef typesafe_cb_preargs
46 #ifdef typesafe_cb_postargs
47 #undef typesafe_cb_postargs
51 #include "ctdb_private.h"
52 #include "ctdbd_conn.h"
53 #include "dbwrap/dbwrap.h"
54 #include "dbwrap/dbwrap_private.h"
55 #include "dbwrap/dbwrap_ctdb.h"
59 struct db_ctdb_transaction_handle
{
60 struct db_ctdb_ctx
*ctx
;
62 * we store the writes done under a transaction:
64 struct ctdb_marshall_buffer
*m_write
;
71 struct db_context
*db
;
72 struct tdb_wrap
*wtdb
;
74 struct db_ctdb_transaction_handle
*transaction
;
75 struct g_lock_ctx
*lock_ctx
;
79 struct db_ctdb_ctx
*ctdb_ctx
;
80 struct ctdb_ltdb_header header
;
81 struct timeval lock_time
;
84 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
86 enum TDB_ERROR tret
= tdb_error(tdb
);
88 return map_nt_error_from_tdb(tret
);
93 * fetch a record from the tdb, separating out the header
94 * information and returning the body of the record.
96 static NTSTATUS
db_ctdb_ltdb_fetch(struct db_ctdb_ctx
*db
,
98 struct ctdb_ltdb_header
*header
,
105 rec
= tdb_fetch_compat(db
->wtdb
->tdb
, key
);
106 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
107 status
= NT_STATUS_NOT_FOUND
;
112 header
->dmaster
= (uint32_t)-1;
119 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
123 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
124 if (data
->dsize
== 0) {
127 data
->dptr
= (unsigned char *)talloc_memdup(mem_ctx
,
129 + sizeof(struct ctdb_ltdb_header
),
131 if (data
->dptr
== NULL
) {
132 status
= NT_STATUS_NO_MEMORY
;
138 status
= NT_STATUS_OK
;
146 * Store a record together with the ctdb record header
147 * in the local copy of the database.
149 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
151 struct ctdb_ltdb_header
*header
,
154 TALLOC_CTX
*tmp_ctx
= talloc_stackframe();
158 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
159 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
161 if (rec
.dptr
== NULL
) {
162 talloc_free(tmp_ctx
);
163 return NT_STATUS_NO_MEMORY
;
166 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
167 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
169 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
171 talloc_free(tmp_ctx
);
173 return (ret
== 0) ? NT_STATUS_OK
174 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
179 form a ctdb_rec_data record from a key/data pair
181 note that header may be NULL. If not NULL then it is included in the data portion
184 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
186 struct ctdb_ltdb_header
*header
,
190 struct ctdb_rec_data
*d
;
192 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
193 data
.dsize
+ (header
?sizeof(*header
):0);
194 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
200 d
->keylen
= key
.dsize
;
201 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
203 d
->datalen
= data
.dsize
+ sizeof(*header
);
204 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
205 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
207 d
->datalen
= data
.dsize
;
208 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
214 /* helper function for marshalling multiple records */
215 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
216 struct ctdb_marshall_buffer
*m
,
220 struct ctdb_ltdb_header
*header
,
223 struct ctdb_rec_data
*r
;
224 size_t m_size
, r_size
;
225 struct ctdb_marshall_buffer
*m2
= NULL
;
227 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
234 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
235 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
242 m_size
= talloc_get_size(m
);
243 r_size
= talloc_get_size(r
);
245 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
246 mem_ctx
, m
, m_size
+ r_size
);
252 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
261 /* we've finished marshalling, return a data blob with the marshalled records */
262 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
265 data
.dptr
= (uint8_t *)m
;
266 data
.dsize
= talloc_get_size(m
);
271 loop over a marshalling buffer
273 - pass r==NULL to start
274 - loop the number of times indicated by m->count
276 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
278 struct ctdb_ltdb_header
*header
,
279 TDB_DATA
*key
, TDB_DATA
*data
)
282 r
= (struct ctdb_rec_data
*)&m
->data
[0];
284 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
292 key
->dptr
= &r
->data
[0];
293 key
->dsize
= r
->keylen
;
296 data
->dptr
= &r
->data
[r
->keylen
];
297 data
->dsize
= r
->datalen
;
298 if (header
!= NULL
) {
299 data
->dptr
+= sizeof(*header
);
300 data
->dsize
-= sizeof(*header
);
304 if (header
!= NULL
) {
305 if (r
->datalen
< sizeof(*header
)) {
308 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
315 * CTDB transaction destructor
317 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
321 status
= g_lock_unlock(h
->ctx
->lock_ctx
, h
->lock_name
);
322 if (!NT_STATUS_IS_OK(status
)) {
323 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h
->lock_name
,
331 * CTDB dbwrap API: transaction_start function
332 * starts a transaction on a persistent database
334 static int db_ctdb_transaction_start(struct db_context
*db
)
336 struct db_ctdb_transaction_handle
*h
;
338 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
341 if (!db
->persistent
) {
342 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
347 if (ctx
->transaction
) {
348 ctx
->transaction
->nesting
++;
349 DEBUG(5, (__location__
" transaction start on db 0x%08x: nesting %d -> %d\n",
350 ctx
->db_id
, ctx
->transaction
->nesting
- 1, ctx
->transaction
->nesting
));
354 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
356 DEBUG(0,(__location__
" oom for transaction handle\n"));
362 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x",
363 (unsigned int)ctx
->db_id
);
364 if (h
->lock_name
== NULL
) {
365 DEBUG(0, ("talloc_asprintf failed\n"));
371 * Wait a day, i.e. forever...
373 status
= g_lock_lock(ctx
->lock_ctx
, h
->lock_name
, G_LOCK_WRITE
,
374 timeval_set(86400, 0));
375 if (!NT_STATUS_IS_OK(status
)) {
376 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status
)));
381 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
383 ctx
->transaction
= h
;
385 DEBUG(5,(__location__
" transaction started on db 0x%08x\n", ctx
->db_id
));
390 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer
*buf
,
392 struct ctdb_ltdb_header
*pheader
,
396 struct ctdb_rec_data
*rec
= NULL
;
397 struct ctdb_ltdb_header h
;
410 * Walk the list of records written during this
411 * transaction. If we want to read one we have already
412 * written, return the last written sample. Thus we do not do
413 * a "break;" for the first hit, this record might have been
417 for (i
=0; i
<buf
->count
; i
++) {
418 TDB_DATA tkey
, tdata
;
420 struct ctdb_ltdb_header hdr
;
424 rec
= db_ctdb_marshall_loop_next(buf
, rec
, &reqid
, &hdr
, &tkey
,
430 if (tdb_data_equal(key
, tkey
)) {
442 data
.dptr
= (uint8_t *)talloc_memdup(mem_ctx
, data
.dptr
,
444 if ((data
.dsize
!= 0) && (data
.dptr
== NULL
)) {
450 if (pheader
!= NULL
) {
458 fetch a record inside a transaction
460 static NTSTATUS
db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
462 TDB_DATA key
, TDB_DATA
*data
)
464 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
468 found
= pull_newest_from_marshall_buffer(h
->m_write
, key
, NULL
,
474 status
= db_ctdb_ltdb_fetch(h
->ctx
, key
, NULL
, mem_ctx
, data
);
476 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
484 * Fetch a record from a persistent database
485 * without record locking and without an active transaction.
487 * This just fetches from the local database copy.
488 * Since the databases are kept in syc cluster-wide,
489 * there is no point in doing a ctdb call to fetch the
490 * record from the lmaster. It does even harm since migration
491 * of records bump their RSN and hence render the persistent
492 * database inconsistent.
494 static NTSTATUS
db_ctdb_fetch_persistent(struct db_ctdb_ctx
*db
,
496 TDB_DATA key
, TDB_DATA
*data
)
500 status
= db_ctdb_ltdb_fetch(db
, key
, NULL
, mem_ctx
, data
);
502 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
509 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
510 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
512 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
516 struct db_record
*result
;
519 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
520 DEBUG(0, ("talloc failed\n"));
524 result
->private_data
= ctx
->transaction
;
526 result
->key
.dsize
= key
.dsize
;
527 result
->key
.dptr
= (uint8_t *)talloc_memdup(result
, key
.dptr
,
529 if (result
->key
.dptr
== NULL
) {
530 DEBUG(0, ("talloc failed\n"));
535 result
->store
= db_ctdb_store_transaction
;
536 result
->delete_rec
= db_ctdb_delete_transaction
;
538 if (pull_newest_from_marshall_buffer(ctx
->transaction
->m_write
, key
,
539 NULL
, result
, &result
->value
)) {
543 ctdb_data
= tdb_fetch_compat(ctx
->wtdb
->tdb
, key
);
544 if (ctdb_data
.dptr
== NULL
) {
545 /* create the record */
546 result
->value
= tdb_null
;
550 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
551 result
->value
.dptr
= NULL
;
553 if ((result
->value
.dsize
!= 0)
554 && !(result
->value
.dptr
= (uint8_t *)talloc_memdup(
555 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
556 result
->value
.dsize
))) {
557 DEBUG(0, ("talloc failed\n"));
561 SAFE_FREE(ctdb_data
.dptr
);
566 static int db_ctdb_record_destructor(struct db_record
**recp
)
568 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
569 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
570 rec
->private_data
, struct db_ctdb_transaction_handle
);
571 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
573 DEBUG(0,(__location__
" transaction_commit failed\n"));
579 auto-create a transaction for persistent databases
581 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
586 struct db_record
*rec
, **recp
;
588 res
= db_ctdb_transaction_start(ctx
->db
);
593 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
595 ctx
->db
->transaction_cancel(ctx
->db
);
599 /* destroy this transaction when we release the lock */
600 recp
= talloc(rec
, struct db_record
*);
602 ctx
->db
->transaction_cancel(ctx
->db
);
607 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
613 stores a record inside a transaction
615 static NTSTATUS
db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
616 TDB_DATA key
, TDB_DATA data
)
618 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
620 struct ctdb_ltdb_header header
;
624 /* we need the header so we can update the RSN */
626 if (!pull_newest_from_marshall_buffer(h
->m_write
, key
, &header
,
629 rec
= tdb_fetch_compat(h
->ctx
->wtdb
->tdb
, key
);
631 if (rec
.dptr
!= NULL
) {
632 memcpy(&header
, rec
.dptr
,
633 sizeof(struct ctdb_ltdb_header
));
634 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
637 * a special case, we are writing the same
638 * data that is there now
640 if (data
.dsize
== rec
.dsize
&&
642 rec
.dptr
+ sizeof(struct ctdb_ltdb_header
),
645 talloc_free(tmp_ctx
);
652 header
.dmaster
= get_my_vnn();
655 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
656 if (h
->m_write
== NULL
) {
657 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
658 talloc_free(tmp_ctx
);
659 return NT_STATUS_NO_MEMORY
;
662 talloc_free(tmp_ctx
);
668 a record store inside a transaction
670 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
672 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
673 rec
->private_data
, struct db_ctdb_transaction_handle
);
676 status
= db_ctdb_transaction_store(h
, rec
->key
, data
);
681 a record delete inside a transaction
683 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
685 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
686 rec
->private_data
, struct db_ctdb_transaction_handle
);
689 status
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
694 * Fetch the db sequence number of a persistent db directly from the db.
696 static NTSTATUS
db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx
*db
,
700 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
703 struct ctdb_ltdb_header header
;
704 TALLOC_CTX
*mem_ctx
= talloc_stackframe();
706 if (seqnum
== NULL
) {
707 return NT_STATUS_INVALID_PARAMETER
;
710 key
= string_term_tdb_data(keyname
);
712 status
= db_ctdb_ltdb_fetch(db
, key
, &header
, mem_ctx
, &data
);
713 if (!NT_STATUS_IS_OK(status
) &&
714 !NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
))
719 status
= NT_STATUS_OK
;
721 if (data
.dsize
!= sizeof(uint64_t)) {
726 *seqnum
= *(uint64_t *)data
.dptr
;
729 TALLOC_FREE(mem_ctx
);
734 * Store the database sequence number inside a transaction.
736 static NTSTATUS
db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle
*h
,
740 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
744 key
= string_term_tdb_data(keyname
);
746 data
.dptr
= (uint8_t *)&seqnum
;
747 data
.dsize
= sizeof(uint64_t);
749 status
= db_ctdb_transaction_store(h
, key
, data
);
757 static int db_ctdb_transaction_commit(struct db_context
*db
)
759 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
763 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
764 uint64_t old_seqnum
, new_seqnum
;
768 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
772 if (h
->nested_cancel
) {
773 db
->transaction_cancel(db
);
774 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
778 if (h
->nesting
!= 0) {
780 DEBUG(5, (__location__
" transaction commit on db 0x%08x: nesting %d -> %d\n",
781 ctx
->db_id
, ctx
->transaction
->nesting
+ 1, ctx
->transaction
->nesting
));
785 if (h
->m_write
== NULL
) {
787 * No changes were made, so don't change the seqnum,
788 * don't push to other node, just exit with success.
794 DEBUG(5,(__location__
" transaction commit on db 0x%08x\n", ctx
->db_id
));
797 * As the last db action before committing, bump the database sequence
798 * number. Note that this undoes all changes to the seqnum records
799 * performed under the transaction. This record is not meant to be
800 * modified by user interaction. It is for internal use only...
802 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &old_seqnum
);
803 if (!NT_STATUS_IS_OK(rets
)) {
804 DEBUG(1, (__location__
" failed to fetch the db sequence number "
805 "in transaction commit on db 0x%08x\n", ctx
->db_id
));
810 new_seqnum
= old_seqnum
+ 1;
812 rets
= db_ctdb_store_db_seqnum(h
, new_seqnum
);
813 if (!NT_STATUS_IS_OK(rets
)) {
814 DEBUG(1, (__location__
"failed to store the db sequence number "
815 " in transaction commit on db 0x%08x\n", ctx
->db_id
));
821 /* tell ctdbd to commit to the other nodes */
822 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
823 CTDB_CONTROL_TRANS3_COMMIT
,
825 db_ctdb_marshall_finish(h
->m_write
),
826 NULL
, NULL
, &status
);
827 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
829 * The TRANS3_COMMIT control should only possibly fail when a
830 * recovery has been running concurrently. In any case, the db
831 * will be the same on all nodes, either the new copy or the
832 * old copy. This can be detected by comparing the old and new
833 * local sequence numbers.
835 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &new_seqnum
);
836 if (!NT_STATUS_IS_OK(rets
)) {
837 DEBUG(1, (__location__
" failed to refetch db sequence "
838 "number after failed TRANS3_COMMIT\n"));
843 if (new_seqnum
== old_seqnum
) {
844 /* Recovery prevented all our changes: retry. */
846 } else if (new_seqnum
!= (old_seqnum
+ 1)) {
847 DEBUG(0, (__location__
" ERROR: new_seqnum[%lu] != "
848 "old_seqnum[%lu] + (0 or 1) after failed "
849 "TRANS3_COMMIT - this should not happen!\n",
850 (unsigned long)new_seqnum
,
851 (unsigned long)old_seqnum
));
856 * Recovery propagated our changes to all nodes, completing
857 * our commit for us - succeed.
864 h
->ctx
->transaction
= NULL
;
873 static int db_ctdb_transaction_cancel(struct db_context
*db
)
875 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
877 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
880 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
884 if (h
->nesting
!= 0) {
886 h
->nested_cancel
= true;
887 DEBUG(5, (__location__
" transaction cancel on db 0x%08x: nesting %d -> %d\n",
888 ctx
->db_id
, ctx
->transaction
->nesting
+ 1, ctx
->transaction
->nesting
));
892 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
894 ctx
->transaction
= NULL
;
900 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
902 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
903 rec
->private_data
, struct db_ctdb_rec
);
905 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
910 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
911 static NTSTATUS
db_ctdb_send_schedule_for_deletion(struct db_record
*rec
)
914 struct ctdb_control_schedule_for_deletion
*dd
;
917 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
918 rec
->private_data
, struct db_ctdb_rec
);
920 indata
.dsize
= offsetof(struct ctdb_control_schedule_for_deletion
, key
) + rec
->key
.dsize
;
921 indata
.dptr
= talloc_zero_array(crec
, uint8_t, indata
.dsize
);
922 if (indata
.dptr
== NULL
) {
923 DEBUG(0, (__location__
" talloc failed!\n"));
924 return NT_STATUS_NO_MEMORY
;
927 dd
= (struct ctdb_control_schedule_for_deletion
*)(void *)indata
.dptr
;
928 dd
->db_id
= crec
->ctdb_ctx
->db_id
;
929 dd
->hdr
= crec
->header
;
930 dd
->keylen
= rec
->key
.dsize
;
931 memcpy(dd
->key
, rec
->key
.dptr
, rec
->key
.dsize
);
933 status
= ctdbd_control_local(messaging_ctdbd_connection(),
934 CTDB_CONTROL_SCHEDULE_FOR_DELETION
,
935 crec
->ctdb_ctx
->db_id
,
936 CTDB_CTRL_FLAG_NOREPLY
, /* flags */
941 talloc_free(indata
.dptr
);
943 if (!NT_STATUS_IS_OK(status
) || cstatus
!= 0) {
944 DEBUG(1, (__location__
" Error sending local control "
945 "SCHEDULE_FOR_DELETION: %s, cstatus = %d\n",
946 nt_errstr(status
), cstatus
));
947 if (NT_STATUS_IS_OK(status
)) {
948 status
= NT_STATUS_UNSUCCESSFUL
;
956 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
962 * We have to store the header with empty data. TODO: Fix the
968 status
= db_ctdb_store(rec
, data
, 0);
969 if (!NT_STATUS_IS_OK(status
)) {
973 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
974 status
= db_ctdb_send_schedule_for_deletion(rec
);
980 static int db_ctdb_record_destr(struct db_record
* data
)
982 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
983 data
->private_data
, struct db_ctdb_rec
);
986 DEBUG(10, (DEBUGLEVEL
> 10
987 ? "Unlocking db %u key %s\n"
988 : "Unlocking db %u key %.20s\n",
989 (int)crec
->ctdb_ctx
->db_id
,
990 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
993 tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
);
995 threshold
= lp_ctdb_locktime_warn_threshold();
996 if (threshold
!= 0) {
997 double timediff
= timeval_elapsed(&crec
->lock_time
);
998 if ((timediff
* 1000) > threshold
) {
1001 key
= hex_encode_talloc(data
,
1002 (unsigned char *)data
->key
.dptr
,
1004 DEBUG(0, ("Held tdb lock on db %s, key %s %f seconds\n",
1005 tdb_name(crec
->ctdb_ctx
->wtdb
->tdb
), key
,
1014 * Check whether we have a valid local copy of the given record,
1015 * either for reading or for writing.
1017 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data
, bool read_only
)
1019 struct ctdb_ltdb_header
*hdr
;
1021 if (ctdb_data
.dptr
== NULL
)
1024 if (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
))
1027 hdr
= (struct ctdb_ltdb_header
*)ctdb_data
.dptr
;
1029 #ifdef HAVE_CTDB_WANT_READONLY_DECL
1030 if (hdr
->dmaster
!= get_my_vnn()) {
1031 /* If we're not dmaster, it must be r/o copy. */
1032 return read_only
&& (hdr
->flags
& CTDB_REC_RO_HAVE_READONLY
);
1036 * If we want write access, no one may have r/o copies.
1038 return read_only
|| !(hdr
->flags
& CTDB_REC_RO_HAVE_DELEGATIONS
);
1040 return (hdr
->dmaster
== get_my_vnn());
1044 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
1045 TALLOC_CTX
*mem_ctx
,
1049 struct db_record
*result
;
1050 struct db_ctdb_rec
*crec
;
1053 int migrate_attempts
= 0;
1056 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
1057 DEBUG(0, ("talloc failed\n"));
1061 if (!(crec
= talloc_zero(result
, struct db_ctdb_rec
))) {
1062 DEBUG(0, ("talloc failed\n"));
1063 TALLOC_FREE(result
);
1067 result
->db
= ctx
->db
;
1068 result
->private_data
= (void *)crec
;
1069 crec
->ctdb_ctx
= ctx
;
1071 result
->key
.dsize
= key
.dsize
;
1072 result
->key
.dptr
= (uint8_t *)talloc_memdup(result
, key
.dptr
,
1074 if (result
->key
.dptr
== NULL
) {
1075 DEBUG(0, ("talloc failed\n"));
1076 TALLOC_FREE(result
);
1081 * Do a blocking lock on the record
1085 if (DEBUGLEVEL
>= 10) {
1086 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
1087 DEBUG(10, (DEBUGLEVEL
> 10
1088 ? "Locking db %u key %s\n"
1089 : "Locking db %u key %.20s\n",
1090 (int)crec
->ctdb_ctx
->db_id
, keystr
));
1091 TALLOC_FREE(keystr
);
1095 ? tdb_chainlock_nonblock(ctx
->wtdb
->tdb
, key
)
1096 : tdb_chainlock(ctx
->wtdb
->tdb
, key
);
1098 DEBUG(3, ("tdb_chainlock failed\n"));
1099 TALLOC_FREE(result
);
1103 result
->store
= db_ctdb_store
;
1104 result
->delete_rec
= db_ctdb_delete
;
1105 talloc_set_destructor(result
, db_ctdb_record_destr
);
1107 ctdb_data
= tdb_fetch_compat(ctx
->wtdb
->tdb
, key
);
1110 * See if we have a valid record and we are the dmaster. If so, we can
1111 * take the shortcut and just return it.
1114 if (!db_ctdb_can_use_local_copy(ctdb_data
, false)) {
1115 SAFE_FREE(ctdb_data
.dptr
);
1116 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
1117 talloc_set_destructor(result
, NULL
);
1119 if (tryonly
&& (migrate_attempts
!= 0)) {
1120 DEBUG(5, ("record migrated away again\n"));
1121 TALLOC_FREE(result
);
1125 migrate_attempts
+= 1;
1127 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u) %u\n",
1128 ctdb_data
.dptr
, ctdb_data
.dptr
?
1129 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
1132 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->flags
: 0));
1134 status
= ctdbd_migrate(messaging_ctdbd_connection(), ctx
->db_id
,
1136 if (!NT_STATUS_IS_OK(status
)) {
1137 DEBUG(5, ("ctdb_migrate failed: %s\n",
1138 nt_errstr(status
)));
1139 TALLOC_FREE(result
);
1142 /* now its migrated, try again */
1146 if (migrate_attempts
> 10) {
1147 DEBUG(0, ("db_ctdb_fetch_locked for %s key %s needed %d "
1148 "attempts\n", tdb_name(ctx
->wtdb
->tdb
),
1149 hex_encode_talloc(talloc_tos(),
1150 (unsigned char *)key
.dptr
,
1155 GetTimeOfDay(&crec
->lock_time
);
1157 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1159 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1160 result
->value
.dptr
= NULL
;
1162 if ((result
->value
.dsize
!= 0)
1163 && !(result
->value
.dptr
= (uint8_t *)talloc_memdup(
1164 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1165 result
->value
.dsize
))) {
1166 DEBUG(0, ("talloc failed\n"));
1167 TALLOC_FREE(result
);
1170 SAFE_FREE(ctdb_data
.dptr
);
1175 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1176 TALLOC_CTX
*mem_ctx
,
1179 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1180 struct db_ctdb_ctx
);
1182 if (ctx
->transaction
!= NULL
) {
1183 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1186 if (db
->persistent
) {
1187 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1190 return fetch_locked_internal(ctx
, mem_ctx
, key
, false);
1193 static struct db_record
*db_ctdb_try_fetch_locked(struct db_context
*db
,
1194 TALLOC_CTX
*mem_ctx
,
1197 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1198 struct db_ctdb_ctx
);
1200 if (ctx
->transaction
!= NULL
) {
1201 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1204 if (db
->persistent
) {
1205 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1208 return fetch_locked_internal(ctx
, mem_ctx
, key
, true);
1212 fetch (unlocked, no migration) operation on ctdb
1214 static NTSTATUS
db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
1215 TDB_DATA key
, TDB_DATA
*data
)
1217 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1218 struct db_ctdb_ctx
);
1222 if (ctx
->transaction
) {
1223 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
1226 if (db
->persistent
) {
1227 return db_ctdb_fetch_persistent(ctx
, mem_ctx
, key
, data
);
1230 /* try a direct fetch */
1231 ctdb_data
= tdb_fetch_compat(ctx
->wtdb
->tdb
, key
);
1234 * See if we have a valid record and we are the dmaster. If so, we can
1235 * take the shortcut and just return it.
1236 * we bypass the dmaster check for persistent databases
1238 if (db_ctdb_can_use_local_copy(ctdb_data
, true)) {
1240 * We have a valid local copy - avoid the ctdb protocol op
1242 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
1244 data
->dptr
= (uint8_t *)talloc_memdup(
1245 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1248 SAFE_FREE(ctdb_data
.dptr
);
1250 if (data
->dptr
== NULL
) {
1251 return NT_STATUS_NO_MEMORY
;
1253 return NT_STATUS_OK
;
1256 SAFE_FREE(ctdb_data
.dptr
);
1259 * We weren't able to get it locally - ask ctdb to fetch it for us.
1260 * If we already had *something*, it's probably worth making a local
1263 status
= ctdbd_fetch(messaging_ctdbd_connection(), ctx
->db_id
, key
,
1265 ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
));
1266 if (!NT_STATUS_IS_OK(status
)) {
1267 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1273 static NTSTATUS
db_ctdb_parse_record(struct db_context
*db
, TDB_DATA key
,
1274 void (*parser
)(TDB_DATA key
,
1276 void *private_data
),
1282 status
= db_ctdb_fetch(db
, talloc_tos(), key
, &data
);
1283 if (!NT_STATUS_IS_OK(status
)) {
1286 parser(key
, data
, private_data
);
1287 TALLOC_FREE(data
.dptr
);
1288 return NT_STATUS_OK
;
1291 struct traverse_state
{
1292 struct db_context
*db
;
1293 int (*fn
)(struct db_record
*rec
, void *private_data
);
1298 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1300 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1301 struct db_record
*rec
;
1302 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1303 /* we have to give them a locked record to prevent races */
1304 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1305 if (rec
&& rec
->value
.dsize
> 0) {
1306 state
->fn(rec
, state
->private_data
);
1308 talloc_free(tmp_ctx
);
1311 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1314 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1315 struct db_record
*rec
;
1316 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1320 * Skip the __db_sequence_number__ key:
1321 * This is used for persistent transactions internally.
1323 if (kbuf
.dsize
== strlen(CTDB_DB_SEQNUM_KEY
) + 1 &&
1324 strcmp((const char*)kbuf
.dptr
, CTDB_DB_SEQNUM_KEY
) == 0)
1329 /* we have to give them a locked record to prevent races */
1330 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1331 if (rec
&& rec
->value
.dsize
> 0) {
1332 ret
= state
->fn(rec
, state
->private_data
);
1336 talloc_free(tmp_ctx
);
1340 /* wrapper to use traverse_persistent_callback with dbwrap */
1341 static int traverse_persistent_callback_dbwrap(struct db_record
*rec
, void* data
)
1343 return traverse_persistent_callback(NULL
, rec
->key
, rec
->value
, data
);
1347 static int db_ctdb_traverse(struct db_context
*db
,
1348 int (*fn
)(struct db_record
*rec
,
1349 void *private_data
),
1353 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1354 struct db_ctdb_ctx
);
1355 struct traverse_state state
;
1359 state
.private_data
= private_data
;
1362 if (db
->persistent
) {
1363 struct tdb_context
*ltdb
= ctx
->wtdb
->tdb
;
1366 /* for persistent databases we don't need to do a ctdb traverse,
1367 we can do a faster local traverse */
1368 ret
= tdb_traverse(ltdb
, traverse_persistent_callback
, &state
);
1372 if (ctx
->transaction
&& ctx
->transaction
->m_write
) {
1374 * we now have to handle keys not yet
1375 * present at transaction start
1377 struct db_context
*newkeys
= db_open_rbt(talloc_tos());
1378 struct ctdb_marshall_buffer
*mbuf
= ctx
->transaction
->m_write
;
1379 struct ctdb_rec_data
*rec
=NULL
;
1383 if (newkeys
== NULL
) {
1387 for (i
=0; i
<mbuf
->count
; i
++) {
1389 rec
=db_ctdb_marshall_loop_next(mbuf
, rec
,
1392 SMB_ASSERT(rec
!= NULL
);
1394 if (!tdb_exists(ltdb
, key
)) {
1395 dbwrap_store(newkeys
, key
, tdb_null
, 0);
1398 status
= dbwrap_traverse(newkeys
,
1399 traverse_persistent_callback_dbwrap
,
1402 talloc_free(newkeys
);
1403 if (!NT_STATUS_IS_OK(status
)) {
1411 status
= ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1412 if (!NT_STATUS_IS_OK(status
)) {
1418 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1420 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1423 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1425 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1428 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1430 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1431 struct db_record rec
;
1434 rec
.store
= db_ctdb_store_deny
;
1435 rec
.delete_rec
= db_ctdb_delete_deny
;
1436 rec
.private_data
= state
->db
;
1437 state
->fn(&rec
, state
->private_data
);
1441 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1444 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1445 struct db_record rec
;
1448 * Skip the __db_sequence_number__ key:
1449 * This is used for persistent transactions internally.
1451 if (kbuf
.dsize
== strlen(CTDB_DB_SEQNUM_KEY
) + 1 &&
1452 strcmp((const char*)kbuf
.dptr
, CTDB_DB_SEQNUM_KEY
) == 0)
1459 rec
.store
= db_ctdb_store_deny
;
1460 rec
.delete_rec
= db_ctdb_delete_deny
;
1461 rec
.private_data
= state
->db
;
1463 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1464 /* a deleted record */
1467 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1468 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1471 return state
->fn(&rec
, state
->private_data
);
1474 static int db_ctdb_traverse_read(struct db_context
*db
,
1475 int (*fn
)(struct db_record
*rec
,
1476 void *private_data
),
1480 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1481 struct db_ctdb_ctx
);
1482 struct traverse_state state
;
1486 state
.private_data
= private_data
;
1489 if (db
->persistent
) {
1490 /* for persistent databases we don't need to do a ctdb traverse,
1491 we can do a faster local traverse */
1492 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1495 status
= ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1496 if (!NT_STATUS_IS_OK(status
)) {
1502 static int db_ctdb_get_seqnum(struct db_context
*db
)
1504 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1505 struct db_ctdb_ctx
);
1506 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1509 static void db_ctdb_id(struct db_context
*db
, const uint8_t **id
,
1512 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(
1513 db
->private_data
, struct db_ctdb_ctx
);
1515 *id
= (uint8_t *)&ctx
->db_id
;
1516 *idlen
= sizeof(ctx
->db_id
);
1519 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1521 int hash_size
, int tdb_flags
,
1522 int open_flags
, mode_t mode
,
1523 enum dbwrap_lock_order lock_order
)
1525 struct db_context
*result
;
1526 struct db_ctdb_ctx
*db_ctdb
;
1528 struct ctdbd_connection
*conn
;
1529 struct loadparm_context
*lp_ctx
;
1530 struct ctdb_db_priority prio
;
1534 if (!lp_clustering()) {
1535 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1539 if (!(result
= talloc_zero(mem_ctx
, struct db_context
))) {
1540 DEBUG(0, ("talloc failed\n"));
1541 TALLOC_FREE(result
);
1545 if (!(db_ctdb
= talloc(result
, struct db_ctdb_ctx
))) {
1546 DEBUG(0, ("talloc failed\n"));
1547 TALLOC_FREE(result
);
1551 db_ctdb
->transaction
= NULL
;
1552 db_ctdb
->db
= result
;
1554 conn
= messaging_ctdbd_connection();
1556 DEBUG(1, ("Could not connect to ctdb\n"));
1557 TALLOC_FREE(result
);
1561 if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn
, name
, &db_ctdb
->db_id
, tdb_flags
))) {
1562 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1563 TALLOC_FREE(result
);
1567 db_path
= ctdbd_dbpath(conn
, db_ctdb
, db_ctdb
->db_id
);
1569 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1570 result
->lock_order
= lock_order
;
1572 /* only pass through specific flags */
1573 tdb_flags
&= TDB_SEQNUM
;
1575 /* honor permissions if user has specified O_CREAT */
1576 if (open_flags
& O_CREAT
) {
1577 chmod(db_path
, mode
);
1580 prio
.db_id
= db_ctdb
->db_id
;
1581 prio
.priority
= lock_order
;
1583 status
= ctdbd_control_local(
1584 conn
, CTDB_CONTROL_SET_DB_PRIORITY
, 0, 0,
1585 make_tdb_data((uint8_t *)&prio
, sizeof(prio
)),
1586 NULL
, NULL
, &cstatus
);
1588 if (!NT_STATUS_IS_OK(status
) || (cstatus
!= 0)) {
1589 DEBUG(1, ("CTDB_CONTROL_SET_DB_PRIORITY failed: %s, %d\n",
1590 nt_errstr(status
), cstatus
));
1591 TALLOC_FREE(result
);
1595 lp_ctx
= loadparm_init_s3(db_path
, loadparm_s3_helpers());
1597 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
,
1599 talloc_unlink(db_path
, lp_ctx
);
1600 if (db_ctdb
->wtdb
== NULL
) {
1601 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1602 TALLOC_FREE(result
);
1605 talloc_free(db_path
);
1607 if (result
->persistent
) {
1608 db_ctdb
->lock_ctx
= g_lock_ctx_init(db_ctdb
,
1609 ctdb_conn_msg_ctx(conn
));
1610 if (db_ctdb
->lock_ctx
== NULL
) {
1611 DEBUG(0, ("g_lock_ctx_init failed\n"));
1612 TALLOC_FREE(result
);
1617 result
->private_data
= (void *)db_ctdb
;
1618 result
->fetch_locked
= db_ctdb_fetch_locked
;
1619 result
->try_fetch_locked
= db_ctdb_try_fetch_locked
;
1620 result
->parse_record
= db_ctdb_parse_record
;
1621 result
->traverse
= db_ctdb_traverse
;
1622 result
->traverse_read
= db_ctdb_traverse_read
;
1623 result
->get_seqnum
= db_ctdb_get_seqnum
;
1624 result
->transaction_start
= db_ctdb_transaction_start
;
1625 result
->transaction_commit
= db_ctdb_transaction_commit
;
1626 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1627 result
->id
= db_ctdb_id
;
1628 result
->stored_callback
= NULL
;
1630 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1631 name
, db_ctdb
->db_id
));
1636 #else /* CLUSTER_SUPPORT */
1638 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1640 int hash_size
, int tdb_flags
,
1641 int open_flags
, mode_t mode
,
1642 enum dbwrap_lock_order lock_order
)
1644 DEBUG(3, ("db_open_ctdb: no cluster support!\n"));