2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle
{
27 struct db_ctdb_ctx
*ctx
;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer
*m_all
;
35 struct ctdb_marshall_buffer
*m_write
;
41 struct db_context
*db
;
42 struct tdb_wrap
*wtdb
;
44 struct db_ctdb_transaction_handle
*transaction
;
48 struct db_ctdb_ctx
*ctdb_ctx
;
49 struct ctdb_ltdb_header header
;
52 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
57 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
60 enum TDB_ERROR tret
= tdb_error(tdb
);
64 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
67 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
70 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
80 form a ctdb_rec_data record from a key/data pair
82 note that header may be NULL. If not NULL then it is included in the data portion
85 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
87 struct ctdb_ltdb_header
*header
,
91 struct ctdb_rec_data
*d
;
93 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
94 data
.dsize
+ (header
?sizeof(*header
):0);
95 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
101 d
->keylen
= key
.dsize
;
102 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
104 d
->datalen
= data
.dsize
+ sizeof(*header
);
105 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
106 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
108 d
->datalen
= data
.dsize
;
109 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
115 /* helper function for marshalling multiple records */
116 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
117 struct ctdb_marshall_buffer
*m
,
121 struct ctdb_ltdb_header
*header
,
124 struct ctdb_rec_data
*r
;
125 size_t m_size
, r_size
;
126 struct ctdb_marshall_buffer
*m2
= NULL
;
128 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
135 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
136 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
143 m_size
= talloc_get_size(m
);
144 r_size
= talloc_get_size(r
);
146 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
147 mem_ctx
, m
, m_size
+ r_size
);
153 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
162 /* we've finished marshalling, return a data blob with the marshalled records */
163 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
166 data
.dptr
= (uint8_t *)m
;
167 data
.dsize
= talloc_get_size(m
);
172 loop over a marshalling buffer
174 - pass r==NULL to start
175 - loop the number of times indicated by m->count
177 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
179 struct ctdb_ltdb_header
*header
,
180 TDB_DATA
*key
, TDB_DATA
*data
)
183 r
= (struct ctdb_rec_data
*)&m
->data
[0];
185 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
193 key
->dptr
= &r
->data
[0];
194 key
->dsize
= r
->keylen
;
197 data
->dptr
= &r
->data
[r
->keylen
];
198 data
->dsize
= r
->datalen
;
199 if (header
!= NULL
) {
200 data
->dptr
+= sizeof(*header
);
201 data
->dsize
-= sizeof(*header
);
205 if (header
!= NULL
) {
206 if (r
->datalen
< sizeof(*header
)) {
209 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
218 * CTDB transaction destructor
220 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
222 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
227 * start a transaction on a ctdb database:
228 * - lock the transaction lock key
229 * - start the tdb transaction
231 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle
*h
)
233 struct db_record
*rh
;
236 const char *keyname
= CTDB_TRANSACTION_LOCK_KEY
;
238 struct db_ctdb_ctx
*ctx
= h
->ctx
;
241 key
.dptr
= (uint8_t *)discard_const(keyname
);
242 key
.dsize
= strlen(keyname
);
245 tmp_ctx
= talloc_new(h
);
247 rh
= fetch_locked_internal(ctx
, tmp_ctx
, key
, true);
249 DEBUG(0,(__location__
" Failed to fetch_lock database\n"));
250 talloc_free(tmp_ctx
);
255 ret
= tdb_transaction_start(ctx
->wtdb
->tdb
);
257 DEBUG(0,(__location__
" Failed to start tdb transaction\n"));
258 talloc_free(tmp_ctx
);
262 data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
263 if ((data
.dptr
== NULL
) ||
264 (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
265 ((struct ctdb_ltdb_header
*)data
.dptr
)->dmaster
!= get_my_vnn()) {
266 SAFE_FREE(data
.dptr
);
267 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
268 talloc_free(tmp_ctx
);
272 SAFE_FREE(data
.dptr
);
273 talloc_free(tmp_ctx
);
280 * CTDB dbwrap API: transaction_start function
281 * starts a transaction on a persistent database
283 static int db_ctdb_transaction_start(struct db_context
*db
)
285 struct db_ctdb_transaction_handle
*h
;
287 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
290 if (!db
->persistent
) {
291 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
296 if (ctx
->transaction
) {
297 ctx
->transaction
->nesting
++;
301 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
303 DEBUG(0,(__location__
" oom for transaction handle\n"));
309 ret
= db_ctdb_transaction_fetch_start(h
);
315 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
317 ctx
->transaction
= h
;
319 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
327 fetch a record inside a transaction
329 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
331 TDB_DATA key
, TDB_DATA
*data
)
333 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
335 *data
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
337 if (data
->dptr
!= NULL
) {
338 uint8_t *oldptr
= (uint8_t *)data
->dptr
;
339 data
->dsize
-= sizeof(struct ctdb_ltdb_header
);
340 if (data
->dsize
== 0) {
343 data
->dptr
= (uint8
*)
345 mem_ctx
, data
->dptr
+sizeof(struct ctdb_ltdb_header
),
349 if (data
->dptr
== NULL
&& data
->dsize
!= 0) {
355 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
, NULL
, *data
);
356 if (h
->m_all
== NULL
) {
357 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
359 talloc_free(data
->dptr
);
368 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
369 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
371 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
375 struct db_record
*result
;
378 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
379 DEBUG(0, ("talloc failed\n"));
383 result
->private_data
= ctx
->transaction
;
385 result
->key
.dsize
= key
.dsize
;
386 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
387 if (result
->key
.dptr
== NULL
) {
388 DEBUG(0, ("talloc failed\n"));
393 result
->store
= db_ctdb_store_transaction
;
394 result
->delete_rec
= db_ctdb_delete_transaction
;
396 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
397 if (ctdb_data
.dptr
== NULL
) {
398 /* create the record */
399 result
->value
= tdb_null
;
403 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
404 result
->value
.dptr
= NULL
;
406 if ((result
->value
.dsize
!= 0)
407 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
408 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
409 result
->value
.dsize
))) {
410 DEBUG(0, ("talloc failed\n"));
414 SAFE_FREE(ctdb_data
.dptr
);
419 static int db_ctdb_record_destructor(struct db_record
**recp
)
421 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
422 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
423 rec
->private_data
, struct db_ctdb_transaction_handle
);
424 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
426 DEBUG(0,(__location__
" transaction_commit failed\n"));
432 auto-create a transaction for persistent databases
434 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
439 struct db_record
*rec
, **recp
;
441 res
= db_ctdb_transaction_start(ctx
->db
);
446 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
448 ctx
->db
->transaction_cancel(ctx
->db
);
452 /* destroy this transaction when we release the lock */
453 recp
= talloc(rec
, struct db_record
*);
455 ctx
->db
->transaction_cancel(ctx
->db
);
460 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
466 stores a record inside a transaction
468 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
469 TDB_DATA key
, TDB_DATA data
)
471 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
474 struct ctdb_ltdb_header header
;
476 /* we need the header so we can update the RSN */
477 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
478 if (rec
.dptr
== NULL
) {
479 /* the record doesn't exist - create one with us as dmaster.
480 This is only safe because we are in a transaction and this
481 is a persistent database */
484 memcpy(&header
, rec
.dptr
, sizeof(struct ctdb_ltdb_header
));
485 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
486 /* a special case, we are writing the same data that is there now */
487 if (data
.dsize
== rec
.dsize
&&
488 memcmp(data
.dptr
, rec
.dptr
+ sizeof(struct ctdb_ltdb_header
), data
.dsize
) == 0) {
490 talloc_free(tmp_ctx
);
496 header
.dmaster
= get_my_vnn();
500 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
, NULL
, data
);
501 if (h
->m_all
== NULL
) {
502 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
503 talloc_free(tmp_ctx
);
508 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
509 if (h
->m_write
== NULL
) {
510 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
511 talloc_free(tmp_ctx
);
515 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
516 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
517 if (rec
.dptr
== NULL
) {
518 DEBUG(0,(__location__
" Failed to alloc record\n"));
519 talloc_free(tmp_ctx
);
522 memcpy(rec
.dptr
, &header
, sizeof(struct ctdb_ltdb_header
));
523 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
525 ret
= tdb_store(h
->ctx
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
527 talloc_free(tmp_ctx
);
534 a record store inside a transaction
536 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
538 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
539 rec
->private_data
, struct db_ctdb_transaction_handle
);
542 ret
= db_ctdb_transaction_store(h
, rec
->key
, data
);
544 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
550 a record delete inside a transaction
552 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
554 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
555 rec
->private_data
, struct db_ctdb_transaction_handle
);
558 ret
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
560 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
569 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle
*h
)
572 struct ctdb_rec_data
*rec
= NULL
;
575 talloc_free(h
->m_write
);
578 ret
= db_ctdb_transaction_fetch_start(h
);
583 for (i
=0;i
<h
->m_all
->count
;i
++) {
586 rec
= db_ctdb_marshall_loop_next(h
->m_all
, rec
, NULL
, NULL
, &key
, &data
);
588 DEBUG(0, (__location__
" Out of records in ctdb_replay_transaction?\n"));
592 if (rec
->reqid
== 0) {
594 if (db_ctdb_transaction_store(h
, key
, data
) != 0) {
599 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
601 if (db_ctdb_transaction_fetch(h
->ctx
, tmp_ctx
, key
, &data2
) != 0) {
602 talloc_free(tmp_ctx
);
605 if (data2
.dsize
!= data
.dsize
||
606 memcmp(data2
.dptr
, data
.dptr
, data
.dsize
) != 0) {
607 /* the record has changed on us - we have to give up */
608 talloc_free(tmp_ctx
);
611 talloc_free(tmp_ctx
);
618 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
626 static int db_ctdb_transaction_commit(struct db_context
*db
)
628 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
634 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
635 enum ctdb_controls failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
638 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
642 if (h
->nested_cancel
) {
643 db
->transaction_cancel(db
);
644 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
648 if (h
->nesting
!= 0) {
653 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
655 talloc_set_destructor(h
, NULL
);
657 /* our commit strategy is quite complex.
659 - we first try to commit the changes to all other nodes
661 - if that works, then we commit locally and we are done
663 - if a commit on another node fails, then we need to cancel
664 the transaction, then restart the transaction (thus
665 opening a window of time for a pending recovery to
666 complete), then replay the transaction, checking all the
667 reads and writes (checking that reads give the same data,
668 and writes succeed). Then we retry the transaction to the
673 if (h
->m_write
== NULL
) {
674 /* no changes were made, potentially after a retry */
675 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
677 ctx
->transaction
= NULL
;
681 /* tell ctdbd to commit to the other nodes */
682 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
683 retries
==0?CTDB_CONTROL_TRANS2_COMMIT
:CTDB_CONTROL_TRANS2_COMMIT_RETRY
,
685 db_ctdb_marshall_finish(h
->m_write
), NULL
, NULL
, &status
);
686 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
687 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
690 if (!NT_STATUS_IS_OK(rets
)) {
691 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
693 /* work out what error code we will give if we
694 have to fail the operation */
695 switch ((enum ctdb_trans2_commit_error
)status
) {
696 case CTDB_TRANS2_COMMIT_SUCCESS
:
697 case CTDB_TRANS2_COMMIT_SOMEFAIL
:
698 case CTDB_TRANS2_COMMIT_TIMEOUT
:
699 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
701 case CTDB_TRANS2_COMMIT_ALLFAIL
:
702 failure_control
= CTDB_CONTROL_TRANS2_FINISHED
;
707 if (++retries
== 5) {
708 DEBUG(0,(__location__
" Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
709 h
->ctx
->db_id
, retries
, (unsigned)failure_control
));
710 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
711 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
712 tdb_null
, NULL
, NULL
, NULL
);
713 h
->ctx
->transaction
= NULL
;
715 ctx
->transaction
= NULL
;
719 if (ctdb_replay_transaction(h
) != 0) {
720 DEBUG(0,(__location__
" Failed to replay transaction failure_control=%u\n",
721 (unsigned)failure_control
));
722 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
723 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
724 tdb_null
, NULL
, NULL
, NULL
);
725 h
->ctx
->transaction
= NULL
;
727 ctx
->transaction
= NULL
;
732 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
735 /* do the real commit locally */
736 ret
= tdb_transaction_commit(h
->ctx
->wtdb
->tdb
);
738 DEBUG(0,(__location__
" Failed to commit transaction failure_control=%u\n",
739 (unsigned)failure_control
));
740 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
, h
->ctx
->db_id
,
741 CTDB_CTRL_FLAG_NOREPLY
, tdb_null
, NULL
, NULL
, NULL
);
742 h
->ctx
->transaction
= NULL
;
747 /* tell ctdbd that we are finished with our local commit */
748 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED
,
749 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
750 tdb_null
, NULL
, NULL
, NULL
);
751 h
->ctx
->transaction
= NULL
;
760 static int db_ctdb_transaction_cancel(struct db_context
*db
)
762 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
764 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
767 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
771 if (h
->nesting
!= 0) {
773 h
->nested_cancel
= true;
777 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
779 ctx
->transaction
= NULL
;
785 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
787 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
788 rec
->private_data
, struct db_ctdb_rec
);
792 cdata
.dsize
= sizeof(crec
->header
) + data
.dsize
;
794 if (!(cdata
.dptr
= SMB_MALLOC_ARRAY(uint8
, cdata
.dsize
))) {
795 return NT_STATUS_NO_MEMORY
;
798 memcpy(cdata
.dptr
, &crec
->header
, sizeof(crec
->header
));
799 memcpy(cdata
.dptr
+ sizeof(crec
->header
), data
.dptr
, data
.dsize
);
801 ret
= tdb_store(crec
->ctdb_ctx
->wtdb
->tdb
, rec
->key
, cdata
, TDB_REPLACE
);
803 SAFE_FREE(cdata
.dptr
);
805 return (ret
== 0) ? NT_STATUS_OK
806 : tdb_error_to_ntstatus(crec
->ctdb_ctx
->wtdb
->tdb
);
811 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
816 * We have to store the header with empty data. TODO: Fix the
822 return db_ctdb_store(rec
, data
, 0);
826 static int db_ctdb_record_destr(struct db_record
* data
)
828 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
829 data
->private_data
, struct db_ctdb_rec
);
831 DEBUG(10, (DEBUGLEVEL
> 10
832 ? "Unlocking db %u key %s\n"
833 : "Unlocking db %u key %.20s\n",
834 (int)crec
->ctdb_ctx
->db_id
,
835 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
838 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
839 DEBUG(0, ("tdb_chainunlock failed\n"));
846 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
851 struct db_record
*result
;
852 struct db_ctdb_rec
*crec
;
855 int migrate_attempts
= 0;
857 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
858 DEBUG(0, ("talloc failed\n"));
862 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
863 DEBUG(0, ("talloc failed\n"));
868 result
->private_data
= (void *)crec
;
869 crec
->ctdb_ctx
= ctx
;
871 result
->key
.dsize
= key
.dsize
;
872 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
873 if (result
->key
.dptr
== NULL
) {
874 DEBUG(0, ("talloc failed\n"));
880 * Do a blocking lock on the record
884 if (DEBUGLEVEL
>= 10) {
885 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
886 DEBUG(10, (DEBUGLEVEL
> 10
887 ? "Locking db %u key %s\n"
888 : "Locking db %u key %.20s\n",
889 (int)crec
->ctdb_ctx
->db_id
, keystr
));
893 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
894 DEBUG(3, ("tdb_chainlock failed\n"));
899 result
->store
= db_ctdb_store
;
900 result
->delete_rec
= db_ctdb_delete
;
901 talloc_set_destructor(result
, db_ctdb_record_destr
);
903 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
906 * See if we have a valid record and we are the dmaster. If so, we can
907 * take the shortcut and just return it.
910 if ((ctdb_data
.dptr
== NULL
) ||
911 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
912 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
914 || (random() % 2 != 0)
917 SAFE_FREE(ctdb_data
.dptr
);
918 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
919 talloc_set_destructor(result
, NULL
);
921 migrate_attempts
+= 1;
923 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
924 ctdb_data
.dptr
, ctdb_data
.dptr
?
925 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
928 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
929 if (!NT_STATUS_IS_OK(status
)) {
930 DEBUG(5, ("ctdb_migrate failed: %s\n",
935 /* now its migrated, try again */
939 if (migrate_attempts
> 10) {
940 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
944 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
946 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
947 result
->value
.dptr
= NULL
;
949 if ((result
->value
.dsize
!= 0)
950 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
951 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
952 result
->value
.dsize
))) {
953 DEBUG(0, ("talloc failed\n"));
957 SAFE_FREE(ctdb_data
.dptr
);
962 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
966 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
969 if (ctx
->transaction
!= NULL
) {
970 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
973 if (db
->persistent
) {
974 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
977 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
981 fetch (unlocked, no migration) operation on ctdb
983 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
984 TDB_DATA key
, TDB_DATA
*data
)
986 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
991 if (ctx
->transaction
) {
992 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
995 /* try a direct fetch */
996 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
999 * See if we have a valid record and we are the dmaster. If so, we can
1000 * take the shortcut and just return it.
1001 * we bypass the dmaster check for persistent databases
1003 if ((ctdb_data
.dptr
!= NULL
) &&
1004 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
1006 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
1007 /* we are the dmaster - avoid the ctdb protocol op */
1009 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
1010 if (data
->dsize
== 0) {
1011 SAFE_FREE(ctdb_data
.dptr
);
1016 data
->dptr
= (uint8
*)talloc_memdup(
1017 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1020 SAFE_FREE(ctdb_data
.dptr
);
1022 if (data
->dptr
== NULL
) {
1028 SAFE_FREE(ctdb_data
.dptr
);
1030 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1031 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
1032 if (!NT_STATUS_IS_OK(status
)) {
1033 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1040 struct traverse_state
{
1041 struct db_context
*db
;
1042 int (*fn
)(struct db_record
*rec
, void *private_data
);
1046 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1048 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1049 struct db_record
*rec
;
1050 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1051 /* we have to give them a locked record to prevent races */
1052 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1053 if (rec
&& rec
->value
.dsize
> 0) {
1054 state
->fn(rec
, state
->private_data
);
1056 talloc_free(tmp_ctx
);
1059 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1062 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1063 struct db_record
*rec
;
1064 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1066 /* we have to give them a locked record to prevent races */
1067 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1068 if (rec
&& rec
->value
.dsize
> 0) {
1069 ret
= state
->fn(rec
, state
->private_data
);
1071 talloc_free(tmp_ctx
);
1075 static int db_ctdb_traverse(struct db_context
*db
,
1076 int (*fn
)(struct db_record
*rec
,
1077 void *private_data
),
1080 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1081 struct db_ctdb_ctx
);
1082 struct traverse_state state
;
1086 state
.private_data
= private_data
;
1088 if (db
->persistent
) {
1089 /* for persistent databases we don't need to do a ctdb traverse,
1090 we can do a faster local traverse */
1091 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1095 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1099 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1101 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1104 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1106 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1109 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1111 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1112 struct db_record rec
;
1115 rec
.store
= db_ctdb_store_deny
;
1116 rec
.delete_rec
= db_ctdb_delete_deny
;
1117 rec
.private_data
= state
->db
;
1118 state
->fn(&rec
, state
->private_data
);
1121 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1124 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1125 struct db_record rec
;
1128 rec
.store
= db_ctdb_store_deny
;
1129 rec
.delete_rec
= db_ctdb_delete_deny
;
1130 rec
.private_data
= state
->db
;
1132 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1133 /* a deleted record */
1136 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1137 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1139 return state
->fn(&rec
, state
->private_data
);
1142 static int db_ctdb_traverse_read(struct db_context
*db
,
1143 int (*fn
)(struct db_record
*rec
,
1144 void *private_data
),
1147 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1148 struct db_ctdb_ctx
);
1149 struct traverse_state state
;
1153 state
.private_data
= private_data
;
1155 if (db
->persistent
) {
1156 /* for persistent databases we don't need to do a ctdb traverse,
1157 we can do a faster local traverse */
1158 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1161 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1165 static int db_ctdb_get_seqnum(struct db_context
*db
)
1167 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1168 struct db_ctdb_ctx
);
1169 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1172 static int db_ctdb_get_flags(struct db_context
*db
)
1174 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1175 struct db_ctdb_ctx
);
1176 return tdb_get_flags(ctx
->wtdb
->tdb
);
1179 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1181 int hash_size
, int tdb_flags
,
1182 int open_flags
, mode_t mode
)
1184 struct db_context
*result
;
1185 struct db_ctdb_ctx
*db_ctdb
;
1188 if (!lp_clustering()) {
1189 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1193 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1194 DEBUG(0, ("talloc failed\n"));
1195 TALLOC_FREE(result
);
1199 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1200 DEBUG(0, ("talloc failed\n"));
1201 TALLOC_FREE(result
);
1205 db_ctdb
->transaction
= NULL
;
1206 db_ctdb
->db
= result
;
1208 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name
, &db_ctdb
->db_id
, tdb_flags
))) {
1209 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1210 TALLOC_FREE(result
);
1214 db_path
= ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb
, db_ctdb
->db_id
);
1216 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1218 /* only pass through specific flags */
1219 tdb_flags
&= TDB_SEQNUM
;
1221 /* honor permissions if user has specified O_CREAT */
1222 if (open_flags
& O_CREAT
) {
1223 chmod(db_path
, mode
);
1226 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1227 if (db_ctdb
->wtdb
== NULL
) {
1228 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1229 TALLOC_FREE(result
);
1232 talloc_free(db_path
);
1234 result
->private_data
= (void *)db_ctdb
;
1235 result
->fetch_locked
= db_ctdb_fetch_locked
;
1236 result
->fetch
= db_ctdb_fetch
;
1237 result
->traverse
= db_ctdb_traverse
;
1238 result
->traverse_read
= db_ctdb_traverse_read
;
1239 result
->get_seqnum
= db_ctdb_get_seqnum
;
1240 result
->get_flags
= db_ctdb_get_flags
;
1241 result
->transaction_start
= db_ctdb_transaction_start
;
1242 result
->transaction_commit
= db_ctdb_transaction_commit
;
1243 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1245 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1246 name
, db_ctdb
->db_id
));