2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007-2009
5 Copyright (C) Michael Adam 2009
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #ifdef CLUSTER_SUPPORT
24 #include "ctdb_private.h"
25 #include "ctdbd_conn.h"
28 struct db_ctdb_transaction_handle
{
29 struct db_ctdb_ctx
*ctx
;
31 * we store the reads and writes done under a transaction:
32 * - one list stores both reads and writes (m_all),
33 * - the other just writes (m_write)
35 struct ctdb_marshall_buffer
*m_all
;
36 struct ctdb_marshall_buffer
*m_write
;
43 struct db_context
*db
;
44 struct tdb_wrap
*wtdb
;
46 struct db_ctdb_transaction_handle
*transaction
;
47 struct g_lock_ctx
*lock_ctx
;
51 struct db_ctdb_ctx
*ctdb_ctx
;
52 struct ctdb_ltdb_header header
;
55 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
58 enum TDB_ERROR tret
= tdb_error(tdb
);
62 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
65 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
68 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
77 * fetch a record from the tdb, separating out the header
78 * information and returning the body of the record.
80 static NTSTATUS
db_ctdb_ltdb_fetch(struct db_ctdb_ctx
*db
,
82 struct ctdb_ltdb_header
*header
,
89 rec
= tdb_fetch(db
->wtdb
->tdb
, key
);
90 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
91 status
= NT_STATUS_NOT_FOUND
;
96 header
->dmaster
= (uint32_t)-1;
103 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
107 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
108 if (data
->dsize
== 0) {
111 data
->dptr
= (unsigned char *)talloc_memdup(mem_ctx
,
113 + sizeof(struct ctdb_ltdb_header
),
115 if (data
->dptr
== NULL
) {
116 status
= NT_STATUS_NO_MEMORY
;
122 status
= NT_STATUS_OK
;
130 * Store a record together with the ctdb record header
131 * in the local copy of the database.
133 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
135 struct ctdb_ltdb_header
*header
,
138 TALLOC_CTX
*tmp_ctx
= talloc_stackframe();
142 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
143 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
145 if (rec
.dptr
== NULL
) {
146 talloc_free(tmp_ctx
);
147 return NT_STATUS_NO_MEMORY
;
150 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
151 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
153 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
155 talloc_free(tmp_ctx
);
157 return (ret
== 0) ? NT_STATUS_OK
158 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
163 form a ctdb_rec_data record from a key/data pair
165 note that header may be NULL. If not NULL then it is included in the data portion
168 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
170 struct ctdb_ltdb_header
*header
,
174 struct ctdb_rec_data
*d
;
176 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
177 data
.dsize
+ (header
?sizeof(*header
):0);
178 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
184 d
->keylen
= key
.dsize
;
185 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
187 d
->datalen
= data
.dsize
+ sizeof(*header
);
188 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
189 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
191 d
->datalen
= data
.dsize
;
192 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
198 /* helper function for marshalling multiple records */
199 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
200 struct ctdb_marshall_buffer
*m
,
204 struct ctdb_ltdb_header
*header
,
207 struct ctdb_rec_data
*r
;
208 size_t m_size
, r_size
;
209 struct ctdb_marshall_buffer
*m2
= NULL
;
211 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
218 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
219 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
226 m_size
= talloc_get_size(m
);
227 r_size
= talloc_get_size(r
);
229 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
230 mem_ctx
, m
, m_size
+ r_size
);
236 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
245 /* we've finished marshalling, return a data blob with the marshalled records */
246 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
249 data
.dptr
= (uint8_t *)m
;
250 data
.dsize
= talloc_get_size(m
);
255 loop over a marshalling buffer
257 - pass r==NULL to start
258 - loop the number of times indicated by m->count
260 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
262 struct ctdb_ltdb_header
*header
,
263 TDB_DATA
*key
, TDB_DATA
*data
)
266 r
= (struct ctdb_rec_data
*)&m
->data
[0];
268 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
276 key
->dptr
= &r
->data
[0];
277 key
->dsize
= r
->keylen
;
280 data
->dptr
= &r
->data
[r
->keylen
];
281 data
->dsize
= r
->datalen
;
282 if (header
!= NULL
) {
283 data
->dptr
+= sizeof(*header
);
284 data
->dsize
-= sizeof(*header
);
288 if (header
!= NULL
) {
289 if (r
->datalen
< sizeof(*header
)) {
292 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
299 * CTDB transaction destructor
301 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
305 status
= g_lock_unlock(h
->ctx
->lock_ctx
, h
->lock_name
);
306 if (!NT_STATUS_IS_OK(status
)) {
307 DEBUG(0, ("g_lock_unlock failed: %s\n", nt_errstr(status
)));
314 * CTDB dbwrap API: transaction_start function
315 * starts a transaction on a persistent database
317 static int db_ctdb_transaction_start(struct db_context
*db
)
319 struct db_ctdb_transaction_handle
*h
;
321 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
324 if (!db
->persistent
) {
325 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
330 if (ctx
->transaction
) {
331 ctx
->transaction
->nesting
++;
335 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
337 DEBUG(0,(__location__
" oom for transaction handle\n"));
343 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x",
344 (unsigned int)ctx
->db_id
);
345 if (h
->lock_name
== NULL
) {
346 DEBUG(0, ("talloc_asprintf failed\n"));
352 * Wait a day, i.e. forever...
354 status
= g_lock_lock(ctx
->lock_ctx
, h
->lock_name
, G_LOCK_WRITE
,
355 timeval_set(86400, 0));
356 if (!NT_STATUS_IS_OK(status
)) {
357 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status
)));
362 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
364 ctx
->transaction
= h
;
366 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
371 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer
*buf
,
373 struct ctdb_ltdb_header
*pheader
,
377 struct ctdb_rec_data
*rec
= NULL
;
378 struct ctdb_ltdb_header h
;
391 * Walk the list of records written during this
392 * transaction. If we want to read one we have already
393 * written, return the last written sample. Thus we do not do
394 * a "break;" for the first hit, this record might have been
398 for (i
=0; i
<buf
->count
; i
++) {
399 TDB_DATA tkey
, tdata
;
401 struct ctdb_ltdb_header hdr
;
405 rec
= db_ctdb_marshall_loop_next(buf
, rec
, &reqid
, &hdr
, &tkey
,
411 if (tdb_data_equal(key
, tkey
)) {
423 data
.dptr
= (uint8_t *)talloc_memdup(mem_ctx
, data
.dptr
,
425 if ((data
.dsize
!= 0) && (data
.dptr
== NULL
)) {
431 if (pheader
!= NULL
) {
439 fetch a record inside a transaction
441 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
443 TDB_DATA key
, TDB_DATA
*data
)
445 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
449 found
= pull_newest_from_marshall_buffer(h
->m_write
, key
, NULL
,
455 status
= db_ctdb_ltdb_fetch(h
->ctx
, key
, NULL
, mem_ctx
, data
);
457 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
459 } else if (!NT_STATUS_IS_OK(status
)) {
463 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
,
465 if (h
->m_all
== NULL
) {
466 DEBUG(0,(__location__
" Failed to add to marshalling "
469 talloc_free(data
->dptr
);
477 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
478 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
480 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
484 struct db_record
*result
;
487 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
488 DEBUG(0, ("talloc failed\n"));
492 result
->private_data
= ctx
->transaction
;
494 result
->key
.dsize
= key
.dsize
;
495 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
496 if (result
->key
.dptr
== NULL
) {
497 DEBUG(0, ("talloc failed\n"));
502 result
->store
= db_ctdb_store_transaction
;
503 result
->delete_rec
= db_ctdb_delete_transaction
;
505 if (pull_newest_from_marshall_buffer(ctx
->transaction
->m_write
, key
,
506 NULL
, result
, &result
->value
)) {
510 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
511 if (ctdb_data
.dptr
== NULL
) {
512 /* create the record */
513 result
->value
= tdb_null
;
517 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
518 result
->value
.dptr
= NULL
;
520 if ((result
->value
.dsize
!= 0)
521 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
522 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
523 result
->value
.dsize
))) {
524 DEBUG(0, ("talloc failed\n"));
528 SAFE_FREE(ctdb_data
.dptr
);
533 static int db_ctdb_record_destructor(struct db_record
**recp
)
535 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
536 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
537 rec
->private_data
, struct db_ctdb_transaction_handle
);
538 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
540 DEBUG(0,(__location__
" transaction_commit failed\n"));
546 auto-create a transaction for persistent databases
548 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
553 struct db_record
*rec
, **recp
;
555 res
= db_ctdb_transaction_start(ctx
->db
);
560 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
562 ctx
->db
->transaction_cancel(ctx
->db
);
566 /* destroy this transaction when we release the lock */
567 recp
= talloc(rec
, struct db_record
*);
569 ctx
->db
->transaction_cancel(ctx
->db
);
574 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
580 stores a record inside a transaction
582 static NTSTATUS
db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
583 TDB_DATA key
, TDB_DATA data
)
585 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
587 struct ctdb_ltdb_header header
;
591 /* we need the header so we can update the RSN */
593 if (!pull_newest_from_marshall_buffer(h
->m_write
, key
, &header
,
596 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
598 if (rec
.dptr
!= NULL
) {
599 memcpy(&header
, rec
.dptr
,
600 sizeof(struct ctdb_ltdb_header
));
601 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
604 * a special case, we are writing the same
605 * data that is there now
607 if (data
.dsize
== rec
.dsize
&&
609 rec
.dptr
+ sizeof(struct ctdb_ltdb_header
),
612 talloc_free(tmp_ctx
);
619 header
.dmaster
= get_my_vnn();
622 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
,
624 if (h
->m_all
== NULL
) {
625 DEBUG(0,(__location__
" Failed to add to marshalling "
627 talloc_free(tmp_ctx
);
628 return NT_STATUS_NO_MEMORY
;
631 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
632 if (h
->m_write
== NULL
) {
633 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
634 talloc_free(tmp_ctx
);
635 return NT_STATUS_NO_MEMORY
;
638 talloc_free(tmp_ctx
);
644 a record store inside a transaction
646 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
648 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
649 rec
->private_data
, struct db_ctdb_transaction_handle
);
652 status
= db_ctdb_transaction_store(h
, rec
->key
, data
);
657 a record delete inside a transaction
659 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
661 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
662 rec
->private_data
, struct db_ctdb_transaction_handle
);
665 status
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
670 * Fetch the db sequence number of a persistent db directly from the db.
672 static NTSTATUS
db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx
*db
,
676 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
679 struct ctdb_ltdb_header header
;
680 TALLOC_CTX
*mem_ctx
= talloc_stackframe();
682 if (seqnum
== NULL
) {
683 return NT_STATUS_INVALID_PARAMETER
;
686 key
= string_term_tdb_data(keyname
);
688 status
= db_ctdb_ltdb_fetch(db
, key
, &header
, mem_ctx
, &data
);
689 if (!NT_STATUS_IS_OK(status
) &&
690 !NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
))
695 status
= NT_STATUS_OK
;
697 if (data
.dsize
!= sizeof(uint64_t)) {
702 *seqnum
= *(uint64_t *)data
.dptr
;
705 TALLOC_FREE(mem_ctx
);
710 * Store the database sequence number inside a transaction.
712 static NTSTATUS
db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle
*h
,
716 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
720 key
= string_term_tdb_data(keyname
);
722 data
.dptr
= (uint8_t *)&seqnum
;
723 data
.dsize
= sizeof(uint64_t);
725 status
= db_ctdb_transaction_store(h
, key
, data
);
733 static int db_ctdb_transaction_commit(struct db_context
*db
)
735 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
739 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
740 uint64_t old_seqnum
, new_seqnum
;
744 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
748 if (h
->nested_cancel
) {
749 db
->transaction_cancel(db
);
750 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
754 if (h
->nesting
!= 0) {
759 if (h
->m_write
== NULL
) {
761 * No changes were made, so don't change the seqnum,
762 * don't push to other node, just exit with success.
768 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
771 * As the last db action before committing, bump the database sequence
772 * number. Note that this undoes all changes to the seqnum records
773 * performed under the transaction. This record is not meant to be
774 * modified by user interaction. It is for internal use only...
776 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &old_seqnum
);
777 if (!NT_STATUS_IS_OK(rets
)) {
778 DEBUG(1, (__location__
" failed to fetch the db sequence number "
779 "in transaction commit on db 0x%08x\n", ctx
->db_id
));
784 new_seqnum
= old_seqnum
+ 1;
786 rets
= db_ctdb_store_db_seqnum(h
, new_seqnum
);
787 if (!NT_STATUS_IS_OK(rets
)) {
788 DEBUG(1, (__location__
"failed to store the db sequence number "
789 " in transaction commit on db 0x%08x\n", ctx
->db_id
));
795 /* tell ctdbd to commit to the other nodes */
796 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
797 CTDB_CONTROL_TRANS3_COMMIT
,
799 db_ctdb_marshall_finish(h
->m_write
),
800 NULL
, NULL
, &status
);
801 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
803 * The TRANS3_COMMIT control should only possibly fail when a
804 * recovery has been running concurrently. In any case, the db
805 * will be the same on all nodes, either the new copy or the
806 * old copy. This can be detected by comparing the old and new
807 * local sequence numbers.
809 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &new_seqnum
);
810 if (!NT_STATUS_IS_OK(rets
)) {
811 DEBUG(1, (__location__
" failed to refetch db sequence "
812 "number after failed TRANS3_COMMIT\n"));
817 if (new_seqnum
== old_seqnum
) {
818 /* Recovery prevented all our changes: retry. */
820 } else if (new_seqnum
!= (old_seqnum
+ 1)) {
821 DEBUG(0, (__location__
" ERROR: new_seqnum[%lu] != "
822 "old_seqnum[%lu] + (0 or 1) after failed "
823 "TRANS3_COMMIT - this should not happen!\n",
824 (unsigned long)new_seqnum
,
825 (unsigned long)old_seqnum
));
830 * Recovery propagated our changes to all nodes, completing
831 * our commit for us - succeed.
838 h
->ctx
->transaction
= NULL
;
847 static int db_ctdb_transaction_cancel(struct db_context
*db
)
849 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
851 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
854 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
858 if (h
->nesting
!= 0) {
860 h
->nested_cancel
= true;
864 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
866 ctx
->transaction
= NULL
;
872 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
874 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
875 rec
->private_data
, struct db_ctdb_rec
);
877 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
882 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
887 * We have to store the header with empty data. TODO: Fix the
893 return db_ctdb_store(rec
, data
, 0);
897 static int db_ctdb_record_destr(struct db_record
* data
)
899 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
900 data
->private_data
, struct db_ctdb_rec
);
902 DEBUG(10, (DEBUGLEVEL
> 10
903 ? "Unlocking db %u key %s\n"
904 : "Unlocking db %u key %.20s\n",
905 (int)crec
->ctdb_ctx
->db_id
,
906 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
909 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
910 DEBUG(0, ("tdb_chainunlock failed\n"));
917 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
921 struct db_record
*result
;
922 struct db_ctdb_rec
*crec
;
925 int migrate_attempts
= 0;
927 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
928 DEBUG(0, ("talloc failed\n"));
932 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
933 DEBUG(0, ("talloc failed\n"));
938 result
->private_data
= (void *)crec
;
939 crec
->ctdb_ctx
= ctx
;
941 result
->key
.dsize
= key
.dsize
;
942 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
943 if (result
->key
.dptr
== NULL
) {
944 DEBUG(0, ("talloc failed\n"));
950 * Do a blocking lock on the record
954 if (DEBUGLEVEL
>= 10) {
955 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
956 DEBUG(10, (DEBUGLEVEL
> 10
957 ? "Locking db %u key %s\n"
958 : "Locking db %u key %.20s\n",
959 (int)crec
->ctdb_ctx
->db_id
, keystr
));
963 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
964 DEBUG(3, ("tdb_chainlock failed\n"));
969 result
->store
= db_ctdb_store
;
970 result
->delete_rec
= db_ctdb_delete
;
971 talloc_set_destructor(result
, db_ctdb_record_destr
);
973 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
976 * See if we have a valid record and we are the dmaster. If so, we can
977 * take the shortcut and just return it.
980 if ((ctdb_data
.dptr
== NULL
) ||
981 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
982 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
984 || (random() % 2 != 0)
987 SAFE_FREE(ctdb_data
.dptr
);
988 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
989 talloc_set_destructor(result
, NULL
);
991 migrate_attempts
+= 1;
993 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
994 ctdb_data
.dptr
, ctdb_data
.dptr
?
995 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
998 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
999 if (!NT_STATUS_IS_OK(status
)) {
1000 DEBUG(5, ("ctdb_migrate failed: %s\n",
1001 nt_errstr(status
)));
1002 TALLOC_FREE(result
);
1005 /* now its migrated, try again */
1009 if (migrate_attempts
> 10) {
1010 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1014 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1016 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1017 result
->value
.dptr
= NULL
;
1019 if ((result
->value
.dsize
!= 0)
1020 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
1021 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1022 result
->value
.dsize
))) {
1023 DEBUG(0, ("talloc failed\n"));
1024 TALLOC_FREE(result
);
1027 SAFE_FREE(ctdb_data
.dptr
);
1032 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1033 TALLOC_CTX
*mem_ctx
,
1036 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1037 struct db_ctdb_ctx
);
1039 if (ctx
->transaction
!= NULL
) {
1040 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1043 if (db
->persistent
) {
1044 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1047 return fetch_locked_internal(ctx
, mem_ctx
, key
);
1051 fetch (unlocked, no migration) operation on ctdb
1053 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
1054 TDB_DATA key
, TDB_DATA
*data
)
1056 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1057 struct db_ctdb_ctx
);
1061 if (ctx
->transaction
) {
1062 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
1065 /* try a direct fetch */
1066 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1069 * See if we have a valid record and we are the dmaster. If so, we can
1070 * take the shortcut and just return it.
1071 * we bypass the dmaster check for persistent databases
1073 if ((ctdb_data
.dptr
!= NULL
) &&
1074 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
1076 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
1077 /* we are the dmaster - avoid the ctdb protocol op */
1079 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
1080 if (data
->dsize
== 0) {
1081 SAFE_FREE(ctdb_data
.dptr
);
1086 data
->dptr
= (uint8
*)talloc_memdup(
1087 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1090 SAFE_FREE(ctdb_data
.dptr
);
1092 if (data
->dptr
== NULL
) {
1098 SAFE_FREE(ctdb_data
.dptr
);
1100 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1101 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
1102 if (!NT_STATUS_IS_OK(status
)) {
1103 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1110 struct traverse_state
{
1111 struct db_context
*db
;
1112 int (*fn
)(struct db_record
*rec
, void *private_data
);
1116 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1118 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1119 struct db_record
*rec
;
1120 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1121 /* we have to give them a locked record to prevent races */
1122 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1123 if (rec
&& rec
->value
.dsize
> 0) {
1124 state
->fn(rec
, state
->private_data
);
1126 talloc_free(tmp_ctx
);
1129 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1132 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1133 struct db_record
*rec
;
1134 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1136 /* we have to give them a locked record to prevent races */
1137 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1138 if (rec
&& rec
->value
.dsize
> 0) {
1139 ret
= state
->fn(rec
, state
->private_data
);
1141 talloc_free(tmp_ctx
);
1145 static int db_ctdb_traverse(struct db_context
*db
,
1146 int (*fn
)(struct db_record
*rec
,
1147 void *private_data
),
1150 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1151 struct db_ctdb_ctx
);
1152 struct traverse_state state
;
1156 state
.private_data
= private_data
;
1158 if (db
->persistent
) {
1159 /* for persistent databases we don't need to do a ctdb traverse,
1160 we can do a faster local traverse */
1161 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1165 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1169 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1171 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1174 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1176 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1179 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1181 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1182 struct db_record rec
;
1185 rec
.store
= db_ctdb_store_deny
;
1186 rec
.delete_rec
= db_ctdb_delete_deny
;
1187 rec
.private_data
= state
->db
;
1188 state
->fn(&rec
, state
->private_data
);
1191 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1194 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1195 struct db_record rec
;
1198 rec
.store
= db_ctdb_store_deny
;
1199 rec
.delete_rec
= db_ctdb_delete_deny
;
1200 rec
.private_data
= state
->db
;
1202 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1203 /* a deleted record */
1206 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1207 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1209 return state
->fn(&rec
, state
->private_data
);
1212 static int db_ctdb_traverse_read(struct db_context
*db
,
1213 int (*fn
)(struct db_record
*rec
,
1214 void *private_data
),
1217 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1218 struct db_ctdb_ctx
);
1219 struct traverse_state state
;
1223 state
.private_data
= private_data
;
1225 if (db
->persistent
) {
1226 /* for persistent databases we don't need to do a ctdb traverse,
1227 we can do a faster local traverse */
1228 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1231 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1235 static int db_ctdb_get_seqnum(struct db_context
*db
)
1237 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1238 struct db_ctdb_ctx
);
1239 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1242 static int db_ctdb_get_flags(struct db_context
*db
)
1244 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1245 struct db_ctdb_ctx
);
1246 return tdb_get_flags(ctx
->wtdb
->tdb
);
1249 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1251 int hash_size
, int tdb_flags
,
1252 int open_flags
, mode_t mode
)
1254 struct db_context
*result
;
1255 struct db_ctdb_ctx
*db_ctdb
;
1257 struct ctdbd_connection
*conn
;
1259 if (!lp_clustering()) {
1260 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1264 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1265 DEBUG(0, ("talloc failed\n"));
1266 TALLOC_FREE(result
);
1270 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1271 DEBUG(0, ("talloc failed\n"));
1272 TALLOC_FREE(result
);
1276 db_ctdb
->transaction
= NULL
;
1277 db_ctdb
->db
= result
;
1279 conn
= messaging_ctdbd_connection();
1281 if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn
, name
, &db_ctdb
->db_id
, tdb_flags
))) {
1282 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1283 TALLOC_FREE(result
);
1287 db_path
= ctdbd_dbpath(conn
, db_ctdb
, db_ctdb
->db_id
);
1289 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1291 /* only pass through specific flags */
1292 tdb_flags
&= TDB_SEQNUM
;
1294 /* honor permissions if user has specified O_CREAT */
1295 if (open_flags
& O_CREAT
) {
1296 chmod(db_path
, mode
);
1299 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1300 if (db_ctdb
->wtdb
== NULL
) {
1301 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1302 TALLOC_FREE(result
);
1305 talloc_free(db_path
);
1307 if (result
->persistent
) {
1308 db_ctdb
->lock_ctx
= g_lock_ctx_init(db_ctdb
,
1309 ctdb_conn_msg_ctx(conn
));
1310 if (db_ctdb
->lock_ctx
== NULL
) {
1311 DEBUG(0, ("g_lock_ctx_init failed\n"));
1312 TALLOC_FREE(result
);
1317 result
->private_data
= (void *)db_ctdb
;
1318 result
->fetch_locked
= db_ctdb_fetch_locked
;
1319 result
->fetch
= db_ctdb_fetch
;
1320 result
->traverse
= db_ctdb_traverse
;
1321 result
->traverse_read
= db_ctdb_traverse_read
;
1322 result
->get_seqnum
= db_ctdb_get_seqnum
;
1323 result
->get_flags
= db_ctdb_get_flags
;
1324 result
->transaction_start
= db_ctdb_transaction_start
;
1325 result
->transaction_commit
= db_ctdb_transaction_commit
;
1326 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1328 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1329 name
, db_ctdb
->db_id
));