2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle
{
27 struct db_ctdb_ctx
*ctx
;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer
*m_all
;
35 struct ctdb_marshall_buffer
*m_write
;
41 struct db_context
*db
;
42 struct tdb_wrap
*wtdb
;
44 struct db_ctdb_transaction_handle
*transaction
;
48 struct db_ctdb_ctx
*ctdb_ctx
;
49 struct ctdb_ltdb_header header
;
52 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
57 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
60 enum TDB_ERROR tret
= tdb_error(tdb
);
64 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
67 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
70 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS
db_ctdb_ltdb_fetch(struct db_ctdb_ctx
*db
,
84 struct ctdb_ltdb_header
*header
,
91 rec
= tdb_fetch(db
->wtdb
->tdb
, key
);
92 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
93 status
= NT_STATUS_NOT_FOUND
;
98 header
->dmaster
= (uint32_t)-1;
105 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
109 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
110 if (data
->dsize
== 0) {
113 data
->dptr
= (unsigned char *)talloc_memdup(mem_ctx
,
115 + sizeof(struct ctdb_ltdb_header
),
117 if (data
->dptr
== NULL
) {
118 status
= NT_STATUS_NO_MEMORY
;
124 status
= NT_STATUS_OK
;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
137 struct ctdb_ltdb_header
*header
,
140 TALLOC_CTX
*tmp_ctx
= talloc_stackframe();
144 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
145 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
147 if (rec
.dptr
== NULL
) {
148 talloc_free(tmp_ctx
);
149 return NT_STATUS_NO_MEMORY
;
152 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
153 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
155 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
157 talloc_free(tmp_ctx
);
159 return (ret
== 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
170 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
172 struct ctdb_ltdb_header
*header
,
176 struct ctdb_rec_data
*d
;
178 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
179 data
.dsize
+ (header
?sizeof(*header
):0);
180 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
186 d
->keylen
= key
.dsize
;
187 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
189 d
->datalen
= data
.dsize
+ sizeof(*header
);
190 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
191 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
193 d
->datalen
= data
.dsize
;
194 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
202 struct ctdb_marshall_buffer
*m
,
206 struct ctdb_ltdb_header
*header
,
209 struct ctdb_rec_data
*r
;
210 size_t m_size
, r_size
;
211 struct ctdb_marshall_buffer
*m2
= NULL
;
213 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
220 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
221 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
228 m_size
= talloc_get_size(m
);
229 r_size
= talloc_get_size(r
);
231 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
232 mem_ctx
, m
, m_size
+ r_size
);
238 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
251 data
.dptr
= (uint8_t *)m
;
252 data
.dsize
= talloc_get_size(m
);
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
264 struct ctdb_ltdb_header
*header
,
265 TDB_DATA
*key
, TDB_DATA
*data
)
268 r
= (struct ctdb_rec_data
*)&m
->data
[0];
270 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
278 key
->dptr
= &r
->data
[0];
279 key
->dsize
= r
->keylen
;
282 data
->dptr
= &r
->data
[r
->keylen
];
283 data
->dsize
= r
->datalen
;
284 if (header
!= NULL
) {
285 data
->dptr
+= sizeof(*header
);
286 data
->dsize
-= sizeof(*header
);
290 if (header
!= NULL
) {
291 if (r
->datalen
< sizeof(*header
)) {
294 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
301 static int32_t db_ctdb_transaction_active(uint32_t db_id
)
307 indata
.dptr
= (uint8_t *)&db_id
;
308 indata
.dsize
= sizeof(db_id
);
310 ret
= ctdbd_control_local(messaging_ctdbd_connection(),
311 CTDB_CONTROL_TRANS2_ACTIVE
, 0, 0,
312 indata
, NULL
, NULL
, &status
);
314 if (!NT_STATUS_IS_OK(ret
)) {
315 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
324 * CTDB transaction destructor
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
328 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
333 * start a transaction on a ctdb database:
334 * - lock the transaction lock key
335 * - start the tdb transaction
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle
*h
)
339 struct db_record
*rh
;
340 struct db_ctdb_rec
*crec
;
343 const char *keyname
= CTDB_TRANSACTION_LOCK_KEY
;
345 struct db_ctdb_ctx
*ctx
= h
->ctx
;
349 struct ctdb_ltdb_header header
;
351 key
.dptr
= (uint8_t *)discard_const(keyname
);
352 key
.dsize
= strlen(keyname
);
355 tmp_ctx
= talloc_new(h
);
357 rh
= fetch_locked_internal(ctx
, tmp_ctx
, key
, true);
359 DEBUG(0,(__location__
" Failed to fetch_lock database\n"));
360 talloc_free(tmp_ctx
);
363 crec
= talloc_get_type_abort(rh
->private_data
, struct db_ctdb_rec
);
366 * store the pid in the database:
367 * it is not enought that the node is dmaster...
370 data
.dptr
= (unsigned char *)&pid
;
371 data
.dsize
= sizeof(pid_t
);
372 status
= db_ctdb_ltdb_store(ctx
, key
, &(crec
->header
), data
);
373 if (!NT_STATUS_IS_OK(status
)) {
374 DEBUG(0, (__location__
" Failed to store pid in transaction "
375 "record: %s\n", nt_errstr(status
)));
376 talloc_free(tmp_ctx
);
382 ret
= tdb_transaction_start(ctx
->wtdb
->tdb
);
384 DEBUG(0,(__location__
" Failed to start tdb transaction\n"));
385 talloc_free(tmp_ctx
);
389 status
= db_ctdb_ltdb_fetch(ctx
, key
, &header
, tmp_ctx
, &data
);
390 if (!NT_STATUS_IS_OK(status
) || header
.dmaster
!= get_my_vnn()) {
391 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
392 talloc_free(tmp_ctx
);
396 if ((data
.dsize
!= sizeof(pid_t
)) || (*(pid_t
*)(data
.dptr
) != pid
)) {
397 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
398 talloc_free(tmp_ctx
);
402 talloc_free(tmp_ctx
);
409 * CTDB dbwrap API: transaction_start function
410 * starts a transaction on a persistent database
412 static int db_ctdb_transaction_start(struct db_context
*db
)
414 struct db_ctdb_transaction_handle
*h
;
416 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
419 if (!db
->persistent
) {
420 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
425 if (ctx
->transaction
) {
426 ctx
->transaction
->nesting
++;
430 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
432 DEBUG(0,(__location__
" oom for transaction handle\n"));
438 ret
= db_ctdb_transaction_fetch_start(h
);
444 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
446 ctx
->transaction
= h
;
448 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
456 fetch a record inside a transaction
458 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
460 TDB_DATA key
, TDB_DATA
*data
)
462 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
465 status
= db_ctdb_ltdb_fetch(h
->ctx
, key
, NULL
, mem_ctx
, data
);
467 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
469 } else if (!NT_STATUS_IS_OK(status
)) {
474 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
, NULL
, *data
);
475 if (h
->m_all
== NULL
) {
476 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
478 talloc_free(data
->dptr
);
487 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
488 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
490 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
494 struct db_record
*result
;
497 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
498 DEBUG(0, ("talloc failed\n"));
502 result
->private_data
= ctx
->transaction
;
504 result
->key
.dsize
= key
.dsize
;
505 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
506 if (result
->key
.dptr
== NULL
) {
507 DEBUG(0, ("talloc failed\n"));
512 result
->store
= db_ctdb_store_transaction
;
513 result
->delete_rec
= db_ctdb_delete_transaction
;
515 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
516 if (ctdb_data
.dptr
== NULL
) {
517 /* create the record */
518 result
->value
= tdb_null
;
522 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
523 result
->value
.dptr
= NULL
;
525 if ((result
->value
.dsize
!= 0)
526 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
527 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
528 result
->value
.dsize
))) {
529 DEBUG(0, ("talloc failed\n"));
533 SAFE_FREE(ctdb_data
.dptr
);
538 static int db_ctdb_record_destructor(struct db_record
**recp
)
540 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
541 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
542 rec
->private_data
, struct db_ctdb_transaction_handle
);
543 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
545 DEBUG(0,(__location__
" transaction_commit failed\n"));
551 auto-create a transaction for persistent databases
553 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
558 struct db_record
*rec
, **recp
;
560 res
= db_ctdb_transaction_start(ctx
->db
);
565 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
567 ctx
->db
->transaction_cancel(ctx
->db
);
571 /* destroy this transaction when we release the lock */
572 recp
= talloc(rec
, struct db_record
*);
574 ctx
->db
->transaction_cancel(ctx
->db
);
579 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
585 stores a record inside a transaction
587 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
588 TDB_DATA key
, TDB_DATA data
)
590 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
593 struct ctdb_ltdb_header header
;
596 /* we need the header so we can update the RSN */
597 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
598 if (rec
.dptr
== NULL
) {
599 /* the record doesn't exist - create one with us as dmaster.
600 This is only safe because we are in a transaction and this
601 is a persistent database */
604 memcpy(&header
, rec
.dptr
, sizeof(struct ctdb_ltdb_header
));
605 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
606 /* a special case, we are writing the same data that is there now */
607 if (data
.dsize
== rec
.dsize
&&
608 memcmp(data
.dptr
, rec
.dptr
+ sizeof(struct ctdb_ltdb_header
), data
.dsize
) == 0) {
610 talloc_free(tmp_ctx
);
616 header
.dmaster
= get_my_vnn();
620 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
, NULL
, data
);
621 if (h
->m_all
== NULL
) {
622 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
623 talloc_free(tmp_ctx
);
628 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
629 if (h
->m_write
== NULL
) {
630 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
631 talloc_free(tmp_ctx
);
635 status
= db_ctdb_ltdb_store(h
->ctx
, key
, &header
, data
);
636 if (NT_STATUS_IS_OK(status
)) {
642 talloc_free(tmp_ctx
);
649 a record store inside a transaction
651 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
653 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
654 rec
->private_data
, struct db_ctdb_transaction_handle
);
657 ret
= db_ctdb_transaction_store(h
, rec
->key
, data
);
659 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
665 a record delete inside a transaction
667 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
669 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
670 rec
->private_data
, struct db_ctdb_transaction_handle
);
673 ret
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
675 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
684 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle
*h
)
687 struct ctdb_rec_data
*rec
= NULL
;
690 talloc_free(h
->m_write
);
693 ret
= db_ctdb_transaction_fetch_start(h
);
698 for (i
=0;i
<h
->m_all
->count
;i
++) {
701 rec
= db_ctdb_marshall_loop_next(h
->m_all
, rec
, NULL
, NULL
, &key
, &data
);
703 DEBUG(0, (__location__
" Out of records in ctdb_replay_transaction?\n"));
707 if (rec
->reqid
== 0) {
709 if (db_ctdb_transaction_store(h
, key
, data
) != 0) {
714 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
716 if (db_ctdb_transaction_fetch(h
->ctx
, tmp_ctx
, key
, &data2
) != 0) {
717 talloc_free(tmp_ctx
);
720 if (data2
.dsize
!= data
.dsize
||
721 memcmp(data2
.dptr
, data
.dptr
, data
.dsize
) != 0) {
722 /* the record has changed on us - we have to give up */
723 talloc_free(tmp_ctx
);
726 talloc_free(tmp_ctx
);
733 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
741 static int db_ctdb_transaction_commit(struct db_context
*db
)
743 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
749 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
750 enum ctdb_controls failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
753 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
757 if (h
->nested_cancel
) {
758 db
->transaction_cancel(db
);
759 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
763 if (h
->nesting
!= 0) {
768 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
770 talloc_set_destructor(h
, NULL
);
772 /* our commit strategy is quite complex.
774 - we first try to commit the changes to all other nodes
776 - if that works, then we commit locally and we are done
778 - if a commit on another node fails, then we need to cancel
779 the transaction, then restart the transaction (thus
780 opening a window of time for a pending recovery to
781 complete), then replay the transaction, checking all the
782 reads and writes (checking that reads give the same data,
783 and writes succeed). Then we retry the transaction to the
788 if (h
->m_write
== NULL
) {
789 /* no changes were made, potentially after a retry */
790 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
792 ctx
->transaction
= NULL
;
796 /* tell ctdbd to commit to the other nodes */
797 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
798 retries
==0?CTDB_CONTROL_TRANS2_COMMIT
:CTDB_CONTROL_TRANS2_COMMIT_RETRY
,
800 db_ctdb_marshall_finish(h
->m_write
), NULL
, NULL
, &status
);
801 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
802 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
805 if (!NT_STATUS_IS_OK(rets
)) {
806 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
808 /* work out what error code we will give if we
809 have to fail the operation */
810 switch ((enum ctdb_trans2_commit_error
)status
) {
811 case CTDB_TRANS2_COMMIT_SUCCESS
:
812 case CTDB_TRANS2_COMMIT_SOMEFAIL
:
813 case CTDB_TRANS2_COMMIT_TIMEOUT
:
814 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
816 case CTDB_TRANS2_COMMIT_ALLFAIL
:
817 failure_control
= CTDB_CONTROL_TRANS2_FINISHED
;
822 if (++retries
== 5) {
823 DEBUG(0,(__location__
" Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
824 h
->ctx
->db_id
, retries
, (unsigned)failure_control
));
825 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
826 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
827 tdb_null
, NULL
, NULL
, NULL
);
828 h
->ctx
->transaction
= NULL
;
830 ctx
->transaction
= NULL
;
834 if (ctdb_replay_transaction(h
) != 0) {
835 DEBUG(0,(__location__
" Failed to replay transaction failure_control=%u\n",
836 (unsigned)failure_control
));
837 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
838 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
839 tdb_null
, NULL
, NULL
, NULL
);
840 h
->ctx
->transaction
= NULL
;
842 ctx
->transaction
= NULL
;
847 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
850 /* do the real commit locally */
851 ret
= tdb_transaction_commit(h
->ctx
->wtdb
->tdb
);
853 DEBUG(0,(__location__
" Failed to commit transaction failure_control=%u\n",
854 (unsigned)failure_control
));
855 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
, h
->ctx
->db_id
,
856 CTDB_CTRL_FLAG_NOREPLY
, tdb_null
, NULL
, NULL
, NULL
);
857 h
->ctx
->transaction
= NULL
;
862 /* tell ctdbd that we are finished with our local commit */
863 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED
,
864 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
865 tdb_null
, NULL
, NULL
, NULL
);
866 h
->ctx
->transaction
= NULL
;
875 static int db_ctdb_transaction_cancel(struct db_context
*db
)
877 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
879 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
882 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
886 if (h
->nesting
!= 0) {
888 h
->nested_cancel
= true;
892 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
894 ctx
->transaction
= NULL
;
900 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
902 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
903 rec
->private_data
, struct db_ctdb_rec
);
905 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
910 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
915 * We have to store the header with empty data. TODO: Fix the
921 return db_ctdb_store(rec
, data
, 0);
925 static int db_ctdb_record_destr(struct db_record
* data
)
927 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
928 data
->private_data
, struct db_ctdb_rec
);
930 DEBUG(10, (DEBUGLEVEL
> 10
931 ? "Unlocking db %u key %s\n"
932 : "Unlocking db %u key %.20s\n",
933 (int)crec
->ctdb_ctx
->db_id
,
934 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
937 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
938 DEBUG(0, ("tdb_chainunlock failed\n"));
945 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
950 struct db_record
*result
;
951 struct db_ctdb_rec
*crec
;
954 int migrate_attempts
= 0;
956 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
957 DEBUG(0, ("talloc failed\n"));
961 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
962 DEBUG(0, ("talloc failed\n"));
967 result
->private_data
= (void *)crec
;
968 crec
->ctdb_ctx
= ctx
;
970 result
->key
.dsize
= key
.dsize
;
971 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
972 if (result
->key
.dptr
== NULL
) {
973 DEBUG(0, ("talloc failed\n"));
979 * Do a blocking lock on the record
983 if (DEBUGLEVEL
>= 10) {
984 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
985 DEBUG(10, (DEBUGLEVEL
> 10
986 ? "Locking db %u key %s\n"
987 : "Locking db %u key %.20s\n",
988 (int)crec
->ctdb_ctx
->db_id
, keystr
));
992 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
993 DEBUG(3, ("tdb_chainlock failed\n"));
998 result
->store
= db_ctdb_store
;
999 result
->delete_rec
= db_ctdb_delete
;
1000 talloc_set_destructor(result
, db_ctdb_record_destr
);
1002 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1005 * See if we have a valid record and we are the dmaster. If so, we can
1006 * take the shortcut and just return it.
1009 if ((ctdb_data
.dptr
== NULL
) ||
1010 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
1011 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
1013 || (random() % 2 != 0)
1016 SAFE_FREE(ctdb_data
.dptr
);
1017 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
1018 talloc_set_destructor(result
, NULL
);
1020 migrate_attempts
+= 1;
1022 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1023 ctdb_data
.dptr
, ctdb_data
.dptr
?
1024 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
1027 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
1028 if (!NT_STATUS_IS_OK(status
)) {
1029 DEBUG(5, ("ctdb_migrate failed: %s\n",
1030 nt_errstr(status
)));
1031 TALLOC_FREE(result
);
1034 /* now its migrated, try again */
1038 if (migrate_attempts
> 10) {
1039 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1043 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1045 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1046 result
->value
.dptr
= NULL
;
1048 if ((result
->value
.dsize
!= 0)
1049 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
1050 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1051 result
->value
.dsize
))) {
1052 DEBUG(0, ("talloc failed\n"));
1053 TALLOC_FREE(result
);
1056 SAFE_FREE(ctdb_data
.dptr
);
1061 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1062 TALLOC_CTX
*mem_ctx
,
1065 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1066 struct db_ctdb_ctx
);
1068 if (ctx
->transaction
!= NULL
) {
1069 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1072 if (db
->persistent
) {
1073 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1076 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
1080 fetch (unlocked, no migration) operation on ctdb
1082 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
1083 TDB_DATA key
, TDB_DATA
*data
)
1085 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1086 struct db_ctdb_ctx
);
1090 if (ctx
->transaction
) {
1091 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
1094 /* try a direct fetch */
1095 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1098 * See if we have a valid record and we are the dmaster. If so, we can
1099 * take the shortcut and just return it.
1100 * we bypass the dmaster check for persistent databases
1102 if ((ctdb_data
.dptr
!= NULL
) &&
1103 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
1105 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
1106 /* we are the dmaster - avoid the ctdb protocol op */
1108 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
1109 if (data
->dsize
== 0) {
1110 SAFE_FREE(ctdb_data
.dptr
);
1115 data
->dptr
= (uint8
*)talloc_memdup(
1116 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1119 SAFE_FREE(ctdb_data
.dptr
);
1121 if (data
->dptr
== NULL
) {
1127 SAFE_FREE(ctdb_data
.dptr
);
1129 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1130 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
1131 if (!NT_STATUS_IS_OK(status
)) {
1132 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1139 struct traverse_state
{
1140 struct db_context
*db
;
1141 int (*fn
)(struct db_record
*rec
, void *private_data
);
1145 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1147 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1148 struct db_record
*rec
;
1149 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1150 /* we have to give them a locked record to prevent races */
1151 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1152 if (rec
&& rec
->value
.dsize
> 0) {
1153 state
->fn(rec
, state
->private_data
);
1155 talloc_free(tmp_ctx
);
1158 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1161 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1162 struct db_record
*rec
;
1163 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1165 /* we have to give them a locked record to prevent races */
1166 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1167 if (rec
&& rec
->value
.dsize
> 0) {
1168 ret
= state
->fn(rec
, state
->private_data
);
1170 talloc_free(tmp_ctx
);
1174 static int db_ctdb_traverse(struct db_context
*db
,
1175 int (*fn
)(struct db_record
*rec
,
1176 void *private_data
),
1179 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1180 struct db_ctdb_ctx
);
1181 struct traverse_state state
;
1185 state
.private_data
= private_data
;
1187 if (db
->persistent
) {
1188 /* for persistent databases we don't need to do a ctdb traverse,
1189 we can do a faster local traverse */
1190 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1194 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1198 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1200 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1203 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1205 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1208 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1210 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1211 struct db_record rec
;
1214 rec
.store
= db_ctdb_store_deny
;
1215 rec
.delete_rec
= db_ctdb_delete_deny
;
1216 rec
.private_data
= state
->db
;
1217 state
->fn(&rec
, state
->private_data
);
1220 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1223 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1224 struct db_record rec
;
1227 rec
.store
= db_ctdb_store_deny
;
1228 rec
.delete_rec
= db_ctdb_delete_deny
;
1229 rec
.private_data
= state
->db
;
1231 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1232 /* a deleted record */
1235 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1236 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1238 return state
->fn(&rec
, state
->private_data
);
1241 static int db_ctdb_traverse_read(struct db_context
*db
,
1242 int (*fn
)(struct db_record
*rec
,
1243 void *private_data
),
1246 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1247 struct db_ctdb_ctx
);
1248 struct traverse_state state
;
1252 state
.private_data
= private_data
;
1254 if (db
->persistent
) {
1255 /* for persistent databases we don't need to do a ctdb traverse,
1256 we can do a faster local traverse */
1257 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1260 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1264 static int db_ctdb_get_seqnum(struct db_context
*db
)
1266 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1267 struct db_ctdb_ctx
);
1268 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1271 static int db_ctdb_get_flags(struct db_context
*db
)
1273 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1274 struct db_ctdb_ctx
);
1275 return tdb_get_flags(ctx
->wtdb
->tdb
);
1278 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1280 int hash_size
, int tdb_flags
,
1281 int open_flags
, mode_t mode
)
1283 struct db_context
*result
;
1284 struct db_ctdb_ctx
*db_ctdb
;
1287 if (!lp_clustering()) {
1288 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1292 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1293 DEBUG(0, ("talloc failed\n"));
1294 TALLOC_FREE(result
);
1298 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1299 DEBUG(0, ("talloc failed\n"));
1300 TALLOC_FREE(result
);
1304 db_ctdb
->transaction
= NULL
;
1305 db_ctdb
->db
= result
;
1307 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name
, &db_ctdb
->db_id
, tdb_flags
))) {
1308 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1309 TALLOC_FREE(result
);
1313 db_path
= ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb
, db_ctdb
->db_id
);
1315 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1317 /* only pass through specific flags */
1318 tdb_flags
&= TDB_SEQNUM
;
1320 /* honor permissions if user has specified O_CREAT */
1321 if (open_flags
& O_CREAT
) {
1322 chmod(db_path
, mode
);
1325 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1326 if (db_ctdb
->wtdb
== NULL
) {
1327 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1328 TALLOC_FREE(result
);
1331 talloc_free(db_path
);
1333 result
->private_data
= (void *)db_ctdb
;
1334 result
->fetch_locked
= db_ctdb_fetch_locked
;
1335 result
->fetch
= db_ctdb_fetch
;
1336 result
->traverse
= db_ctdb_traverse
;
1337 result
->traverse_read
= db_ctdb_traverse_read
;
1338 result
->get_seqnum
= db_ctdb_get_seqnum
;
1339 result
->get_flags
= db_ctdb_get_flags
;
1340 result
->transaction_start
= db_ctdb_transaction_start
;
1341 result
->transaction_commit
= db_ctdb_transaction_commit
;
1342 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1344 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1345 name
, db_ctdb
->db_id
));