2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle
{
27 struct db_ctdb_ctx
*ctx
;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer
*m_all
;
35 struct ctdb_marshall_buffer
*m_write
;
41 struct db_context
*db
;
42 struct tdb_wrap
*wtdb
;
44 struct db_ctdb_transaction_handle
*transaction
;
48 struct db_ctdb_ctx
*ctdb_ctx
;
49 struct ctdb_ltdb_header header
;
52 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
57 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
60 enum TDB_ERROR tret
= tdb_error(tdb
);
64 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
67 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
70 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS
db_ctdb_ltdb_fetch(struct db_ctdb_ctx
*db
,
84 struct ctdb_ltdb_header
*header
,
91 rec
= tdb_fetch(db
->wtdb
->tdb
, key
);
92 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
93 status
= NT_STATUS_NOT_FOUND
;
98 header
->dmaster
= (uint32_t)-1;
105 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
109 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
110 if (data
->dsize
== 0) {
113 data
->dptr
= (unsigned char *)talloc_memdup(mem_ctx
,
115 + sizeof(struct ctdb_ltdb_header
),
117 if (data
->dptr
== NULL
) {
118 status
= NT_STATUS_NO_MEMORY
;
124 status
= NT_STATUS_OK
;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
137 struct ctdb_ltdb_header
*header
,
140 TALLOC_CTX
*tmp_ctx
= talloc_stackframe();
144 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
145 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
147 if (rec
.dptr
== NULL
) {
148 talloc_free(tmp_ctx
);
149 return NT_STATUS_NO_MEMORY
;
152 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
153 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
155 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
157 talloc_free(tmp_ctx
);
159 return (ret
== 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
170 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
172 struct ctdb_ltdb_header
*header
,
176 struct ctdb_rec_data
*d
;
178 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
179 data
.dsize
+ (header
?sizeof(*header
):0);
180 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
186 d
->keylen
= key
.dsize
;
187 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
189 d
->datalen
= data
.dsize
+ sizeof(*header
);
190 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
191 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
193 d
->datalen
= data
.dsize
;
194 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
202 struct ctdb_marshall_buffer
*m
,
206 struct ctdb_ltdb_header
*header
,
209 struct ctdb_rec_data
*r
;
210 size_t m_size
, r_size
;
211 struct ctdb_marshall_buffer
*m2
= NULL
;
213 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
220 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
221 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
228 m_size
= talloc_get_size(m
);
229 r_size
= talloc_get_size(r
);
231 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
232 mem_ctx
, m
, m_size
+ r_size
);
238 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
251 data
.dptr
= (uint8_t *)m
;
252 data
.dsize
= talloc_get_size(m
);
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
264 struct ctdb_ltdb_header
*header
,
265 TDB_DATA
*key
, TDB_DATA
*data
)
268 r
= (struct ctdb_rec_data
*)&m
->data
[0];
270 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
278 key
->dptr
= &r
->data
[0];
279 key
->dsize
= r
->keylen
;
282 data
->dptr
= &r
->data
[r
->keylen
];
283 data
->dsize
= r
->datalen
;
284 if (header
!= NULL
) {
285 data
->dptr
+= sizeof(*header
);
286 data
->dsize
-= sizeof(*header
);
290 if (header
!= NULL
) {
291 if (r
->datalen
< sizeof(*header
)) {
294 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
303 * CTDB transaction destructor
305 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
307 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
312 * start a transaction on a ctdb database:
313 * - lock the transaction lock key
314 * - start the tdb transaction
316 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle
*h
)
318 struct db_record
*rh
;
319 struct db_ctdb_rec
*crec
;
322 const char *keyname
= CTDB_TRANSACTION_LOCK_KEY
;
324 struct db_ctdb_ctx
*ctx
= h
->ctx
;
328 struct ctdb_ltdb_header header
;
330 key
.dptr
= (uint8_t *)discard_const(keyname
);
331 key
.dsize
= strlen(keyname
);
334 tmp_ctx
= talloc_new(h
);
336 rh
= fetch_locked_internal(ctx
, tmp_ctx
, key
, true);
338 DEBUG(0,(__location__
" Failed to fetch_lock database\n"));
339 talloc_free(tmp_ctx
);
342 crec
= talloc_get_type_abort(rh
->private_data
, struct db_ctdb_rec
);
345 * store the pid in the database:
346 * it is not enought that the node is dmaster...
349 data
.dptr
= (unsigned char *)&pid
;
350 data
.dsize
= sizeof(pid_t
);
351 status
= db_ctdb_ltdb_store(ctx
, key
, &(crec
->header
), data
);
352 if (!NT_STATUS_IS_OK(status
)) {
353 DEBUG(0, (__location__
" Failed to store pid in transaction "
354 "record: %s\n", nt_errstr(status
)));
355 talloc_free(tmp_ctx
);
361 ret
= tdb_transaction_start(ctx
->wtdb
->tdb
);
363 DEBUG(0,(__location__
" Failed to start tdb transaction\n"));
364 talloc_free(tmp_ctx
);
368 status
= db_ctdb_ltdb_fetch(ctx
, key
, &header
, tmp_ctx
, &data
);
369 if (!NT_STATUS_IS_OK(status
) || header
.dmaster
!= get_my_vnn()) {
370 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
371 talloc_free(tmp_ctx
);
375 if ((data
.dsize
!= sizeof(pid_t
)) || (*(pid_t
*)(data
.dptr
) != pid
)) {
376 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
377 talloc_free(tmp_ctx
);
381 talloc_free(tmp_ctx
);
388 * CTDB dbwrap API: transaction_start function
389 * starts a transaction on a persistent database
391 static int db_ctdb_transaction_start(struct db_context
*db
)
393 struct db_ctdb_transaction_handle
*h
;
395 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
398 if (!db
->persistent
) {
399 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
404 if (ctx
->transaction
) {
405 ctx
->transaction
->nesting
++;
409 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
411 DEBUG(0,(__location__
" oom for transaction handle\n"));
417 ret
= db_ctdb_transaction_fetch_start(h
);
423 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
425 ctx
->transaction
= h
;
427 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
435 fetch a record inside a transaction
437 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
439 TDB_DATA key
, TDB_DATA
*data
)
441 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
444 status
= db_ctdb_ltdb_fetch(h
->ctx
, key
, NULL
, mem_ctx
, data
);
446 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
448 } else if (!NT_STATUS_IS_OK(status
)) {
453 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
, NULL
, *data
);
454 if (h
->m_all
== NULL
) {
455 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
457 talloc_free(data
->dptr
);
466 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
467 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
469 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
473 struct db_record
*result
;
476 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
477 DEBUG(0, ("talloc failed\n"));
481 result
->private_data
= ctx
->transaction
;
483 result
->key
.dsize
= key
.dsize
;
484 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
485 if (result
->key
.dptr
== NULL
) {
486 DEBUG(0, ("talloc failed\n"));
491 result
->store
= db_ctdb_store_transaction
;
492 result
->delete_rec
= db_ctdb_delete_transaction
;
494 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
495 if (ctdb_data
.dptr
== NULL
) {
496 /* create the record */
497 result
->value
= tdb_null
;
501 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
502 result
->value
.dptr
= NULL
;
504 if ((result
->value
.dsize
!= 0)
505 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
506 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
507 result
->value
.dsize
))) {
508 DEBUG(0, ("talloc failed\n"));
512 SAFE_FREE(ctdb_data
.dptr
);
517 static int db_ctdb_record_destructor(struct db_record
**recp
)
519 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
520 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
521 rec
->private_data
, struct db_ctdb_transaction_handle
);
522 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
524 DEBUG(0,(__location__
" transaction_commit failed\n"));
530 auto-create a transaction for persistent databases
532 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
537 struct db_record
*rec
, **recp
;
539 res
= db_ctdb_transaction_start(ctx
->db
);
544 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
546 ctx
->db
->transaction_cancel(ctx
->db
);
550 /* destroy this transaction when we release the lock */
551 recp
= talloc(rec
, struct db_record
*);
553 ctx
->db
->transaction_cancel(ctx
->db
);
558 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
564 stores a record inside a transaction
566 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
567 TDB_DATA key
, TDB_DATA data
)
569 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
572 struct ctdb_ltdb_header header
;
575 /* we need the header so we can update the RSN */
576 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
577 if (rec
.dptr
== NULL
) {
578 /* the record doesn't exist - create one with us as dmaster.
579 This is only safe because we are in a transaction and this
580 is a persistent database */
583 memcpy(&header
, rec
.dptr
, sizeof(struct ctdb_ltdb_header
));
584 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
585 /* a special case, we are writing the same data that is there now */
586 if (data
.dsize
== rec
.dsize
&&
587 memcmp(data
.dptr
, rec
.dptr
+ sizeof(struct ctdb_ltdb_header
), data
.dsize
) == 0) {
589 talloc_free(tmp_ctx
);
595 header
.dmaster
= get_my_vnn();
599 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
, NULL
, data
);
600 if (h
->m_all
== NULL
) {
601 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
602 talloc_free(tmp_ctx
);
607 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
608 if (h
->m_write
== NULL
) {
609 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
610 talloc_free(tmp_ctx
);
614 status
= db_ctdb_ltdb_store(h
->ctx
, key
, &header
, data
);
615 if (NT_STATUS_IS_OK(status
)) {
621 talloc_free(tmp_ctx
);
628 a record store inside a transaction
630 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
632 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
633 rec
->private_data
, struct db_ctdb_transaction_handle
);
636 ret
= db_ctdb_transaction_store(h
, rec
->key
, data
);
638 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
644 a record delete inside a transaction
646 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
648 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
649 rec
->private_data
, struct db_ctdb_transaction_handle
);
652 ret
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
654 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
663 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle
*h
)
666 struct ctdb_rec_data
*rec
= NULL
;
669 talloc_free(h
->m_write
);
672 ret
= db_ctdb_transaction_fetch_start(h
);
677 for (i
=0;i
<h
->m_all
->count
;i
++) {
680 rec
= db_ctdb_marshall_loop_next(h
->m_all
, rec
, NULL
, NULL
, &key
, &data
);
682 DEBUG(0, (__location__
" Out of records in ctdb_replay_transaction?\n"));
686 if (rec
->reqid
== 0) {
688 if (db_ctdb_transaction_store(h
, key
, data
) != 0) {
693 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
695 if (db_ctdb_transaction_fetch(h
->ctx
, tmp_ctx
, key
, &data2
) != 0) {
696 talloc_free(tmp_ctx
);
699 if (data2
.dsize
!= data
.dsize
||
700 memcmp(data2
.dptr
, data
.dptr
, data
.dsize
) != 0) {
701 /* the record has changed on us - we have to give up */
702 talloc_free(tmp_ctx
);
705 talloc_free(tmp_ctx
);
712 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
720 static int db_ctdb_transaction_commit(struct db_context
*db
)
722 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
728 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
729 enum ctdb_controls failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
732 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
736 if (h
->nested_cancel
) {
737 db
->transaction_cancel(db
);
738 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
742 if (h
->nesting
!= 0) {
747 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
749 talloc_set_destructor(h
, NULL
);
751 /* our commit strategy is quite complex.
753 - we first try to commit the changes to all other nodes
755 - if that works, then we commit locally and we are done
757 - if a commit on another node fails, then we need to cancel
758 the transaction, then restart the transaction (thus
759 opening a window of time for a pending recovery to
760 complete), then replay the transaction, checking all the
761 reads and writes (checking that reads give the same data,
762 and writes succeed). Then we retry the transaction to the
767 if (h
->m_write
== NULL
) {
768 /* no changes were made, potentially after a retry */
769 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
771 ctx
->transaction
= NULL
;
775 /* tell ctdbd to commit to the other nodes */
776 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
777 retries
==0?CTDB_CONTROL_TRANS2_COMMIT
:CTDB_CONTROL_TRANS2_COMMIT_RETRY
,
779 db_ctdb_marshall_finish(h
->m_write
), NULL
, NULL
, &status
);
780 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
781 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
784 if (!NT_STATUS_IS_OK(rets
)) {
785 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
787 /* work out what error code we will give if we
788 have to fail the operation */
789 switch ((enum ctdb_trans2_commit_error
)status
) {
790 case CTDB_TRANS2_COMMIT_SUCCESS
:
791 case CTDB_TRANS2_COMMIT_SOMEFAIL
:
792 case CTDB_TRANS2_COMMIT_TIMEOUT
:
793 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
795 case CTDB_TRANS2_COMMIT_ALLFAIL
:
796 failure_control
= CTDB_CONTROL_TRANS2_FINISHED
;
801 if (++retries
== 5) {
802 DEBUG(0,(__location__
" Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
803 h
->ctx
->db_id
, retries
, (unsigned)failure_control
));
804 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
805 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
806 tdb_null
, NULL
, NULL
, NULL
);
807 h
->ctx
->transaction
= NULL
;
809 ctx
->transaction
= NULL
;
813 if (ctdb_replay_transaction(h
) != 0) {
814 DEBUG(0,(__location__
" Failed to replay transaction failure_control=%u\n",
815 (unsigned)failure_control
));
816 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
817 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
818 tdb_null
, NULL
, NULL
, NULL
);
819 h
->ctx
->transaction
= NULL
;
821 ctx
->transaction
= NULL
;
826 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
829 /* do the real commit locally */
830 ret
= tdb_transaction_commit(h
->ctx
->wtdb
->tdb
);
832 DEBUG(0,(__location__
" Failed to commit transaction failure_control=%u\n",
833 (unsigned)failure_control
));
834 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
, h
->ctx
->db_id
,
835 CTDB_CTRL_FLAG_NOREPLY
, tdb_null
, NULL
, NULL
, NULL
);
836 h
->ctx
->transaction
= NULL
;
841 /* tell ctdbd that we are finished with our local commit */
842 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED
,
843 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
844 tdb_null
, NULL
, NULL
, NULL
);
845 h
->ctx
->transaction
= NULL
;
854 static int db_ctdb_transaction_cancel(struct db_context
*db
)
856 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
858 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
861 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
865 if (h
->nesting
!= 0) {
867 h
->nested_cancel
= true;
871 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
873 ctx
->transaction
= NULL
;
879 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
881 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
882 rec
->private_data
, struct db_ctdb_rec
);
884 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
889 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
894 * We have to store the header with empty data. TODO: Fix the
900 return db_ctdb_store(rec
, data
, 0);
904 static int db_ctdb_record_destr(struct db_record
* data
)
906 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
907 data
->private_data
, struct db_ctdb_rec
);
909 DEBUG(10, (DEBUGLEVEL
> 10
910 ? "Unlocking db %u key %s\n"
911 : "Unlocking db %u key %.20s\n",
912 (int)crec
->ctdb_ctx
->db_id
,
913 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
916 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
917 DEBUG(0, ("tdb_chainunlock failed\n"));
924 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
929 struct db_record
*result
;
930 struct db_ctdb_rec
*crec
;
933 int migrate_attempts
= 0;
935 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
936 DEBUG(0, ("talloc failed\n"));
940 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
941 DEBUG(0, ("talloc failed\n"));
946 result
->private_data
= (void *)crec
;
947 crec
->ctdb_ctx
= ctx
;
949 result
->key
.dsize
= key
.dsize
;
950 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
951 if (result
->key
.dptr
== NULL
) {
952 DEBUG(0, ("talloc failed\n"));
958 * Do a blocking lock on the record
962 if (DEBUGLEVEL
>= 10) {
963 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
964 DEBUG(10, (DEBUGLEVEL
> 10
965 ? "Locking db %u key %s\n"
966 : "Locking db %u key %.20s\n",
967 (int)crec
->ctdb_ctx
->db_id
, keystr
));
971 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
972 DEBUG(3, ("tdb_chainlock failed\n"));
977 result
->store
= db_ctdb_store
;
978 result
->delete_rec
= db_ctdb_delete
;
979 talloc_set_destructor(result
, db_ctdb_record_destr
);
981 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
984 * See if we have a valid record and we are the dmaster. If so, we can
985 * take the shortcut and just return it.
988 if ((ctdb_data
.dptr
== NULL
) ||
989 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
990 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
992 || (random() % 2 != 0)
995 SAFE_FREE(ctdb_data
.dptr
);
996 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
997 talloc_set_destructor(result
, NULL
);
999 migrate_attempts
+= 1;
1001 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1002 ctdb_data
.dptr
, ctdb_data
.dptr
?
1003 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
1006 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
1007 if (!NT_STATUS_IS_OK(status
)) {
1008 DEBUG(5, ("ctdb_migrate failed: %s\n",
1009 nt_errstr(status
)));
1010 TALLOC_FREE(result
);
1013 /* now its migrated, try again */
1017 if (migrate_attempts
> 10) {
1018 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1022 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1024 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1025 result
->value
.dptr
= NULL
;
1027 if ((result
->value
.dsize
!= 0)
1028 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
1029 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1030 result
->value
.dsize
))) {
1031 DEBUG(0, ("talloc failed\n"));
1032 TALLOC_FREE(result
);
1035 SAFE_FREE(ctdb_data
.dptr
);
1040 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1041 TALLOC_CTX
*mem_ctx
,
1044 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1045 struct db_ctdb_ctx
);
1047 if (ctx
->transaction
!= NULL
) {
1048 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1051 if (db
->persistent
) {
1052 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1055 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
1059 fetch (unlocked, no migration) operation on ctdb
1061 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
1062 TDB_DATA key
, TDB_DATA
*data
)
1064 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1065 struct db_ctdb_ctx
);
1069 if (ctx
->transaction
) {
1070 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
1073 /* try a direct fetch */
1074 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1077 * See if we have a valid record and we are the dmaster. If so, we can
1078 * take the shortcut and just return it.
1079 * we bypass the dmaster check for persistent databases
1081 if ((ctdb_data
.dptr
!= NULL
) &&
1082 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
1084 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
1085 /* we are the dmaster - avoid the ctdb protocol op */
1087 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
1088 if (data
->dsize
== 0) {
1089 SAFE_FREE(ctdb_data
.dptr
);
1094 data
->dptr
= (uint8
*)talloc_memdup(
1095 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1098 SAFE_FREE(ctdb_data
.dptr
);
1100 if (data
->dptr
== NULL
) {
1106 SAFE_FREE(ctdb_data
.dptr
);
1108 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1109 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
1110 if (!NT_STATUS_IS_OK(status
)) {
1111 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1118 struct traverse_state
{
1119 struct db_context
*db
;
1120 int (*fn
)(struct db_record
*rec
, void *private_data
);
1124 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1126 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1127 struct db_record
*rec
;
1128 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1129 /* we have to give them a locked record to prevent races */
1130 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1131 if (rec
&& rec
->value
.dsize
> 0) {
1132 state
->fn(rec
, state
->private_data
);
1134 talloc_free(tmp_ctx
);
1137 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1140 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1141 struct db_record
*rec
;
1142 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1144 /* we have to give them a locked record to prevent races */
1145 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1146 if (rec
&& rec
->value
.dsize
> 0) {
1147 ret
= state
->fn(rec
, state
->private_data
);
1149 talloc_free(tmp_ctx
);
1153 static int db_ctdb_traverse(struct db_context
*db
,
1154 int (*fn
)(struct db_record
*rec
,
1155 void *private_data
),
1158 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1159 struct db_ctdb_ctx
);
1160 struct traverse_state state
;
1164 state
.private_data
= private_data
;
1166 if (db
->persistent
) {
1167 /* for persistent databases we don't need to do a ctdb traverse,
1168 we can do a faster local traverse */
1169 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1173 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1177 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1179 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1182 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1184 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1187 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1189 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1190 struct db_record rec
;
1193 rec
.store
= db_ctdb_store_deny
;
1194 rec
.delete_rec
= db_ctdb_delete_deny
;
1195 rec
.private_data
= state
->db
;
1196 state
->fn(&rec
, state
->private_data
);
1199 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1202 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1203 struct db_record rec
;
1206 rec
.store
= db_ctdb_store_deny
;
1207 rec
.delete_rec
= db_ctdb_delete_deny
;
1208 rec
.private_data
= state
->db
;
1210 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1211 /* a deleted record */
1214 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1215 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1217 return state
->fn(&rec
, state
->private_data
);
1220 static int db_ctdb_traverse_read(struct db_context
*db
,
1221 int (*fn
)(struct db_record
*rec
,
1222 void *private_data
),
1225 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1226 struct db_ctdb_ctx
);
1227 struct traverse_state state
;
1231 state
.private_data
= private_data
;
1233 if (db
->persistent
) {
1234 /* for persistent databases we don't need to do a ctdb traverse,
1235 we can do a faster local traverse */
1236 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1239 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1243 static int db_ctdb_get_seqnum(struct db_context
*db
)
1245 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1246 struct db_ctdb_ctx
);
1247 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1250 static int db_ctdb_get_flags(struct db_context
*db
)
1252 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1253 struct db_ctdb_ctx
);
1254 return tdb_get_flags(ctx
->wtdb
->tdb
);
1257 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1259 int hash_size
, int tdb_flags
,
1260 int open_flags
, mode_t mode
)
1262 struct db_context
*result
;
1263 struct db_ctdb_ctx
*db_ctdb
;
1266 if (!lp_clustering()) {
1267 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1271 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1272 DEBUG(0, ("talloc failed\n"));
1273 TALLOC_FREE(result
);
1277 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1278 DEBUG(0, ("talloc failed\n"));
1279 TALLOC_FREE(result
);
1283 db_ctdb
->transaction
= NULL
;
1284 db_ctdb
->db
= result
;
1286 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name
, &db_ctdb
->db_id
, tdb_flags
))) {
1287 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1288 TALLOC_FREE(result
);
1292 db_path
= ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb
, db_ctdb
->db_id
);
1294 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1296 /* only pass through specific flags */
1297 tdb_flags
&= TDB_SEQNUM
;
1299 /* honor permissions if user has specified O_CREAT */
1300 if (open_flags
& O_CREAT
) {
1301 chmod(db_path
, mode
);
1304 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1305 if (db_ctdb
->wtdb
== NULL
) {
1306 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1307 TALLOC_FREE(result
);
1310 talloc_free(db_path
);
1312 result
->private_data
= (void *)db_ctdb
;
1313 result
->fetch_locked
= db_ctdb_fetch_locked
;
1314 result
->fetch
= db_ctdb_fetch
;
1315 result
->traverse
= db_ctdb_traverse
;
1316 result
->traverse_read
= db_ctdb_traverse_read
;
1317 result
->get_seqnum
= db_ctdb_get_seqnum
;
1318 result
->get_flags
= db_ctdb_get_flags
;
1319 result
->transaction_start
= db_ctdb_transaction_start
;
1320 result
->transaction_commit
= db_ctdb_transaction_commit
;
1321 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1323 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1324 name
, db_ctdb
->db_id
));