2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle
{
27 struct db_ctdb_ctx
*ctx
;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer
*m_all
;
35 struct ctdb_marshall_buffer
*m_write
;
41 struct db_context
*db
;
42 struct tdb_wrap
*wtdb
;
44 struct db_ctdb_transaction_handle
*transaction
;
48 struct db_ctdb_ctx
*ctdb_ctx
;
49 struct ctdb_ltdb_header header
;
52 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
57 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
60 enum TDB_ERROR tret
= tdb_error(tdb
);
64 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
67 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
70 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS
db_ctdb_ltdb_fetch(struct db_ctdb_ctx
*db
,
84 struct ctdb_ltdb_header
*header
,
91 rec
= tdb_fetch(db
->wtdb
->tdb
, key
);
92 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
93 status
= NT_STATUS_NOT_FOUND
;
98 header
->dmaster
= (uint32_t)-1;
105 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
109 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
110 if (data
->dsize
== 0) {
113 data
->dptr
= (unsigned char *)talloc_memdup(mem_ctx
,
115 + sizeof(struct ctdb_ltdb_header
),
117 if (data
->dptr
== NULL
) {
118 status
= NT_STATUS_NO_MEMORY
;
124 status
= NT_STATUS_OK
;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
137 struct ctdb_ltdb_header
*header
,
140 TALLOC_CTX
*tmp_ctx
= talloc_stackframe();
144 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
145 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
147 if (rec
.dptr
== NULL
) {
148 talloc_free(tmp_ctx
);
149 return NT_STATUS_NO_MEMORY
;
152 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
153 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
155 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
157 talloc_free(tmp_ctx
);
159 return (ret
== 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
170 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
172 struct ctdb_ltdb_header
*header
,
176 struct ctdb_rec_data
*d
;
178 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
179 data
.dsize
+ (header
?sizeof(*header
):0);
180 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
186 d
->keylen
= key
.dsize
;
187 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
189 d
->datalen
= data
.dsize
+ sizeof(*header
);
190 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
191 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
193 d
->datalen
= data
.dsize
;
194 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
202 struct ctdb_marshall_buffer
*m
,
206 struct ctdb_ltdb_header
*header
,
209 struct ctdb_rec_data
*r
;
210 size_t m_size
, r_size
;
211 struct ctdb_marshall_buffer
*m2
= NULL
;
213 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
220 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
221 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
228 m_size
= talloc_get_size(m
);
229 r_size
= talloc_get_size(r
);
231 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
232 mem_ctx
, m
, m_size
+ r_size
);
238 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
251 data
.dptr
= (uint8_t *)m
;
252 data
.dsize
= talloc_get_size(m
);
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
264 struct ctdb_ltdb_header
*header
,
265 TDB_DATA
*key
, TDB_DATA
*data
)
268 r
= (struct ctdb_rec_data
*)&m
->data
[0];
270 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
278 key
->dptr
= &r
->data
[0];
279 key
->dsize
= r
->keylen
;
282 data
->dptr
= &r
->data
[r
->keylen
];
283 data
->dsize
= r
->datalen
;
284 if (header
!= NULL
) {
285 data
->dptr
+= sizeof(*header
);
286 data
->dsize
-= sizeof(*header
);
290 if (header
!= NULL
) {
291 if (r
->datalen
< sizeof(*header
)) {
294 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
301 static int32_t db_ctdb_transaction_active(uint32_t db_id
)
307 indata
.dptr
= (uint8_t *)&db_id
;
308 indata
.dsize
= sizeof(db_id
);
310 ret
= ctdbd_control_local(messaging_ctdbd_connection(),
311 CTDB_CONTROL_TRANS2_ACTIVE
, 0, 0,
312 indata
, NULL
, NULL
, &status
);
314 if (!NT_STATUS_IS_OK(ret
)) {
315 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
324 * CTDB transaction destructor
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
328 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
333 * start a transaction on a ctdb database:
334 * - lock the transaction lock key
335 * - start the tdb transaction
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle
*h
)
339 struct db_record
*rh
;
340 struct db_ctdb_rec
*crec
;
343 const char *keyname
= CTDB_TRANSACTION_LOCK_KEY
;
345 struct db_ctdb_ctx
*ctx
= h
->ctx
;
349 struct ctdb_ltdb_header header
;
350 int32_t transaction_status
;
352 key
.dptr
= (uint8_t *)discard_const(keyname
);
353 key
.dsize
= strlen(keyname
);
356 tmp_ctx
= talloc_new(h
);
358 rh
= fetch_locked_internal(ctx
, tmp_ctx
, key
, true);
360 DEBUG(0,(__location__
" Failed to fetch_lock database\n"));
361 talloc_free(tmp_ctx
);
364 crec
= talloc_get_type_abort(rh
->private_data
, struct db_ctdb_rec
);
366 transaction_status
= db_ctdb_transaction_active(ctx
->db_id
);
367 if (transaction_status
== 1) {
368 unsigned long int usec
= (1000 + random()) % 100000;
369 DEBUG(3, ("Transaction already active on db_id[0x%08x]."
370 "Re-trying after %lu microseconds...",
372 talloc_free(tmp_ctx
);
378 * store the pid in the database:
379 * it is not enought that the node is dmaster...
382 data
.dptr
= (unsigned char *)&pid
;
383 data
.dsize
= sizeof(pid_t
);
384 status
= db_ctdb_ltdb_store(ctx
, key
, &(crec
->header
), data
);
385 if (!NT_STATUS_IS_OK(status
)) {
386 DEBUG(0, (__location__
" Failed to store pid in transaction "
387 "record: %s\n", nt_errstr(status
)));
388 talloc_free(tmp_ctx
);
394 ret
= tdb_transaction_start(ctx
->wtdb
->tdb
);
396 DEBUG(0,(__location__
" Failed to start tdb transaction\n"));
397 talloc_free(tmp_ctx
);
401 status
= db_ctdb_ltdb_fetch(ctx
, key
, &header
, tmp_ctx
, &data
);
402 if (!NT_STATUS_IS_OK(status
) || header
.dmaster
!= get_my_vnn()) {
403 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
404 talloc_free(tmp_ctx
);
408 if ((data
.dsize
!= sizeof(pid_t
)) || (*(pid_t
*)(data
.dptr
) != pid
)) {
409 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
410 talloc_free(tmp_ctx
);
414 talloc_free(tmp_ctx
);
421 * CTDB dbwrap API: transaction_start function
422 * starts a transaction on a persistent database
424 static int db_ctdb_transaction_start(struct db_context
*db
)
426 struct db_ctdb_transaction_handle
*h
;
428 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
431 if (!db
->persistent
) {
432 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
437 if (ctx
->transaction
) {
438 ctx
->transaction
->nesting
++;
442 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
444 DEBUG(0,(__location__
" oom for transaction handle\n"));
450 ret
= db_ctdb_transaction_fetch_start(h
);
456 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
458 ctx
->transaction
= h
;
460 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
468 fetch a record inside a transaction
470 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
472 TDB_DATA key
, TDB_DATA
*data
)
474 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
477 status
= db_ctdb_ltdb_fetch(h
->ctx
, key
, NULL
, mem_ctx
, data
);
479 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
481 } else if (!NT_STATUS_IS_OK(status
)) {
486 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
, NULL
, *data
);
487 if (h
->m_all
== NULL
) {
488 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
490 talloc_free(data
->dptr
);
499 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
500 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
502 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
506 struct db_record
*result
;
509 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
510 DEBUG(0, ("talloc failed\n"));
514 result
->private_data
= ctx
->transaction
;
516 result
->key
.dsize
= key
.dsize
;
517 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
518 if (result
->key
.dptr
== NULL
) {
519 DEBUG(0, ("talloc failed\n"));
524 result
->store
= db_ctdb_store_transaction
;
525 result
->delete_rec
= db_ctdb_delete_transaction
;
527 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
528 if (ctdb_data
.dptr
== NULL
) {
529 /* create the record */
530 result
->value
= tdb_null
;
534 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
535 result
->value
.dptr
= NULL
;
537 if ((result
->value
.dsize
!= 0)
538 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
539 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
540 result
->value
.dsize
))) {
541 DEBUG(0, ("talloc failed\n"));
545 SAFE_FREE(ctdb_data
.dptr
);
550 static int db_ctdb_record_destructor(struct db_record
**recp
)
552 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
553 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
554 rec
->private_data
, struct db_ctdb_transaction_handle
);
555 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
557 DEBUG(0,(__location__
" transaction_commit failed\n"));
563 auto-create a transaction for persistent databases
565 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
570 struct db_record
*rec
, **recp
;
572 res
= db_ctdb_transaction_start(ctx
->db
);
577 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
579 ctx
->db
->transaction_cancel(ctx
->db
);
583 /* destroy this transaction when we release the lock */
584 recp
= talloc(rec
, struct db_record
*);
586 ctx
->db
->transaction_cancel(ctx
->db
);
591 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
597 stores a record inside a transaction
599 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
600 TDB_DATA key
, TDB_DATA data
)
602 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
605 struct ctdb_ltdb_header header
;
608 /* we need the header so we can update the RSN */
609 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
610 if (rec
.dptr
== NULL
) {
611 /* the record doesn't exist - create one with us as dmaster.
612 This is only safe because we are in a transaction and this
613 is a persistent database */
616 memcpy(&header
, rec
.dptr
, sizeof(struct ctdb_ltdb_header
));
617 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
618 /* a special case, we are writing the same data that is there now */
619 if (data
.dsize
== rec
.dsize
&&
620 memcmp(data
.dptr
, rec
.dptr
+ sizeof(struct ctdb_ltdb_header
), data
.dsize
) == 0) {
622 talloc_free(tmp_ctx
);
628 header
.dmaster
= get_my_vnn();
632 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
, NULL
, data
);
633 if (h
->m_all
== NULL
) {
634 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
635 talloc_free(tmp_ctx
);
640 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
641 if (h
->m_write
== NULL
) {
642 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
643 talloc_free(tmp_ctx
);
647 status
= db_ctdb_ltdb_store(h
->ctx
, key
, &header
, data
);
648 if (NT_STATUS_IS_OK(status
)) {
654 talloc_free(tmp_ctx
);
661 a record store inside a transaction
663 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
665 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
666 rec
->private_data
, struct db_ctdb_transaction_handle
);
669 ret
= db_ctdb_transaction_store(h
, rec
->key
, data
);
671 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
677 a record delete inside a transaction
679 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
681 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
682 rec
->private_data
, struct db_ctdb_transaction_handle
);
685 ret
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
687 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
696 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle
*h
)
699 struct ctdb_rec_data
*rec
= NULL
;
702 talloc_free(h
->m_write
);
705 ret
= db_ctdb_transaction_fetch_start(h
);
710 for (i
=0;i
<h
->m_all
->count
;i
++) {
713 rec
= db_ctdb_marshall_loop_next(h
->m_all
, rec
, NULL
, NULL
, &key
, &data
);
715 DEBUG(0, (__location__
" Out of records in ctdb_replay_transaction?\n"));
719 if (rec
->reqid
== 0) {
721 if (db_ctdb_transaction_store(h
, key
, data
) != 0) {
726 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
728 if (db_ctdb_transaction_fetch(h
->ctx
, tmp_ctx
, key
, &data2
) != 0) {
729 talloc_free(tmp_ctx
);
732 if (data2
.dsize
!= data
.dsize
||
733 memcmp(data2
.dptr
, data
.dptr
, data
.dsize
) != 0) {
734 /* the record has changed on us - we have to give up */
735 talloc_free(tmp_ctx
);
738 talloc_free(tmp_ctx
);
745 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
753 static int db_ctdb_transaction_commit(struct db_context
*db
)
755 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
761 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
762 enum ctdb_controls failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
765 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
769 if (h
->nested_cancel
) {
770 db
->transaction_cancel(db
);
771 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
775 if (h
->nesting
!= 0) {
780 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
782 talloc_set_destructor(h
, NULL
);
784 /* our commit strategy is quite complex.
786 - we first try to commit the changes to all other nodes
788 - if that works, then we commit locally and we are done
790 - if a commit on another node fails, then we need to cancel
791 the transaction, then restart the transaction (thus
792 opening a window of time for a pending recovery to
793 complete), then replay the transaction, checking all the
794 reads and writes (checking that reads give the same data,
795 and writes succeed). Then we retry the transaction to the
800 if (h
->m_write
== NULL
) {
801 /* no changes were made, potentially after a retry */
802 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
804 ctx
->transaction
= NULL
;
808 /* tell ctdbd to commit to the other nodes */
809 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
810 retries
==0?CTDB_CONTROL_TRANS2_COMMIT
:CTDB_CONTROL_TRANS2_COMMIT_RETRY
,
812 db_ctdb_marshall_finish(h
->m_write
), NULL
, NULL
, &status
);
813 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
814 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
817 if (!NT_STATUS_IS_OK(rets
)) {
818 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
820 /* work out what error code we will give if we
821 have to fail the operation */
822 switch ((enum ctdb_trans2_commit_error
)status
) {
823 case CTDB_TRANS2_COMMIT_SUCCESS
:
824 case CTDB_TRANS2_COMMIT_SOMEFAIL
:
825 case CTDB_TRANS2_COMMIT_TIMEOUT
:
826 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
828 case CTDB_TRANS2_COMMIT_ALLFAIL
:
829 failure_control
= CTDB_CONTROL_TRANS2_FINISHED
;
834 if (++retries
== 5) {
835 DEBUG(0,(__location__
" Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
836 h
->ctx
->db_id
, retries
, (unsigned)failure_control
));
837 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
838 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
839 tdb_null
, NULL
, NULL
, NULL
);
840 h
->ctx
->transaction
= NULL
;
842 ctx
->transaction
= NULL
;
846 if (ctdb_replay_transaction(h
) != 0) {
847 DEBUG(0,(__location__
" Failed to replay transaction failure_control=%u\n",
848 (unsigned)failure_control
));
849 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
850 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
851 tdb_null
, NULL
, NULL
, NULL
);
852 h
->ctx
->transaction
= NULL
;
854 ctx
->transaction
= NULL
;
859 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
862 /* do the real commit locally */
863 ret
= tdb_transaction_commit(h
->ctx
->wtdb
->tdb
);
865 DEBUG(0,(__location__
" Failed to commit transaction failure_control=%u\n",
866 (unsigned)failure_control
));
867 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
, h
->ctx
->db_id
,
868 CTDB_CTRL_FLAG_NOREPLY
, tdb_null
, NULL
, NULL
, NULL
);
869 h
->ctx
->transaction
= NULL
;
874 /* tell ctdbd that we are finished with our local commit */
875 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED
,
876 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
877 tdb_null
, NULL
, NULL
, NULL
);
878 h
->ctx
->transaction
= NULL
;
887 static int db_ctdb_transaction_cancel(struct db_context
*db
)
889 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
891 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
894 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
898 if (h
->nesting
!= 0) {
900 h
->nested_cancel
= true;
904 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
906 ctx
->transaction
= NULL
;
912 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
914 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
915 rec
->private_data
, struct db_ctdb_rec
);
917 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
922 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
927 * We have to store the header with empty data. TODO: Fix the
933 return db_ctdb_store(rec
, data
, 0);
937 static int db_ctdb_record_destr(struct db_record
* data
)
939 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
940 data
->private_data
, struct db_ctdb_rec
);
942 DEBUG(10, (DEBUGLEVEL
> 10
943 ? "Unlocking db %u key %s\n"
944 : "Unlocking db %u key %.20s\n",
945 (int)crec
->ctdb_ctx
->db_id
,
946 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
949 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
950 DEBUG(0, ("tdb_chainunlock failed\n"));
957 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
962 struct db_record
*result
;
963 struct db_ctdb_rec
*crec
;
966 int migrate_attempts
= 0;
968 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
969 DEBUG(0, ("talloc failed\n"));
973 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
974 DEBUG(0, ("talloc failed\n"));
979 result
->private_data
= (void *)crec
;
980 crec
->ctdb_ctx
= ctx
;
982 result
->key
.dsize
= key
.dsize
;
983 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
984 if (result
->key
.dptr
== NULL
) {
985 DEBUG(0, ("talloc failed\n"));
991 * Do a blocking lock on the record
995 if (DEBUGLEVEL
>= 10) {
996 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
997 DEBUG(10, (DEBUGLEVEL
> 10
998 ? "Locking db %u key %s\n"
999 : "Locking db %u key %.20s\n",
1000 (int)crec
->ctdb_ctx
->db_id
, keystr
));
1001 TALLOC_FREE(keystr
);
1004 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
1005 DEBUG(3, ("tdb_chainlock failed\n"));
1006 TALLOC_FREE(result
);
1010 result
->store
= db_ctdb_store
;
1011 result
->delete_rec
= db_ctdb_delete
;
1012 talloc_set_destructor(result
, db_ctdb_record_destr
);
1014 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1017 * See if we have a valid record and we are the dmaster. If so, we can
1018 * take the shortcut and just return it.
1021 if ((ctdb_data
.dptr
== NULL
) ||
1022 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
1023 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
1025 || (random() % 2 != 0)
1028 SAFE_FREE(ctdb_data
.dptr
);
1029 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
1030 talloc_set_destructor(result
, NULL
);
1032 migrate_attempts
+= 1;
1034 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1035 ctdb_data
.dptr
, ctdb_data
.dptr
?
1036 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
1039 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
1040 if (!NT_STATUS_IS_OK(status
)) {
1041 DEBUG(5, ("ctdb_migrate failed: %s\n",
1042 nt_errstr(status
)));
1043 TALLOC_FREE(result
);
1046 /* now its migrated, try again */
1050 if (migrate_attempts
> 10) {
1051 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1055 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1057 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1058 result
->value
.dptr
= NULL
;
1060 if ((result
->value
.dsize
!= 0)
1061 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
1062 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1063 result
->value
.dsize
))) {
1064 DEBUG(0, ("talloc failed\n"));
1065 TALLOC_FREE(result
);
1068 SAFE_FREE(ctdb_data
.dptr
);
1073 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1074 TALLOC_CTX
*mem_ctx
,
1077 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1078 struct db_ctdb_ctx
);
1080 if (ctx
->transaction
!= NULL
) {
1081 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1084 if (db
->persistent
) {
1085 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1088 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
1092 fetch (unlocked, no migration) operation on ctdb
1094 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
1095 TDB_DATA key
, TDB_DATA
*data
)
1097 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1098 struct db_ctdb_ctx
);
1102 if (ctx
->transaction
) {
1103 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
1106 /* try a direct fetch */
1107 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1110 * See if we have a valid record and we are the dmaster. If so, we can
1111 * take the shortcut and just return it.
1112 * we bypass the dmaster check for persistent databases
1114 if ((ctdb_data
.dptr
!= NULL
) &&
1115 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
1117 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
1118 /* we are the dmaster - avoid the ctdb protocol op */
1120 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
1121 if (data
->dsize
== 0) {
1122 SAFE_FREE(ctdb_data
.dptr
);
1127 data
->dptr
= (uint8
*)talloc_memdup(
1128 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1131 SAFE_FREE(ctdb_data
.dptr
);
1133 if (data
->dptr
== NULL
) {
1139 SAFE_FREE(ctdb_data
.dptr
);
1141 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1142 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
1143 if (!NT_STATUS_IS_OK(status
)) {
1144 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1151 struct traverse_state
{
1152 struct db_context
*db
;
1153 int (*fn
)(struct db_record
*rec
, void *private_data
);
1157 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1159 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1160 struct db_record
*rec
;
1161 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1162 /* we have to give them a locked record to prevent races */
1163 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1164 if (rec
&& rec
->value
.dsize
> 0) {
1165 state
->fn(rec
, state
->private_data
);
1167 talloc_free(tmp_ctx
);
1170 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1173 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1174 struct db_record
*rec
;
1175 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1177 /* we have to give them a locked record to prevent races */
1178 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1179 if (rec
&& rec
->value
.dsize
> 0) {
1180 ret
= state
->fn(rec
, state
->private_data
);
1182 talloc_free(tmp_ctx
);
1186 static int db_ctdb_traverse(struct db_context
*db
,
1187 int (*fn
)(struct db_record
*rec
,
1188 void *private_data
),
1191 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1192 struct db_ctdb_ctx
);
1193 struct traverse_state state
;
1197 state
.private_data
= private_data
;
1199 if (db
->persistent
) {
1200 /* for persistent databases we don't need to do a ctdb traverse,
1201 we can do a faster local traverse */
1202 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1206 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1210 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1212 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1215 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1217 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1220 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1222 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1223 struct db_record rec
;
1226 rec
.store
= db_ctdb_store_deny
;
1227 rec
.delete_rec
= db_ctdb_delete_deny
;
1228 rec
.private_data
= state
->db
;
1229 state
->fn(&rec
, state
->private_data
);
1232 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1235 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1236 struct db_record rec
;
1239 rec
.store
= db_ctdb_store_deny
;
1240 rec
.delete_rec
= db_ctdb_delete_deny
;
1241 rec
.private_data
= state
->db
;
1243 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1244 /* a deleted record */
1247 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1248 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1250 return state
->fn(&rec
, state
->private_data
);
1253 static int db_ctdb_traverse_read(struct db_context
*db
,
1254 int (*fn
)(struct db_record
*rec
,
1255 void *private_data
),
1258 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1259 struct db_ctdb_ctx
);
1260 struct traverse_state state
;
1264 state
.private_data
= private_data
;
1266 if (db
->persistent
) {
1267 /* for persistent databases we don't need to do a ctdb traverse,
1268 we can do a faster local traverse */
1269 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1272 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1276 static int db_ctdb_get_seqnum(struct db_context
*db
)
1278 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1279 struct db_ctdb_ctx
);
1280 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1283 static int db_ctdb_get_flags(struct db_context
*db
)
1285 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1286 struct db_ctdb_ctx
);
1287 return tdb_get_flags(ctx
->wtdb
->tdb
);
1290 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1292 int hash_size
, int tdb_flags
,
1293 int open_flags
, mode_t mode
)
1295 struct db_context
*result
;
1296 struct db_ctdb_ctx
*db_ctdb
;
1299 if (!lp_clustering()) {
1300 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1304 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1305 DEBUG(0, ("talloc failed\n"));
1306 TALLOC_FREE(result
);
1310 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1311 DEBUG(0, ("talloc failed\n"));
1312 TALLOC_FREE(result
);
1316 db_ctdb
->transaction
= NULL
;
1317 db_ctdb
->db
= result
;
1319 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name
, &db_ctdb
->db_id
, tdb_flags
))) {
1320 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1321 TALLOC_FREE(result
);
1325 db_path
= ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb
, db_ctdb
->db_id
);
1327 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1329 /* only pass through specific flags */
1330 tdb_flags
&= TDB_SEQNUM
;
1332 /* honor permissions if user has specified O_CREAT */
1333 if (open_flags
& O_CREAT
) {
1334 chmod(db_path
, mode
);
1337 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1338 if (db_ctdb
->wtdb
== NULL
) {
1339 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1340 TALLOC_FREE(result
);
1343 talloc_free(db_path
);
1345 result
->private_data
= (void *)db_ctdb
;
1346 result
->fetch_locked
= db_ctdb_fetch_locked
;
1347 result
->fetch
= db_ctdb_fetch
;
1348 result
->traverse
= db_ctdb_traverse
;
1349 result
->traverse_read
= db_ctdb_traverse_read
;
1350 result
->get_seqnum
= db_ctdb_get_seqnum
;
1351 result
->get_flags
= db_ctdb_get_flags
;
1352 result
->transaction_start
= db_ctdb_transaction_start
;
1353 result
->transaction_commit
= db_ctdb_transaction_commit
;
1354 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1356 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1357 name
, db_ctdb
->db_id
));