2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
27 struct tdb_wrap
*wtdb
;
32 struct db_ctdb_ctx
*ctdb_ctx
;
33 struct ctdb_ltdb_header header
;
36 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
41 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
44 enum TDB_ERROR tret
= tdb_error(tdb
);
48 status
= NT_STATUS_OBJECT_NAME_COLLISION
;
51 status
= NT_STATUS_OBJECT_NAME_NOT_FOUND
;
54 status
= NT_STATUS_INTERNAL_DB_CORRUPTION
;
61 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
63 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
64 rec
->private_data
, struct db_ctdb_rec
);
68 cdata
.dsize
= sizeof(crec
->header
) + data
.dsize
;
70 if (!(cdata
.dptr
= SMB_MALLOC_ARRAY(uint8
, cdata
.dsize
))) {
71 return NT_STATUS_NO_MEMORY
;
74 memcpy(cdata
.dptr
, &crec
->header
, sizeof(crec
->header
));
75 memcpy(cdata
.dptr
+ sizeof(crec
->header
), data
.dptr
, data
.dsize
);
77 ret
= tdb_store(crec
->ctdb_ctx
->wtdb
->tdb
, rec
->key
, cdata
, TDB_REPLACE
);
79 SAFE_FREE(cdata
.dptr
);
81 return (ret
== 0) ? NT_STATUS_OK
82 : tdb_error_to_ntstatus(crec
->ctdb_ctx
->wtdb
->tdb
);
86 /* for persistent databases the store is a bit different. We have to
87 ask the ctdb daemon to push the record to all nodes after the
89 static NTSTATUS
db_ctdb_store_persistent(struct db_record
*rec
, TDB_DATA data
, int flag
)
91 struct db_ctdb_rec
*crec
;
92 struct db_record
*record
;
97 int max_retries
= lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
99 for (count
= 0, status
= NT_STATUS_UNSUCCESSFUL
, record
= rec
;
100 (count
< max_retries
) && !NT_STATUS_IS_OK(status
);
106 * There is a hack here: We use rec as a memory
107 * context and re-use it as the record struct ptr.
108 * We don't free the record data allocated
109 * in each turn. So all gets freed when the caller
110 * releases the original record. This is because
111 * we don't get the record passed in by reference
112 * in the first place and the caller relies on
113 * having to free the record himself.
115 record
= fetch_locked_internal(crec
->ctdb_ctx
,
118 true /* persistent */);
119 if (record
== NULL
) {
120 DEBUG(5, ("fetch_locked_internal failed.\n"));
121 status
= NT_STATUS_NO_MEMORY
;
126 crec
= talloc_get_type_abort(record
->private_data
,
129 cdata
.dsize
= sizeof(crec
->header
) + data
.dsize
;
131 if (!(cdata
.dptr
= SMB_MALLOC_ARRAY(uint8
, cdata
.dsize
))) {
132 return NT_STATUS_NO_MEMORY
;
137 memcpy(cdata
.dptr
, &crec
->header
, sizeof(crec
->header
));
138 memcpy(cdata
.dptr
+ sizeof(crec
->header
), data
.dptr
, data
.dsize
);
140 status
= ctdbd_start_persistent_update(
141 messaging_ctdbd_connection(),
142 crec
->ctdb_ctx
->db_id
,
146 if (NT_STATUS_IS_OK(status
)) {
147 ret
= tdb_store(crec
->ctdb_ctx
->wtdb
->tdb
, rec
->key
,
151 : tdb_error_to_ntstatus(
152 crec
->ctdb_ctx
->wtdb
->tdb
);
156 * release the lock *now* in order to prevent deadlocks.
158 * There is a tradeoff: Usually, the record is still locked
159 * after db->store operation. This lock is usually released
160 * via the talloc destructor with the TALLOC_FREE to
161 * the record. So we have two choices:
163 * - Either re-lock the record after the call to persistent_store
164 * or cancel_persistent update and this way not changing any
165 * assumptions callers may have about the state, but possibly
166 * introducing new race conditions.
168 * - Or don't lock the record again but just remove the
169 * talloc_destructor. This is less racy but assumes that
170 * the lock is always released via TALLOC_FREE of the record.
172 * I choose the first variant for now since it seems less racy.
173 * We can't guarantee that we succeed in getting the lock
174 * anyways. The only real danger here is that a caller
175 * performs multiple store operations after a fetch_locked()
176 * which is currently not the case.
178 tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, rec
->key
);
179 talloc_set_destructor(record
, NULL
);
181 /* now tell ctdbd to update this record on all other nodes */
182 if (NT_STATUS_IS_OK(status
)) {
183 status
= ctdbd_persistent_store(
184 messaging_ctdbd_connection(),
185 crec
->ctdb_ctx
->db_id
,
189 ctdbd_cancel_persistent_update(
190 messaging_ctdbd_connection(),
191 crec
->ctdb_ctx
->db_id
,
197 SAFE_FREE(cdata
.dptr
);
200 if (!NT_STATUS_IS_OK(status
)) {
201 DEBUG(5, ("ctdbd_persistent_store failed after "
202 "%d retries with error %s - giving up.\n",
203 count
, nt_errstr(status
)));
206 SAFE_FREE(cdata
.dptr
);
211 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
216 * We have to store the header with empty data. TODO: Fix the
222 return db_ctdb_store(rec
, data
, 0);
226 static NTSTATUS
db_ctdb_delete_persistent(struct db_record
*rec
)
231 * We have to store the header with empty data. TODO: Fix the
237 return db_ctdb_store_persistent(rec
, data
, 0);
241 static int db_ctdb_record_destr(struct db_record
* data
)
243 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
244 data
->private_data
, struct db_ctdb_rec
);
246 DEBUG(10, (DEBUGLEVEL
> 10
247 ? "Unlocking db %u key %s\n"
248 : "Unlocking db %u key %.20s\n",
249 (int)crec
->ctdb_ctx
->db_id
,
250 hex_encode(data
, (unsigned char *)data
->key
.dptr
,
253 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
254 DEBUG(0, ("tdb_chainunlock failed\n"));
261 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
266 struct db_record
*result
;
267 struct db_ctdb_rec
*crec
;
270 int migrate_attempts
= 0;
272 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
273 DEBUG(0, ("talloc failed\n"));
277 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
278 DEBUG(0, ("talloc failed\n"));
283 result
->private_data
= (void *)crec
;
284 crec
->ctdb_ctx
= ctx
;
286 result
->key
.dsize
= key
.dsize
;
287 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
288 if (result
->key
.dptr
== NULL
) {
289 DEBUG(0, ("talloc failed\n"));
295 * Do a blocking lock on the record
299 if (DEBUGLEVEL
>= 10) {
300 char *keystr
= hex_encode(result
, key
.dptr
, key
.dsize
);
301 DEBUG(10, (DEBUGLEVEL
> 10
302 ? "Locking db %u key %s\n"
303 : "Locking db %u key %.20s\n",
304 (int)crec
->ctdb_ctx
->db_id
, keystr
));
308 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
309 DEBUG(3, ("tdb_chainlock failed\n"));
315 result
->store
= db_ctdb_store_persistent
;
316 result
->delete_rec
= db_ctdb_delete_persistent
;
318 result
->store
= db_ctdb_store
;
319 result
->delete_rec
= db_ctdb_delete
;
321 talloc_set_destructor(result
, db_ctdb_record_destr
);
323 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
326 * See if we have a valid record and we are the dmaster. If so, we can
327 * take the shortcut and just return it.
330 if ((ctdb_data
.dptr
== NULL
) ||
331 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
332 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
334 || (random() % 2 != 0)
337 SAFE_FREE(ctdb_data
.dptr
);
338 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
339 talloc_set_destructor(result
, NULL
);
341 migrate_attempts
+= 1;
343 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
344 ctdb_data
.dptr
, ctdb_data
.dptr
?
345 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
348 status
= ctdbd_migrate(messaging_ctdbd_connection(),ctx
->db_id
, key
);
349 if (!NT_STATUS_IS_OK(status
)) {
350 DEBUG(5, ("ctdb_migrate failed: %s\n",
355 /* now its migrated, try again */
359 if (migrate_attempts
> 10) {
360 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
364 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
366 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
367 result
->value
.dptr
= NULL
;
369 if ((result
->value
.dsize
!= 0)
370 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
371 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
372 result
->value
.dsize
))) {
373 DEBUG(0, ("talloc failed\n"));
377 SAFE_FREE(ctdb_data
.dptr
);
382 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
386 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
389 return fetch_locked_internal(ctx
, mem_ctx
, key
, db
->persistent
);
393 fetch (unlocked, no migration) operation on ctdb
395 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
396 TDB_DATA key
, TDB_DATA
*data
)
398 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
403 /* try a direct fetch */
404 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
407 * See if we have a valid record and we are the dmaster. If so, we can
408 * take the shortcut and just return it.
409 * we bypass the dmaster check for persistent databases
411 if ((ctdb_data
.dptr
!= NULL
) &&
412 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
414 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn())) {
415 /* we are the dmaster - avoid the ctdb protocol op */
417 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
418 if (data
->dsize
== 0) {
419 SAFE_FREE(ctdb_data
.dptr
);
424 data
->dptr
= (uint8
*)talloc_memdup(
425 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
428 SAFE_FREE(ctdb_data
.dptr
);
430 if (data
->dptr
== NULL
) {
436 SAFE_FREE(ctdb_data
.dptr
);
438 /* we weren't able to get it locally - ask ctdb to fetch it for us */
439 status
= ctdbd_fetch(messaging_ctdbd_connection(),ctx
->db_id
, key
, mem_ctx
, data
);
440 if (!NT_STATUS_IS_OK(status
)) {
441 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
448 struct traverse_state
{
449 struct db_context
*db
;
450 int (*fn
)(struct db_record
*rec
, void *private_data
);
454 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
456 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
457 struct db_record
*rec
;
458 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
459 /* we have to give them a locked record to prevent races */
460 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
461 if (rec
&& rec
->value
.dsize
> 0) {
462 state
->fn(rec
, state
->private_data
);
464 talloc_free(tmp_ctx
);
467 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
470 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
471 struct db_record
*rec
;
472 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
474 /* we have to give them a locked record to prevent races */
475 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
476 if (rec
&& rec
->value
.dsize
> 0) {
477 ret
= state
->fn(rec
, state
->private_data
);
479 talloc_free(tmp_ctx
);
483 static int db_ctdb_traverse(struct db_context
*db
,
484 int (*fn
)(struct db_record
*rec
,
488 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
490 struct traverse_state state
;
494 state
.private_data
= private_data
;
496 if (db
->persistent
) {
497 /* for persistent databases we don't need to do a ctdb traverse,
498 we can do a faster local traverse */
499 return tdb_traverse(ctx
->wtdb
->tdb
, traverse_persistent_callback
, &state
);
503 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
507 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
509 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
512 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
514 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
517 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
519 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
520 struct db_record rec
;
523 rec
.store
= db_ctdb_store_deny
;
524 rec
.delete_rec
= db_ctdb_delete_deny
;
525 rec
.private_data
= state
->db
;
526 state
->fn(&rec
, state
->private_data
);
529 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
532 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
533 struct db_record rec
;
536 rec
.store
= db_ctdb_store_deny
;
537 rec
.delete_rec
= db_ctdb_delete_deny
;
538 rec
.private_data
= state
->db
;
540 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
541 /* a deleted record */
544 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
545 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
547 return state
->fn(&rec
, state
->private_data
);
550 static int db_ctdb_traverse_read(struct db_context
*db
,
551 int (*fn
)(struct db_record
*rec
,
555 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
557 struct traverse_state state
;
561 state
.private_data
= private_data
;
563 if (db
->persistent
) {
564 /* for persistent databases we don't need to do a ctdb traverse,
565 we can do a faster local traverse */
566 return tdb_traverse_read(ctx
->wtdb
->tdb
, traverse_persistent_callback_read
, &state
);
569 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
573 static int db_ctdb_get_seqnum(struct db_context
*db
)
575 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
577 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
580 static int db_ctdb_trans_dummy(struct db_context
*db
)
583 * Not implemented yet, just return ok
588 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
590 int hash_size
, int tdb_flags
,
591 int open_flags
, mode_t mode
)
593 struct db_context
*result
;
594 struct db_ctdb_ctx
*db_ctdb
;
597 if (!lp_clustering()) {
598 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
602 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
603 DEBUG(0, ("talloc failed\n"));
608 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
609 DEBUG(0, ("talloc failed\n"));
614 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name
, &db_ctdb
->db_id
, tdb_flags
))) {
615 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name
));
620 db_path
= ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb
, db_ctdb
->db_id
);
622 result
->persistent
= ((tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0);
624 /* only pass through specific flags */
625 tdb_flags
&= TDB_SEQNUM
;
627 /* honor permissions if user has specified O_CREAT */
628 if (open_flags
& O_CREAT
) {
629 chmod(db_path
, mode
);
632 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
633 if (db_ctdb
->wtdb
== NULL
) {
634 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
638 talloc_free(db_path
);
640 result
->private_data
= (void *)db_ctdb
;
641 result
->fetch_locked
= db_ctdb_fetch_locked
;
642 result
->fetch
= db_ctdb_fetch
;
643 result
->traverse
= db_ctdb_traverse
;
644 result
->traverse_read
= db_ctdb_traverse_read
;
645 result
->get_seqnum
= db_ctdb_get_seqnum
;
646 result
->transaction_start
= db_ctdb_trans_dummy
;
647 result
->transaction_commit
= db_ctdb_trans_dummy
;
648 result
->transaction_cancel
= db_ctdb_trans_dummy
;
650 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
651 name
, db_ctdb
->db_id
));