2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle
{
27 struct db_ctdb_ctx
*ctx
;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer
*m_all
;
33 struct ctdb_marshall_buffer
*m_write
;
39 struct db_context
*db
;
40 struct tdb_wrap
*wtdb
;
42 struct db_ctdb_transaction_handle
*transaction
;
46 struct db_ctdb_ctx
*ctdb_ctx
;
47 struct ctdb_ltdb_header header
;
50 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
55 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
58 enum TDB_ERROR tret
= tdb_error(tdb
);
62 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
65 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
68 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
78 form a ctdb_rec_data record from a key/data pair
80 note that header may be NULL. If not NULL then it is included in the data portion
83 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
85 struct ctdb_ltdb_header
*header
,
89 struct ctdb_rec_data
*d
;
91 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
92 data
.dsize
+ (header
?sizeof(*header
):0);
93 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
99 d
->keylen
= key
.dsize
;
100 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
102 d
->datalen
= data
.dsize
+ sizeof(*header
);
103 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
104 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
106 d
->datalen
= data
.dsize
;
107 memcpy(&d
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
115 struct ctdb_marshall_buffer
*m
,
119 struct ctdb_ltdb_header
*header
,
122 struct ctdb_rec_data
*r
;
123 size_t m_size
, r_size
;
124 struct ctdb_marshall_buffer
*m2
;
126 r
= db_ctdb_marshall_record(mem_ctx
, reqid
, key
, header
, data
);
133 m
= talloc_zero_size(mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
140 m_size
= talloc_get_size(m
);
141 r_size
= talloc_get_size(r
);
143 m2
= talloc_realloc_size(mem_ctx
, m
, m_size
+ r_size
);
149 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
158 /* we've finished marshalling, return a data blob with the marshalled records */
159 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
162 data
.dptr
= (uint8_t *)m
;
163 data
.dsize
= talloc_get_size(m
);
168 loop over a marshalling buffer
170 - pass r==NULL to start
171 - loop the number of times indicated by m->count
173 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
,
175 struct ctdb_ltdb_header
*header
,
176 TDB_DATA
*key
, TDB_DATA
*data
)
179 r
= (struct ctdb_rec_data
*)&m
->data
[0];
181 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
189 key
->dptr
= &r
->data
[0];
190 key
->dsize
= r
->keylen
;
193 data
->dptr
= &r
->data
[r
->keylen
];
194 data
->dsize
= r
->datalen
;
195 if (header
!= NULL
) {
196 data
->dptr
+= sizeof(*header
);
197 data
->dsize
-= sizeof(*header
);
201 if (header
!= NULL
) {
202 if (r
->datalen
< sizeof(*header
)) {
205 *header
= *(struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
213 /* start a transaction on a database */
214 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
216 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
220 /* start a transaction on a database */
221 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle
*h
)
223 struct db_record
*rh
;
226 const char *keyname
= CTDB_TRANSACTION_LOCK_KEY
;
228 struct db_ctdb_ctx
*ctx
= h
->ctx
;
231 key
.dptr
= discard_const(keyname
);
232 key
.dsize
= strlen(keyname
);
235 tmp_ctx
= talloc_new(h
);
237 rh
= fetch_locked_internal(ctx
, tmp_ctx
, key
, true);
239 DEBUG(0,(__location__
" Failed to fetch_lock database\n"));
240 talloc_free(tmp_ctx
);
245 ret
= tdb_transaction_start(ctx
->wtdb
->tdb
);
247 DEBUG(0,(__location__
" Failed to start tdb transaction\n"));
248 talloc_free(tmp_ctx
);
252 data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
253 if ((data
.dptr
== NULL
) ||
254 (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
255 ((struct ctdb_ltdb_header
*)data
.dptr
)->dmaster
!= get_my_vnn()) {
256 SAFE_FREE(data
.dptr
);
257 tdb_transaction_cancel(ctx
->wtdb
->tdb
);
258 talloc_free(tmp_ctx
);
262 SAFE_FREE(data
.dptr
);
263 talloc_free(tmp_ctx
);
269 /* start a transaction on a database */
270 static int db_ctdb_transaction_start(struct db_context
*db
)
272 struct db_ctdb_transaction_handle
*h
;
274 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
277 if (!db
->persistent
) {
278 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
283 if (ctx
->transaction
) {
284 ctx
->transaction
->nesting
++;
288 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
290 DEBUG(0,(__location__
" oom for transaction handle\n"));
296 ret
= db_ctdb_transaction_fetch_start(h
);
302 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
304 ctx
->transaction
= h
;
306 DEBUG(5,(__location__
" Started transaction on db 0x%08x\n", ctx
->db_id
));
314 fetch a record inside a transaction
316 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx
*db
,
318 TDB_DATA key
, TDB_DATA
*data
)
320 struct db_ctdb_transaction_handle
*h
= db
->transaction
;
322 *data
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
324 if (data
->dptr
!= NULL
) {
325 uint8_t *oldptr
= (uint8_t *)data
->dptr
;
326 data
->dsize
-= sizeof(struct ctdb_ltdb_header
);
327 if (data
->dsize
== 0) {
330 data
->dptr
= (uint8
*)
332 mem_ctx
, data
->dptr
+sizeof(struct ctdb_ltdb_header
),
336 if (data
->dptr
== NULL
&& data
->dsize
!= 0) {
342 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 1, key
, NULL
, *data
);
343 if (h
->m_all
== NULL
) {
344 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
346 talloc_free(data
->dptr
);
355 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
356 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
358 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
362 struct db_record
*result
;
365 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
366 DEBUG(0, ("talloc failed\n"));
370 result
->private_data
= ctx
->transaction
;
372 result
->key
.dsize
= key
.dsize
;
373 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
374 if (result
->key
.dptr
== NULL
) {
375 DEBUG(0, ("talloc failed\n"));
380 result
->store
= db_ctdb_store_transaction
;
381 result
->delete_rec
= db_ctdb_delete_transaction
;
383 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
384 if (ctdb_data
.dptr
== NULL
) {
385 /* create the record */
386 result
->value
= tdb_null
;
390 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
391 result
->value
.dptr
= NULL
;
393 if ((result
->value
.dsize
!= 0)
394 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
395 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
396 result
->value
.dsize
))) {
397 DEBUG(0, ("talloc failed\n"));
401 SAFE_FREE(ctdb_data
.dptr
);
406 static int db_ctdb_record_destructor(struct db_record
**recp
)
408 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
409 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
410 rec
->private_data
, struct db_ctdb_transaction_handle
);
411 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
413 DEBUG(0,(__location__
" transaction_commit failed\n"));
419 auto-create a transaction for persistent databases
421 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
426 struct db_record
*rec
, **recp
;
428 res
= db_ctdb_transaction_start(ctx
->db
);
433 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
435 ctx
->db
->transaction_cancel(ctx
->db
);
439 /* destroy this transaction when we release the lock */
440 recp
= talloc(rec
, struct db_record
*);
442 ctx
->db
->transaction_cancel(ctx
->db
);
447 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
453 stores a record inside a transaction
455 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
456 TDB_DATA key
, TDB_DATA data
)
458 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
461 struct ctdb_ltdb_header header
;
463 /* we need the header so we can update the RSN */
464 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
465 if (rec
.dptr
== NULL
) {
466 /* the record doesn't exist - create one with us as dmaster.
467 This is only safe because we are in a transaction and this
468 is a persistent database */
470 header
.dmaster
= get_my_vnn();
472 memcpy(&header
, rec
.dptr
, sizeof(struct ctdb_ltdb_header
));
473 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
474 /* a special case, we are writing the same data that is there now */
475 if (data
.dsize
== rec
.dsize
&&
476 memcmp(data
.dptr
, rec
.dptr
+ sizeof(struct ctdb_ltdb_header
), data
.dsize
) == 0) {
478 talloc_free(tmp_ctx
);
487 h
->m_all
= db_ctdb_marshall_add(h
, h
->m_all
, h
->ctx
->db_id
, 0, key
, NULL
, data
);
488 if (h
->m_all
== NULL
) {
489 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
490 talloc_free(tmp_ctx
);
495 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
496 if (h
->m_write
== NULL
) {
497 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
498 talloc_free(tmp_ctx
);
502 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
503 rec
.dptr
= talloc_size(tmp_ctx
, rec
.dsize
);
504 if (rec
.dptr
== NULL
) {
505 DEBUG(0,(__location__
" Failed to alloc record\n"));
506 talloc_free(tmp_ctx
);
509 memcpy(rec
.dptr
, &header
, sizeof(struct ctdb_ltdb_header
));
510 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
512 ret
= tdb_store(h
->ctx
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
514 talloc_free(tmp_ctx
);
521 a record store inside a transaction
523 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
525 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
526 rec
->private_data
, struct db_ctdb_transaction_handle
);
529 ret
= db_ctdb_transaction_store(h
, rec
->key
, data
);
531 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
537 a record delete inside a transaction
539 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
541 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
542 rec
->private_data
, struct db_ctdb_transaction_handle
);
545 ret
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
547 return tdb_error_to_ntstatus(h
->ctx
->wtdb
->tdb
);
556 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle
*h
)
559 struct ctdb_rec_data
*rec
= NULL
;
562 talloc_free(h
->m_write
);
565 ret
= db_ctdb_transaction_fetch_start(h
);
570 for (i
=0;i
<h
->m_all
->count
;i
++) {
573 rec
= db_ctdb_marshall_loop_next(h
->m_all
, rec
, NULL
, NULL
, &key
, &data
);
575 DEBUG(0, (__location__
" Out of records in ctdb_replay_transaction?\n"));
579 if (rec
->reqid
== 0) {
581 if (db_ctdb_transaction_store(h
, key
, data
) != 0) {
586 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
588 if (db_ctdb_transaction_fetch(h
->ctx
, tmp_ctx
, key
, &data2
) != 0) {
589 talloc_free(tmp_ctx
);
592 if (data2
.dsize
!= data
.dsize
||
593 memcmp(data2
.dptr
, data
.dptr
, data
.dsize
) != 0) {
594 /* the record has changed on us - we have to give up */
595 talloc_free(tmp_ctx
);
598 talloc_free(tmp_ctx
);
605 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
613 static int db_ctdb_transaction_commit(struct db_context
*db
)
615 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
621 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
622 enum ctdb_controls failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
625 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
629 if (h
->nested_cancel
) {
630 db
->transaction_cancel(db
);
631 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
635 if (h
->nesting
!= 0) {
640 DEBUG(5,(__location__
" Commit transaction on db 0x%08x\n", ctx
->db_id
));
642 talloc_set_destructor(h
, NULL
);
644 /* our commit strategy is quite complex.
646 - we first try to commit the changes to all other nodes
648 - if that works, then we commit locally and we are done
650 - if a commit on another node fails, then we need to cancel
651 the transaction, then restart the transaction (thus
652 opening a window of time for a pending recovery to
653 complete), then replay the transaction, checking all the
654 reads and writes (checking that reads give the same data,
655 and writes succeed). Then we retry the transaction to the
660 if (h
->m_write
== NULL
) {
661 /* no changes were made, potentially after a retry */
662 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
664 ctx
->transaction
= NULL
;
668 /* tell ctdbd to commit to the other nodes */
669 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
670 retries
==0?CTDB_CONTROL_TRANS2_COMMIT
:CTDB_CONTROL_TRANS2_COMMIT_RETRY
,
672 db_ctdb_marshall_finish(h
->m_write
), NULL
, NULL
, &status
);
673 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
674 tdb_transaction_cancel(h
->ctx
->wtdb
->tdb
);
677 if (!NT_STATUS_IS_OK(rets
)) {
678 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
680 /* work out what error code we will give if we
681 have to fail the operation */
682 switch ((enum ctdb_trans2_commit_error
)status
) {
683 case CTDB_TRANS2_COMMIT_SUCCESS
:
684 case CTDB_TRANS2_COMMIT_SOMEFAIL
:
685 case CTDB_TRANS2_COMMIT_TIMEOUT
:
686 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
688 case CTDB_TRANS2_COMMIT_ALLFAIL
:
689 failure_control
= CTDB_CONTROL_TRANS2_FINISHED
;
694 if (++retries
== 5) {
695 DEBUG(0,(__location__
" Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
696 h
->ctx
->db_id
, retries
, (unsigned)failure_control
));
697 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
698 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
699 tdb_null
, NULL
, NULL
, NULL
);
700 h
->ctx
->transaction
= NULL
;
702 ctx
->transaction
= NULL
;
706 if (ctdb_replay_transaction(h
) != 0) {
707 DEBUG(0,(__location__
" Failed to replay transaction failure_control=%u\n",
708 (unsigned)failure_control
));
709 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
,
710 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
711 tdb_null
, NULL
, NULL
, NULL
);
712 h
->ctx
->transaction
= NULL
;
714 ctx
->transaction
= NULL
;
719 failure_control
= CTDB_CONTROL_TRANS2_ERROR
;
722 /* do the real commit locally */
723 ret
= tdb_transaction_commit(h
->ctx
->wtdb
->tdb
);
725 DEBUG(0,(__location__
" Failed to commit transaction failure_control=%u\n",
726 (unsigned)failure_control
));
727 ctdbd_control_local(messaging_ctdbd_connection(), failure_control
, h
->ctx
->db_id
,
728 CTDB_CTRL_FLAG_NOREPLY
, tdb_null
, NULL
, NULL
, NULL
);
729 h
->ctx
->transaction
= NULL
;
734 /* tell ctdbd that we are finished with our local commit */
735 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED
,
736 h
->ctx
->db_id
, CTDB_CTRL_FLAG_NOREPLY
,
737 tdb_null
, NULL
, NULL
, NULL
);
738 h
->ctx
->transaction
= NULL
;
747 static int db_ctdb_transaction_cancel(struct db_context
*db
)
749 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
751 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
754 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
758 if (h
->nesting
!= 0) {
760 h
->nested_cancel
= true;
764 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
766 ctx
->transaction
= NULL
;
772 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
774 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
775 rec
->private_data
, struct db_ctdb_rec
);
779 cdata
.dsize
= sizeof(crec
->header
) + data
.dsize
;
781 if (!(cdata
.dptr
= SMB_MALLOC_ARRAY(uint8
, cdata
.dsize
))) {
782 return NT_STATUS_NO_MEMORY
;
785 memcpy(cdata
.dptr
, &crec
->header
, sizeof(crec
->header
));
786 memcpy(cdata
.dptr
+ sizeof(crec
->header
), data
.dptr
, data
.dsize
);
788 ret
= tdb_store(crec
->ctdb_ctx
->wtdb
->tdb
, rec
->key
, cdata
, TDB_REPLACE
);
790 SAFE_FREE(cdata
.dptr
);
792 return (ret
== 0) ? NT_STATUS_OK
793 : tdb_error_to_ntstatus(crec
->ctdb_ctx
->wtdb
->tdb
);
798 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
803 * We have to store the header with empty data. TODO: Fix the
809 return db_ctdb_store(rec
, data
, 0);
813 static int db_ctdb_record_destr(struct db_record
* data
)
815 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
816 data
->private_data
, struct db_ctdb_rec
);
818 DEBUG(10, (DEBUGLEVEL
> 10
819 ? "Unlocking db %u key %s\n"
820 : "Unlocking db %u key %.20s\n",
821 (int)crec
->ctdb_ctx
->db_id
,
822 hex_encode(data
, (unsigned char *)data
->key
.dptr
,
825 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
826 DEBUG(0, ("tdb_chainunlock failed\n"));
833 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
838 struct db_record
*result
;
839 struct db_ctdb_rec
*crec
;
842 int migrate_attempts
= 0;
844 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
845 DEBUG(0, ("talloc failed\n"));
849 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
850 DEBUG(0, ("talloc failed\n"));
855 result
->private_data
= (void *)crec
;
856 crec
->ctdb_ctx
= ctx
;
858 result
->key
.dsize
= key
.dsize
;
859 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
860 if (result
->key
.dptr
== NULL
) {
861 DEBUG(0, ("talloc failed\n"));
867 * Do a blocking lock on the record
871 if (DEBUGLEVEL
>= 10) {
872 char *keystr
= hex_encode(result
, key
.dptr
, key
.dsize
);
873 DEBUG(10, (DEBUGLEVEL
> 10
874 ? "Locking db %u key %s\n"
875 : "Locking db %u key %.20s\n",
876 (int)crec
->ctdb_ctx
->db_id
, keystr
));
880 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
881 DEBUG(3, ("tdb_chainlock failed\n"));
886 result
->store
= db_ctdb_store
;
887 result
->delete_rec
= db_ctdb_delete
;
888 talloc_set_destructor(result
, db_ctdb_record_destr
);
890 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
893 * See if we have a valid record and we are the dmaster. If so, we can
894 * take the shortcut and just return it.
897 if ((ctdb_data
.dptr
== NULL
) ||
898 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
899 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
901 || (random() % 2 != 0)
904 SAFE_FREE(ctdb_data
.dptr
);
905 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
906 talloc_set_destructor(result
, NULL
);
908 migrate_attempts
+= 1;
910 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
911 ctdb_data
.dptr
, ctdb_data
.dptr
?
912 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
915 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
916 if (!NT_STATUS_IS_OK(status
)) {
917 DEBUG(5, ("ctdb_migrate failed: %s\n",
922 /* now its migrated, try again */
926 if (migrate_attempts
> 10) {
927 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
931 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
933 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
934 result
->value
.dptr
= NULL
;
936 if ((result
->value
.dsize
!= 0)
937 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
938 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
939 result
->value
.dsize
))) {
940 DEBUG(0, ("talloc failed\n"));
944 SAFE_FREE(ctdb_data
.dptr
);
949 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
953 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
956 if (ctx
->transaction
!= NULL
) {
957 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
960 if (db
->persistent
) {
961 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
964 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
968 fetch (unlocked, no migration) operation on ctdb
970 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
971 TDB_DATA key
, TDB_DATA
*data
)
973 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
978 if (ctx
->transaction
) {
979 return db_ctdb_transaction_fetch(ctx
, mem_ctx
, key
, data
);
982 /* try a direct fetch */
983 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
986 * See if we have a valid record and we are the dmaster. If so, we can
987 * take the shortcut and just return it.
988 * we bypass the dmaster check for persistent databases
990 if ((ctdb_data
.dptr
!= NULL
) &&
991 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
993 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
994 /* we are the dmaster - avoid the ctdb protocol op */
996 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
997 if (data
->dsize
== 0) {
998 SAFE_FREE(ctdb_data
.dptr
);
1003 data
->dptr
= (uint8
*)talloc_memdup(
1004 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
1007 SAFE_FREE(ctdb_data
.dptr
);
1009 if (data
->dptr
== NULL
) {
1015 SAFE_FREE(ctdb_data
.dptr
);
1017 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1018 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
1019 if (!NT_STATUS_IS_OK(status
)) {
1020 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
1027 struct traverse_state
{
1028 struct db_context
*db
;
1029 int (*fn
)(struct db_record
*rec
, void *private_data
);
1033 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1035 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1036 struct db_record
*rec
;
1037 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1038 /* we have to give them a locked record to prevent races */
1039 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1040 if (rec
&& rec
->value
.dsize
> 0) {
1041 state
->fn(rec
, state
->private_data
);
1043 talloc_free(tmp_ctx
);
1046 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1049 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1050 struct db_record
*rec
;
1051 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1053 /* we have to give them a locked record to prevent races */
1054 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1055 if (rec
&& rec
->value
.dsize
> 0) {
1056 ret
= state
->fn(rec
, state
->private_data
);
1058 talloc_free(tmp_ctx
);
1062 static int db_ctdb_traverse(struct db_context
*db
,
1063 int (*fn
)(struct db_record
*rec
,
1064 void *private_data
),
1067 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1068 struct db_ctdb_ctx
);
1069 struct traverse_state state
;
1073 state
.private_data
= private_data
;
1075 if (db
->persistent
) {
1076 /* for persistent databases we don't need to do a ctdb traverse,
1077 we can do a faster local traverse */
1078 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
1082 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1086 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1088 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1091 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1093 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1096 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1098 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1099 struct db_record rec
;
1102 rec
.store
= db_ctdb_store_deny
;
1103 rec
.delete_rec
= db_ctdb_delete_deny
;
1104 rec
.private_data
= state
->db
;
1105 state
->fn(&rec
, state
->private_data
);
1108 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1111 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1112 struct db_record rec
;
1115 rec
.store
= db_ctdb_store_deny
;
1116 rec
.delete_rec
= db_ctdb_delete_deny
;
1117 rec
.private_data
= state
->db
;
1119 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1120 /* a deleted record */
1123 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1124 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1126 return state
->fn(&rec
, state
->private_data
);
1129 static int db_ctdb_traverse_read(struct db_context
*db
,
1130 int (*fn
)(struct db_record
*rec
,
1131 void *private_data
),
1134 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1135 struct db_ctdb_ctx
);
1136 struct traverse_state state
;
1140 state
.private_data
= private_data
;
1142 if (db
->persistent
) {
1143 /* for persistent databases we don't need to do a ctdb traverse,
1144 we can do a faster local traverse */
1145 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1148 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1152 static int db_ctdb_get_seqnum(struct db_context
*db
)
1154 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1155 struct db_ctdb_ctx
);
1156 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1159 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1161 int hash_size
, int tdb_flags
,
1162 int open_flags
, mode_t mode
)
1164 struct db_context
*result
;
1165 struct db_ctdb_ctx
*db_ctdb
;
1168 if (!lp_clustering()) {
1169 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1173 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
1174 DEBUG(0, ("talloc failed\n"));
1175 TALLOC_FREE(result
);
1179 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
1180 DEBUG(0, ("talloc failed\n"));
1181 TALLOC_FREE(result
);
1185 db_ctdb
->transaction
= NULL
;
1186 db_ctdb
->db
= result
;
1188 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name
, &db_ctdb
->db_id
, tdb_flags
))) {
1189 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1190 TALLOC_FREE(result
);
1194 db_path
= ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb
, db_ctdb
->db_id
);
1196 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1198 /* only pass through specific flags */
1199 tdb_flags
&= TDB_SEQNUM
;
1201 /* honor permissions if user has specified O_CREAT */
1202 if (open_flags
& O_CREAT
) {
1203 chmod(db_path
, mode
);
1206 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
1207 if (db_ctdb
->wtdb
== NULL
) {
1208 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1209 TALLOC_FREE(result
);
1212 talloc_free(db_path
);
1214 result
->private_data
= (void *)db_ctdb
;
1215 result
->fetch_locked
= db_ctdb_fetch_locked
;
1216 result
->fetch
= db_ctdb_fetch
;
1217 result
->traverse
= db_ctdb_traverse
;
1218 result
->traverse_read
= db_ctdb_traverse_read
;
1219 result
->get_seqnum
= db_ctdb_get_seqnum
;
1220 result
->transaction_start
= db_ctdb_transaction_start
;
1221 result
->transaction_commit
= db_ctdb_transaction_commit
;
1222 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1224 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1225 name
, db_ctdb
->db_id
));