2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle
{
27 struct db_ctdb_ctx
*ctx
;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer
*m_all
;
35 struct ctdb_marshall_buffer
*m_write
;
41 struct db_context
*db
;
42 struct tdb_wrap
*wtdb
;
44 struct db_ctdb_transaction_handle
*transaction
;
48 struct db_ctdb_ctx
*ctdb_ctx
;
49 struct ctdb_ltdb_header header
;
52 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
57 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
60 enum TDB_ERROR tret
= tdb_error(tdb
);
64 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
67 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
70 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS
db_ctdb_ltdb_fetch(struct db_ctdb_ctx
*db
,
84 struct ctdb_ltdb_header
*header
,
91 rec
= tdb_fetch(db
->wtdb
->tdb
, key
);
92 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
93 status
= NT_STATUS_NOT_FOUND
;
98 header
->dmaster
= (uint32_t)-1;
105 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
109 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
110 if (data
->dsize
== 0) {
113 data
->dptr
= (unsigned char *)talloc_memdup(mem_ctx
,
115 + sizeof(struct ctdb_ltdb_header
),
117 if (data
->dptr
== NULL
) {
118 status
= NT_STATUS_NO_MEMORY
;
124 status
= NT_STATUS_OK
;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
137 struct ctdb_ltdb_header
*header
,
140 TALLOC_CTX
*tmp_ctx
= talloc_stackframe();
144 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
145 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
147 if (rec
.dptr
== NULL
) {
148 talloc_free(tmp_ctx
);
149 return NT_STATUS_NO_MEMORY
;
152 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
153 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
155 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
157 talloc_free(tmp_ctx
);
159 return (ret
== 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
170 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
172 struct ctdb_ltdb_header
*header
,
176 struct ctdb_rec_data
*d
;
178 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
179 data
.dsize
+ (header
?sizeof(*header
):0);
180 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
186 d
->keylen
= key
.dsize
;
187 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
189 d
->datalen
= data
.dsize
+ sizeof(*header
);
190 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
191 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
193 d
->datalen
= data
.dsize
;
194 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
202 struct ctdb_marshall_buffer
*m
,
206 struct ctdb_ltdb_header
*header
,
209 struct ctdb_rec_data
*r
;
210 size_t m_size
, r_size
;
211 struct ctdb_marshall_buffer
*m2
= NULL
;
213 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
220 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
221 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
228 m_size
= talloc_get_size(m
);
229 r_size
= talloc_get_size(r
);
231 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
232 mem_ctx
, m
, m_size
+ r_size
);
238 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
251 data
.dptr
= (uint8_t *)m
;
252 data
.dsize
= talloc_get_size(m
);
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
264 struct ctdb_ltdb_header
*header
,
265 TDB_DATA
*key
, TDB_DATA
*data
)
268 r
= (struct ctdb_rec_data
*)&m
->data
[0];
270 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
278 key
->dptr
= &r
->data
[0];
279 key
->dsize
= r
->keylen
;
282 data
->dptr
= &r
->data
[r
->keylen
];
283 data
->dsize
= r
->datalen
;
284 if (header
!= NULL
) {
285 data
->dptr
+= sizeof(*header
);
286 data
->dsize
-= sizeof(*header
);
290 if (header
!= NULL
) {
291 if (r
->datalen
< sizeof(*header
)) {
294 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
301 static int32_t db_ctdb_transaction_active(uint32_t db_id
)
307 indata
.dptr
= (uint8_t *)&db_id
;
308 indata
.dsize
= sizeof(db_id
);
310 ret
= ctdbd_control_local(messaging_ctdbd_connection(),
311 CTDB_CONTROL_TRANS2_ACTIVE
, 0, 0,
312 indata
, NULL
, NULL
, &status
);
314 if (!NT_STATUS_IS_OK(ret
)) {
315 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
324 * CTDB transaction destructor
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
328 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
333 * start a transaction on a ctdb database:
334 * - lock the transaction lock key
335 * - start the tdb transaction
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle
*h
)
339 struct db_record
*rh
;
340 struct db_ctdb_rec
*crec
;
343 const char *keyname
= CTDB_TRANSACTION_LOCK_KEY
;
345 struct db_ctdb_ctx
*ctx
= h
->ctx
;
349 struct ctdb_ltdb_header header
;
350 int32_t transaction_status
;
352 key
.dptr
= (uint8_t *)discard_const(keyname
);
353 key
.dsize
= strlen(keyname
);
356 tmp_ctx
= talloc_new(h
);
358 rh
= fetch_locked_internal(ctx
, tmp_ctx
, key
, true);
360 DEBUG(0,(__location__
" Failed to fetch_lock database\n"));
361 talloc_free(tmp_ctx
);
364 crec
= talloc_get_type_abort(rh
->private_data
, struct db_ctdb_rec
);
366 transaction_status
= db_ctdb_transaction_active(ctx
->db_id
);
367 if (transaction_status
== 1) {
368 unsigned long int usec
= (1000 + random()) % 100000;
369 DEBUG(3, ("Transaction already active on db_id[0x%08x]."
370 "Re-trying after %lu microseconds...",
372 talloc_free(tmp_ctx
);
378 * store the pid in the database:
379 * it is not enought that the node is dmaster...
382 data
.dptr
= (unsigned char *)&pid
;
383 data
.dsize
= sizeof(pid_t
);
385 crec
->header
.dmaster
= get_my_vnn();
386 status
= db_ctdb_ltdb_store(ctx
, key
, &(crec
->header
), data
);
387 if (!NT_STATUS_IS_OK(status
)) {
388 DEBUG(0, (__location__
" Failed to store pid in transaction "
389 "record: %s\n", nt_errstr(status
)));
390 talloc_free(tmp_ctx
);
396 ret
= tdb_transaction_start(ctx
->wtdb
->tdb
);
398 DEBUG(0,(__location__
" Failed to start tdb transaction\n"));
399 talloc_free(tmp_ctx
);
403 status
= db_ctdb_ltdb_fetch(ctx
, key
, &header
, tmp_ctx
, &data
);
404 if (!NT_STATUS_IS_OK(status
)) {
405 DEBUG(0, (__location__
" failed to refetch transaction lock "
406 "record inside transaction: %s - retrying\n",
408 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
409 talloc_free(tmp_ctx
);
413 if (header
.dmaster
!= get_my_vnn()) {
414 DEBUG(3, (__location__
" refetch transaction lock record : "
415 "we are not dmaster any more "
416 "(dmaster[%u] != my_vnn[%u]) - retrying\n",
417 header
.dmaster
, get_my_vnn()));
418 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
419 talloc_free(tmp_ctx
);
423 if ((data
.dsize
!= sizeof(pid_t
)) || (*(pid_t
*)(data
.dptr
) != pid
)) {
424 DEBUG(3, (__location__
" refetch transaction lock record: "
425 "another local process has started a transaction "
426 "(stored pid [%u] != my pid [%u]) - retrying\n",
427 *(pid_t
*)(data
.dptr
), pid
));
428 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
429 talloc_free(tmp_ctx
);
433 talloc_free(tmp_ctx
);
440 * CTDB dbwrap API: transaction_start function
441 * starts a transaction on a persistent database
443 static int db_ctdb_transaction_start(struct db_context
*db
)
445 struct db_ctdb_transaction_handle
*h
;
447 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
450 if (!db
->persistent
) {
451 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
456 if (ctx
->transaction
) {
457 ctx
->transaction
->nesting
++;
461 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
463 DEBUG(0,(__location__
" oom for transaction handle\n"));
469 ret
= db_ctdb_transaction_fetch_start(h
);
475 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
477 ctx
->transaction
= h
;
479 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
487 fetch a record inside a transaction
489 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
491 TDB_DATA key
, TDB_DATA
*data
)
493 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
496 status
= db_ctdb_ltdb_fetch(h
->ctx
, key
, NULL
, mem_ctx
, data
);
498 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
500 } else if (!NT_STATUS_IS_OK(status
)) {
505 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
, NULL
, *data
);
506 if (h
->m_all
== NULL
) {
507 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
509 talloc_free(data
->dptr
);
518 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
519 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
521 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
525 struct db_record
*result
;
528 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
529 DEBUG(0, ("talloc failed\n"));
533 result
->private_data
= ctx
->transaction
;
535 result
->key
.dsize
= key
.dsize
;
536 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
537 if (result
->key
.dptr
== NULL
) {
538 DEBUG(0, ("talloc failed\n"));
543 result
->store
= db_ctdb_store_transaction
;
544 result
->delete_rec
= db_ctdb_delete_transaction
;
546 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
547 if (ctdb_data
.dptr
== NULL
) {
548 /* create the record */
549 result
->value
= tdb_null
;
553 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
554 result
->value
.dptr
= NULL
;
556 if ((result
->value
.dsize
!= 0)
557 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
558 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
559 result
->value
.dsize
))) {
560 DEBUG(0, ("talloc failed\n"));
564 SAFE_FREE(ctdb_data
.dptr
);
569 static int db_ctdb_record_destructor(struct db_record
**recp
)
571 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
572 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
573 rec
->private_data
, struct db_ctdb_transaction_handle
);
574 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
576 DEBUG(0,(__location__
" transaction_commit failed\n"));
582 auto-create a transaction for persistent databases
584 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
589 struct db_record
*rec
, **recp
;
591 res
= db_ctdb_transaction_start(ctx
->db
);
596 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
598 ctx
->db
->transaction_cancel(ctx
->db
);
602 /* destroy this transaction when we release the lock */
603 recp
= talloc(rec
, struct db_record
*);
605 ctx
->db
->transaction_cancel(ctx
->db
);
610 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
616 stores a record inside a transaction
618 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
619 TDB_DATA key
, TDB_DATA data
)
621 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
624 struct ctdb_ltdb_header header
;
627 /* we need the header so we can update the RSN */
628 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
629 if (rec
.dptr
== NULL
) {
630 /* the record doesn't exist - create one with us as dmaster.
631 This is only safe because we are in a transaction and this
632 is a persistent database */
635 memcpy(&header
, rec
.dptr
, sizeof(struct ctdb_ltdb_header
));
636 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
637 /* a special case, we are writing the same data that is there now */
638 if (data
.dsize
== rec
.dsize
&&
639 memcmp(data
.dptr
, rec
.dptr
+ sizeof(struct ctdb_ltdb_header
), data
.dsize
) == 0) {
641 talloc_free(tmp_ctx
);
647 header
.dmaster
= get_my_vnn();
651 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
, NULL
, data
);
652 if (h
->m_all
== NULL
) {
653 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
654 talloc_free(tmp_ctx
);
659 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
660 if (h
->m_write
== NULL
) {
661 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
662 talloc_free(tmp_ctx
);
666 status
= db_ctdb_ltdb_store(h
->ctx
, key
, &header
, data
);
667 if (NT_STATUS_IS_OK(status
)) {
673 talloc_free(tmp_ctx
);
680 a record store inside a transaction
682 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
684 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
685 rec
->private_data
, struct db_ctdb_transaction_handle
);
688 ret
= db_ctdb_transaction_store(h
, rec
->key
, data
);
690 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
696 a record delete inside a transaction
698 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
700 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
701 rec
->private_data
, struct db_ctdb_transaction_handle
);
704 ret
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
706 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
715 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle
*h
)
718 struct ctdb_rec_data
*rec
= NULL
;
721 talloc_free(h
->m_write
);
724 ret
= db_ctdb_transaction_fetch_start(h
);
729 for (i
=0;i
<h
->m_all
->count
;i
++) {
732 rec
= db_ctdb_marshall_loop_next(h
->m_all
, rec
, NULL
, NULL
, &key
, &data
);
734 DEBUG(0, (__location__
" Out of records in ctdb_replay_transaction?\n"));
738 if (rec
->reqid
== 0) {
740 if (db_ctdb_transaction_store(h
, key
, data
) != 0) {
745 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
747 if (db_ctdb_transaction_fetch(h
->ctx
, tmp_ctx
, key
, &data2
) != 0) {
748 talloc_free(tmp_ctx
);
751 if (data2
.dsize
!= data
.dsize
||
752 memcmp(data2
.dptr
, data
.dptr
, data
.dsize
) != 0) {
753 /* the record has changed on us - we have to give up */
754 talloc_free(tmp_ctx
);
757 talloc_free(tmp_ctx
);
764 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
772 static int db_ctdb_transaction_commit(struct db_context
*db
)
774 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
780 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
781 enum ctdb_controls failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
784 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
788 if (h
->nested_cancel
) {
789 db
->transaction_cancel(db
);
790 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
794 if (h
->nesting
!= 0) {
799 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
801 talloc_set_destructor(h
, NULL
);
803 /* our commit strategy is quite complex.
805 - we first try to commit the changes to all other nodes
807 - if that works, then we commit locally and we are done
809 - if a commit on another node fails, then we need to cancel
810 the transaction, then restart the transaction (thus
811 opening a window of time for a pending recovery to
812 complete), then replay the transaction, checking all the
813 reads and writes (checking that reads give the same data,
814 and writes succeed). Then we retry the transaction to the
819 if (h
->m_write
== NULL
) {
820 /* no changes were made, potentially after a retry */
821 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
823 ctx
->transaction
= NULL
;
827 /* tell ctdbd to commit to the other nodes */
828 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
829 retries
==0?CTDB_CONTROL_TRANS2_COMMIT
:CTDB_CONTROL_TRANS2_COMMIT_RETRY
,
831 db_ctdb_marshall_finish(h
->m_write
), NULL
, NULL
, &status
);
832 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
833 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
836 if (!NT_STATUS_IS_OK(rets
)) {
837 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
839 /* work out what error code we will give if we
840 have to fail the operation */
841 switch ((enum ctdb_trans2_commit_error
)status
) {
842 case CTDB_TRANS2_COMMIT_SUCCESS
:
843 case CTDB_TRANS2_COMMIT_SOMEFAIL
:
844 case CTDB_TRANS2_COMMIT_TIMEOUT
:
845 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
847 case CTDB_TRANS2_COMMIT_ALLFAIL
:
848 failure_control
= CTDB_CONTROL_TRANS2_FINISHED
;
853 if (++retries
== 100) {
854 DEBUG(0,(__location__
" Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
855 h
->ctx
->db_id
, retries
, (unsigned)failure_control
));
856 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
857 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
858 tdb_null
, NULL
, NULL
, NULL
);
859 h
->ctx
->transaction
= NULL
;
861 ctx
->transaction
= NULL
;
865 if (ctdb_replay_transaction(h
) != 0) {
866 DEBUG(0,(__location__
" Failed to replay transaction failure_control=%u\n",
867 (unsigned)failure_control
));
868 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
869 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
870 tdb_null
, NULL
, NULL
, NULL
);
871 h
->ctx
->transaction
= NULL
;
873 ctx
->transaction
= NULL
;
878 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
881 /* do the real commit locally */
882 ret
= tdb_transaction_commit(h
->ctx
->wtdb
->tdb
);
884 DEBUG(0,(__location__
" Failed to commit transaction failure_control=%u\n",
885 (unsigned)failure_control
));
886 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
, h
->ctx
->db_id
,
887 CTDB_CTRL_FLAG_NOREPLY
, tdb_null
, NULL
, NULL
, NULL
);
888 h
->ctx
->transaction
= NULL
;
893 /* tell ctdbd that we are finished with our local commit */
894 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED
,
895 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
896 tdb_null
, NULL
, NULL
, NULL
);
897 h
->ctx
->transaction
= NULL
;
906 static int db_ctdb_transaction_cancel(struct db_context
*db
)
908 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
910 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
913 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
917 if (h
->nesting
!= 0) {
919 h
->nested_cancel
= true;
923 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
925 ctx
->transaction
= NULL
;
931 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
933 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
934 rec
->private_data
, struct db_ctdb_rec
);
936 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
941 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
946 * We have to store the header with empty data. TODO: Fix the
952 return db_ctdb_store(rec
, data
, 0);
956 static int db_ctdb_record_destr(struct db_record
* data
)
958 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
959 data
->private_data
, struct db_ctdb_rec
);
961 DEBUG(10, (DEBUGLEVEL
> 10
962 ? "Unlocking db %u key %s\n"
963 : "Unlocking db %u key %.20s\n",
964 (int)crec
->ctdb_ctx
->db_id
,
965 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
968 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
969 DEBUG(0, ("tdb_chainunlock failed\n"));
976 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
981 struct db_record
*result
;
982 struct db_ctdb_rec
*crec
;
985 int migrate_attempts
= 0;
987 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
988 DEBUG(0, ("talloc failed\n"));
992 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
993 DEBUG(0, ("talloc failed\n"));
998 result
->private_data
= (void *)crec
;
999 crec
->ctdb_ctx
= ctx
;
1001 result
->key
.dsize
= key
.dsize
;
1002 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
1003 if (result
->key
.dptr
== NULL
) {
1004 DEBUG(0, ("talloc failed\n"));
1005 TALLOC_FREE(result
);
1010 * Do a blocking lock on the record
1014 if (DEBUGLEVEL
>= 10) {
1015 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
1016 DEBUG(10, (DEBUGLEVEL
> 10
1017 ? "Locking db %u key %s\n"
1018 : "Locking db %u key %.20s\n",
1019 (int)crec
->ctdb_ctx
->db_id
, keystr
));
1020 TALLOC_FREE(keystr
);
1023 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
1024 DEBUG(3, ("tdb_chainlock failed\n"));
1025 TALLOC_FREE(result
);
1029 result
->store
= db_ctdb_store
;
1030 result
->delete_rec
= db_ctdb_delete
;
1031 talloc_set_destructor(result
, db_ctdb_record_destr
);
1033 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1036 * See if we have a valid record and we are the dmaster. If so, we can
1037 * take the shortcut and just return it.
1040 if ((ctdb_data
.dptr
== NULL
) ||
1041 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
1042 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
1044 || (random() % 2 != 0)
1047 SAFE_FREE(ctdb_data
.dptr
);
1048 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
1049 talloc_set_destructor(result
, NULL
);
1051 migrate_attempts
+= 1;
1053 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1054 ctdb_data
.dptr
, ctdb_data
.dptr
?
1055 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
1058 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
1059 if (!NT_STATUS_IS_OK(status
)) {
1060 DEBUG(5, ("ctdb_migrate failed: %s\n",
1061 nt_errstr(status
)));
1062 TALLOC_FREE(result
);
1065 /* now its migrated, try again */
1069 if (migrate_attempts
> 10) {
1070 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1074 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1076 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1077 result
->value
.dptr
= NULL
;
1079 if ((result
->value
.dsize
!= 0)
1080 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
1081 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1082 result
->value
.dsize
))) {
1083 DEBUG(0, ("talloc failed\n"));
1084 TALLOC_FREE(result
);
1087 SAFE_FREE(ctdb_data
.dptr
);
1092 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1093 TALLOC_CTX
*mem_ctx
,
1096 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1097 struct db_ctdb_ctx
);
1099 if (ctx
->transaction
!= NULL
) {
1100 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1103 if (db
->persistent
) {
1104 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1107 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
1111 fetch (unlocked, no migration) operation on ctdb
1113 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
1114 TDB_DATA key
, TDB_DATA
*data
)
1116 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1117 struct db_ctdb_ctx
);
1121 if (ctx
->transaction
) {
1122 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
1125 /* try a direct fetch */
1126 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1129 * See if we have a valid record and we are the dmaster. If so, we can
1130 * take the shortcut and just return it.
1131 * we bypass the dmaster check for persistent databases
1133 if ((ctdb_data
.dptr
!= NULL
) &&
1134 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
1136 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
1137 /* we are the dmaster - avoid the ctdb protocol op */
1139 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
1140 if (data
->dsize
== 0) {
1141 SAFE_FREE(ctdb_data
.dptr
);
1146 data
->dptr
= (uint8
*)talloc_memdup(
1147 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1150 SAFE_FREE(ctdb_data
.dptr
);
1152 if (data
->dptr
== NULL
) {
1158 SAFE_FREE(ctdb_data
.dptr
);
1160 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1161 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
1162 if (!NT_STATUS_IS_OK(status
)) {
1163 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1170 struct traverse_state
{
1171 struct db_context
*db
;
1172 int (*fn
)(struct db_record
*rec
, void *private_data
);
1176 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1178 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1179 struct db_record
*rec
;
1180 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1181 /* we have to give them a locked record to prevent races */
1182 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1183 if (rec
&& rec
->value
.dsize
> 0) {
1184 state
->fn(rec
, state
->private_data
);
1186 talloc_free(tmp_ctx
);
1189 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1192 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1193 struct db_record
*rec
;
1194 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1196 /* we have to give them a locked record to prevent races */
1197 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1198 if (rec
&& rec
->value
.dsize
> 0) {
1199 ret
= state
->fn(rec
, state
->private_data
);
1201 talloc_free(tmp_ctx
);
1205 static int db_ctdb_traverse(struct db_context
*db
,
1206 int (*fn
)(struct db_record
*rec
,
1207 void *private_data
),
1210 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1211 struct db_ctdb_ctx
);
1212 struct traverse_state state
;
1216 state
.private_data
= private_data
;
1218 if (db
->persistent
) {
1219 /* for persistent databases we don't need to do a ctdb traverse,
1220 we can do a faster local traverse */
1221 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1225 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1229 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1231 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1234 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1236 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1239 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1241 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1242 struct db_record rec
;
1245 rec
.store
= db_ctdb_store_deny
;
1246 rec
.delete_rec
= db_ctdb_delete_deny
;
1247 rec
.private_data
= state
->db
;
1248 state
->fn(&rec
, state
->private_data
);
1251 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1254 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1255 struct db_record rec
;
1258 rec
.store
= db_ctdb_store_deny
;
1259 rec
.delete_rec
= db_ctdb_delete_deny
;
1260 rec
.private_data
= state
->db
;
1262 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1263 /* a deleted record */
1266 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1267 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1269 return state
->fn(&rec
, state
->private_data
);
1272 static int db_ctdb_traverse_read(struct db_context
*db
,
1273 int (*fn
)(struct db_record
*rec
,
1274 void *private_data
),
1277 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1278 struct db_ctdb_ctx
);
1279 struct traverse_state state
;
1283 state
.private_data
= private_data
;
1285 if (db
->persistent
) {
1286 /* for persistent databases we don't need to do a ctdb traverse,
1287 we can do a faster local traverse */
1288 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1291 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1295 static int db_ctdb_get_seqnum(struct db_context
*db
)
1297 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1298 struct db_ctdb_ctx
);
1299 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1302 static int db_ctdb_get_flags(struct db_context
*db
)
1304 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1305 struct db_ctdb_ctx
);
1306 return tdb_get_flags(ctx
->wtdb
->tdb
);
1309 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1311 int hash_size
, int tdb_flags
,
1312 int open_flags
, mode_t mode
)
1314 struct db_context
*result
;
1315 struct db_ctdb_ctx
*db_ctdb
;
1318 if (!lp_clustering()) {
1319 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1323 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1324 DEBUG(0, ("talloc failed\n"));
1325 TALLOC_FREE(result
);
1329 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1330 DEBUG(0, ("talloc failed\n"));
1331 TALLOC_FREE(result
);
1335 db_ctdb
->transaction
= NULL
;
1336 db_ctdb
->db
= result
;
1338 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name
, &db_ctdb
->db_id
, tdb_flags
))) {
1339 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1340 TALLOC_FREE(result
);
1344 db_path
= ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb
, db_ctdb
->db_id
);
1346 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1348 /* only pass through specific flags */
1349 tdb_flags
&= TDB_SEQNUM
;
1351 /* honor permissions if user has specified O_CREAT */
1352 if (open_flags
& O_CREAT
) {
1353 chmod(db_path
, mode
);
1356 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1357 if (db_ctdb
->wtdb
== NULL
) {
1358 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1359 TALLOC_FREE(result
);
1362 talloc_free(db_path
);
1364 result
->private_data
= (void *)db_ctdb
;
1365 result
->fetch_locked
= db_ctdb_fetch_locked
;
1366 result
->fetch
= db_ctdb_fetch
;
1367 result
->traverse
= db_ctdb_traverse
;
1368 result
->traverse_read
= db_ctdb_traverse_read
;
1369 result
->get_seqnum
= db_ctdb_get_seqnum
;
1370 result
->get_flags
= db_ctdb_get_flags
;
1371 result
->transaction_start
= db_ctdb_transaction_start
;
1372 result
->transaction_commit
= db_ctdb_transaction_commit
;
1373 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1375 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1376 name
, db_ctdb
->db_id
));