2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle
{
27 struct db_ctdb_ctx
*ctx
;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer
*m_all
;
33 struct ctdb_marshall_buffer
*m_write
;
39 struct db_context
*db
;
40 struct tdb_wrap
*wtdb
;
42 struct db_ctdb_transaction_handle
*transaction
;
46 struct db_ctdb_ctx
*ctdb_ctx
;
47 struct ctdb_ltdb_header header
;
50 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
55 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
58 enum TDB_ERROR tret
= tdb_error(tdb
);
62 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
65 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
68 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
78 form a ctdb_rec_data record from a key/data pair
80 note that header may be NULL. If not NULL then it is included in the data portion
83 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
85 struct ctdb_ltdb_header
*header
,
89 struct ctdb_rec_data
*d
;
91 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
92 data
.dsize
+ (header
?sizeof(*header
):0);
93 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
99 d
->keylen
= key
.dsize
;
100 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
102 d
->datalen
= data
.dsize
+ sizeof(*header
);
103 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
104 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
106 d
->datalen
= data
.dsize
;
107 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
115 struct ctdb_marshall_buffer
*m
,
119 struct ctdb_ltdb_header
*header
,
122 struct ctdb_rec_data
*r
;
123 size_t m_size
, r_size
;
124 struct ctdb_marshall_buffer
*m2
= NULL
;
126 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
133 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
134 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
141 m_size
= talloc_get_size(m
);
142 r_size
= talloc_get_size(r
);
144 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
145 mem_ctx
, m
, m_size
+ r_size
);
151 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
160 /* we've finished marshalling, return a data blob with the marshalled records */
161 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
164 data
.dptr
= (uint8_t *)m
;
165 data
.dsize
= talloc_get_size(m
);
170 loop over a marshalling buffer
172 - pass r==NULL to start
173 - loop the number of times indicated by m->count
175 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
177 struct ctdb_ltdb_header
*header
,
178 TDB_DATA
*key
, TDB_DATA
*data
)
181 r
= (struct ctdb_rec_data
*)&m
->data
[0];
183 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
191 key
->dptr
= &r
->data
[0];
192 key
->dsize
= r
->keylen
;
195 data
->dptr
= &r
->data
[r
->keylen
];
196 data
->dsize
= r
->datalen
;
197 if (header
!= NULL
) {
198 data
->dptr
+= sizeof(*header
);
199 data
->dsize
-= sizeof(*header
);
203 if (header
!= NULL
) {
204 if (r
->datalen
< sizeof(*header
)) {
207 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
215 /* start a transaction on a database */
216 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
218 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
222 /* start a transaction on a database */
223 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle
*h
)
225 struct db_record
*rh
;
228 const char *keyname
= CTDB_TRANSACTION_LOCK_KEY
;
230 struct db_ctdb_ctx
*ctx
= h
->ctx
;
233 key
.dptr
= (uint8_t *)discard_const(keyname
);
234 key
.dsize
= strlen(keyname
);
237 tmp_ctx
= talloc_new(h
);
239 rh
= fetch_locked_internal(ctx
, tmp_ctx
, key
, true);
241 DEBUG(0,(__location__
" Failed to fetch_lock database\n"));
242 talloc_free(tmp_ctx
);
247 ret
= tdb_transaction_start(ctx
->wtdb
->tdb
);
249 DEBUG(0,(__location__
" Failed to start tdb transaction\n"));
250 talloc_free(tmp_ctx
);
254 data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
255 if ((data
.dptr
== NULL
) ||
256 (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
257 ((struct ctdb_ltdb_header
*)data
.dptr
)->dmaster
!= get_my_vnn()) {
258 SAFE_FREE(data
.dptr
);
259 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
260 talloc_free(tmp_ctx
);
264 SAFE_FREE(data
.dptr
);
265 talloc_free(tmp_ctx
);
271 /* start a transaction on a database */
272 static int db_ctdb_transaction_start(struct db_context
*db
)
274 struct db_ctdb_transaction_handle
*h
;
276 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
279 if (!db
->persistent
) {
280 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
285 if (ctx
->transaction
) {
286 ctx
->transaction
->nesting
++;
290 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
292 DEBUG(0,(__location__
" oom for transaction handle\n"));
298 ret
= db_ctdb_transaction_fetch_start(h
);
304 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
306 ctx
->transaction
= h
;
308 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
316 fetch a record inside a transaction
318 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
320 TDB_DATA key
, TDB_DATA
*data
)
322 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
324 *data
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
326 if (data
->dptr
!= NULL
) {
327 uint8_t *oldptr
= (uint8_t *)data
->dptr
;
328 data
->dsize
-= sizeof(struct ctdb_ltdb_header
);
329 if (data
->dsize
== 0) {
332 data
->dptr
= (uint8
*)
334 mem_ctx
, data
->dptr
+sizeof(struct ctdb_ltdb_header
),
338 if (data
->dptr
== NULL
&& data
->dsize
!= 0) {
344 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
, NULL
, *data
);
345 if (h
->m_all
== NULL
) {
346 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
348 talloc_free(data
->dptr
);
357 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
358 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
360 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
364 struct db_record
*result
;
367 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
368 DEBUG(0, ("talloc failed\n"));
372 result
->private_data
= ctx
->transaction
;
374 result
->key
.dsize
= key
.dsize
;
375 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
376 if (result
->key
.dptr
== NULL
) {
377 DEBUG(0, ("talloc failed\n"));
382 result
->store
= db_ctdb_store_transaction
;
383 result
->delete_rec
= db_ctdb_delete_transaction
;
385 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
386 if (ctdb_data
.dptr
== NULL
) {
387 /* create the record */
388 result
->value
= tdb_null
;
392 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
393 result
->value
.dptr
= NULL
;
395 if ((result
->value
.dsize
!= 0)
396 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
397 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
398 result
->value
.dsize
))) {
399 DEBUG(0, ("talloc failed\n"));
403 SAFE_FREE(ctdb_data
.dptr
);
408 static int db_ctdb_record_destructor(struct db_record
**recp
)
410 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
411 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
412 rec
->private_data
, struct db_ctdb_transaction_handle
);
413 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
415 DEBUG(0,(__location__
" transaction_commit failed\n"));
421 auto-create a transaction for persistent databases
423 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
428 struct db_record
*rec
, **recp
;
430 res
= db_ctdb_transaction_start(ctx
->db
);
435 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
437 ctx
->db
->transaction_cancel(ctx
->db
);
441 /* destroy this transaction when we release the lock */
442 recp
= talloc(rec
, struct db_record
*);
444 ctx
->db
->transaction_cancel(ctx
->db
);
449 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
455 stores a record inside a transaction
457 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
458 TDB_DATA key
, TDB_DATA data
)
460 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
463 struct ctdb_ltdb_header header
;
465 /* we need the header so we can update the RSN */
466 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
467 if (rec
.dptr
== NULL
) {
468 /* the record doesn't exist - create one with us as dmaster.
469 This is only safe because we are in a transaction and this
470 is a persistent database */
472 header
.dmaster
= get_my_vnn();
474 memcpy(&header
, rec
.dptr
, sizeof(struct ctdb_ltdb_header
));
475 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
476 /* a special case, we are writing the same data that is there now */
477 if (data
.dsize
== rec
.dsize
&&
478 memcmp(data
.dptr
, rec
.dptr
+ sizeof(struct ctdb_ltdb_header
), data
.dsize
) == 0) {
480 talloc_free(tmp_ctx
);
489 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
, NULL
, data
);
490 if (h
->m_all
== NULL
) {
491 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
492 talloc_free(tmp_ctx
);
497 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
498 if (h
->m_write
== NULL
) {
499 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
500 talloc_free(tmp_ctx
);
504 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
505 rec
.dptr
= (uint8_t *)talloc_size(tmp_ctx
, rec
.dsize
);
506 if (rec
.dptr
== NULL
) {
507 DEBUG(0,(__location__
" Failed to alloc record\n"));
508 talloc_free(tmp_ctx
);
511 memcpy(rec
.dptr
, &header
, sizeof(struct ctdb_ltdb_header
));
512 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
514 ret
= tdb_store(h
->ctx
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
516 talloc_free(tmp_ctx
);
523 a record store inside a transaction
525 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
527 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
528 rec
->private_data
, struct db_ctdb_transaction_handle
);
531 ret
= db_ctdb_transaction_store(h
, rec
->key
, data
);
533 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
539 a record delete inside a transaction
541 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
543 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
544 rec
->private_data
, struct db_ctdb_transaction_handle
);
547 ret
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
549 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
558 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle
*h
)
561 struct ctdb_rec_data
*rec
= NULL
;
564 talloc_free(h
->m_write
);
567 ret
= db_ctdb_transaction_fetch_start(h
);
572 for (i
=0;i
<h
->m_all
->count
;i
++) {
575 rec
= db_ctdb_marshall_loop_next(h
->m_all
, rec
, NULL
, NULL
, &key
, &data
);
577 DEBUG(0, (__location__
" Out of records in ctdb_replay_transaction?\n"));
581 if (rec
->reqid
== 0) {
583 if (db_ctdb_transaction_store(h
, key
, data
) != 0) {
588 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
590 if (db_ctdb_transaction_fetch(h
->ctx
, tmp_ctx
, key
, &data2
) != 0) {
591 talloc_free(tmp_ctx
);
594 if (data2
.dsize
!= data
.dsize
||
595 memcmp(data2
.dptr
, data
.dptr
, data
.dsize
) != 0) {
596 /* the record has changed on us - we have to give up */
597 talloc_free(tmp_ctx
);
600 talloc_free(tmp_ctx
);
607 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
615 static int db_ctdb_transaction_commit(struct db_context
*db
)
617 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
623 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
624 enum ctdb_controls failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
627 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
631 if (h
->nested_cancel
) {
632 db
->transaction_cancel(db
);
633 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
637 if (h
->nesting
!= 0) {
642 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
644 talloc_set_destructor(h
, NULL
);
646 /* our commit strategy is quite complex.
648 - we first try to commit the changes to all other nodes
650 - if that works, then we commit locally and we are done
652 - if a commit on another node fails, then we need to cancel
653 the transaction, then restart the transaction (thus
654 opening a window of time for a pending recovery to
655 complete), then replay the transaction, checking all the
656 reads and writes (checking that reads give the same data,
657 and writes succeed). Then we retry the transaction to the
662 if (h
->m_write
== NULL
) {
663 /* no changes were made, potentially after a retry */
664 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
666 ctx
->transaction
= NULL
;
670 /* tell ctdbd to commit to the other nodes */
671 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
672 retries
==0?CTDB_CONTROL_TRANS2_COMMIT
:CTDB_CONTROL_TRANS2_COMMIT_RETRY
,
674 db_ctdb_marshall_finish(h
->m_write
), NULL
, NULL
, &status
);
675 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
676 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
679 if (!NT_STATUS_IS_OK(rets
)) {
680 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
682 /* work out what error code we will give if we
683 have to fail the operation */
684 switch ((enum ctdb_trans2_commit_error
)status
) {
685 case CTDB_TRANS2_COMMIT_SUCCESS
:
686 case CTDB_TRANS2_COMMIT_SOMEFAIL
:
687 case CTDB_TRANS2_COMMIT_TIMEOUT
:
688 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
690 case CTDB_TRANS2_COMMIT_ALLFAIL
:
691 failure_control
= CTDB_CONTROL_TRANS2_FINISHED
;
696 if (++retries
== 5) {
697 DEBUG(0,(__location__
" Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
698 h
->ctx
->db_id
, retries
, (unsigned)failure_control
));
699 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
700 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
701 tdb_null
, NULL
, NULL
, NULL
);
702 h
->ctx
->transaction
= NULL
;
704 ctx
->transaction
= NULL
;
708 if (ctdb_replay_transaction(h
) != 0) {
709 DEBUG(0,(__location__
" Failed to replay transaction failure_control=%u\n",
710 (unsigned)failure_control
));
711 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
712 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
713 tdb_null
, NULL
, NULL
, NULL
);
714 h
->ctx
->transaction
= NULL
;
716 ctx
->transaction
= NULL
;
721 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
724 /* do the real commit locally */
725 ret
= tdb_transaction_commit(h
->ctx
->wtdb
->tdb
);
727 DEBUG(0,(__location__
" Failed to commit transaction failure_control=%u\n",
728 (unsigned)failure_control
));
729 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
, h
->ctx
->db_id
,
730 CTDB_CTRL_FLAG_NOREPLY
, tdb_null
, NULL
, NULL
, NULL
);
731 h
->ctx
->transaction
= NULL
;
736 /* tell ctdbd that we are finished with our local commit */
737 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED
,
738 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
739 tdb_null
, NULL
, NULL
, NULL
);
740 h
->ctx
->transaction
= NULL
;
749 static int db_ctdb_transaction_cancel(struct db_context
*db
)
751 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
753 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
756 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
760 if (h
->nesting
!= 0) {
762 h
->nested_cancel
= true;
766 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
768 ctx
->transaction
= NULL
;
774 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
776 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
777 rec
->private_data
, struct db_ctdb_rec
);
781 cdata
.dsize
= sizeof(crec
->header
) + data
.dsize
;
783 if (!(cdata
.dptr
= SMB_MALLOC_ARRAY(uint8
, cdata
.dsize
))) {
784 return NT_STATUS_NO_MEMORY
;
787 memcpy(cdata
.dptr
, &crec
->header
, sizeof(crec
->header
));
788 memcpy(cdata
.dptr
+ sizeof(crec
->header
), data
.dptr
, data
.dsize
);
790 ret
= tdb_store(crec
->ctdb_ctx
->wtdb
->tdb
, rec
->key
, cdata
, TDB_REPLACE
);
792 SAFE_FREE(cdata
.dptr
);
794 return (ret
== 0) ? NT_STATUS_OK
795 : tdb_error_to_ntstatus(crec
->ctdb_ctx
->wtdb
->tdb
);
800 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
805 * We have to store the header with empty data. TODO: Fix the
811 return db_ctdb_store(rec
, data
, 0);
815 static int db_ctdb_record_destr(struct db_record
* data
)
817 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
818 data
->private_data
, struct db_ctdb_rec
);
820 DEBUG(10, (DEBUGLEVEL
> 10
821 ? "Unlocking db %u key %s\n"
822 : "Unlocking db %u key %.20s\n",
823 (int)crec
->ctdb_ctx
->db_id
,
824 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
827 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
828 DEBUG(0, ("tdb_chainunlock failed\n"));
835 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
840 struct db_record
*result
;
841 struct db_ctdb_rec
*crec
;
844 int migrate_attempts
= 0;
846 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
847 DEBUG(0, ("talloc failed\n"));
851 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
852 DEBUG(0, ("talloc failed\n"));
857 result
->private_data
= (void *)crec
;
858 crec
->ctdb_ctx
= ctx
;
860 result
->key
.dsize
= key
.dsize
;
861 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
862 if (result
->key
.dptr
== NULL
) {
863 DEBUG(0, ("talloc failed\n"));
869 * Do a blocking lock on the record
873 if (DEBUGLEVEL
>= 10) {
874 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
875 DEBUG(10, (DEBUGLEVEL
> 10
876 ? "Locking db %u key %s\n"
877 : "Locking db %u key %.20s\n",
878 (int)crec
->ctdb_ctx
->db_id
, keystr
));
882 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
883 DEBUG(3, ("tdb_chainlock failed\n"));
888 result
->store
= db_ctdb_store
;
889 result
->delete_rec
= db_ctdb_delete
;
890 talloc_set_destructor(result
, db_ctdb_record_destr
);
892 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
895 * See if we have a valid record and we are the dmaster. If so, we can
896 * take the shortcut and just return it.
899 if ((ctdb_data
.dptr
== NULL
) ||
900 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
901 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
903 || (random() % 2 != 0)
906 SAFE_FREE(ctdb_data
.dptr
);
907 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
908 talloc_set_destructor(result
, NULL
);
910 migrate_attempts
+= 1;
912 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
913 ctdb_data
.dptr
, ctdb_data
.dptr
?
914 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
917 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
918 if (!NT_STATUS_IS_OK(status
)) {
919 DEBUG(5, ("ctdb_migrate failed: %s\n",
924 /* now its migrated, try again */
928 if (migrate_attempts
> 10) {
929 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
933 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
935 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
936 result
->value
.dptr
= NULL
;
938 if ((result
->value
.dsize
!= 0)
939 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
940 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
941 result
->value
.dsize
))) {
942 DEBUG(0, ("talloc failed\n"));
946 SAFE_FREE(ctdb_data
.dptr
);
951 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
955 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
958 if (ctx
->transaction
!= NULL
) {
959 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
962 if (db
->persistent
) {
963 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
966 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
970 fetch (unlocked, no migration) operation on ctdb
972 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
973 TDB_DATA key
, TDB_DATA
*data
)
975 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
980 if (ctx
->transaction
) {
981 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
984 /* try a direct fetch */
985 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
988 * See if we have a valid record and we are the dmaster. If so, we can
989 * take the shortcut and just return it.
990 * we bypass the dmaster check for persistent databases
992 if ((ctdb_data
.dptr
!= NULL
) &&
993 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
995 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
996 /* we are the dmaster - avoid the ctdb protocol op */
998 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
999 if (data
->dsize
== 0) {
1000 SAFE_FREE(ctdb_data
.dptr
);
1005 data
->dptr
= (uint8
*)talloc_memdup(
1006 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1009 SAFE_FREE(ctdb_data
.dptr
);
1011 if (data
->dptr
== NULL
) {
1017 SAFE_FREE(ctdb_data
.dptr
);
1019 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1020 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
1021 if (!NT_STATUS_IS_OK(status
)) {
1022 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1029 struct traverse_state
{
1030 struct db_context
*db
;
1031 int (*fn
)(struct db_record
*rec
, void *private_data
);
1035 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1037 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1038 struct db_record
*rec
;
1039 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1040 /* we have to give them a locked record to prevent races */
1041 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1042 if (rec
&& rec
->value
.dsize
> 0) {
1043 state
->fn(rec
, state
->private_data
);
1045 talloc_free(tmp_ctx
);
1048 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1051 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1052 struct db_record
*rec
;
1053 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1055 /* we have to give them a locked record to prevent races */
1056 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1057 if (rec
&& rec
->value
.dsize
> 0) {
1058 ret
= state
->fn(rec
, state
->private_data
);
1060 talloc_free(tmp_ctx
);
1064 static int db_ctdb_traverse(struct db_context
*db
,
1065 int (*fn
)(struct db_record
*rec
,
1066 void *private_data
),
1069 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1070 struct db_ctdb_ctx
);
1071 struct traverse_state state
;
1075 state
.private_data
= private_data
;
1077 if (db
->persistent
) {
1078 /* for persistent databases we don't need to do a ctdb traverse,
1079 we can do a faster local traverse */
1080 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1084 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1088 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1090 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1093 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1095 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1098 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1100 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1101 struct db_record rec
;
1104 rec
.store
= db_ctdb_store_deny
;
1105 rec
.delete_rec
= db_ctdb_delete_deny
;
1106 rec
.private_data
= state
->db
;
1107 state
->fn(&rec
, state
->private_data
);
1110 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1113 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1114 struct db_record rec
;
1117 rec
.store
= db_ctdb_store_deny
;
1118 rec
.delete_rec
= db_ctdb_delete_deny
;
1119 rec
.private_data
= state
->db
;
1121 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1122 /* a deleted record */
1125 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1126 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1128 return state
->fn(&rec
, state
->private_data
);
1131 static int db_ctdb_traverse_read(struct db_context
*db
,
1132 int (*fn
)(struct db_record
*rec
,
1133 void *private_data
),
1136 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1137 struct db_ctdb_ctx
);
1138 struct traverse_state state
;
1142 state
.private_data
= private_data
;
1144 if (db
->persistent
) {
1145 /* for persistent databases we don't need to do a ctdb traverse,
1146 we can do a faster local traverse */
1147 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1150 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1154 static int db_ctdb_get_seqnum(struct db_context
*db
)
1156 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1157 struct db_ctdb_ctx
);
1158 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1161 static int db_ctdb_get_flags(struct db_context
*db
)
1163 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1164 struct db_ctdb_ctx
);
1165 return tdb_get_flags(ctx
->wtdb
->tdb
);
1168 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1170 int hash_size
, int tdb_flags
,
1171 int open_flags
, mode_t mode
)
1173 struct db_context
*result
;
1174 struct db_ctdb_ctx
*db_ctdb
;
1177 if (!lp_clustering()) {
1178 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1182 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1183 DEBUG(0, ("talloc failed\n"));
1184 TALLOC_FREE(result
);
1188 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1189 DEBUG(0, ("talloc failed\n"));
1190 TALLOC_FREE(result
);
1194 db_ctdb
->transaction
= NULL
;
1195 db_ctdb
->db
= result
;
1197 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name
, &db_ctdb
->db_id
, tdb_flags
))) {
1198 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1199 TALLOC_FREE(result
);
1203 db_path
= ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb
, db_ctdb
->db_id
);
1205 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1207 /* only pass through specific flags */
1208 tdb_flags
&= TDB_SEQNUM
;
1210 /* honor permissions if user has specified O_CREAT */
1211 if (open_flags
& O_CREAT
) {
1212 chmod(db_path
, mode
);
1215 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1216 if (db_ctdb
->wtdb
== NULL
) {
1217 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1218 TALLOC_FREE(result
);
1221 talloc_free(db_path
);
1223 result
->private_data
= (void *)db_ctdb
;
1224 result
->fetch_locked
= db_ctdb_fetch_locked
;
1225 result
->fetch
= db_ctdb_fetch
;
1226 result
->traverse
= db_ctdb_traverse
;
1227 result
->traverse_read
= db_ctdb_traverse_read
;
1228 result
->get_seqnum
= db_ctdb_get_seqnum
;
1229 result
->get_flags
= db_ctdb_get_flags
;
1230 result
->transaction_start
= db_ctdb_transaction_start
;
1231 result
->transaction_commit
= db_ctdb_transaction_commit
;
1232 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1234 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1235 name
, db_ctdb
->db_id
));