2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007-2009
5 Copyright (C) Michael Adam 2009
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "system/filesys.h"
23 #include "lib/tdb_wrap/tdb_wrap.h"
25 #include "dbwrap/dbwrap.h"
26 #include "dbwrap/dbwrap_ctdb.h"
27 #include "dbwrap/dbwrap_rbt.h"
28 #include "lib/param/param.h"
30 #ifdef CLUSTER_SUPPORT
33 * It is not possible to include ctdb.h and tdb_compat.h (included via
34 * some other include above) without warnings. This fixes those
42 #ifdef typesafe_cb_preargs
43 #undef typesafe_cb_preargs
46 #ifdef typesafe_cb_postargs
47 #undef typesafe_cb_postargs
51 #include "ctdb_private.h"
52 #include "ctdbd_conn.h"
53 #include "dbwrap/dbwrap.h"
54 #include "dbwrap/dbwrap_private.h"
55 #include "dbwrap/dbwrap_ctdb.h"
59 struct db_ctdb_transaction_handle
{
60 struct db_ctdb_ctx
*ctx
;
62 * we store the writes done under a transaction:
64 struct ctdb_marshall_buffer
*m_write
;
71 struct db_context
*db
;
72 struct tdb_wrap
*wtdb
;
74 struct db_ctdb_transaction_handle
*transaction
;
75 struct g_lock_ctx
*lock_ctx
;
79 struct db_ctdb_ctx
*ctdb_ctx
;
80 struct ctdb_ltdb_header header
;
81 struct timeval lock_time
;
84 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
86 enum TDB_ERROR tret
= tdb_error(tdb
);
88 return map_nt_error_from_tdb(tret
);
91 struct db_ctdb_ltdb_parse_state
{
92 void (*parser
)(TDB_DATA key
, struct ctdb_ltdb_header
*header
,
93 TDB_DATA data
, void *private_data
);
97 static int db_ctdb_ltdb_parser(TDB_DATA key
, TDB_DATA data
,
100 struct db_ctdb_ltdb_parse_state
*state
=
101 (struct db_ctdb_ltdb_parse_state
*)private_data
;
103 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
106 if (data
.dsize
== sizeof(struct ctdb_ltdb_header
)) {
108 * Making this a separate case that needs fixing
109 * separately. This is an empty record. ctdbd does not
110 * distinguish between empty and deleted records. Samba right
111 * now can live without empty records, so lets treat zero-size
112 * (i.e. deleted) records as non-existing.
117 key
, (struct ctdb_ltdb_header
*)data
.dptr
,
118 make_tdb_data(data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
119 data
.dsize
- sizeof(struct ctdb_ltdb_header
)),
120 state
->private_data
);
124 static NTSTATUS
db_ctdb_ltdb_parse(
125 struct db_ctdb_ctx
*db
, TDB_DATA key
,
126 void (*parser
)(TDB_DATA key
, struct ctdb_ltdb_header
*header
,
127 TDB_DATA data
, void *private_data
),
130 struct db_ctdb_ltdb_parse_state state
;
133 state
.parser
= parser
;
134 state
.private_data
= private_data
;
136 ret
= tdb_parse_record(db
->wtdb
->tdb
, key
, db_ctdb_ltdb_parser
,
139 return NT_STATUS_NOT_FOUND
;
145 * Store a record together with the ctdb record header
146 * in the local copy of the database.
148 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
150 struct ctdb_ltdb_header
*header
,
156 rec
.dsize
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
157 rec
.dptr
= (uint8_t *)talloc_size(talloc_tos(), rec
.dsize
);
159 if (rec
.dptr
== NULL
) {
160 return NT_STATUS_NO_MEMORY
;
163 memcpy(rec
.dptr
, header
, sizeof(struct ctdb_ltdb_header
));
164 memcpy(sizeof(struct ctdb_ltdb_header
) + (uint8_t *)rec
.dptr
, data
.dptr
, data
.dsize
);
166 ret
= tdb_store(db
->wtdb
->tdb
, key
, rec
, TDB_REPLACE
);
168 talloc_free(rec
.dptr
);
170 return (ret
== 0) ? NT_STATUS_OK
171 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
176 form a ctdb_rec_data record from a key/data pair
178 static struct ctdb_rec_data
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
180 struct ctdb_ltdb_header
*header
,
184 struct ctdb_rec_data
*d
;
186 length
= offsetof(struct ctdb_rec_data
, data
) + key
.dsize
+
187 data
.dsize
+ sizeof(*header
);
188 d
= (struct ctdb_rec_data
*)talloc_size(mem_ctx
, length
);
194 d
->keylen
= key
.dsize
;
195 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
197 d
->datalen
= data
.dsize
+ sizeof(*header
);
198 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
199 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
204 /* helper function for marshalling multiple records */
205 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
206 struct ctdb_marshall_buffer
*m
,
210 struct ctdb_ltdb_header
*header
,
213 struct ctdb_rec_data
*r
;
214 size_t m_size
, r_size
;
215 struct ctdb_marshall_buffer
*m2
= NULL
;
217 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
224 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
225 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
232 m_size
= talloc_get_size(m
);
233 r_size
= talloc_get_size(r
);
235 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
236 mem_ctx
, m
, m_size
+ r_size
);
242 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
251 /* we've finished marshalling, return a data blob with the marshalled records */
252 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
255 data
.dptr
= (uint8_t *)m
;
256 data
.dsize
= talloc_get_size(m
);
261 loop over a marshalling buffer
263 - pass r==NULL to start
264 - loop the number of times indicated by m->count
266 static struct ctdb_rec_data
*db_ctdb_marshall_loop_next_key(
267 struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data
*r
, TDB_DATA
*key
)
270 r
= (struct ctdb_rec_data
*)&m
->data
[0];
272 r
= (struct ctdb_rec_data
*)(r
->length
+ (uint8_t *)r
);
275 key
->dptr
= &r
->data
[0];
276 key
->dsize
= r
->keylen
;
280 static bool db_ctdb_marshall_buf_parse(
281 struct ctdb_rec_data
*r
, uint32_t *reqid
,
282 struct ctdb_ltdb_header
**header
, TDB_DATA
*data
)
284 if (r
->datalen
< sizeof(struct ctdb_ltdb_header
)) {
290 data
->dptr
= &r
->data
[r
->keylen
] + sizeof(struct ctdb_ltdb_header
);
291 data
->dsize
= r
->datalen
- sizeof(struct ctdb_ltdb_header
);
293 *header
= (struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
299 * CTDB transaction destructor
301 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
305 status
= g_lock_unlock(h
->ctx
->lock_ctx
, h
->lock_name
);
306 if (!NT_STATUS_IS_OK(status
)) {
307 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h
->lock_name
,
315 * CTDB dbwrap API: transaction_start function
316 * starts a transaction on a persistent database
318 static int db_ctdb_transaction_start(struct db_context
*db
)
320 struct db_ctdb_transaction_handle
*h
;
322 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
325 if (!db
->persistent
) {
326 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
331 if (ctx
->transaction
) {
332 ctx
->transaction
->nesting
++;
333 DEBUG(5, (__location__
" transaction start on db 0x%08x: nesting %d -> %d\n",
334 ctx
->db_id
, ctx
->transaction
->nesting
- 1, ctx
->transaction
->nesting
));
338 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
340 DEBUG(0,(__location__
" oom for transaction handle\n"));
346 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x",
347 (unsigned int)ctx
->db_id
);
348 if (h
->lock_name
== NULL
) {
349 DEBUG(0, ("talloc_asprintf failed\n"));
355 * Wait a day, i.e. forever...
357 status
= g_lock_lock(ctx
->lock_ctx
, h
->lock_name
, G_LOCK_WRITE
,
358 timeval_set(86400, 0));
359 if (!NT_STATUS_IS_OK(status
)) {
360 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status
)));
365 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
367 ctx
->transaction
= h
;
369 DEBUG(5,(__location__
" transaction started on db 0x%08x\n", ctx
->db_id
));
374 static bool parse_newest_in_marshall_buffer(
375 struct ctdb_marshall_buffer
*buf
, TDB_DATA key
,
376 void (*parser
)(TDB_DATA key
, struct ctdb_ltdb_header
*header
,
377 TDB_DATA data
, void *private_data
),
380 struct ctdb_rec_data
*rec
= NULL
;
381 struct ctdb_ltdb_header
*h
= NULL
;
390 * Walk the list of records written during this
391 * transaction. If we want to read one we have already
392 * written, return the last written sample. Thus we do not do
393 * a "break;" for the first hit, this record might have been
397 for (i
=0; i
<buf
->count
; i
++) {
401 rec
= db_ctdb_marshall_loop_next_key(buf
, rec
, &tkey
);
406 if (!tdb_data_equal(key
, tkey
)) {
410 if (!db_ctdb_marshall_buf_parse(rec
, &reqid
, &h
, &data
)) {
419 parser(key
, h
, data
, private_data
);
424 struct pull_newest_from_marshall_buffer_state
{
425 struct ctdb_ltdb_header
*pheader
;
430 static void pull_newest_from_marshall_buffer_parser(
431 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
432 TDB_DATA data
, void *private_data
)
434 struct pull_newest_from_marshall_buffer_state
*state
=
435 (struct pull_newest_from_marshall_buffer_state
*)private_data
;
437 if (state
->pheader
!= NULL
) {
438 memcpy(state
->pheader
, header
, sizeof(*state
->pheader
));
440 if (state
->pdata
!= NULL
) {
441 state
->pdata
->dsize
= data
.dsize
;
442 state
->pdata
->dptr
= (uint8_t *)talloc_memdup(
443 state
->mem_ctx
, data
.dptr
, data
.dsize
);
447 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer
*buf
,
449 struct ctdb_ltdb_header
*pheader
,
453 struct pull_newest_from_marshall_buffer_state state
;
455 state
.pheader
= pheader
;
456 state
.mem_ctx
= mem_ctx
;
459 if (!parse_newest_in_marshall_buffer(
460 buf
, key
, pull_newest_from_marshall_buffer_parser
,
464 if ((pdata
!= NULL
) && (pdata
->dsize
!= 0) && (pdata
->dptr
== NULL
)) {
471 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
);
472 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
474 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
478 struct db_record
*result
;
481 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
482 DEBUG(0, ("talloc failed\n"));
486 result
->db
= ctx
->db
;
487 result
->private_data
= ctx
->transaction
;
489 result
->key
.dsize
= key
.dsize
;
490 result
->key
.dptr
= (uint8_t *)talloc_memdup(result
, key
.dptr
,
492 if (result
->key
.dptr
== NULL
) {
493 DEBUG(0, ("talloc failed\n"));
498 result
->store
= db_ctdb_store_transaction
;
499 result
->delete_rec
= db_ctdb_delete_transaction
;
501 if (pull_newest_from_marshall_buffer(ctx
->transaction
->m_write
, key
,
502 NULL
, result
, &result
->value
)) {
506 ctdb_data
= tdb_fetch_compat(ctx
->wtdb
->tdb
, key
);
507 if (ctdb_data
.dptr
== NULL
) {
508 /* create the record */
509 result
->value
= tdb_null
;
513 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
514 result
->value
.dptr
= NULL
;
516 if ((result
->value
.dsize
!= 0)
517 && !(result
->value
.dptr
= (uint8_t *)talloc_memdup(
518 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
519 result
->value
.dsize
))) {
520 DEBUG(0, ("talloc failed\n"));
524 SAFE_FREE(ctdb_data
.dptr
);
529 static int db_ctdb_record_destructor(struct db_record
**recp
)
531 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
532 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
533 rec
->private_data
, struct db_ctdb_transaction_handle
);
534 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
536 DEBUG(0,(__location__
" transaction_commit failed\n"));
542 auto-create a transaction for persistent databases
544 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
549 struct db_record
*rec
, **recp
;
551 res
= db_ctdb_transaction_start(ctx
->db
);
556 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
558 ctx
->db
->transaction_cancel(ctx
->db
);
562 /* destroy this transaction when we release the lock */
563 recp
= talloc(rec
, struct db_record
*);
565 ctx
->db
->transaction_cancel(ctx
->db
);
570 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
576 stores a record inside a transaction
578 static NTSTATUS
db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
579 TDB_DATA key
, TDB_DATA data
)
581 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
583 struct ctdb_ltdb_header header
;
587 /* we need the header so we can update the RSN */
589 if (!pull_newest_from_marshall_buffer(h
->m_write
, key
, &header
,
592 rec
= tdb_fetch_compat(h
->ctx
->wtdb
->tdb
, key
);
594 if (rec
.dptr
!= NULL
) {
595 memcpy(&header
, rec
.dptr
,
596 sizeof(struct ctdb_ltdb_header
));
597 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
600 * a special case, we are writing the same
601 * data that is there now
603 if (data
.dsize
== rec
.dsize
&&
605 rec
.dptr
+ sizeof(struct ctdb_ltdb_header
),
608 talloc_free(tmp_ctx
);
615 header
.dmaster
= get_my_vnn();
618 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
619 if (h
->m_write
== NULL
) {
620 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
621 talloc_free(tmp_ctx
);
622 return NT_STATUS_NO_MEMORY
;
625 talloc_free(tmp_ctx
);
631 a record store inside a transaction
633 static NTSTATUS
db_ctdb_store_transaction(struct db_record
*rec
, TDB_DATA data
, int flag
)
635 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
636 rec
->private_data
, struct db_ctdb_transaction_handle
);
639 status
= db_ctdb_transaction_store(h
, rec
->key
, data
);
644 a record delete inside a transaction
646 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
648 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
649 rec
->private_data
, struct db_ctdb_transaction_handle
);
652 status
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
656 static void db_ctdb_fetch_db_seqnum_parser(
657 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
658 TDB_DATA data
, void *private_data
)
660 uint64_t *seqnum
= (uint64_t *)private_data
;
662 if (data
.dsize
!= sizeof(uint64_t)) {
666 memcpy(seqnum
, data
.dptr
, sizeof(*seqnum
));
670 * Fetch the db sequence number of a persistent db directly from the db.
672 static NTSTATUS
db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx
*db
,
678 if (seqnum
== NULL
) {
679 return NT_STATUS_INVALID_PARAMETER
;
682 key
= string_term_tdb_data(CTDB_DB_SEQNUM_KEY
);
684 status
= db_ctdb_ltdb_parse(
685 db
, key
, db_ctdb_fetch_db_seqnum_parser
, seqnum
);
687 if (NT_STATUS_IS_OK(status
)) {
690 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
698 * Store the database sequence number inside a transaction.
700 static NTSTATUS
db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle
*h
,
704 const char *keyname
= CTDB_DB_SEQNUM_KEY
;
708 key
= string_term_tdb_data(keyname
);
710 data
.dptr
= (uint8_t *)&seqnum
;
711 data
.dsize
= sizeof(uint64_t);
713 status
= db_ctdb_transaction_store(h
, key
, data
);
721 static int db_ctdb_transaction_commit(struct db_context
*db
)
723 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
727 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
728 uint64_t old_seqnum
, new_seqnum
;
732 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
736 if (h
->nested_cancel
) {
737 db
->transaction_cancel(db
);
738 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
742 if (h
->nesting
!= 0) {
744 DEBUG(5, (__location__
" transaction commit on db 0x%08x: nesting %d -> %d\n",
745 ctx
->db_id
, ctx
->transaction
->nesting
+ 1, ctx
->transaction
->nesting
));
749 if (h
->m_write
== NULL
) {
751 * No changes were made, so don't change the seqnum,
752 * don't push to other node, just exit with success.
758 DEBUG(5,(__location__
" transaction commit on db 0x%08x\n", ctx
->db_id
));
761 * As the last db action before committing, bump the database sequence
762 * number. Note that this undoes all changes to the seqnum records
763 * performed under the transaction. This record is not meant to be
764 * modified by user interaction. It is for internal use only...
766 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &old_seqnum
);
767 if (!NT_STATUS_IS_OK(rets
)) {
768 DEBUG(1, (__location__
" failed to fetch the db sequence number "
769 "in transaction commit on db 0x%08x\n", ctx
->db_id
));
774 new_seqnum
= old_seqnum
+ 1;
776 rets
= db_ctdb_store_db_seqnum(h
, new_seqnum
);
777 if (!NT_STATUS_IS_OK(rets
)) {
778 DEBUG(1, (__location__
"failed to store the db sequence number "
779 " in transaction commit on db 0x%08x\n", ctx
->db_id
));
785 /* tell ctdbd to commit to the other nodes */
786 rets
= ctdbd_control_local(messaging_ctdbd_connection(),
787 CTDB_CONTROL_TRANS3_COMMIT
,
789 db_ctdb_marshall_finish(h
->m_write
),
790 NULL
, NULL
, &status
);
791 if (!NT_STATUS_IS_OK(rets
) || status
!= 0) {
793 * The TRANS3_COMMIT control should only possibly fail when a
794 * recovery has been running concurrently. In any case, the db
795 * will be the same on all nodes, either the new copy or the
796 * old copy. This can be detected by comparing the old and new
797 * local sequence numbers.
799 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &new_seqnum
);
800 if (!NT_STATUS_IS_OK(rets
)) {
801 DEBUG(1, (__location__
" failed to refetch db sequence "
802 "number after failed TRANS3_COMMIT\n"));
807 if (new_seqnum
== old_seqnum
) {
808 /* Recovery prevented all our changes: retry. */
811 if (new_seqnum
!= (old_seqnum
+ 1)) {
812 DEBUG(0, (__location__
" ERROR: new_seqnum[%lu] != "
813 "old_seqnum[%lu] + (0 or 1) after failed "
814 "TRANS3_COMMIT - this should not happen!\n",
815 (unsigned long)new_seqnum
,
816 (unsigned long)old_seqnum
));
821 * Recovery propagated our changes to all nodes, completing
822 * our commit for us - succeed.
829 h
->ctx
->transaction
= NULL
;
838 static int db_ctdb_transaction_cancel(struct db_context
*db
)
840 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
842 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
845 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
849 if (h
->nesting
!= 0) {
851 h
->nested_cancel
= true;
852 DEBUG(5, (__location__
" transaction cancel on db 0x%08x: nesting %d -> %d\n",
853 ctx
->db_id
, ctx
->transaction
->nesting
+ 1, ctx
->transaction
->nesting
));
857 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
859 ctx
->transaction
= NULL
;
865 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
867 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
868 rec
->private_data
, struct db_ctdb_rec
);
870 return db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
), data
);
875 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
876 static NTSTATUS
db_ctdb_send_schedule_for_deletion(struct db_record
*rec
)
879 struct ctdb_control_schedule_for_deletion
*dd
;
882 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
883 rec
->private_data
, struct db_ctdb_rec
);
885 indata
.dsize
= offsetof(struct ctdb_control_schedule_for_deletion
, key
) + rec
->key
.dsize
;
886 indata
.dptr
= talloc_zero_array(crec
, uint8_t, indata
.dsize
);
887 if (indata
.dptr
== NULL
) {
888 DEBUG(0, (__location__
" talloc failed!\n"));
889 return NT_STATUS_NO_MEMORY
;
892 dd
= (struct ctdb_control_schedule_for_deletion
*)(void *)indata
.dptr
;
893 dd
->db_id
= crec
->ctdb_ctx
->db_id
;
894 dd
->hdr
= crec
->header
;
895 dd
->keylen
= rec
->key
.dsize
;
896 memcpy(dd
->key
, rec
->key
.dptr
, rec
->key
.dsize
);
898 status
= ctdbd_control_local(messaging_ctdbd_connection(),
899 CTDB_CONTROL_SCHEDULE_FOR_DELETION
,
900 crec
->ctdb_ctx
->db_id
,
901 CTDB_CTRL_FLAG_NOREPLY
, /* flags */
906 talloc_free(indata
.dptr
);
908 if (!NT_STATUS_IS_OK(status
) || cstatus
!= 0) {
909 DEBUG(1, (__location__
" Error sending local control "
910 "SCHEDULE_FOR_DELETION: %s, cstatus = %d\n",
911 nt_errstr(status
), cstatus
));
912 if (NT_STATUS_IS_OK(status
)) {
913 status
= NT_STATUS_UNSUCCESSFUL
;
921 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
926 * We have to store the header with empty data. TODO: Fix the
930 status
= db_ctdb_store(rec
, tdb_null
, 0);
931 if (!NT_STATUS_IS_OK(status
)) {
935 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
936 status
= db_ctdb_send_schedule_for_deletion(rec
);
942 static int db_ctdb_record_destr(struct db_record
* data
)
944 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
945 data
->private_data
, struct db_ctdb_rec
);
948 DEBUG(10, (DEBUGLEVEL
> 10
949 ? "Unlocking db %u key %s\n"
950 : "Unlocking db %u key %.20s\n",
951 (int)crec
->ctdb_ctx
->db_id
,
952 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
955 tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
);
957 threshold
= lp_ctdb_locktime_warn_threshold();
958 if (threshold
!= 0) {
959 double timediff
= timeval_elapsed(&crec
->lock_time
);
960 if ((timediff
* 1000) > threshold
) {
963 key
= hex_encode_talloc(data
,
964 (unsigned char *)data
->key
.dptr
,
966 DEBUG(0, ("Held tdb lock on db %s, key %s %f seconds\n",
967 tdb_name(crec
->ctdb_ctx
->wtdb
->tdb
), key
,
976 * Check whether we have a valid local copy of the given record,
977 * either for reading or for writing.
979 static bool db_ctdb_can_use_local_hdr(const struct ctdb_ltdb_header
*hdr
,
982 #ifdef HAVE_CTDB_WANT_READONLY_DECL
983 if (hdr
->dmaster
!= get_my_vnn()) {
984 /* If we're not dmaster, it must be r/o copy. */
985 return read_only
&& (hdr
->flags
& CTDB_REC_RO_HAVE_READONLY
);
989 * If we want write access, no one may have r/o copies.
991 return read_only
|| !(hdr
->flags
& CTDB_REC_RO_HAVE_DELEGATIONS
);
993 return (hdr
->dmaster
== get_my_vnn());
997 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data
, bool read_only
)
999 if (ctdb_data
.dptr
== NULL
) {
1003 if (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1007 return db_ctdb_can_use_local_hdr(
1008 (struct ctdb_ltdb_header
*)ctdb_data
.dptr
, read_only
);
1011 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
1012 TALLOC_CTX
*mem_ctx
,
1016 struct db_record
*result
;
1017 struct db_ctdb_rec
*crec
;
1020 int migrate_attempts
= 0;
1023 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
1024 DEBUG(0, ("talloc failed\n"));
1028 if (!(crec
= talloc_zero(result
, struct db_ctdb_rec
))) {
1029 DEBUG(0, ("talloc failed\n"));
1030 TALLOC_FREE(result
);
1034 result
->db
= ctx
->db
;
1035 result
->private_data
= (void *)crec
;
1036 crec
->ctdb_ctx
= ctx
;
1038 result
->key
.dsize
= key
.dsize
;
1039 result
->key
.dptr
= (uint8_t *)talloc_memdup(result
, key
.dptr
,
1041 if (result
->key
.dptr
== NULL
) {
1042 DEBUG(0, ("talloc failed\n"));
1043 TALLOC_FREE(result
);
1048 * Do a blocking lock on the record
1052 if (DEBUGLEVEL
>= 10) {
1053 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
1054 DEBUG(10, (DEBUGLEVEL
> 10
1055 ? "Locking db %u key %s\n"
1056 : "Locking db %u key %.20s\n",
1057 (int)crec
->ctdb_ctx
->db_id
, keystr
));
1058 TALLOC_FREE(keystr
);
1062 ? tdb_chainlock_nonblock(ctx
->wtdb
->tdb
, key
)
1063 : tdb_chainlock(ctx
->wtdb
->tdb
, key
);
1065 DEBUG(3, ("tdb_chainlock failed\n"));
1066 TALLOC_FREE(result
);
1070 result
->store
= db_ctdb_store
;
1071 result
->delete_rec
= db_ctdb_delete
;
1072 talloc_set_destructor(result
, db_ctdb_record_destr
);
1074 ctdb_data
= tdb_fetch_compat(ctx
->wtdb
->tdb
, key
);
1077 * See if we have a valid record and we are the dmaster. If so, we can
1078 * take the shortcut and just return it.
1081 if (!db_ctdb_can_use_local_copy(ctdb_data
, false)) {
1082 SAFE_FREE(ctdb_data
.dptr
);
1083 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
1084 talloc_set_destructor(result
, NULL
);
1086 if (tryonly
&& (migrate_attempts
!= 0)) {
1087 DEBUG(5, ("record migrated away again\n"));
1088 TALLOC_FREE(result
);
1092 migrate_attempts
+= 1;
1094 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u) %u\n",
1095 ctdb_data
.dptr
, ctdb_data
.dptr
?
1096 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
1099 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->flags
: 0));
1101 status
= ctdbd_migrate(messaging_ctdbd_connection(), ctx
->db_id
,
1103 if (!NT_STATUS_IS_OK(status
)) {
1104 DEBUG(5, ("ctdb_migrate failed: %s\n",
1105 nt_errstr(status
)));
1106 TALLOC_FREE(result
);
1109 /* now its migrated, try again */
1113 if (migrate_attempts
> 10) {
1114 DEBUG(0, ("db_ctdb_fetch_locked for %s key %s needed %d "
1115 "attempts\n", tdb_name(ctx
->wtdb
->tdb
),
1116 hex_encode_talloc(talloc_tos(),
1117 (unsigned char *)key
.dptr
,
1122 GetTimeOfDay(&crec
->lock_time
);
1124 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1126 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1127 result
->value
.dptr
= NULL
;
1129 if ((result
->value
.dsize
!= 0)
1130 && !(result
->value
.dptr
= (uint8_t *)talloc_memdup(
1131 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1132 result
->value
.dsize
))) {
1133 DEBUG(0, ("talloc failed\n"));
1134 TALLOC_FREE(result
);
1137 SAFE_FREE(ctdb_data
.dptr
);
1142 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1143 TALLOC_CTX
*mem_ctx
,
1146 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1147 struct db_ctdb_ctx
);
1149 if (ctx
->transaction
!= NULL
) {
1150 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1153 if (db
->persistent
) {
1154 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1157 return fetch_locked_internal(ctx
, mem_ctx
, key
, false);
1160 static struct db_record
*db_ctdb_try_fetch_locked(struct db_context
*db
,
1161 TALLOC_CTX
*mem_ctx
,
1164 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1165 struct db_ctdb_ctx
);
1167 if (ctx
->transaction
!= NULL
) {
1168 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1171 if (db
->persistent
) {
1172 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1175 return fetch_locked_internal(ctx
, mem_ctx
, key
, true);
1178 struct db_ctdb_parse_record_state
{
1179 void (*parser
)(TDB_DATA key
, TDB_DATA data
, void *private_data
);
1181 bool ask_for_readonly_copy
;
1185 static void db_ctdb_parse_record_parser(
1186 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
1187 TDB_DATA data
, void *private_data
)
1189 struct db_ctdb_parse_record_state
*state
=
1190 (struct db_ctdb_parse_record_state
*)private_data
;
1191 state
->parser(key
, data
, state
->private_data
);
1194 static void db_ctdb_parse_record_parser_nonpersistent(
1195 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
1196 TDB_DATA data
, void *private_data
)
1198 struct db_ctdb_parse_record_state
*state
=
1199 (struct db_ctdb_parse_record_state
*)private_data
;
1201 if (db_ctdb_can_use_local_hdr(header
, true)) {
1202 state
->parser(key
, data
, state
->private_data
);
1206 * We found something in the db, so it seems that this record,
1207 * while not usable locally right now, is popular. Ask for a
1210 state
->ask_for_readonly_copy
= true;
1214 static NTSTATUS
db_ctdb_parse_record(struct db_context
*db
, TDB_DATA key
,
1215 void (*parser
)(TDB_DATA key
,
1217 void *private_data
),
1220 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(
1221 db
->private_data
, struct db_ctdb_ctx
);
1222 struct db_ctdb_parse_record_state state
;
1225 state
.parser
= parser
;
1226 state
.private_data
= private_data
;
1228 if (ctx
->transaction
!= NULL
) {
1229 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
1233 * Transactions only happen for persistent db's.
1236 found
= parse_newest_in_marshall_buffer(
1237 h
->m_write
, key
, db_ctdb_parse_record_parser
, &state
);
1240 return NT_STATUS_OK
;
1244 if (db
->persistent
) {
1246 * Persistent db, but not found in the transaction buffer
1248 return db_ctdb_ltdb_parse(
1249 ctx
, key
, db_ctdb_parse_record_parser
, &state
);
1253 state
.ask_for_readonly_copy
= false;
1255 status
= db_ctdb_ltdb_parse(
1256 ctx
, key
, db_ctdb_parse_record_parser_nonpersistent
, &state
);
1257 if (NT_STATUS_IS_OK(status
) && state
.done
) {
1258 return NT_STATUS_OK
;
1261 return ctdbd_parse(messaging_ctdbd_connection(), ctx
->db_id
, key
,
1262 state
.ask_for_readonly_copy
, parser
, private_data
);
1265 struct traverse_state
{
1266 struct db_context
*db
;
1267 int (*fn
)(struct db_record
*rec
, void *private_data
);
1272 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1274 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1275 struct db_record
*rec
;
1276 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1277 /* we have to give them a locked record to prevent races */
1278 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1279 if (rec
&& rec
->value
.dsize
> 0) {
1280 state
->fn(rec
, state
->private_data
);
1282 talloc_free(tmp_ctx
);
1285 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1288 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1289 struct db_record
*rec
;
1290 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1294 * Skip the __db_sequence_number__ key:
1295 * This is used for persistent transactions internally.
1297 if (kbuf
.dsize
== strlen(CTDB_DB_SEQNUM_KEY
) + 1 &&
1298 strcmp((const char*)kbuf
.dptr
, CTDB_DB_SEQNUM_KEY
) == 0)
1303 /* we have to give them a locked record to prevent races */
1304 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1305 if (rec
&& rec
->value
.dsize
> 0) {
1306 ret
= state
->fn(rec
, state
->private_data
);
1310 talloc_free(tmp_ctx
);
1314 /* wrapper to use traverse_persistent_callback with dbwrap */
1315 static int traverse_persistent_callback_dbwrap(struct db_record
*rec
, void* data
)
1317 return traverse_persistent_callback(NULL
, rec
->key
, rec
->value
, data
);
1321 static int db_ctdb_traverse(struct db_context
*db
,
1322 int (*fn
)(struct db_record
*rec
,
1323 void *private_data
),
1327 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1328 struct db_ctdb_ctx
);
1329 struct traverse_state state
;
1333 state
.private_data
= private_data
;
1336 if (db
->persistent
) {
1337 struct tdb_context
*ltdb
= ctx
->wtdb
->tdb
;
1340 /* for persistent databases we don't need to do a ctdb traverse,
1341 we can do a faster local traverse */
1342 ret
= tdb_traverse(ltdb
, traverse_persistent_callback
, &state
);
1346 if (ctx
->transaction
&& ctx
->transaction
->m_write
) {
1348 * we now have to handle keys not yet
1349 * present at transaction start
1351 struct db_context
*newkeys
= db_open_rbt(talloc_tos());
1352 struct ctdb_marshall_buffer
*mbuf
= ctx
->transaction
->m_write
;
1353 struct ctdb_rec_data
*rec
=NULL
;
1357 if (newkeys
== NULL
) {
1361 for (i
=0; i
<mbuf
->count
; i
++) {
1363 rec
= db_ctdb_marshall_loop_next_key(
1365 SMB_ASSERT(rec
!= NULL
);
1367 if (!tdb_exists(ltdb
, key
)) {
1368 dbwrap_store(newkeys
, key
, tdb_null
, 0);
1371 status
= dbwrap_traverse(newkeys
,
1372 traverse_persistent_callback_dbwrap
,
1375 talloc_free(newkeys
);
1376 if (!NT_STATUS_IS_OK(status
)) {
1384 status
= ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1385 if (!NT_STATUS_IS_OK(status
)) {
1391 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
1393 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1396 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1398 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1401 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1403 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1404 struct db_record rec
;
1410 rec
.store
= db_ctdb_store_deny
;
1411 rec
.delete_rec
= db_ctdb_delete_deny
;
1412 rec
.private_data
= NULL
;
1413 state
->fn(&rec
, state
->private_data
);
1417 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1420 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1421 struct db_record rec
;
1424 * Skip the __db_sequence_number__ key:
1425 * This is used for persistent transactions internally.
1427 if (kbuf
.dsize
== strlen(CTDB_DB_SEQNUM_KEY
) + 1 &&
1428 strcmp((const char*)kbuf
.dptr
, CTDB_DB_SEQNUM_KEY
) == 0)
1437 rec
.store
= db_ctdb_store_deny
;
1438 rec
.delete_rec
= db_ctdb_delete_deny
;
1439 rec
.private_data
= NULL
;
1441 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1442 /* a deleted record */
1445 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1446 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1449 return state
->fn(&rec
, state
->private_data
);
1452 static int db_ctdb_traverse_read(struct db_context
*db
,
1453 int (*fn
)(struct db_record
*rec
,
1454 void *private_data
),
1458 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1459 struct db_ctdb_ctx
);
1460 struct traverse_state state
;
1464 state
.private_data
= private_data
;
1467 if (db
->persistent
) {
1468 /* for persistent databases we don't need to do a ctdb traverse,
1469 we can do a faster local traverse */
1470 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
1473 status
= ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1474 if (!NT_STATUS_IS_OK(status
)) {
1480 static int db_ctdb_get_seqnum(struct db_context
*db
)
1482 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1483 struct db_ctdb_ctx
);
1484 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1487 static void db_ctdb_id(struct db_context
*db
, const uint8_t **id
,
1490 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(
1491 db
->private_data
, struct db_ctdb_ctx
);
1493 *id
= (uint8_t *)&ctx
->db_id
;
1494 *idlen
= sizeof(ctx
->db_id
);
1497 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1499 int hash_size
, int tdb_flags
,
1500 int open_flags
, mode_t mode
,
1501 enum dbwrap_lock_order lock_order
)
1503 struct db_context
*result
;
1504 struct db_ctdb_ctx
*db_ctdb
;
1506 struct ctdbd_connection
*conn
;
1507 struct loadparm_context
*lp_ctx
;
1508 struct ctdb_db_priority prio
;
1512 if (!lp_clustering()) {
1513 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1517 if (!(result
= talloc_zero(mem_ctx
, struct db_context
))) {
1518 DEBUG(0, ("talloc failed\n"));
1519 TALLOC_FREE(result
);
1523 if (!(db_ctdb
= talloc(result
, struct db_ctdb_ctx
))) {
1524 DEBUG(0, ("talloc failed\n"));
1525 TALLOC_FREE(result
);
1529 result
->name
= talloc_strdup(result
, name
);
1530 if (result
->name
== NULL
) {
1531 DEBUG(0, ("talloc failed\n"));
1532 TALLOC_FREE(result
);
1536 db_ctdb
->transaction
= NULL
;
1537 db_ctdb
->db
= result
;
1539 conn
= messaging_ctdbd_connection();
1541 DEBUG(1, ("Could not connect to ctdb\n"));
1542 TALLOC_FREE(result
);
1546 if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn
, name
, &db_ctdb
->db_id
, tdb_flags
))) {
1547 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
1548 TALLOC_FREE(result
);
1552 db_path
= ctdbd_dbpath(conn
, db_ctdb
, db_ctdb
->db_id
);
1554 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
1555 result
->lock_order
= lock_order
;
1557 /* only pass through specific flags */
1558 tdb_flags
&= TDB_SEQNUM
;
1560 /* honor permissions if user has specified O_CREAT */
1561 if (open_flags
& O_CREAT
) {
1562 chmod(db_path
, mode
);
1565 prio
.db_id
= db_ctdb
->db_id
;
1566 prio
.priority
= lock_order
;
1568 status
= ctdbd_control_local(
1569 conn
, CTDB_CONTROL_SET_DB_PRIORITY
, 0, 0,
1570 make_tdb_data((uint8_t *)&prio
, sizeof(prio
)),
1571 NULL
, NULL
, &cstatus
);
1573 if (!NT_STATUS_IS_OK(status
) || (cstatus
!= 0)) {
1574 DEBUG(1, ("CTDB_CONTROL_SET_DB_PRIORITY failed: %s, %d\n",
1575 nt_errstr(status
), cstatus
));
1576 TALLOC_FREE(result
);
1580 lp_ctx
= loadparm_init_s3(db_path
, loadparm_s3_helpers());
1582 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
,
1584 talloc_unlink(db_path
, lp_ctx
);
1585 if (db_ctdb
->wtdb
== NULL
) {
1586 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1587 TALLOC_FREE(result
);
1590 talloc_free(db_path
);
1592 if (result
->persistent
) {
1593 db_ctdb
->lock_ctx
= g_lock_ctx_init(db_ctdb
,
1594 ctdb_conn_msg_ctx(conn
));
1595 if (db_ctdb
->lock_ctx
== NULL
) {
1596 DEBUG(0, ("g_lock_ctx_init failed\n"));
1597 TALLOC_FREE(result
);
1602 result
->private_data
= (void *)db_ctdb
;
1603 result
->fetch_locked
= db_ctdb_fetch_locked
;
1604 result
->try_fetch_locked
= db_ctdb_try_fetch_locked
;
1605 result
->parse_record
= db_ctdb_parse_record
;
1606 result
->traverse
= db_ctdb_traverse
;
1607 result
->traverse_read
= db_ctdb_traverse_read
;
1608 result
->get_seqnum
= db_ctdb_get_seqnum
;
1609 result
->transaction_start
= db_ctdb_transaction_start
;
1610 result
->transaction_commit
= db_ctdb_transaction_commit
;
1611 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1612 result
->id
= db_ctdb_id
;
1613 result
->stored_callback
= NULL
;
1615 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1616 name
, db_ctdb
->db_id
));
1621 #else /* CLUSTER_SUPPORT */
1623 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1625 int hash_size
, int tdb_flags
,
1626 int open_flags
, mode_t mode
,
1627 enum dbwrap_lock_order lock_order
)
1629 DEBUG(3, ("db_open_ctdb: no cluster support!\n"));