2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007-2009
5 Copyright (C) Michael Adam 2009
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "system/filesys.h"
23 #ifdef CLUSTER_SUPPORT
25 #include "ctdb_private.h"
26 #include "ctdbd_conn.h"
30 struct db_ctdb_transaction_handle
{
31 struct db_ctdb_ctx
*ctx
;
33 * we store the reads and writes done under a transaction:
34 * - one list stores both reads and writes (m_all),
35 * - the other just writes (m_write)
37 struct ctdb_marshall_buffer
*m_all
;
38 struct ctdb_marshall_buffer
*m_write
;
45 struct db_context
*db
;
46 struct tdb_wrap
*wtdb
;
48 struct db_ctdb_transaction_handle
*transaction
;
49 struct g_lock_ctx
*lock_ctx
;
53 struct db_ctdb_ctx
*ctdb_ctx
;
54 struct ctdb_ltdb_header header
;
55 struct timeval lock_time
;
58 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
61 enum TDB_ERROR tret
= tdb_error(tdb
);
65 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
68 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
71 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
80 * fetch a record from the tdb, separating out the header
81 * information and returning the body of the record.
83 static NTSTATUS
db_ctdb_ltdb_fetch(struct db_ctdb_ctx
*db
,
85 struct ctdb_ltdb_header
*header
,
92 rec
= tdb_fetch(db
->wtdb
->tdb
, key
);
93 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
94 status
= NT_STATUS_NOT_FOUND
;
99 header
->dmaster
= (uint32_t)-1;
106 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
110 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
111 if (data
->dsize
== 0) {
114 data
->dptr
= (unsigned char *)talloc_memdup(mem_ctx
,
116 + sizeof(struct ctdb_ltdb_header
),
118 if (data
->dptr
== NULL
) {
119 status
= NT_STATUS_NO_MEMORY
;
125 status
= NT_STATUS_OK
;
133 * Store a record together with the ctdb record header
134 * in the local copy of the database.
136 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
138 struct ctdb_ltdb_header
*header
,
141 TALLOC_CTX
*tmp_ctx
= talloc_stackframe();
145 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
146 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
148 if (rec
.dptr
== NULL
) {
149 talloc_free(tmp_ctx
);
150 return NT_STATUS_NO_MEMORY
;
153 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
154 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
156 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
158 talloc_free(tmp_ctx
);
160 return (ret
== 0) ? NT_STATUS_OK
161 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
166 form a ctdb_rec_data record from a key/data pair
168 note that header may be NULL. If not NULL then it is included in the data portion
171 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
173 struct ctdb_ltdb_header
*header
,
177 struct ctdb_rec_data
*d
;
179 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
180 data
.dsize
+ (header
?sizeof(*header
):0);
181 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
187 d
->keylen
= key
.dsize
;
188 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
190 d
->datalen
= data
.dsize
+ sizeof(*header
);
191 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
192 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
194 d
->datalen
= data
.dsize
;
195 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
201 /* helper function for marshalling multiple records */
202 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
203 struct ctdb_marshall_buffer
*m
,
207 struct ctdb_ltdb_header
*header
,
210 struct ctdb_rec_data
*r
;
211 size_t m_size
, r_size
;
212 struct ctdb_marshall_buffer
*m2
= NULL
;
214 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
221 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
222 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
229 m_size
= talloc_get_size(m
);
230 r_size
= talloc_get_size(r
);
232 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
233 mem_ctx
, m
, m_size
+ r_size
);
239 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
248 /* we've finished marshalling, return a data blob with the marshalled records */
249 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
252 data
.dptr
= (uint8_t *)m
;
253 data
.dsize
= talloc_get_size(m
);
258 loop over a marshalling buffer
260 - pass r==NULL to start
261 - loop the number of times indicated by m->count
263 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
265 struct ctdb_ltdb_header
*header
,
266 TDB_DATA
*key
, TDB_DATA
*data
)
269 r
= (struct ctdb_rec_data
*)&m
->data
[0];
271 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
279 key
->dptr
= &r
->data
[0];
280 key
->dsize
= r
->keylen
;
283 data
->dptr
= &r
->data
[r
->keylen
];
284 data
->dsize
= r
->datalen
;
285 if (header
!= NULL
) {
286 data
->dptr
+= sizeof(*header
);
287 data
->dsize
-= sizeof(*header
);
291 if (header
!= NULL
) {
292 if (r
->datalen
< sizeof(*header
)) {
295 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
302 * CTDB transaction destructor
304 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
308 status
= g_lock_unlock(h
->ctx
->lock_ctx
, h
->lock_name
);
309 if (!NT_STATUS_IS_OK(status
)) {
310 DEBUG(0, ("g_lock_unlock failed: %s\n", nt_errstr(status
)));
317 * CTDB dbwrap API: transaction_start function
318 * starts a transaction on a persistent database
320 static int db_ctdb_transaction_start(struct db_context
*db
)
322 struct db_ctdb_transaction_handle
*h
;
324 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
327 if (!db
->persistent
) {
328 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
333 if (ctx
->transaction
) {
334 ctx
->transaction
->nesting
++;
338 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
340 DEBUG(0,(__location__
" oom for transaction handle\n"));
346 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x",
347 (unsigned int)ctx
->db_id
);
348 if (h
->lock_name
== NULL
) {
349 DEBUG(0, ("talloc_asprintf failed\n"));
355 * Wait a day, i.e. forever...
357 status
= g_lock_lock(ctx
->lock_ctx
, h
->lock_name
, G_LOCK_WRITE
,
358 timeval_set(86400, 0));
359 if (!NT_STATUS_IS_OK(status
)) {
360 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status
)));
365 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
367 ctx
->transaction
= h
;
369 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
374 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer
*buf
,
376 struct ctdb_ltdb_header
*pheader
,
380 struct ctdb_rec_data
*rec
= NULL
;
381 struct ctdb_ltdb_header h
;
394 * Walk the list of records written during this
395 * transaction. If we want to read one we have already
396 * written, return the last written sample. Thus we do not do
397 * a "break;" for the first hit, this record might have been
401 for (i
=0; i
<buf
->count
; i
++) {
402 TDB_DATA tkey
, tdata
;
404 struct ctdb_ltdb_header hdr
;
408 rec
= db_ctdb_marshall_loop_next(buf
, rec
, &reqid
, &hdr
, &tkey
,
414 if (tdb_data_equal(key
, tkey
)) {
426 data
.dptr
= (uint8_t *)talloc_memdup(mem_ctx
, data
.dptr
,
428 if ((data
.dsize
!= 0) && (data
.dptr
== NULL
)) {
434 if (pheader
!= NULL
) {
442 fetch a record inside a transaction
444 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
446 TDB_DATA key
, TDB_DATA
*data
)
448 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
452 found
= pull_newest_from_marshall_buffer(h
->m_write
, key
, NULL
,
458 status
= db_ctdb_ltdb_fetch(h
->ctx
, key
, NULL
, mem_ctx
, data
);
460 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
462 } else if (!NT_STATUS_IS_OK(status
)) {
466 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
,
468 if (h
->m_all
== NULL
) {
469 DEBUG(0,(__location__
" Failed to add to marshalling "
472 talloc_free(data
->dptr
);
480 * Fetch a record from a persistent database
481 * without record locking and without an active transaction.
483 * This just fetches from the local database copy.
484 * Since the databases are kept in syc cluster-wide,
485 * there is no point in doing a ctdb call to fetch the
486 * record from the lmaster. It does even harm since migration
487 * of records bump their RSN and hence render the persistent
488 * database inconsistent.
490 static int db_ctdb_fetch_persistent(struct db_ctdb_ctx
*db
,
492 TDB_DATA key
, TDB_DATA
*data
)
496 status
= db_ctdb_ltdb_fetch(db
, key
, NULL
, mem_ctx
, data
);
498 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
500 } else if (!NT_STATUS_IS_OK(status
)) {
507 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
508 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
510 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
514 struct db_record
*result
;
517 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
518 DEBUG(0, ("talloc failed\n"));
522 result
->private_data
= ctx
->transaction
;
524 result
->key
.dsize
= key
.dsize
;
525 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
526 if (result
->key
.dptr
== NULL
) {
527 DEBUG(0, ("talloc failed\n"));
532 result
->store
= db_ctdb_store_transaction
;
533 result
->delete_rec
= db_ctdb_delete_transaction
;
535 if (pull_newest_from_marshall_buffer(ctx
->transaction
->m_write
, key
,
536 NULL
, result
, &result
->value
)) {
540 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
541 if (ctdb_data
.dptr
== NULL
) {
542 /* create the record */
543 result
->value
= tdb_null
;
547 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
548 result
->value
.dptr
= NULL
;
550 if ((result
->value
.dsize
!= 0)
551 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
552 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
553 result
->value
.dsize
))) {
554 DEBUG(0, ("talloc failed\n"));
558 SAFE_FREE(ctdb_data
.dptr
);
563 static int db_ctdb_record_destructor(struct db_record
**recp
)
565 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
566 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
567 rec
->private_data
, struct db_ctdb_transaction_handle
);
568 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
570 DEBUG(0,(__location__
" transaction_commit failed\n"));
576 auto-create a transaction for persistent databases
578 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
583 struct db_record
*rec
, **recp
;
585 res
= db_ctdb_transaction_start(ctx
->db
);
590 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
592 ctx
->db
->transaction_cancel(ctx
->db
);
596 /* destroy this transaction when we release the lock */
597 recp
= talloc(rec
, struct db_record
*);
599 ctx
->db
->transaction_cancel(ctx
->db
);
604 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
610 stores a record inside a transaction
612 static NTSTATUS
db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
613 TDB_DATA key
, TDB_DATA data
)
615 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
617 struct ctdb_ltdb_header header
;
621 /* we need the header so we can update the RSN */
623 if (!pull_newest_from_marshall_buffer(h
->m_write
, key
, &header
,
626 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
628 if (rec
.dptr
!= NULL
) {
629 memcpy(&header
, rec
.dptr
,
630 sizeof(struct ctdb_ltdb_header
));
631 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
634 * a special case, we are writing the same
635 * data that is there now
637 if (data
.dsize
== rec
.dsize
&&
639 rec
.dptr
+ sizeof(struct ctdb_ltdb_header
),
642 talloc_free(tmp_ctx
);
649 header
.dmaster
= get_my_vnn();
652 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
,
654 if (h
->m_all
== NULL
) {
655 DEBUG(0,(__location__
" Failed to add to marshalling "
657 talloc_free(tmp_ctx
);
658 return NT_STATUS_NO_MEMORY
;
661 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
662 if (h
->m_write
== NULL
) {
663 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
664 talloc_free(tmp_ctx
);
665 return NT_STATUS_NO_MEMORY
;
668 talloc_free(tmp_ctx
);
674 a record store inside a transaction
676 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
678 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
679 rec
->private_data
, struct db_ctdb_transaction_handle
);
682 status
= db_ctdb_transaction_store(h
, rec
->key
, data
);
687 a record delete inside a transaction
689 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
691 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
692 rec
->private_data
, struct db_ctdb_transaction_handle
);
695 status
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
700 * Fetch the db sequence number of a persistent db directly from the db.
702 static NTSTATUS
db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx
*db
,
706 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
709 struct ctdb_ltdb_header header
;
710 TALLOC_CTX
*mem_ctx
= talloc_stackframe();
712 if (seqnum
== NULL
) {
713 return NT_STATUS_INVALID_PARAMETER
;
716 key
= string_term_tdb_data(keyname
);
718 status
= db_ctdb_ltdb_fetch(db
, key
, &header
, mem_ctx
, &data
);
719 if (!NT_STATUS_IS_OK(status
) &&
720 !NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
))
725 status
= NT_STATUS_OK
;
727 if (data
.dsize
!= sizeof(uint64_t)) {
732 *seqnum
= *(uint64_t *)data
.dptr
;
735 TALLOC_FREE(mem_ctx
);
740 * Store the database sequence number inside a transaction.
742 static NTSTATUS
db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle
*h
,
746 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
750 key
= string_term_tdb_data(keyname
);
752 data
.dptr
= (uint8_t *)&seqnum
;
753 data
.dsize
= sizeof(uint64_t);
755 status
= db_ctdb_transaction_store(h
, key
, data
);
763 static int db_ctdb_transaction_commit(struct db_context
*db
)
765 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
769 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
770 uint64_t old_seqnum
, new_seqnum
;
774 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
778 if (h
->nested_cancel
) {
779 db
->transaction_cancel(db
);
780 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
784 if (h
->nesting
!= 0) {
789 if (h
->m_write
== NULL
) {
791 * No changes were made, so don't change the seqnum,
792 * don't push to other node, just exit with success.
798 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
801 * As the last db action before committing, bump the database sequence
802 * number. Note that this undoes all changes to the seqnum records
803 * performed under the transaction. This record is not meant to be
804 * modified by user interaction. It is for internal use only...
806 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &old_seqnum
);
807 if (!NT_STATUS_IS_OK(rets
)) {
808 DEBUG(1, (__location__
" failed to fetch the db sequence number "
809 "in transaction commit on db 0x%08x\n", ctx
->db_id
));
814 new_seqnum
= old_seqnum
+ 1;
816 rets
= db_ctdb_store_db_seqnum(h
, new_seqnum
);
817 if (!NT_STATUS_IS_OK(rets
)) {
818 DEBUG(1, (__location__
"failed to store the db sequence number "
819 " in transaction commit on db 0x%08x\n", ctx
->db_id
));
825 /* tell ctdbd to commit to the other nodes */
826 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
827 CTDB_CONTROL_TRANS3_COMMIT
,
829 db_ctdb_marshall_finish(h
->m_write
),
830 NULL
, NULL
, &status
);
831 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
833 * The TRANS3_COMMIT control should only possibly fail when a
834 * recovery has been running concurrently. In any case, the db
835 * will be the same on all nodes, either the new copy or the
836 * old copy. This can be detected by comparing the old and new
837 * local sequence numbers.
839 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &new_seqnum
);
840 if (!NT_STATUS_IS_OK(rets
)) {
841 DEBUG(1, (__location__
" failed to refetch db sequence "
842 "number after failed TRANS3_COMMIT\n"));
847 if (new_seqnum
== old_seqnum
) {
848 /* Recovery prevented all our changes: retry. */
850 } else if (new_seqnum
!= (old_seqnum
+ 1)) {
851 DEBUG(0, (__location__
" ERROR: new_seqnum[%lu] != "
852 "old_seqnum[%lu] + (0 or 1) after failed "
853 "TRANS3_COMMIT - this should not happen!\n",
854 (unsigned long)new_seqnum
,
855 (unsigned long)old_seqnum
));
860 * Recovery propagated our changes to all nodes, completing
861 * our commit for us - succeed.
868 h
->ctx
->transaction
= NULL
;
877 static int db_ctdb_transaction_cancel(struct db_context
*db
)
879 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
881 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
884 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
888 if (h
->nesting
!= 0) {
890 h
->nested_cancel
= true;
894 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
896 ctx
->transaction
= NULL
;
902 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
904 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
905 rec
->private_data
, struct db_ctdb_rec
);
907 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
912 #ifdef CTDB_CONTROL_SCHEDULE_FOR_DELETION
913 static NTSTATUS
db_ctdb_send_schedule_for_deletion(struct db_record
*rec
)
916 struct ctdb_control_schedule_for_deletion
*dd
;
919 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
920 rec
->private_data
, struct db_ctdb_rec
);
922 indata
.dsize
= offsetof(struct ctdb_control_schedule_for_deletion
, key
) + rec
->key
.dsize
;
923 indata
.dptr
= talloc_zero_array(crec
, uint8_t, indata
.dsize
);
924 if (indata
.dptr
== NULL
) {
925 DEBUG(0, (__location__
" talloc failed!\n"));
926 return NT_STATUS_NO_MEMORY
;
929 dd
= (struct ctdb_control_schedule_for_deletion
*)(void *)indata
.dptr
;
930 dd
->db_id
= crec
->ctdb_ctx
->db_id
;
931 dd
->hdr
= crec
->header
;
932 dd
->keylen
= rec
->key
.dsize
;
933 memcpy(dd
->key
, rec
->key
.dptr
, rec
->key
.dsize
);
935 status
= ctdbd_control_local(messaging_ctdbd_connection(),
936 CTDB_CONTROL_SCHEDULE_FOR_DELETION
,
937 crec
->ctdb_ctx
->db_id
,
938 CTDB_CTRL_FLAG_NOREPLY
, /* flags */
943 talloc_free(indata
.dptr
);
945 if (!NT_STATUS_IS_OK(status
) || cstatus
!= 0) {
946 DEBUG(1, (__location__
" Error sending local control "
947 "SCHEDULE_FOR_DELETION: %s, cstatus = %d\n",
948 nt_errstr(status
), cstatus
));
949 if (NT_STATUS_IS_OK(status
)) {
950 status
= NT_STATUS_UNSUCCESSFUL
;
958 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
964 * We have to store the header with empty data. TODO: Fix the
970 status
= db_ctdb_store(rec
, data
, 0);
971 if (!NT_STATUS_IS_OK(status
)) {
975 #ifdef CTDB_CONTROL_SCHEDULE_FOR_DELETION
976 status
= db_ctdb_send_schedule_for_deletion(rec
);
982 static int db_ctdb_record_destr(struct db_record
* data
)
984 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
985 data
->private_data
, struct db_ctdb_rec
);
988 DEBUG(10, (DEBUGLEVEL
> 10
989 ? "Unlocking db %u key %s\n"
990 : "Unlocking db %u key %.20s\n",
991 (int)crec
->ctdb_ctx
->db_id
,
992 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
995 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
996 DEBUG(0, ("tdb_chainunlock failed\n"));
1000 threshold
= lp_ctdb_locktime_warn_threshold();
1001 if (threshold
!= 0) {
1002 double timediff
= timeval_elapsed(&crec
->lock_time
);
1003 if ((timediff
* 1000) > threshold
) {
1004 DEBUG(0, ("Held tdb lock %f seconds\n", timediff
));
1011 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
1012 TALLOC_CTX
*mem_ctx
,
1015 struct db_record
*result
;
1016 struct db_ctdb_rec
*crec
;
1019 int migrate_attempts
= 0;
1021 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
1022 DEBUG(0, ("talloc failed\n"));
1026 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
1027 DEBUG(0, ("talloc failed\n"));
1028 TALLOC_FREE(result
);
1032 result
->private_data
= (void *)crec
;
1033 crec
->ctdb_ctx
= ctx
;
1035 result
->key
.dsize
= key
.dsize
;
1036 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
1037 if (result
->key
.dptr
== NULL
) {
1038 DEBUG(0, ("talloc failed\n"));
1039 TALLOC_FREE(result
);
1044 * Do a blocking lock on the record
1048 if (DEBUGLEVEL
>= 10) {
1049 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
1050 DEBUG(10, (DEBUGLEVEL
> 10
1051 ? "Locking db %u key %s\n"
1052 : "Locking db %u key %.20s\n",
1053 (int)crec
->ctdb_ctx
->db_id
, keystr
));
1054 TALLOC_FREE(keystr
);
1057 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
1058 DEBUG(3, ("tdb_chainlock failed\n"));
1059 TALLOC_FREE(result
);
1063 result
->store
= db_ctdb_store
;
1064 result
->delete_rec
= db_ctdb_delete
;
1065 talloc_set_destructor(result
, db_ctdb_record_destr
);
1067 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1070 * See if we have a valid record and we are the dmaster. If so, we can
1071 * take the shortcut and just return it.
1074 if ((ctdb_data
.dptr
== NULL
) ||
1075 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
1076 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
1078 || (random() % 2 != 0)
1081 SAFE_FREE(ctdb_data
.dptr
);
1082 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
1083 talloc_set_destructor(result
, NULL
);
1085 migrate_attempts
+= 1;
1087 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1088 ctdb_data
.dptr
, ctdb_data
.dptr
?
1089 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
1092 status
= ctdbd_migrate(messaging_ctdbd_connection(), ctx
->db_id
,
1094 if (!NT_STATUS_IS_OK(status
)) {
1095 DEBUG(5, ("ctdb_migrate failed: %s\n",
1096 nt_errstr(status
)));
1097 TALLOC_FREE(result
);
1100 /* now its migrated, try again */
1104 if (migrate_attempts
> 10) {
1105 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1109 GetTimeOfDay(&crec
->lock_time
);
1111 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1113 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1114 result
->value
.dptr
= NULL
;
1116 if ((result
->value
.dsize
!= 0)
1117 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
1118 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1119 result
->value
.dsize
))) {
1120 DEBUG(0, ("talloc failed\n"));
1121 TALLOC_FREE(result
);
1124 SAFE_FREE(ctdb_data
.dptr
);
1129 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1130 TALLOC_CTX
*mem_ctx
,
1133 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1134 struct db_ctdb_ctx
);
1136 if (ctx
->transaction
!= NULL
) {
1137 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1140 if (db
->persistent
) {
1141 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1144 return fetch_locked_internal(ctx
, mem_ctx
, key
);
1148 fetch (unlocked, no migration) operation on ctdb
1150 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
1151 TDB_DATA key
, TDB_DATA
*data
)
1153 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1154 struct db_ctdb_ctx
);
1158 if (ctx
->transaction
) {
1159 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
1162 if (db
->persistent
) {
1163 return db_ctdb_fetch_persistent(ctx
, mem_ctx
, key
, data
);
1166 /* try a direct fetch */
1167 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1170 * See if we have a valid record and we are the dmaster. If so, we can
1171 * take the shortcut and just return it.
1172 * we bypass the dmaster check for persistent databases
1174 if ((ctdb_data
.dptr
!= NULL
) &&
1175 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
1176 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())
1178 /* we are the dmaster - avoid the ctdb protocol op */
1180 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
1181 if (data
->dsize
== 0) {
1182 SAFE_FREE(ctdb_data
.dptr
);
1187 data
->dptr
= (uint8
*)talloc_memdup(
1188 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1191 SAFE_FREE(ctdb_data
.dptr
);
1193 if (data
->dptr
== NULL
) {
1199 SAFE_FREE(ctdb_data
.dptr
);
1201 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1202 status
= ctdbd_fetch(messaging_ctdbd_connection(), ctx
->db_id
, key
,
1204 if (!NT_STATUS_IS_OK(status
)) {
1205 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1212 struct traverse_state
{
1213 struct db_context
*db
;
1214 int (*fn
)(struct db_record
*rec
, void *private_data
);
1218 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1220 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1221 struct db_record
*rec
;
1222 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1223 /* we have to give them a locked record to prevent races */
1224 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1225 if (rec
&& rec
->value
.dsize
> 0) {
1226 state
->fn(rec
, state
->private_data
);
1228 talloc_free(tmp_ctx
);
1231 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1234 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1235 struct db_record
*rec
;
1236 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1238 /* we have to give them a locked record to prevent races */
1239 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1240 if (rec
&& rec
->value
.dsize
> 0) {
1241 ret
= state
->fn(rec
, state
->private_data
);
1243 talloc_free(tmp_ctx
);
1247 static int db_ctdb_traverse(struct db_context
*db
,
1248 int (*fn
)(struct db_record
*rec
,
1249 void *private_data
),
1252 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1253 struct db_ctdb_ctx
);
1254 struct traverse_state state
;
1258 state
.private_data
= private_data
;
1260 if (db
->persistent
) {
1261 /* for persistent databases we don't need to do a ctdb traverse,
1262 we can do a faster local traverse */
1263 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1267 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1271 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1273 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1276 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1278 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1281 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1283 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1284 struct db_record rec
;
1287 rec
.store
= db_ctdb_store_deny
;
1288 rec
.delete_rec
= db_ctdb_delete_deny
;
1289 rec
.private_data
= state
->db
;
1290 state
->fn(&rec
, state
->private_data
);
1293 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1296 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1297 struct db_record rec
;
1300 rec
.store
= db_ctdb_store_deny
;
1301 rec
.delete_rec
= db_ctdb_delete_deny
;
1302 rec
.private_data
= state
->db
;
1304 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1305 /* a deleted record */
1308 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1309 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1311 return state
->fn(&rec
, state
->private_data
);
1314 static int db_ctdb_traverse_read(struct db_context
*db
,
1315 int (*fn
)(struct db_record
*rec
,
1316 void *private_data
),
1319 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1320 struct db_ctdb_ctx
);
1321 struct traverse_state state
;
1325 state
.private_data
= private_data
;
1327 if (db
->persistent
) {
1328 /* for persistent databases we don't need to do a ctdb traverse,
1329 we can do a faster local traverse */
1330 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1333 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1337 static int db_ctdb_get_seqnum(struct db_context
*db
)
1339 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1340 struct db_ctdb_ctx
);
1341 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1344 static int db_ctdb_get_flags(struct db_context
*db
)
1346 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1347 struct db_ctdb_ctx
);
1348 return tdb_get_flags(ctx
->wtdb
->tdb
);
1351 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1353 int hash_size
, int tdb_flags
,
1354 int open_flags
, mode_t mode
)
1356 struct db_context
*result
;
1357 struct db_ctdb_ctx
*db_ctdb
;
1359 struct ctdbd_connection
*conn
;
1361 if (!lp_clustering()) {
1362 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1366 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1367 DEBUG(0, ("talloc failed\n"));
1368 TALLOC_FREE(result
);
1372 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1373 DEBUG(0, ("talloc failed\n"));
1374 TALLOC_FREE(result
);
1378 db_ctdb
->transaction
= NULL
;
1379 db_ctdb
->db
= result
;
1381 conn
= messaging_ctdbd_connection();
1383 DEBUG(1, ("Could not connect to ctdb\n"));
1384 TALLOC_FREE(result
);
1388 if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn
, name
, &db_ctdb
->db_id
, tdb_flags
))) {
1389 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1390 TALLOC_FREE(result
);
1394 db_path
= ctdbd_dbpath(conn
, db_ctdb
, db_ctdb
->db_id
);
1396 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1398 /* only pass through specific flags */
1399 tdb_flags
&= TDB_SEQNUM
;
1401 /* honor permissions if user has specified O_CREAT */
1402 if (open_flags
& O_CREAT
) {
1403 chmod(db_path
, mode
);
1406 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1407 if (db_ctdb
->wtdb
== NULL
) {
1408 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1409 TALLOC_FREE(result
);
1412 talloc_free(db_path
);
1414 if (result
->persistent
) {
1415 db_ctdb
->lock_ctx
= g_lock_ctx_init(db_ctdb
,
1416 ctdb_conn_msg_ctx(conn
));
1417 if (db_ctdb
->lock_ctx
== NULL
) {
1418 DEBUG(0, ("g_lock_ctx_init failed\n"));
1419 TALLOC_FREE(result
);
1424 result
->private_data
= (void *)db_ctdb
;
1425 result
->fetch_locked
= db_ctdb_fetch_locked
;
1426 result
->fetch
= db_ctdb_fetch
;
1427 result
->traverse
= db_ctdb_traverse
;
1428 result
->traverse_read
= db_ctdb_traverse_read
;
1429 result
->get_seqnum
= db_ctdb_get_seqnum
;
1430 result
->get_flags
= db_ctdb_get_flags
;
1431 result
->transaction_start
= db_ctdb_transaction_start
;
1432 result
->transaction_commit
= db_ctdb_transaction_commit
;
1433 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1435 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1436 name
, db_ctdb
->db_id
));