2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007-2009
5 Copyright (C) Michael Adam 2009
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #ifdef CLUSTER_SUPPORT
24 #include "ctdb_private.h"
25 #include "ctdbd_conn.h"
28 struct db_ctdb_transaction_handle
{
29 struct db_ctdb_ctx
*ctx
;
31 * we store the reads and writes done under a transaction:
32 * - one list stores both reads and writes (m_all),
33 * - the other just writes (m_write)
35 struct ctdb_marshall_buffer
*m_all
;
36 struct ctdb_marshall_buffer
*m_write
;
43 struct db_context
*db
;
44 struct tdb_wrap
*wtdb
;
46 struct db_ctdb_transaction_handle
*transaction
;
47 struct g_lock_ctx
*lock_ctx
;
51 struct db_ctdb_ctx
*ctdb_ctx
;
52 struct ctdb_ltdb_header header
;
55 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
60 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
63 enum TDB_ERROR tret
= tdb_error(tdb
);
67 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
70 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
73 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
82 * fetch a record from the tdb, separating out the header
83 * information and returning the body of the record.
85 static NTSTATUS
db_ctdb_ltdb_fetch(struct db_ctdb_ctx
*db
,
87 struct ctdb_ltdb_header
*header
,
94 rec
= tdb_fetch(db
->wtdb
->tdb
, key
);
95 if (rec
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
96 status
= NT_STATUS_NOT_FOUND
;
101 header
->dmaster
= (uint32_t)-1;
108 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
112 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
113 if (data
->dsize
== 0) {
116 data
->dptr
= (unsigned char *)talloc_memdup(mem_ctx
,
118 + sizeof(struct ctdb_ltdb_header
),
120 if (data
->dptr
== NULL
) {
121 status
= NT_STATUS_NO_MEMORY
;
127 status
= NT_STATUS_OK
;
135 * Store a record together with the ctdb record header
136 * in the local copy of the database.
138 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
140 struct ctdb_ltdb_header
*header
,
143 TALLOC_CTX
*tmp_ctx
= talloc_stackframe();
147 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
148 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
150 if (rec
.dptr
== NULL
) {
151 talloc_free(tmp_ctx
);
152 return NT_STATUS_NO_MEMORY
;
155 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
156 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
158 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
160 talloc_free(tmp_ctx
);
162 return (ret
== 0) ? NT_STATUS_OK
163 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
168 form a ctdb_rec_data record from a key/data pair
170 note that header may be NULL. If not NULL then it is included in the data portion
173 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
175 struct ctdb_ltdb_header
*header
,
179 struct ctdb_rec_data
*d
;
181 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
182 data
.dsize
+ (header
?sizeof(*header
):0);
183 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
189 d
->keylen
= key
.dsize
;
190 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
192 d
->datalen
= data
.dsize
+ sizeof(*header
);
193 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
194 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
196 d
->datalen
= data
.dsize
;
197 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
203 /* helper function for marshalling multiple records */
204 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
205 struct ctdb_marshall_buffer
*m
,
209 struct ctdb_ltdb_header
*header
,
212 struct ctdb_rec_data
*r
;
213 size_t m_size
, r_size
;
214 struct ctdb_marshall_buffer
*m2
= NULL
;
216 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
223 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
224 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
231 m_size
= talloc_get_size(m
);
232 r_size
= talloc_get_size(r
);
234 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
235 mem_ctx
, m
, m_size
+ r_size
);
241 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
250 /* we've finished marshalling, return a data blob with the marshalled records */
251 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
254 data
.dptr
= (uint8_t *)m
;
255 data
.dsize
= talloc_get_size(m
);
260 loop over a marshalling buffer
262 - pass r==NULL to start
263 - loop the number of times indicated by m->count
265 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
267 struct ctdb_ltdb_header
*header
,
268 TDB_DATA
*key
, TDB_DATA
*data
)
271 r
= (struct ctdb_rec_data
*)&m
->data
[0];
273 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
281 key
->dptr
= &r
->data
[0];
282 key
->dsize
= r
->keylen
;
285 data
->dptr
= &r
->data
[r
->keylen
];
286 data
->dsize
= r
->datalen
;
287 if (header
!= NULL
) {
288 data
->dptr
+= sizeof(*header
);
289 data
->dsize
-= sizeof(*header
);
293 if (header
!= NULL
) {
294 if (r
->datalen
< sizeof(*header
)) {
297 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
304 * CTDB transaction destructor
306 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
310 status
= g_lock_unlock(h
->ctx
->lock_ctx
, h
->lock_name
);
311 if (!NT_STATUS_IS_OK(status
)) {
312 DEBUG(0, ("g_lock_unlock failed: %s\n", nt_errstr(status
)));
319 * CTDB dbwrap API: transaction_start function
320 * starts a transaction on a persistent database
322 static int db_ctdb_transaction_start(struct db_context
*db
)
324 struct db_ctdb_transaction_handle
*h
;
326 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
329 if (!db
->persistent
) {
330 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
335 if (ctx
->transaction
) {
336 ctx
->transaction
->nesting
++;
340 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
342 DEBUG(0,(__location__
" oom for transaction handle\n"));
348 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x",
349 (unsigned int)ctx
->db_id
);
350 if (h
->lock_name
== NULL
) {
351 DEBUG(0, ("talloc_asprintf failed\n"));
357 * Wait a day, i.e. forever...
359 status
= g_lock_lock(ctx
->lock_ctx
, h
->lock_name
, G_LOCK_WRITE
,
360 timeval_set(86400, 0));
361 if (!NT_STATUS_IS_OK(status
)) {
362 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status
)));
367 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
369 ctx
->transaction
= h
;
371 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
376 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer
*buf
,
378 struct ctdb_ltdb_header
*pheader
,
382 struct ctdb_rec_data
*rec
= NULL
;
383 struct ctdb_ltdb_header h
;
393 * Walk the list of records written during this
394 * transaction. If we want to read one we have already
395 * written, return the last written sample. Thus we do not do
396 * a "break;" for the first hit, this record might have been
400 for (i
=0; i
<buf
->count
; i
++) {
401 TDB_DATA tkey
, tdata
;
404 rec
= db_ctdb_marshall_loop_next(buf
, rec
, &reqid
, &h
, &tkey
,
410 if (tdb_data_equal(key
, tkey
)) {
421 data
.dptr
= (uint8_t *)talloc_memdup(mem_ctx
, data
.dptr
,
423 if ((data
.dsize
!= 0) && (data
.dptr
== NULL
)) {
429 if (pheader
!= NULL
) {
437 fetch a record inside a transaction
439 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
441 TDB_DATA key
, TDB_DATA
*data
)
443 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
447 found
= pull_newest_from_marshall_buffer(h
->m_write
, key
, NULL
,
453 status
= db_ctdb_ltdb_fetch(h
->ctx
, key
, NULL
, mem_ctx
, data
);
455 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
457 } else if (!NT_STATUS_IS_OK(status
)) {
461 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
,
463 if (h
->m_all
== NULL
) {
464 DEBUG(0,(__location__
" Failed to add to marshalling "
467 talloc_free(data
->dptr
);
475 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
476 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
478 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
482 struct db_record
*result
;
485 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
486 DEBUG(0, ("talloc failed\n"));
490 result
->private_data
= ctx
->transaction
;
492 result
->key
.dsize
= key
.dsize
;
493 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
494 if (result
->key
.dptr
== NULL
) {
495 DEBUG(0, ("talloc failed\n"));
500 result
->store
= db_ctdb_store_transaction
;
501 result
->delete_rec
= db_ctdb_delete_transaction
;
503 if (pull_newest_from_marshall_buffer(ctx
->transaction
->m_write
, key
,
504 NULL
, result
, &result
->value
)) {
508 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
509 if (ctdb_data
.dptr
== NULL
) {
510 /* create the record */
511 result
->value
= tdb_null
;
515 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
516 result
->value
.dptr
= NULL
;
518 if ((result
->value
.dsize
!= 0)
519 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
520 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
521 result
->value
.dsize
))) {
522 DEBUG(0, ("talloc failed\n"));
526 SAFE_FREE(ctdb_data
.dptr
);
531 static int db_ctdb_record_destructor(struct db_record
**recp
)
533 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
534 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
535 rec
->private_data
, struct db_ctdb_transaction_handle
);
536 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
538 DEBUG(0,(__location__
" transaction_commit failed\n"));
544 auto-create a transaction for persistent databases
546 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
551 struct db_record
*rec
, **recp
;
553 res
= db_ctdb_transaction_start(ctx
->db
);
558 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
560 ctx
->db
->transaction_cancel(ctx
->db
);
564 /* destroy this transaction when we release the lock */
565 recp
= talloc(rec
, struct db_record
*);
567 ctx
->db
->transaction_cancel(ctx
->db
);
572 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
578 stores a record inside a transaction
580 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
581 TDB_DATA key
, TDB_DATA data
)
583 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
585 struct ctdb_ltdb_header header
;
589 /* we need the header so we can update the RSN */
591 if (!pull_newest_from_marshall_buffer(h
->m_write
, key
, &header
,
594 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
596 if (rec
.dptr
!= NULL
) {
597 memcpy(&header
, rec
.dptr
,
598 sizeof(struct ctdb_ltdb_header
));
599 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
602 * a special case, we are writing the same
603 * data that is there now
605 if (data
.dsize
== rec
.dsize
&&
607 rec
.dptr
+ sizeof(struct ctdb_ltdb_header
),
610 talloc_free(tmp_ctx
);
617 header
.dmaster
= get_my_vnn();
620 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
,
622 if (h
->m_all
== NULL
) {
623 DEBUG(0,(__location__
" Failed to add to marshalling "
625 talloc_free(tmp_ctx
);
629 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
630 if (h
->m_write
== NULL
) {
631 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
632 talloc_free(tmp_ctx
);
636 talloc_free(tmp_ctx
);
642 a record store inside a transaction
644 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
646 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
647 rec
->private_data
, struct db_ctdb_transaction_handle
);
650 ret
= db_ctdb_transaction_store(h
, rec
->key
, data
);
652 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
658 a record delete inside a transaction
660 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
662 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
663 rec
->private_data
, struct db_ctdb_transaction_handle
);
666 ret
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
668 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
676 static int db_ctdb_transaction_commit(struct db_context
*db
)
678 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
682 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
685 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
689 if (h
->nested_cancel
) {
690 db
->transaction_cancel(db
);
691 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
695 if (h
->nesting
!= 0) {
700 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
703 if (h
->m_write
== NULL
) {
704 /* no changes were made, potentially after a retry */
708 /* tell ctdbd to commit to the other nodes */
709 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
710 CTDB_CONTROL_TRANS3_COMMIT
,
712 db_ctdb_marshall_finish(h
->m_write
),
713 NULL
, NULL
, &status
);
714 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
717 * check the database sequence number and
718 * compare it to the seqnum after applying the
719 * marshall buffer. If it is the same: return success.
725 h
->ctx
->transaction
= NULL
;
734 static int db_ctdb_transaction_cancel(struct db_context
*db
)
736 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
738 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
741 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
745 if (h
->nesting
!= 0) {
747 h
->nested_cancel
= true;
751 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
753 ctx
->transaction
= NULL
;
759 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
761 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
762 rec
->private_data
, struct db_ctdb_rec
);
764 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
769 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
774 * We have to store the header with empty data. TODO: Fix the
780 return db_ctdb_store(rec
, data
, 0);
784 static int db_ctdb_record_destr(struct db_record
* data
)
786 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
787 data
->private_data
, struct db_ctdb_rec
);
789 DEBUG(10, (DEBUGLEVEL
> 10
790 ? "Unlocking db %u key %s\n"
791 : "Unlocking db %u key %.20s\n",
792 (int)crec
->ctdb_ctx
->db_id
,
793 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
796 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
797 DEBUG(0, ("tdb_chainunlock failed\n"));
804 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
809 struct db_record
*result
;
810 struct db_ctdb_rec
*crec
;
813 int migrate_attempts
= 0;
815 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
816 DEBUG(0, ("talloc failed\n"));
820 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
821 DEBUG(0, ("talloc failed\n"));
826 result
->private_data
= (void *)crec
;
827 crec
->ctdb_ctx
= ctx
;
829 result
->key
.dsize
= key
.dsize
;
830 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
831 if (result
->key
.dptr
== NULL
) {
832 DEBUG(0, ("talloc failed\n"));
838 * Do a blocking lock on the record
842 if (DEBUGLEVEL
>= 10) {
843 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
844 DEBUG(10, (DEBUGLEVEL
> 10
845 ? "Locking db %u key %s\n"
846 : "Locking db %u key %.20s\n",
847 (int)crec
->ctdb_ctx
->db_id
, keystr
));
851 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
852 DEBUG(3, ("tdb_chainlock failed\n"));
857 result
->store
= db_ctdb_store
;
858 result
->delete_rec
= db_ctdb_delete
;
859 talloc_set_destructor(result
, db_ctdb_record_destr
);
861 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
864 * See if we have a valid record and we are the dmaster. If so, we can
865 * take the shortcut and just return it.
868 if ((ctdb_data
.dptr
== NULL
) ||
869 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
870 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
872 || (random() % 2 != 0)
875 SAFE_FREE(ctdb_data
.dptr
);
876 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
877 talloc_set_destructor(result
, NULL
);
879 migrate_attempts
+= 1;
881 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
882 ctdb_data
.dptr
, ctdb_data
.dptr
?
883 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
886 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
887 if (!NT_STATUS_IS_OK(status
)) {
888 DEBUG(5, ("ctdb_migrate failed: %s\n",
893 /* now its migrated, try again */
897 if (migrate_attempts
> 10) {
898 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
902 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
904 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
905 result
->value
.dptr
= NULL
;
907 if ((result
->value
.dsize
!= 0)
908 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
909 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
910 result
->value
.dsize
))) {
911 DEBUG(0, ("talloc failed\n"));
915 SAFE_FREE(ctdb_data
.dptr
);
920 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
924 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
927 if (ctx
->transaction
!= NULL
) {
928 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
931 if (db
->persistent
) {
932 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
935 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
939 fetch (unlocked, no migration) operation on ctdb
941 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
942 TDB_DATA key
, TDB_DATA
*data
)
944 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
949 if (ctx
->transaction
) {
950 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
953 /* try a direct fetch */
954 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
957 * See if we have a valid record and we are the dmaster. If so, we can
958 * take the shortcut and just return it.
959 * we bypass the dmaster check for persistent databases
961 if ((ctdb_data
.dptr
!= NULL
) &&
962 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
964 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
965 /* we are the dmaster - avoid the ctdb protocol op */
967 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
968 if (data
->dsize
== 0) {
969 SAFE_FREE(ctdb_data
.dptr
);
974 data
->dptr
= (uint8
*)talloc_memdup(
975 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
978 SAFE_FREE(ctdb_data
.dptr
);
980 if (data
->dptr
== NULL
) {
986 SAFE_FREE(ctdb_data
.dptr
);
988 /* we weren't able to get it locally - ask ctdb to fetch it for us */
989 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
990 if (!NT_STATUS_IS_OK(status
)) {
991 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
998 struct traverse_state
{
999 struct db_context
*db
;
1000 int (*fn
)(struct db_record
*rec
, void *private_data
);
1004 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1006 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1007 struct db_record
*rec
;
1008 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1009 /* we have to give them a locked record to prevent races */
1010 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1011 if (rec
&& rec
->value
.dsize
> 0) {
1012 state
->fn(rec
, state
->private_data
);
1014 talloc_free(tmp_ctx
);
1017 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1020 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1021 struct db_record
*rec
;
1022 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1024 /* we have to give them a locked record to prevent races */
1025 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1026 if (rec
&& rec
->value
.dsize
> 0) {
1027 ret
= state
->fn(rec
, state
->private_data
);
1029 talloc_free(tmp_ctx
);
1033 static int db_ctdb_traverse(struct db_context
*db
,
1034 int (*fn
)(struct db_record
*rec
,
1035 void *private_data
),
1038 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1039 struct db_ctdb_ctx
);
1040 struct traverse_state state
;
1044 state
.private_data
= private_data
;
1046 if (db
->persistent
) {
1047 /* for persistent databases we don't need to do a ctdb traverse,
1048 we can do a faster local traverse */
1049 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1053 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1057 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1059 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1062 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1064 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1067 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1069 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1070 struct db_record rec
;
1073 rec
.store
= db_ctdb_store_deny
;
1074 rec
.delete_rec
= db_ctdb_delete_deny
;
1075 rec
.private_data
= state
->db
;
1076 state
->fn(&rec
, state
->private_data
);
1079 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1082 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1083 struct db_record rec
;
1086 rec
.store
= db_ctdb_store_deny
;
1087 rec
.delete_rec
= db_ctdb_delete_deny
;
1088 rec
.private_data
= state
->db
;
1090 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1091 /* a deleted record */
1094 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1095 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1097 return state
->fn(&rec
, state
->private_data
);
1100 static int db_ctdb_traverse_read(struct db_context
*db
,
1101 int (*fn
)(struct db_record
*rec
,
1102 void *private_data
),
1105 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1106 struct db_ctdb_ctx
);
1107 struct traverse_state state
;
1111 state
.private_data
= private_data
;
1113 if (db
->persistent
) {
1114 /* for persistent databases we don't need to do a ctdb traverse,
1115 we can do a faster local traverse */
1116 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1119 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1123 static int db_ctdb_get_seqnum(struct db_context
*db
)
1125 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1126 struct db_ctdb_ctx
);
1127 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1130 static int db_ctdb_get_flags(struct db_context
*db
)
1132 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1133 struct db_ctdb_ctx
);
1134 return tdb_get_flags(ctx
->wtdb
->tdb
);
1137 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1139 int hash_size
, int tdb_flags
,
1140 int open_flags
, mode_t mode
)
1142 struct db_context
*result
;
1143 struct db_ctdb_ctx
*db_ctdb
;
1145 struct ctdbd_connection
*conn
;
1147 if (!lp_clustering()) {
1148 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1152 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1153 DEBUG(0, ("talloc failed\n"));
1154 TALLOC_FREE(result
);
1158 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1159 DEBUG(0, ("talloc failed\n"));
1160 TALLOC_FREE(result
);
1164 db_ctdb
->transaction
= NULL
;
1165 db_ctdb
->db
= result
;
1167 conn
= messaging_ctdbd_connection();
1169 if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn
, name
, &db_ctdb
->db_id
, tdb_flags
))) {
1170 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1171 TALLOC_FREE(result
);
1175 db_path
= ctdbd_dbpath(conn
, db_ctdb
, db_ctdb
->db_id
);
1177 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1179 /* only pass through specific flags */
1180 tdb_flags
&= TDB_SEQNUM
;
1182 /* honor permissions if user has specified O_CREAT */
1183 if (open_flags
& O_CREAT
) {
1184 chmod(db_path
, mode
);
1187 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1188 if (db_ctdb
->wtdb
== NULL
) {
1189 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1190 TALLOC_FREE(result
);
1193 talloc_free(db_path
);
1195 if (result
->persistent
) {
1196 db_ctdb
->lock_ctx
= g_lock_ctx_init(db_ctdb
,
1197 ctdb_conn_msg_ctx(conn
));
1198 if (db_ctdb
->lock_ctx
== NULL
) {
1199 DEBUG(0, ("g_lock_ctx_init failed\n"));
1200 TALLOC_FREE(result
);
1205 result
->private_data
= (void *)db_ctdb
;
1206 result
->fetch_locked
= db_ctdb_fetch_locked
;
1207 result
->fetch
= db_ctdb_fetch
;
1208 result
->traverse
= db_ctdb_traverse
;
1209 result
->traverse_read
= db_ctdb_traverse_read
;
1210 result
->get_seqnum
= db_ctdb_get_seqnum
;
1211 result
->get_flags
= db_ctdb_get_flags
;
1212 result
->transaction_start
= db_ctdb_transaction_start
;
1213 result
->transaction_commit
= db_ctdb_transaction_commit
;
1214 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1216 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1217 name
, db_ctdb
->db_id
));