2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle
{
27 struct db_ctdb_ctx
*ctx
;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer
*m_all
;
33 struct ctdb_marshall_buffer
*m_write
;
39 struct db_context
*db
;
40 struct tdb_wrap
*wtdb
;
42 struct db_ctdb_transaction_handle
*transaction
;
46 struct db_ctdb_ctx
*ctdb_ctx
;
47 struct ctdb_ltdb_header header
;
50 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
55 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
58 enum TDB_ERROR tret
= tdb_error(tdb
);
62 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
65 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
68 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
78 form a ctdb_rec_data record from a key/data pair
80 note that header may be NULL. If not NULL then it is included in the data portion
83 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
85 struct ctdb_ltdb_header
*header
,
89 struct ctdb_rec_data
*d
;
91 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
92 data
.dsize
+ (header
?sizeof(*header
):0);
93 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
99 d
->keylen
= key
.dsize
;
100 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
102 d
->datalen
= data
.dsize
+ sizeof(*header
);
103 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
104 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
106 d
->datalen
= data
.dsize
;
107 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
115 struct ctdb_marshall_buffer
*m
,
119 struct ctdb_ltdb_header
*header
,
122 struct ctdb_rec_data
*r
;
123 size_t m_size
, r_size
;
124 struct ctdb_marshall_buffer
*m2
;
126 r
= db_ctdb_marshall_record(mem_ctx
, reqid
, key
, header
, data
);
133 m
= talloc_zero_size(mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
140 m_size
= talloc_get_size(m
);
141 r_size
= talloc_get_size(r
);
143 m2
= talloc_realloc_size(mem_ctx
, m
, m_size
+ r_size
);
149 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
158 /* we've finished marshalling, return a data blob with the marshalled records */
159 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
162 data
.dptr
= (uint8_t *)m
;
163 data
.dsize
= talloc_get_size(m
);
168 loop over a marshalling buffer
170 - pass r==NULL to start
171 - loop the number of times indicated by m->count
173 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
175 struct ctdb_ltdb_header
*header
,
176 TDB_DATA
*key
, TDB_DATA
*data
)
179 r
= (struct ctdb_rec_data
*)&m
->data
[0];
181 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
189 key
->dptr
= &r
->data
[0];
190 key
->dsize
= r
->keylen
;
193 data
->dptr
= &r
->data
[r
->keylen
];
194 data
->dsize
= r
->datalen
;
195 if (header
!= NULL
) {
196 data
->dptr
+= sizeof(*header
);
197 data
->dsize
-= sizeof(*header
);
201 if (header
!= NULL
) {
202 if (r
->datalen
< sizeof(*header
)) {
205 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
213 /* start a transaction on a database */
214 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
216 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
220 /* start a transaction on a database */
221 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle
*h
)
223 struct db_record
*rh
;
226 const char *keyname
= CTDB_TRANSACTION_LOCK_KEY
;
228 struct db_ctdb_ctx
*ctx
= h
->ctx
;
231 key
.dptr
= discard_const(keyname
);
232 key
.dsize
= strlen(keyname
);
235 tmp_ctx
= talloc_new(h
);
237 rh
= fetch_locked_internal(ctx
, tmp_ctx
, key
, true);
239 DEBUG(0,(__location__
" Failed to fetch_lock database\n"));
240 talloc_free(tmp_ctx
);
245 ret
= tdb_transaction_start(ctx
->wtdb
->tdb
);
247 DEBUG(0,(__location__
" Failed to start tdb transaction\n"));
248 talloc_free(tmp_ctx
);
252 data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
253 if ((data
.dptr
== NULL
) ||
254 (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
255 ((struct ctdb_ltdb_header
*)data
.dptr
)->dmaster
!= get_my_vnn()) {
256 SAFE_FREE(data
.dptr
);
257 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
258 talloc_free(tmp_ctx
);
262 SAFE_FREE(data
.dptr
);
263 talloc_free(tmp_ctx
);
269 /* start a transaction on a database */
270 static int db_ctdb_transaction_start(struct db_context
*db
)
272 struct db_ctdb_transaction_handle
*h
;
274 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
277 if (!db
->persistent
) {
278 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
283 if (ctx
->transaction
) {
284 ctx
->transaction
->nesting
++;
288 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
290 DEBUG(0,(__location__
" oom for transaction handle\n"));
296 ret
= db_ctdb_transaction_fetch_start(h
);
302 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
304 ctx
->transaction
= h
;
306 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
314 fetch a record inside a transaction
316 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
318 TDB_DATA key
, TDB_DATA
*data
)
320 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
322 *data
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
324 if (data
->dptr
!= NULL
) {
325 uint8_t *oldptr
= (uint8_t *)data
->dptr
;
326 data
->dsize
-= sizeof(struct ctdb_ltdb_header
);
327 if (data
->dsize
== 0) {
330 data
->dptr
= (uint8
*)
332 mem_ctx
, data
->dptr
+sizeof(struct ctdb_ltdb_header
),
336 if (data
->dptr
== NULL
&& data
->dsize
!= 0) {
342 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
, NULL
, *data
);
343 if (h
->m_all
== NULL
) {
344 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
346 talloc_free(data
->dptr
);
355 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
356 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
358 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
362 struct db_record
*result
;
365 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
366 DEBUG(0, ("talloc failed\n"));
370 result
->private_data
= ctx
->transaction
;
372 result
->key
.dsize
= key
.dsize
;
373 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
374 if (result
->key
.dptr
== NULL
) {
375 DEBUG(0, ("talloc failed\n"));
380 result
->store
= db_ctdb_store_transaction
;
381 result
->delete_rec
= db_ctdb_delete_transaction
;
383 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
384 if (ctdb_data
.dptr
== NULL
) {
385 /* create the record */
386 result
->value
= tdb_null
;
390 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
391 result
->value
.dptr
= NULL
;
393 if ((result
->value
.dsize
!= 0)
394 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
395 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
396 result
->value
.dsize
))) {
397 DEBUG(0, ("talloc failed\n"));
401 SAFE_FREE(ctdb_data
.dptr
);
406 static int db_ctdb_record_destructor(struct db_record
*rec
)
408 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
409 rec
->private_data
, struct db_ctdb_transaction_handle
);
410 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
412 DEBUG(0,(__location__
" transaction_commit failed\n"));
418 auto-create a transaction for persistent databases
420 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
425 struct db_record
*rec
;
427 res
= db_ctdb_transaction_start(ctx
->db
);
432 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
434 ctx
->db
->transaction_cancel(ctx
->db
);
438 /* destroy this transaction when we release the lock */
439 talloc_set_destructor((struct db_record
*)talloc_new(rec
), db_ctdb_record_destructor
);
445 stores a record inside a transaction
447 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
448 TDB_DATA key
, TDB_DATA data
)
450 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
453 struct ctdb_ltdb_header header
;
455 /* we need the header so we can update the RSN */
456 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
457 if (rec
.dptr
== NULL
) {
458 /* the record doesn't exist - create one with us as dmaster.
459 This is only safe because we are in a transaction and this
460 is a persistent database */
462 header
.dmaster
= get_my_vnn();
464 memcpy(&header
, rec
.dptr
, sizeof(struct ctdb_ltdb_header
));
465 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
466 /* a special case, we are writing the same data that is there now */
467 if (data
.dsize
== rec
.dsize
&&
468 memcmp(data
.dptr
, rec
.dptr
+ sizeof(struct ctdb_ltdb_header
), data
.dsize
) == 0) {
470 talloc_free(tmp_ctx
);
479 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
, NULL
, data
);
480 if (h
->m_all
== NULL
) {
481 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
482 talloc_free(tmp_ctx
);
487 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
488 if (h
->m_write
== NULL
) {
489 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
490 talloc_free(tmp_ctx
);
494 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
495 rec
.dptr
= talloc_size(tmp_ctx
, rec
.dsize
);
496 if (rec
.dptr
== NULL
) {
497 DEBUG(0,(__location__
" Failed to alloc record\n"));
498 talloc_free(tmp_ctx
);
501 memcpy(rec
.dptr
, &header
, sizeof(struct ctdb_ltdb_header
));
502 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
504 ret
= tdb_store(h
->ctx
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
506 talloc_free(tmp_ctx
);
513 a record store inside a transaction
515 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
517 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
518 rec
->private_data
, struct db_ctdb_transaction_handle
);
521 ret
= db_ctdb_transaction_store(h
, rec
->key
, data
);
523 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
529 a record delete inside a transaction
531 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
533 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
534 rec
->private_data
, struct db_ctdb_transaction_handle
);
537 ret
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
539 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
548 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle
*h
)
551 struct ctdb_rec_data
*rec
= NULL
;
554 talloc_free(h
->m_write
);
557 ret
= db_ctdb_transaction_fetch_start(h
);
562 for (i
=0;i
<h
->m_all
->count
;i
++) {
565 rec
= db_ctdb_marshall_loop_next(h
->m_all
, rec
, NULL
, NULL
, &key
, &data
);
567 DEBUG(0, (__location__
" Out of records in ctdb_replay_transaction?\n"));
571 if (rec
->reqid
== 0) {
573 if (db_ctdb_transaction_store(h
, key
, data
) != 0) {
578 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
580 if (db_ctdb_transaction_fetch(h
->ctx
, tmp_ctx
, key
, &data2
) != 0) {
581 talloc_free(tmp_ctx
);
584 if (data2
.dsize
!= data
.dsize
||
585 memcmp(data2
.dptr
, data
.dptr
, data
.dsize
) != 0) {
586 /* the record has changed on us - we have to give up */
587 talloc_free(tmp_ctx
);
590 talloc_free(tmp_ctx
);
597 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
605 static int db_ctdb_transaction_commit(struct db_context
*db
)
607 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
613 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
614 enum ctdb_controls failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
617 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
621 if (h
->nested_cancel
) {
622 db
->transaction_cancel(db
);
623 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
627 if (h
->nesting
!= 0) {
632 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
634 talloc_set_destructor(h
, NULL
);
636 /* our commit strategy is quite complex.
638 - we first try to commit the changes to all other nodes
640 - if that works, then we commit locally and we are done
642 - if a commit on another node fails, then we need to cancel
643 the transaction, then restart the transaction (thus
644 opening a window of time for a pending recovery to
645 complete), then replay the transaction, checking all the
646 reads and writes (checking that reads give the same data,
647 and writes succeed). Then we retry the transaction to the
652 if (h
->m_write
== NULL
) {
653 /* no changes were made, potentially after a retry */
654 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
656 ctx
->transaction
= NULL
;
660 /* tell ctdbd to commit to the other nodes */
661 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
662 retries
==0?CTDB_CONTROL_TRANS2_COMMIT
:CTDB_CONTROL_TRANS2_COMMIT_RETRY
,
664 db_ctdb_marshall_finish(h
->m_write
), NULL
, NULL
, &status
);
665 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
666 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
669 if (!NT_STATUS_IS_OK(rets
)) {
670 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
672 /* work out what error code we will give if we
673 have to fail the operation */
674 switch ((enum ctdb_trans2_commit_error
)status
) {
675 case CTDB_TRANS2_COMMIT_SUCCESS
:
676 case CTDB_TRANS2_COMMIT_SOMEFAIL
:
677 case CTDB_TRANS2_COMMIT_TIMEOUT
:
678 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
680 case CTDB_TRANS2_COMMIT_ALLFAIL
:
681 failure_control
= CTDB_CONTROL_TRANS2_FINISHED
;
686 if (++retries
== 5) {
687 DEBUG(0,(__location__
" Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
688 h
->ctx
->db_id
, retries
, (unsigned)failure_control
));
689 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
690 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
691 tdb_null
, NULL
, NULL
, NULL
);
692 h
->ctx
->transaction
= NULL
;
694 ctx
->transaction
= NULL
;
698 if (ctdb_replay_transaction(h
) != 0) {
699 DEBUG(0,(__location__
" Failed to replay transaction failure_control=%u\n",
700 (unsigned)failure_control
));
701 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
702 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
703 tdb_null
, NULL
, NULL
, NULL
);
704 h
->ctx
->transaction
= NULL
;
706 ctx
->transaction
= NULL
;
711 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
714 /* do the real commit locally */
715 ret
= tdb_transaction_commit(h
->ctx
->wtdb
->tdb
);
717 DEBUG(0,(__location__
" Failed to commit transaction failure_control=%u\n",
718 (unsigned)failure_control
));
719 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
, h
->ctx
->db_id
,
720 CTDB_CTRL_FLAG_NOREPLY
, tdb_null
, NULL
, NULL
, NULL
);
721 h
->ctx
->transaction
= NULL
;
726 /* tell ctdbd that we are finished with our local commit */
727 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED
,
728 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
729 tdb_null
, NULL
, NULL
, NULL
);
730 h
->ctx
->transaction
= NULL
;
739 static int db_ctdb_transaction_cancel(struct db_context
*db
)
741 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
743 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
746 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
750 if (h
->nesting
!= 0) {
752 h
->nested_cancel
= true;
756 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
758 ctx
->transaction
= NULL
;
764 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
766 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
767 rec
->private_data
, struct db_ctdb_rec
);
771 cdata
.dsize
= sizeof(crec
->header
) + data
.dsize
;
773 if (!(cdata
.dptr
= SMB_MALLOC_ARRAY(uint8
, cdata
.dsize
))) {
774 return NT_STATUS_NO_MEMORY
;
777 memcpy(cdata
.dptr
, &crec
->header
, sizeof(crec
->header
));
778 memcpy(cdata
.dptr
+ sizeof(crec
->header
), data
.dptr
, data
.dsize
);
780 ret
= tdb_store(crec
->ctdb_ctx
->wtdb
->tdb
, rec
->key
, cdata
, TDB_REPLACE
);
782 SAFE_FREE(cdata
.dptr
);
784 return (ret
== 0) ? NT_STATUS_OK
785 : tdb_error_to_ntstatus(crec
->ctdb_ctx
->wtdb
->tdb
);
790 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
795 * We have to store the header with empty data. TODO: Fix the
801 return db_ctdb_store(rec
, data
, 0);
805 static int db_ctdb_record_destr(struct db_record
* data
)
807 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
808 data
->private_data
, struct db_ctdb_rec
);
810 DEBUG(10, (DEBUGLEVEL
> 10
811 ? "Unlocking db %u key %s\n"
812 : "Unlocking db %u key %.20s\n",
813 (int)crec
->ctdb_ctx
->db_id
,
814 hex_encode(data
, (unsigned char *)data
->key
.dptr
,
817 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
818 DEBUG(0, ("tdb_chainunlock failed\n"));
825 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
830 struct db_record
*result
;
831 struct db_ctdb_rec
*crec
;
834 int migrate_attempts
= 0;
836 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
837 DEBUG(0, ("talloc failed\n"));
841 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
842 DEBUG(0, ("talloc failed\n"));
847 result
->private_data
= (void *)crec
;
848 crec
->ctdb_ctx
= ctx
;
850 result
->key
.dsize
= key
.dsize
;
851 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
852 if (result
->key
.dptr
== NULL
) {
853 DEBUG(0, ("talloc failed\n"));
859 * Do a blocking lock on the record
863 if (DEBUGLEVEL
>= 10) {
864 char *keystr
= hex_encode(result
, key
.dptr
, key
.dsize
);
865 DEBUG(10, (DEBUGLEVEL
> 10
866 ? "Locking db %u key %s\n"
867 : "Locking db %u key %.20s\n",
868 (int)crec
->ctdb_ctx
->db_id
, keystr
));
872 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
873 DEBUG(3, ("tdb_chainlock failed\n"));
878 result
->store
= db_ctdb_store
;
879 result
->delete_rec
= db_ctdb_delete
;
880 talloc_set_destructor(result
, db_ctdb_record_destr
);
882 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
885 * See if we have a valid record and we are the dmaster. If so, we can
886 * take the shortcut and just return it.
889 if ((ctdb_data
.dptr
== NULL
) ||
890 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
891 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
893 || (random() % 2 != 0)
896 SAFE_FREE(ctdb_data
.dptr
);
897 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
898 talloc_set_destructor(result
, NULL
);
900 migrate_attempts
+= 1;
902 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
903 ctdb_data
.dptr
, ctdb_data
.dptr
?
904 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
907 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
908 if (!NT_STATUS_IS_OK(status
)) {
909 DEBUG(5, ("ctdb_migrate failed: %s\n",
914 /* now its migrated, try again */
918 if (migrate_attempts
> 10) {
919 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
923 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
925 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
926 result
->value
.dptr
= NULL
;
928 if ((result
->value
.dsize
!= 0)
929 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
930 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
931 result
->value
.dsize
))) {
932 DEBUG(0, ("talloc failed\n"));
936 SAFE_FREE(ctdb_data
.dptr
);
941 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
945 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
948 if (ctx
->transaction
!= NULL
) {
949 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
952 if (db
->persistent
) {
953 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
956 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
960 fetch (unlocked, no migration) operation on ctdb
962 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
963 TDB_DATA key
, TDB_DATA
*data
)
965 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
970 if (ctx
->transaction
) {
971 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
974 /* try a direct fetch */
975 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
978 * See if we have a valid record and we are the dmaster. If so, we can
979 * take the shortcut and just return it.
980 * we bypass the dmaster check for persistent databases
982 if ((ctdb_data
.dptr
!= NULL
) &&
983 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
985 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
986 /* we are the dmaster - avoid the ctdb protocol op */
988 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
989 if (data
->dsize
== 0) {
990 SAFE_FREE(ctdb_data
.dptr
);
995 data
->dptr
= (uint8
*)talloc_memdup(
996 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
999 SAFE_FREE(ctdb_data
.dptr
);
1001 if (data
->dptr
== NULL
) {
1007 SAFE_FREE(ctdb_data
.dptr
);
1009 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1010 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
1011 if (!NT_STATUS_IS_OK(status
)) {
1012 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1019 struct traverse_state
{
1020 struct db_context
*db
;
1021 int (*fn
)(struct db_record
*rec
, void *private_data
);
1025 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1027 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1028 struct db_record
*rec
;
1029 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1030 /* we have to give them a locked record to prevent races */
1031 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1032 if (rec
&& rec
->value
.dsize
> 0) {
1033 state
->fn(rec
, state
->private_data
);
1035 talloc_free(tmp_ctx
);
1038 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1041 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1042 struct db_record
*rec
;
1043 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1045 /* we have to give them a locked record to prevent races */
1046 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1047 if (rec
&& rec
->value
.dsize
> 0) {
1048 ret
= state
->fn(rec
, state
->private_data
);
1050 talloc_free(tmp_ctx
);
1054 static int db_ctdb_traverse(struct db_context
*db
,
1055 int (*fn
)(struct db_record
*rec
,
1056 void *private_data
),
1059 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1060 struct db_ctdb_ctx
);
1061 struct traverse_state state
;
1065 state
.private_data
= private_data
;
1067 if (db
->persistent
) {
1068 /* for persistent databases we don't need to do a ctdb traverse,
1069 we can do a faster local traverse */
1070 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1074 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1078 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1080 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1083 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1085 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1088 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1090 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1091 struct db_record rec
;
1094 rec
.store
= db_ctdb_store_deny
;
1095 rec
.delete_rec
= db_ctdb_delete_deny
;
1096 rec
.private_data
= state
->db
;
1097 state
->fn(&rec
, state
->private_data
);
1100 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1103 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1104 struct db_record rec
;
1107 rec
.store
= db_ctdb_store_deny
;
1108 rec
.delete_rec
= db_ctdb_delete_deny
;
1109 rec
.private_data
= state
->db
;
1111 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1112 /* a deleted record */
1115 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1116 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1118 return state
->fn(&rec
, state
->private_data
);
1121 static int db_ctdb_traverse_read(struct db_context
*db
,
1122 int (*fn
)(struct db_record
*rec
,
1123 void *private_data
),
1126 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1127 struct db_ctdb_ctx
);
1128 struct traverse_state state
;
1132 state
.private_data
= private_data
;
1134 if (db
->persistent
) {
1135 /* for persistent databases we don't need to do a ctdb traverse,
1136 we can do a faster local traverse */
1137 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1140 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1144 static int db_ctdb_get_seqnum(struct db_context
*db
)
1146 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1147 struct db_ctdb_ctx
);
1148 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1151 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1153 int hash_size
, int tdb_flags
,
1154 int open_flags
, mode_t mode
)
1156 struct db_context
*result
;
1157 struct db_ctdb_ctx
*db_ctdb
;
1160 if (!lp_clustering()) {
1161 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1165 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1166 DEBUG(0, ("talloc failed\n"));
1167 TALLOC_FREE(result
);
1171 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1172 DEBUG(0, ("talloc failed\n"));
1173 TALLOC_FREE(result
);
1177 db_ctdb
->transaction
= NULL
;
1178 db_ctdb
->db
= result
;
1180 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name
, &db_ctdb
->db_id
, tdb_flags
))) {
1181 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1182 TALLOC_FREE(result
);
1186 db_path
= ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb
, db_ctdb
->db_id
);
1188 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1190 /* only pass through specific flags */
1191 tdb_flags
&= TDB_SEQNUM
;
1193 /* honor permissions if user has specified O_CREAT */
1194 if (open_flags
& O_CREAT
) {
1195 chmod(db_path
, mode
);
1198 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1199 if (db_ctdb
->wtdb
== NULL
) {
1200 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1201 TALLOC_FREE(result
);
1204 talloc_free(db_path
);
1206 result
->private_data
= (void *)db_ctdb
;
1207 result
->fetch_locked
= db_ctdb_fetch_locked
;
1208 result
->fetch
= db_ctdb_fetch
;
1209 result
->traverse
= db_ctdb_traverse
;
1210 result
->traverse_read
= db_ctdb_traverse_read
;
1211 result
->get_seqnum
= db_ctdb_get_seqnum
;
1212 result
->transaction_start
= db_ctdb_transaction_start
;
1213 result
->transaction_commit
= db_ctdb_transaction_commit
;
1214 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1216 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1217 name
, db_ctdb
->db_id
));