2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle
{
27 struct db_ctdb_ctx
*ctx
;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer
*m_all
;
33 struct ctdb_marshall_buffer
*m_write
;
39 struct db_context
*db
;
40 struct tdb_wrap
*wtdb
;
42 struct db_ctdb_transaction_handle
*transaction
;
46 struct db_ctdb_ctx
*ctdb_ctx
;
47 struct ctdb_ltdb_header header
;
50 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
55 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
58 enum TDB_ERROR tret
= tdb_error(tdb
);
62 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
65 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
68 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
78 form a ctdb_rec_data record from a key/data pair
80 note that header may be NULL. If not NULL then it is included in the data portion
83 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
85 struct ctdb_ltdb_header
*header
,
89 struct ctdb_rec_data
*d
;
91 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
92 data
.dsize
+ (header
?sizeof(*header
):0);
93 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
99 d
->keylen
= key
.dsize
;
100 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
102 d
->datalen
= data
.dsize
+ sizeof(*header
);
103 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
104 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
106 d
->datalen
= data
.dsize
;
107 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
115 struct ctdb_marshall_buffer
*m
,
119 struct ctdb_ltdb_header
*header
,
122 struct ctdb_rec_data
*r
;
123 size_t m_size
, r_size
;
124 struct ctdb_marshall_buffer
*m2
;
126 r
= db_ctdb_marshall_record(mem_ctx
, reqid
, key
, header
, data
);
133 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
134 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
141 m_size
= talloc_get_size(m
);
142 r_size
= talloc_get_size(r
);
144 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
145 mem_ctx
, m
, m_size
+ r_size
);
151 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
160 /* we've finished marshalling, return a data blob with the marshalled records */
161 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
164 data
.dptr
= (uint8_t *)m
;
165 data
.dsize
= talloc_get_size(m
);
170 loop over a marshalling buffer
172 - pass r==NULL to start
173 - loop the number of times indicated by m->count
175 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
177 struct ctdb_ltdb_header
*header
,
178 TDB_DATA
*key
, TDB_DATA
*data
)
181 r
= (struct ctdb_rec_data
*)&m
->data
[0];
183 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
191 key
->dptr
= &r
->data
[0];
192 key
->dsize
= r
->keylen
;
195 data
->dptr
= &r
->data
[r
->keylen
];
196 data
->dsize
= r
->datalen
;
197 if (header
!= NULL
) {
198 data
->dptr
+= sizeof(*header
);
199 data
->dsize
-= sizeof(*header
);
203 if (header
!= NULL
) {
204 if (r
->datalen
< sizeof(*header
)) {
207 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
215 /* start a transaction on a database */
216 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
218 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
222 /* start a transaction on a database */
223 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle
*h
)
225 struct db_record
*rh
;
228 const char *keyname
= CTDB_TRANSACTION_LOCK_KEY
;
230 struct db_ctdb_ctx
*ctx
= h
->ctx
;
233 key
.dptr
= (uint8_t *)discard_const(keyname
);
234 key
.dsize
= strlen(keyname
);
237 tmp_ctx
= talloc_new(h
);
239 rh
= fetch_locked_internal(ctx
, tmp_ctx
, key
, true);
241 DEBUG(0,(__location__
" Failed to fetch_lock database\n"));
242 talloc_free(tmp_ctx
);
247 ret
= tdb_transaction_start(ctx
->wtdb
->tdb
);
249 DEBUG(0,(__location__
" Failed to start tdb transaction\n"));
250 talloc_free(tmp_ctx
);
254 data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
255 if ((data
.dptr
== NULL
) ||
256 (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
257 ((struct ctdb_ltdb_header
*)data
.dptr
)->dmaster
!= get_my_vnn()) {
258 SAFE_FREE(data
.dptr
);
259 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
260 talloc_free(tmp_ctx
);
264 SAFE_FREE(data
.dptr
);
265 talloc_free(tmp_ctx
);
271 /* start a transaction on a database */
272 static int db_ctdb_transaction_start(struct db_context
*db
)
274 struct db_ctdb_transaction_handle
*h
;
276 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
279 if (!db
->persistent
) {
280 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
285 if (ctx
->transaction
) {
286 ctx
->transaction
->nesting
++;
290 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
292 DEBUG(0,(__location__
" oom for transaction handle\n"));
298 ret
= db_ctdb_transaction_fetch_start(h
);
304 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
306 ctx
->transaction
= h
;
308 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
316 fetch a record inside a transaction
318 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
320 TDB_DATA key
, TDB_DATA
*data
)
322 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
324 *data
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
326 if (data
->dptr
!= NULL
) {
327 uint8_t *oldptr
= (uint8_t *)data
->dptr
;
328 data
->dsize
-= sizeof(struct ctdb_ltdb_header
);
329 if (data
->dsize
== 0) {
332 data
->dptr
= (uint8
*)
334 mem_ctx
, data
->dptr
+sizeof(struct ctdb_ltdb_header
),
338 if (data
->dptr
== NULL
&& data
->dsize
!= 0) {
344 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
, NULL
, *data
);
345 if (h
->m_all
== NULL
) {
346 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
348 talloc_free(data
->dptr
);
357 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
358 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
360 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
364 struct db_record
*result
;
367 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
368 DEBUG(0, ("talloc failed\n"));
372 result
->private_data
= ctx
->transaction
;
374 result
->key
.dsize
= key
.dsize
;
375 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
376 if (result
->key
.dptr
== NULL
) {
377 DEBUG(0, ("talloc failed\n"));
382 result
->store
= db_ctdb_store_transaction
;
383 result
->delete_rec
= db_ctdb_delete_transaction
;
385 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
386 if (ctdb_data
.dptr
== NULL
) {
387 /* create the record */
388 result
->value
= tdb_null
;
392 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
393 result
->value
.dptr
= NULL
;
395 if ((result
->value
.dsize
!= 0)
396 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
397 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
398 result
->value
.dsize
))) {
399 DEBUG(0, ("talloc failed\n"));
403 SAFE_FREE(ctdb_data
.dptr
);
408 static int db_ctdb_record_destructor(struct db_record
*rec
)
410 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
411 rec
->private_data
, struct db_ctdb_transaction_handle
);
412 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
414 DEBUG(0,(__location__
" transaction_commit failed\n"));
420 auto-create a transaction for persistent databases
422 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
427 struct db_record
*rec
;
429 res
= db_ctdb_transaction_start(ctx
->db
);
434 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
436 ctx
->db
->transaction_cancel(ctx
->db
);
440 /* destroy this transaction when we release the lock */
441 talloc_set_destructor((struct db_record
*)talloc_new(rec
), db_ctdb_record_destructor
);
447 stores a record inside a transaction
449 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
450 TDB_DATA key
, TDB_DATA data
)
452 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
455 struct ctdb_ltdb_header header
;
457 /* we need the header so we can update the RSN */
458 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
459 if (rec
.dptr
== NULL
) {
460 /* the record doesn't exist - create one with us as dmaster.
461 This is only safe because we are in a transaction and this
462 is a persistent database */
464 header
.dmaster
= get_my_vnn();
466 memcpy(&header
, rec
.dptr
, sizeof(struct ctdb_ltdb_header
));
467 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
468 /* a special case, we are writing the same data that is there now */
469 if (data
.dsize
== rec
.dsize
&&
470 memcmp(data
.dptr
, rec
.dptr
+ sizeof(struct ctdb_ltdb_header
), data
.dsize
) == 0) {
472 talloc_free(tmp_ctx
);
481 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
, NULL
, data
);
482 if (h
->m_all
== NULL
) {
483 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
484 talloc_free(tmp_ctx
);
489 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
490 if (h
->m_write
== NULL
) {
491 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
492 talloc_free(tmp_ctx
);
496 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
497 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
498 if (rec
.dptr
== NULL
) {
499 DEBUG(0,(__location__
" Failed to alloc record\n"));
500 talloc_free(tmp_ctx
);
503 memcpy(rec
.dptr
, &header
, sizeof(struct ctdb_ltdb_header
));
504 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
506 ret
= tdb_store(h
->ctx
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
508 talloc_free(tmp_ctx
);
515 a record store inside a transaction
517 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
519 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
520 rec
->private_data
, struct db_ctdb_transaction_handle
);
523 ret
= db_ctdb_transaction_store(h
, rec
->key
, data
);
525 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
531 a record delete inside a transaction
533 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
535 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
536 rec
->private_data
, struct db_ctdb_transaction_handle
);
539 ret
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
541 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
550 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle
*h
)
553 struct ctdb_rec_data
*rec
= NULL
;
556 talloc_free(h
->m_write
);
559 ret
= db_ctdb_transaction_fetch_start(h
);
564 for (i
=0;i
<h
->m_all
->count
;i
++) {
567 rec
= db_ctdb_marshall_loop_next(h
->m_all
, rec
, NULL
, NULL
, &key
, &data
);
569 DEBUG(0, (__location__
" Out of records in ctdb_replay_transaction?\n"));
573 if (rec
->reqid
== 0) {
575 if (db_ctdb_transaction_store(h
, key
, data
) != 0) {
580 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
582 if (db_ctdb_transaction_fetch(h
->ctx
, tmp_ctx
, key
, &data2
) != 0) {
583 talloc_free(tmp_ctx
);
586 if (data2
.dsize
!= data
.dsize
||
587 memcmp(data2
.dptr
, data
.dptr
, data
.dsize
) != 0) {
588 /* the record has changed on us - we have to give up */
589 talloc_free(tmp_ctx
);
592 talloc_free(tmp_ctx
);
599 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
607 static int db_ctdb_transaction_commit(struct db_context
*db
)
609 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
615 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
616 enum ctdb_controls failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
619 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
623 if (h
->nested_cancel
) {
624 db
->transaction_cancel(db
);
625 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
629 if (h
->nesting
!= 0) {
634 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
636 talloc_set_destructor(h
, NULL
);
638 /* our commit strategy is quite complex.
640 - we first try to commit the changes to all other nodes
642 - if that works, then we commit locally and we are done
644 - if a commit on another node fails, then we need to cancel
645 the transaction, then restart the transaction (thus
646 opening a window of time for a pending recovery to
647 complete), then replay the transaction, checking all the
648 reads and writes (checking that reads give the same data,
649 and writes succeed). Then we retry the transaction to the
654 if (h
->m_write
== NULL
) {
655 /* no changes were made, potentially after a retry */
656 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
658 ctx
->transaction
= NULL
;
662 /* tell ctdbd to commit to the other nodes */
663 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
664 retries
==0?CTDB_CONTROL_TRANS2_COMMIT
:CTDB_CONTROL_TRANS2_COMMIT_RETRY
,
666 db_ctdb_marshall_finish(h
->m_write
), NULL
, NULL
, &status
);
667 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
668 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
671 if (!NT_STATUS_IS_OK(rets
)) {
672 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
674 /* work out what error code we will give if we
675 have to fail the operation */
676 switch ((enum ctdb_trans2_commit_error
)status
) {
677 case CTDB_TRANS2_COMMIT_SUCCESS
:
678 case CTDB_TRANS2_COMMIT_SOMEFAIL
:
679 case CTDB_TRANS2_COMMIT_TIMEOUT
:
680 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
682 case CTDB_TRANS2_COMMIT_ALLFAIL
:
683 failure_control
= CTDB_CONTROL_TRANS2_FINISHED
;
688 if (++retries
== 5) {
689 DEBUG(0,(__location__
" Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
690 h
->ctx
->db_id
, retries
, (unsigned)failure_control
));
691 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
692 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
693 tdb_null
, NULL
, NULL
, NULL
);
694 h
->ctx
->transaction
= NULL
;
696 ctx
->transaction
= NULL
;
700 if (ctdb_replay_transaction(h
) != 0) {
701 DEBUG(0,(__location__
" Failed to replay transaction failure_control=%u\n",
702 (unsigned)failure_control
));
703 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
704 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
705 tdb_null
, NULL
, NULL
, NULL
);
706 h
->ctx
->transaction
= NULL
;
708 ctx
->transaction
= NULL
;
713 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
716 /* do the real commit locally */
717 ret
= tdb_transaction_commit(h
->ctx
->wtdb
->tdb
);
719 DEBUG(0,(__location__
" Failed to commit transaction failure_control=%u\n",
720 (unsigned)failure_control
));
721 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
, h
->ctx
->db_id
,
722 CTDB_CTRL_FLAG_NOREPLY
, tdb_null
, NULL
, NULL
, NULL
);
723 h
->ctx
->transaction
= NULL
;
728 /* tell ctdbd that we are finished with our local commit */
729 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED
,
730 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
731 tdb_null
, NULL
, NULL
, NULL
);
732 h
->ctx
->transaction
= NULL
;
741 static int db_ctdb_transaction_cancel(struct db_context
*db
)
743 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
745 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
748 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
752 if (h
->nesting
!= 0) {
754 h
->nested_cancel
= true;
758 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
760 ctx
->transaction
= NULL
;
766 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
768 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
769 rec
->private_data
, struct db_ctdb_rec
);
773 cdata
.dsize
= sizeof(crec
->header
) + data
.dsize
;
775 if (!(cdata
.dptr
= SMB_MALLOC_ARRAY(uint8
, cdata
.dsize
))) {
776 return NT_STATUS_NO_MEMORY
;
779 memcpy(cdata
.dptr
, &crec
->header
, sizeof(crec
->header
));
780 memcpy(cdata
.dptr
+ sizeof(crec
->header
), data
.dptr
, data
.dsize
);
782 ret
= tdb_store(crec
->ctdb_ctx
->wtdb
->tdb
, rec
->key
, cdata
, TDB_REPLACE
);
784 SAFE_FREE(cdata
.dptr
);
786 return (ret
== 0) ? NT_STATUS_OK
787 : tdb_error_to_ntstatus(crec
->ctdb_ctx
->wtdb
->tdb
);
792 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
797 * We have to store the header with empty data. TODO: Fix the
803 return db_ctdb_store(rec
, data
, 0);
807 static int db_ctdb_record_destr(struct db_record
* data
)
809 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
810 data
->private_data
, struct db_ctdb_rec
);
812 DEBUG(10, (DEBUGLEVEL
> 10
813 ? "Unlocking db %u key %s\n"
814 : "Unlocking db %u key %.20s\n",
815 (int)crec
->ctdb_ctx
->db_id
,
816 hex_encode(data
, (unsigned char *)data
->key
.dptr
,
819 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
820 DEBUG(0, ("tdb_chainunlock failed\n"));
827 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
832 struct db_record
*result
;
833 struct db_ctdb_rec
*crec
;
836 int migrate_attempts
= 0;
838 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
839 DEBUG(0, ("talloc failed\n"));
843 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
844 DEBUG(0, ("talloc failed\n"));
849 result
->private_data
= (void *)crec
;
850 crec
->ctdb_ctx
= ctx
;
852 result
->key
.dsize
= key
.dsize
;
853 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
854 if (result
->key
.dptr
== NULL
) {
855 DEBUG(0, ("talloc failed\n"));
861 * Do a blocking lock on the record
865 if (DEBUGLEVEL
>= 10) {
866 char *keystr
= hex_encode(result
, key
.dptr
, key
.dsize
);
867 DEBUG(10, (DEBUGLEVEL
> 10
868 ? "Locking db %u key %s\n"
869 : "Locking db %u key %.20s\n",
870 (int)crec
->ctdb_ctx
->db_id
, keystr
));
874 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
875 DEBUG(3, ("tdb_chainlock failed\n"));
880 result
->store
= db_ctdb_store
;
881 result
->delete_rec
= db_ctdb_delete
;
882 talloc_set_destructor(result
, db_ctdb_record_destr
);
884 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
887 * See if we have a valid record and we are the dmaster. If so, we can
888 * take the shortcut and just return it.
891 if ((ctdb_data
.dptr
== NULL
) ||
892 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
893 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
895 || (random() % 2 != 0)
898 SAFE_FREE(ctdb_data
.dptr
);
899 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
900 talloc_set_destructor(result
, NULL
);
902 migrate_attempts
+= 1;
904 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
905 ctdb_data
.dptr
, ctdb_data
.dptr
?
906 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
909 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
910 if (!NT_STATUS_IS_OK(status
)) {
911 DEBUG(5, ("ctdb_migrate failed: %s\n",
916 /* now its migrated, try again */
920 if (migrate_attempts
> 10) {
921 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
925 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
927 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
928 result
->value
.dptr
= NULL
;
930 if ((result
->value
.dsize
!= 0)
931 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
932 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
933 result
->value
.dsize
))) {
934 DEBUG(0, ("talloc failed\n"));
938 SAFE_FREE(ctdb_data
.dptr
);
943 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
947 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
950 if (ctx
->transaction
!= NULL
) {
951 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
954 if (db
->persistent
) {
955 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
958 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
962 fetch (unlocked, no migration) operation on ctdb
964 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
965 TDB_DATA key
, TDB_DATA
*data
)
967 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
972 if (ctx
->transaction
) {
973 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
976 /* try a direct fetch */
977 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
980 * See if we have a valid record and we are the dmaster. If so, we can
981 * take the shortcut and just return it.
982 * we bypass the dmaster check for persistent databases
984 if ((ctdb_data
.dptr
!= NULL
) &&
985 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
987 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
988 /* we are the dmaster - avoid the ctdb protocol op */
990 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
991 if (data
->dsize
== 0) {
992 SAFE_FREE(ctdb_data
.dptr
);
997 data
->dptr
= (uint8
*)talloc_memdup(
998 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1001 SAFE_FREE(ctdb_data
.dptr
);
1003 if (data
->dptr
== NULL
) {
1009 SAFE_FREE(ctdb_data
.dptr
);
1011 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1012 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
1013 if (!NT_STATUS_IS_OK(status
)) {
1014 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1021 struct traverse_state
{
1022 struct db_context
*db
;
1023 int (*fn
)(struct db_record
*rec
, void *private_data
);
1027 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1029 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1030 struct db_record
*rec
;
1031 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1032 /* we have to give them a locked record to prevent races */
1033 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1034 if (rec
&& rec
->value
.dsize
> 0) {
1035 state
->fn(rec
, state
->private_data
);
1037 talloc_free(tmp_ctx
);
1040 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1043 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1044 struct db_record
*rec
;
1045 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1047 /* we have to give them a locked record to prevent races */
1048 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1049 if (rec
&& rec
->value
.dsize
> 0) {
1050 ret
= state
->fn(rec
, state
->private_data
);
1052 talloc_free(tmp_ctx
);
1056 static int db_ctdb_traverse(struct db_context
*db
,
1057 int (*fn
)(struct db_record
*rec
,
1058 void *private_data
),
1061 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1062 struct db_ctdb_ctx
);
1063 struct traverse_state state
;
1067 state
.private_data
= private_data
;
1069 if (db
->persistent
) {
1070 /* for persistent databases we don't need to do a ctdb traverse,
1071 we can do a faster local traverse */
1072 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1076 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1080 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1082 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1085 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1087 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1090 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1092 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1093 struct db_record rec
;
1096 rec
.store
= db_ctdb_store_deny
;
1097 rec
.delete_rec
= db_ctdb_delete_deny
;
1098 rec
.private_data
= state
->db
;
1099 state
->fn(&rec
, state
->private_data
);
1102 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1105 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1106 struct db_record rec
;
1109 rec
.store
= db_ctdb_store_deny
;
1110 rec
.delete_rec
= db_ctdb_delete_deny
;
1111 rec
.private_data
= state
->db
;
1113 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1114 /* a deleted record */
1117 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1118 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1120 return state
->fn(&rec
, state
->private_data
);
1123 static int db_ctdb_traverse_read(struct db_context
*db
,
1124 int (*fn
)(struct db_record
*rec
,
1125 void *private_data
),
1128 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1129 struct db_ctdb_ctx
);
1130 struct traverse_state state
;
1134 state
.private_data
= private_data
;
1136 if (db
->persistent
) {
1137 /* for persistent databases we don't need to do a ctdb traverse,
1138 we can do a faster local traverse */
1139 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1142 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1146 static int db_ctdb_get_seqnum(struct db_context
*db
)
1148 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1149 struct db_ctdb_ctx
);
1150 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1153 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1155 int hash_size
, int tdb_flags
,
1156 int open_flags
, mode_t mode
)
1158 struct db_context
*result
;
1159 struct db_ctdb_ctx
*db_ctdb
;
1162 if (!lp_clustering()) {
1163 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1167 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1168 DEBUG(0, ("talloc failed\n"));
1169 TALLOC_FREE(result
);
1173 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1174 DEBUG(0, ("talloc failed\n"));
1175 TALLOC_FREE(result
);
1179 db_ctdb
->transaction
= NULL
;
1180 db_ctdb
->db
= result
;
1182 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name
, &db_ctdb
->db_id
, tdb_flags
))) {
1183 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1184 TALLOC_FREE(result
);
1188 db_path
= ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb
, db_ctdb
->db_id
);
1190 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1192 /* only pass through specific flags */
1193 tdb_flags
&= TDB_SEQNUM
;
1195 /* honor permissions if user has specified O_CREAT */
1196 if (open_flags
& O_CREAT
) {
1197 chmod(db_path
, mode
);
1200 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1201 if (db_ctdb
->wtdb
== NULL
) {
1202 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1203 TALLOC_FREE(result
);
1206 talloc_free(db_path
);
1208 result
->private_data
= (void *)db_ctdb
;
1209 result
->fetch_locked
= db_ctdb_fetch_locked
;
1210 result
->fetch
= db_ctdb_fetch
;
1211 result
->traverse
= db_ctdb_traverse
;
1212 result
->traverse_read
= db_ctdb_traverse_read
;
1213 result
->get_seqnum
= db_ctdb_get_seqnum
;
1214 result
->transaction_start
= db_ctdb_transaction_start
;
1215 result
->transaction_commit
= db_ctdb_transaction_commit
;
1216 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1218 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1219 name
, db_ctdb
->db_id
));