dbwrap: add dbwrap_tdb2 backend
[Samba.git] / source / lib / dbwrap_tdb2.c
blobcbcbe71d711b3b15c1be60d9956ec51d6bc24fc1
1 /*
2 Unix SMB/CIFS implementation.
4 Database interface wrapper around tdb/ctdb
6 Copyright (C) Volker Lendecke 2005-2007
7 Copyright (C) Stefan Metzmacher 2008
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "includes.h"
24 #include "librpc/gen_ndr/ndr_messaging.h"
26 struct db_tdb2_ctx {
27 struct db_context *db;
28 const char *name;
29 struct tdb_wrap *mtdb;
30 const char *mtdb_path;
31 bool master_transaction;
32 struct {
33 int hash_size;
34 int tdb_flags;
35 int open_flags;
36 mode_t mode;
37 } open;
38 struct tdb_wrap *ltdb;
39 const char *ltdb_path;
40 bool local_transaction;
41 int transaction;
42 bool out_of_sync;
43 uint32_t lseqnum;
44 uint32_t mseqnum;
45 #define DB_TDB2_MASTER_SEQNUM_KEYSTR "DB_TDB2_MASTER_SEQNUM_KEYSTR"
46 TDB_DATA mseqkey;
47 uint32_t max_buffer_size;
48 uint32_t current_buffer_size;
49 struct dbwrap_tdb2_changes changes;
53 static NTSTATUS db_tdb2_store(struct db_record *rec, TDB_DATA data, int flag);
54 static NTSTATUS db_tdb2_delete(struct db_record *rec);
56 static void db_tdb2_queue_change(struct db_tdb2_ctx *db_ctx, const TDB_DATA key);
57 static void db_tdb2_send_notify(struct db_tdb2_ctx *db_ctx);
59 static struct db_context *db_open_tdb2_ex(TALLOC_CTX *mem_ctx,
60 const char *name,
61 int hash_size, int tdb_flags,
62 int open_flags, mode_t mode,
63 const struct dbwrap_tdb2_changes *chgs);
65 static int db_tdb2_sync_from_master(struct db_tdb2_ctx *db_ctx,
66 const struct dbwrap_tdb2_changes *changes);
68 static int db_tdb2_open_master(struct db_tdb2_ctx *db_ctx, bool transaction,
69 const struct dbwrap_tdb2_changes *changes);
70 static int db_tdb2_commit_local(struct db_tdb2_ctx *db_ctx, uint32_t mseqnum);
71 static int db_tdb2_close_master(struct db_tdb2_ctx *db_ctx);
72 static int db_tdb2_transaction_cancel(struct db_context *db);
74 static void db_tdb2_receive_changes(struct messaging_context *msg,
75 void *private_data,
76 uint32_t msg_type,
77 struct server_id server_id,
78 DATA_BLOB *data);
80 static struct messaging_context *global_tdb2_msg_ctx;
81 static bool global_tdb2_msg_ctx_initialized;
83 void db_tdb2_setup_messaging(struct messaging_context *msg_ctx, bool server)
85 global_tdb2_msg_ctx = msg_ctx;
87 global_tdb2_msg_ctx_initialized = true;
89 if (!server) {
90 return;
93 if (!lp_parm_bool(-1, "dbwrap", "use_tdb2", false)) {
94 return;
97 messaging_register(msg_ctx, NULL, MSG_DBWRAP_TDB2_CHANGES,
98 db_tdb2_receive_changes);
101 static struct messaging_context *db_tdb2_get_global_messaging_context(void)
103 struct messaging_context *msg_ctx;
105 if (global_tdb2_msg_ctx_initialized) {
106 return global_tdb2_msg_ctx;
109 msg_ctx = messaging_init(NULL, procid_self(),
110 event_context_init(NULL));
112 db_tdb2_setup_messaging(msg_ctx, false);
114 return global_tdb2_msg_ctx;
117 struct tdb_fetch_locked_state {
118 TALLOC_CTX *mem_ctx;
119 struct db_record *result;
122 static int db_tdb2_fetchlock_parse(TDB_DATA key, TDB_DATA data,
123 void *private_data)
125 struct tdb_fetch_locked_state *state =
126 (struct tdb_fetch_locked_state *)private_data;
128 state->result = (struct db_record *)talloc_size(
129 state->mem_ctx,
130 sizeof(struct db_record) + key.dsize + data.dsize);
132 if (state->result == NULL) {
133 return 0;
136 state->result->key.dsize = key.dsize;
137 state->result->key.dptr = ((uint8 *)state->result)
138 + sizeof(struct db_record);
139 memcpy(state->result->key.dptr, key.dptr, key.dsize);
141 state->result->value.dsize = data.dsize;
143 if (data.dsize > 0) {
144 state->result->value.dptr = state->result->key.dptr+key.dsize;
145 memcpy(state->result->value.dptr, data.dptr, data.dsize);
147 else {
148 state->result->value.dptr = NULL;
151 return 0;
154 static struct db_record *db_tdb2_fetch_locked(struct db_context *db,
155 TALLOC_CTX *mem_ctx, TDB_DATA key)
157 struct db_tdb2_ctx *ctx = talloc_get_type_abort(db->private_data,
158 struct db_tdb2_ctx);
159 struct tdb_fetch_locked_state state;
161 /* Do not accidently allocate/deallocate w/o need when debug level is lower than needed */
162 if(DEBUGLEVEL >= 10) {
163 char *keystr = hex_encode(NULL, (unsigned char*)key.dptr, key.dsize);
164 DEBUG(10, (DEBUGLEVEL > 10
165 ? "Locking key %s\n" : "Locking key %.20s\n",
166 keystr));
167 TALLOC_FREE(keystr);
171 * we only support modifications within a
172 * started transaction.
174 if (ctx->transaction == 0) {
175 DEBUG(0, ("db_tdb2_fetch_locked[%s]: no transaction started\n",
176 ctx->name));
177 smb_panic("no transaction");
178 return NULL;
181 state.mem_ctx = mem_ctx;
182 state.result = NULL;
184 tdb_parse_record(ctx->mtdb->tdb, key, db_tdb2_fetchlock_parse, &state);
186 if (state.result == NULL) {
187 db_tdb2_fetchlock_parse(key, tdb_null, &state);
190 if (state.result == NULL) {
191 return NULL;
194 state.result->private_data = talloc_reference(state.result, ctx);
195 state.result->store = db_tdb2_store;
196 state.result->delete_rec = db_tdb2_delete;
198 DEBUG(10, ("Allocated locked data 0x%p\n", state.result));
200 return state.result;
203 struct tdb_fetch_state {
204 TALLOC_CTX *mem_ctx;
205 int result;
206 TDB_DATA data;
209 static int db_tdb2_fetch_parse(TDB_DATA key, TDB_DATA data,
210 void *private_data)
212 struct tdb_fetch_state *state =
213 (struct tdb_fetch_state *)private_data;
215 state->data.dptr = (uint8 *)talloc_memdup(state->mem_ctx, data.dptr,
216 data.dsize);
217 if (state->data.dptr == NULL) {
218 state->result = -1;
219 return 0;
222 state->data.dsize = data.dsize;
223 return 0;
226 static void db_tdb2_resync_before_read(struct db_tdb2_ctx *db_ctx, TDB_DATA *kbuf)
228 if (db_ctx->mtdb) {
229 return;
232 if (!db_ctx->out_of_sync) {
233 return;
237 * this function operates on the local copy,
238 * so hide the DB_TDB2_MASTER_SEQNUM_KEYSTR from the caller.
240 if (kbuf && (db_ctx->mseqkey.dsize == kbuf->dsize) &&
241 (memcmp(db_ctx->mseqkey.dptr, kbuf->dptr, kbuf->dsize) == 0)) {
242 return;
245 DEBUG(0,("resync_before_read[%s/%s]\n",
246 db_ctx->mtdb_path, db_ctx->ltdb_path));
248 db_tdb2_open_master(db_ctx, false, NULL);
249 db_tdb2_close_master(db_ctx);
252 static int db_tdb2_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
253 TDB_DATA key, TDB_DATA *pdata)
255 struct db_tdb2_ctx *ctx = talloc_get_type_abort(
256 db->private_data, struct db_tdb2_ctx);
258 struct tdb_fetch_state state;
260 db_tdb2_resync_before_read(ctx, &key);
262 if (ctx->out_of_sync) {
263 DEBUG(0,("out of sync[%s] failing fetch\n",
264 ctx->ltdb_path));
265 errno = EIO;
266 return -1;
269 state.mem_ctx = mem_ctx;
270 state.result = 0;
271 state.data = tdb_null;
273 tdb_parse_record(ctx->ltdb->tdb, key, db_tdb2_fetch_parse, &state);
275 if (state.result == -1) {
276 return -1;
279 *pdata = state.data;
280 return 0;
283 static NTSTATUS db_tdb2_store(struct db_record *rec, TDB_DATA data, int flag)
285 struct db_tdb2_ctx *ctx = talloc_get_type_abort(rec->private_data,
286 struct db_tdb2_ctx);
287 int ret;
290 * This has a bug: We need to replace rec->value for correct
291 * operation, but right now brlock and locking don't use the value
292 * anymore after it was stored.
295 /* first store it to the master copy */
296 ret = tdb_store(ctx->mtdb->tdb, rec->key, data, flag);
297 if (ret != 0) {
298 return NT_STATUS_UNSUCCESSFUL;
301 /* then store it to the local copy */
302 ret = tdb_store(ctx->ltdb->tdb, rec->key, data, flag);
303 if (ret != 0) {
304 /* try to restore the old value in the master copy */
305 if (rec->value.dptr) {
306 tdb_store(ctx->mtdb->tdb, rec->key,
307 rec->value, TDB_REPLACE);
308 } else {
309 tdb_delete(ctx->mtdb->tdb, rec->key);
311 return NT_STATUS_INTERNAL_DB_CORRUPTION;
314 db_tdb2_queue_change(ctx, rec->key);
316 return NT_STATUS_OK;
319 static NTSTATUS db_tdb2_delete(struct db_record *rec)
321 struct db_tdb2_ctx *ctx = talloc_get_type_abort(rec->private_data,
322 struct db_tdb2_ctx);
323 int ret;
325 ret = tdb_delete(ctx->mtdb->tdb, rec->key);
326 if (ret != 0) {
327 if (tdb_error(ctx->mtdb->tdb) == TDB_ERR_NOEXIST) {
328 return NT_STATUS_NOT_FOUND;
331 return NT_STATUS_UNSUCCESSFUL;
334 ret = tdb_delete(ctx->ltdb->tdb, rec->key);
335 if (ret != 0) {
336 /* try to restore the value in the master copy */
337 tdb_store(ctx->mtdb->tdb, rec->key,
338 rec->value, TDB_REPLACE);
339 return NT_STATUS_INTERNAL_DB_CORRUPTION;
342 db_tdb2_queue_change(ctx, rec->key);
344 return NT_STATUS_OK;
347 struct db_tdb2_traverse_ctx {
348 struct db_tdb2_ctx *db_ctx;
349 int (*f)(struct db_record *rec, void *private_data);
350 void *private_data;
353 static int db_tdb2_traverse_func(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
354 void *private_data)
356 struct db_tdb2_traverse_ctx *ctx =
357 (struct db_tdb2_traverse_ctx *)private_data;
358 struct db_record rec;
360 /* this function operates on the master copy */
362 rec.key = kbuf;
363 rec.value = dbuf;
364 rec.store = db_tdb2_store;
365 rec.delete_rec = db_tdb2_delete;
366 rec.private_data = ctx->db_ctx;
368 return ctx->f(&rec, ctx->private_data);
371 static int db_tdb2_traverse(struct db_context *db,
372 int (*f)(struct db_record *rec, void *private_data),
373 void *private_data)
375 struct db_tdb2_ctx *db_ctx =
376 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
377 struct db_tdb2_traverse_ctx ctx;
380 * we only support modifications within a
381 * started transaction.
383 if (db_ctx->transaction == 0) {
384 DEBUG(0, ("db_tdb2_traverse[%s]: no transaction started\n",
385 db_ctx->name));
386 smb_panic("no transaction");
387 return -1;
390 /* here we traverse the master copy */
391 ctx.db_ctx = db_ctx;
392 ctx.f = f;
393 ctx.private_data = private_data;
394 return tdb_traverse(db_ctx->mtdb->tdb, db_tdb2_traverse_func, &ctx);
397 static NTSTATUS db_tdb2_store_deny(struct db_record *rec, TDB_DATA data, int flag)
399 return NT_STATUS_MEDIA_WRITE_PROTECTED;
402 static NTSTATUS db_tdb2_delete_deny(struct db_record *rec)
404 return NT_STATUS_MEDIA_WRITE_PROTECTED;
407 static int db_tdb2_traverse_read_func(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
408 void *private_data)
410 struct db_tdb2_traverse_ctx *ctx =
411 (struct db_tdb2_traverse_ctx *)private_data;
412 struct db_record rec;
415 * this function operates on the local copy,
416 * so hide the DB_TDB2_MASTER_SEQNUM_KEYSTR from the caller.
418 if ((ctx->db_ctx->mseqkey.dsize == kbuf.dsize) &&
419 (memcmp(ctx->db_ctx->mseqkey.dptr, kbuf.dptr, kbuf.dsize) == 0)) {
420 return 0;
423 rec.key = kbuf;
424 rec.value = dbuf;
425 rec.store = db_tdb2_store_deny;
426 rec.delete_rec = db_tdb2_delete_deny;
427 rec.private_data = ctx->db_ctx;
429 return ctx->f(&rec, ctx->private_data);
432 static int db_tdb2_traverse_read(struct db_context *db,
433 int (*f)(struct db_record *rec, void *private_data),
434 void *private_data)
436 struct db_tdb2_ctx *db_ctx =
437 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
438 struct db_tdb2_traverse_ctx ctx;
439 int ret;
441 db_tdb2_resync_before_read(db_ctx, NULL);
443 if (db_ctx->out_of_sync) {
444 DEBUG(0,("out of sync[%s] failing traverse_read\n",
445 db_ctx->ltdb_path));
446 errno = EIO;
447 return -1;
450 /* here we traverse the local copy */
451 ctx.db_ctx = db_ctx;
452 ctx.f = f;
453 ctx.private_data = private_data;
454 ret = tdb_traverse_read(db_ctx->ltdb->tdb, db_tdb2_traverse_read_func, &ctx);
455 if (ret > 0) {
456 /* we have filtered one entry */
457 ret--;
460 return ret;
463 static int db_tdb2_get_seqnum(struct db_context *db)
466 struct db_tdb2_ctx *db_ctx =
467 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
468 uint32_t nlseq;
469 uint32_t nmseq;
470 bool ok;
472 nlseq = tdb_get_seqnum(db_ctx->ltdb->tdb);
474 if (nlseq == db_ctx->lseqnum) {
475 return db_ctx->mseqnum;
478 ok = tdb_fetch_uint32_byblob(db_ctx->ltdb->tdb,
479 db_ctx->mseqkey,
480 &nmseq);
481 if (!ok) {
482 /* TODO: what should we do here? */
483 return db_ctx->mseqnum;
486 db_ctx->lseqnum = nlseq;
487 db_ctx->mseqnum = nmseq;
489 return db_ctx->mseqnum;
492 static int db_tdb2_transaction_start(struct db_context *db)
494 struct db_tdb2_ctx *db_ctx =
495 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
496 int ret;
498 if (db_ctx->transaction) {
499 db_ctx->transaction++;
500 return 0;
503 /* we need to open the master tdb in order to */
504 ret = db_tdb2_open_master(db_ctx, true, NULL);
505 if (ret != 0) {
506 return ret;
509 ret = tdb_transaction_start(db_ctx->ltdb->tdb);
510 if (ret != 0) {
511 db_tdb2_close_master(db_ctx);
512 return ret;
515 db_ctx->local_transaction = true;
516 db_ctx->transaction = 1;
518 return 0;
521 static void db_tdb2_queue_change(struct db_tdb2_ctx *db_ctx, const TDB_DATA key)
523 size_t size_needed = 4 + key.dsize;
524 size_t size_new = db_ctx->current_buffer_size + size_needed;
525 uint32_t i;
526 DATA_BLOB *keys;
528 db_ctx->changes.num_changes++;
530 if (db_ctx->changes.num_changes > 1 &&
531 db_ctx->changes.keys == NULL) {
533 * this means we already overflowed
535 return;
538 if (db_ctx->changes.num_changes == 1) {
539 db_ctx->changes.old_seqnum = db_ctx->mseqnum;
542 for (i=0; i < db_ctx->changes.num_keys; i++) {
543 int ret;
545 if (key.dsize != db_ctx->changes.keys[i].length) {
546 continue;
548 ret = memcmp(key.dptr, db_ctx->changes.keys[i].data, key.dsize);
549 if (ret != 0) {
550 continue;
554 * the key is already in the list
555 * so we're done
557 return;
560 if (db_ctx->max_buffer_size < size_new) {
561 goto overflow;
564 keys = TALLOC_REALLOC_ARRAY(db_ctx, db_ctx->changes.keys,
565 DATA_BLOB,
566 db_ctx->changes.num_keys + 1);
567 if (!keys) {
568 goto overflow;
570 db_ctx->changes.keys = keys;
572 keys[db_ctx->changes.num_keys].data = talloc_memdup(keys,
573 key.dptr,
574 key.dsize);
575 if (!keys[db_ctx->changes.num_keys].data) {
576 goto overflow;
578 keys[db_ctx->changes.num_keys].length = key.dsize;
579 db_ctx->changes.num_keys++;
580 db_ctx->current_buffer_size = size_new;
582 return;
584 overflow:
586 * on overflow discard the buffer and let
587 * the others reload the whole tdb
589 db_ctx->current_buffer_size = 0;
590 db_ctx->changes.num_keys = 0;
591 TALLOC_FREE(db_ctx->changes.keys);
592 return;
595 static void db_tdb2_send_notify(struct db_tdb2_ctx *db_ctx)
597 enum ndr_err_code ndr_err;
598 bool ok;
599 DATA_BLOB blob;
600 struct messaging_context *msg_ctx;
601 int num_msgs = 0;
602 struct server_id self = procid_self();
604 msg_ctx = db_tdb2_get_global_messaging_context();
606 db_ctx->changes.name = db_ctx->name;
608 DEBUG(10,("%s[%s] size[%u/%u] changes[%u] keys[%u] seqnum[%u=>%u]\n",
609 __FUNCTION__,
610 db_ctx->changes.name,
611 db_ctx->current_buffer_size,
612 db_ctx->max_buffer_size,
613 db_ctx->changes.num_changes,
614 db_ctx->changes.num_keys,
615 db_ctx->changes.old_seqnum,
616 db_ctx->changes.new_seqnum));
618 if (db_ctx->changes.num_changes == 0) {
619 DEBUG(10,("db_tdb2_send_notify[%s]: no changes\n",
620 db_ctx->changes.name));
621 goto done;
624 if (!msg_ctx) {
625 DEBUG(1,("db_tdb2_send_notify[%s]: skipped (no msg ctx)\n",
626 db_ctx->changes.name));
627 goto done;
630 ndr_err = ndr_push_struct_blob(
631 &blob, talloc_tos(), &db_ctx->changes,
632 (ndr_push_flags_fn_t)ndr_push_dbwrap_tdb2_changes);
633 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
634 DEBUG(0,("db_tdb2_send_notify[%s]: failed to push changes: %s\n",
635 db_ctx->changes.name,
636 nt_errstr(ndr_map_error2ntstatus(ndr_err))));
637 goto done;
640 ok = message_send_all(msg_ctx, MSG_DBWRAP_TDB2_CHANGES,
641 blob.data, blob.length, &num_msgs);
642 if (!ok) {
643 DEBUG(0,("db_tdb2_send_notify[%s]: failed to send changes\n",
644 db_ctx->changes.name));
645 goto done;
648 DEBUG(10,("db_tdb2_send_notify[%s]: pid %s send %u messages\n",
649 db_ctx->name, procid_str_static(&self), num_msgs));
651 done:
652 TALLOC_FREE(db_ctx->changes.keys);
653 ZERO_STRUCT(db_ctx->changes);
655 return;
658 static void db_tdb2_receive_changes(struct messaging_context *msg,
659 void *private_data,
660 uint32_t msg_type,
661 struct server_id server_id,
662 DATA_BLOB *data)
664 enum ndr_err_code ndr_err;
665 struct dbwrap_tdb2_changes changes;
666 struct db_context *db;
667 struct server_id self;
669 if (procid_is_me(&server_id)) {
670 DEBUG(0,("db_tdb2_receive_changes: ignore selfpacket\n"));
671 return;
674 self = procid_self();
676 DEBUG(10,("db_tdb2_receive_changes: from %s to %s\n",
677 procid_str(debug_ctx(), &server_id),
678 procid_str(debug_ctx(), &self)));
680 ndr_err = ndr_pull_struct_blob_all(
681 data, talloc_tos(), &changes,
682 (ndr_pull_flags_fn_t)ndr_pull_dbwrap_tdb2_changes);
683 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
684 DEBUG(0,("db_tdb2_receive_changes: failed to pull changes: %s\n",
685 nt_errstr(ndr_map_error2ntstatus(ndr_err))));
686 goto done;
689 if(DEBUGLEVEL >= 10) {
690 NDR_PRINT_DEBUG(dbwrap_tdb2_changes, &changes);
693 /* open the db, this will sync it */
694 db = db_open_tdb2_ex(talloc_tos(), changes.name, 0,
695 0, O_RDWR, 0600, &changes);
696 TALLOC_FREE(db);
697 done:
698 return;
701 static int db_tdb2_transaction_commit(struct db_context *db)
703 struct db_tdb2_ctx *db_ctx =
704 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
705 int ret;
706 uint32_t mseqnum;
708 if (db_ctx->transaction == 0) {
709 return -1;
710 } else if (db_ctx->transaction > 1) {
711 db_ctx->transaction--;
712 return 0;
715 mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb);
716 db_ctx->changes.new_seqnum = mseqnum;
718 /* first commit to the master copy */
719 ret = tdb_transaction_commit(db_ctx->mtdb->tdb);
720 db_ctx->master_transaction = false;
721 if (ret != 0) {
722 int saved_errno = errno;
723 db_tdb2_transaction_cancel(db);
724 errno = saved_errno;
725 return ret;
729 * Note: as we've already commited the changes to the master copy
730 * so we ignore errors in the following functions
732 ret = db_tdb2_commit_local(db_ctx, mseqnum);
733 if (ret == 0) {
734 db_ctx->out_of_sync = false;
735 } else {
736 db_ctx->out_of_sync = true;
739 db_ctx->transaction = 0;
741 db_tdb2_close_master(db_ctx);
743 db_tdb2_send_notify(db_ctx);
745 return 0;
748 static int db_tdb2_transaction_cancel(struct db_context *db)
750 struct db_tdb2_ctx *db_ctx =
751 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
752 int saved_errno;
753 int ret;
755 if (db_ctx->transaction == 0) {
756 return -1;
758 if (db_ctx->transaction > 1) {
759 db_ctx->transaction--;
760 return 0;
763 /* cancel the transaction and close the master copy */
764 ret = db_tdb2_close_master(db_ctx);
765 saved_errno = errno;
767 /* now cancel on the local copy and ignore any error */
768 tdb_transaction_cancel(db_ctx->ltdb->tdb);
769 db_ctx->local_transaction = false;
771 db_ctx->transaction = 0;
773 errno = saved_errno;
774 return ret;
777 static int db_tdb2_open_master(struct db_tdb2_ctx *db_ctx, bool transaction,
778 const struct dbwrap_tdb2_changes *changes)
780 int ret;
782 db_ctx->mtdb = tdb_wrap_open(db_ctx,
783 db_ctx->mtdb_path,
784 db_ctx->open.hash_size,
785 db_ctx->open.tdb_flags|TDB_NOMMAP|TDB_SEQNUM,
786 db_ctx->open.open_flags,
787 db_ctx->open.mode);
788 if (db_ctx->mtdb == NULL) {
789 DEBUG(0, ("Could not open master tdb[%s]: %s\n",
790 db_ctx->mtdb_path,
791 strerror(errno)));
792 return -1;
794 DEBUG(10,("open_master[%s]\n", db_ctx->mtdb_path));
796 if (!db_ctx->ltdb) {
797 struct stat st;
799 if (fstat(tdb_fd(db_ctx->mtdb->tdb), &st) == 0) {
800 db_ctx->open.mode = st.st_mode;
803 /* make sure the local one uses the same hash size as the master one */
804 db_ctx->open.hash_size = tdb_hash_size(db_ctx->mtdb->tdb);
806 db_ctx->ltdb = tdb_wrap_open(db_ctx,
807 db_ctx->ltdb_path,
808 db_ctx->open.hash_size,
809 db_ctx->open.tdb_flags|TDB_SEQNUM,
810 db_ctx->open.open_flags|O_CREAT,
811 db_ctx->open.mode);
812 if (db_ctx->ltdb == NULL) {
813 DEBUG(0, ("Could not open local tdb[%s]: %s\n",
814 db_ctx->ltdb_path,
815 strerror(errno)));
816 TALLOC_FREE(db_ctx->mtdb);
817 return -1;
819 DEBUG(10,("open_local[%s]\n", db_ctx->ltdb_path));
822 if (transaction) {
823 ret = tdb_transaction_start(db_ctx->mtdb->tdb);
824 if (ret != 0) {
825 DEBUG(0,("open failed to start transaction[%s]\n",
826 db_ctx->mtdb_path));
827 db_tdb2_close_master(db_ctx);
828 return ret;
830 db_ctx->master_transaction = true;
833 ret = db_tdb2_sync_from_master(db_ctx, changes);
834 if (ret != 0) {
835 DEBUG(0,("open failed to sync from master[%s]\n",
836 db_ctx->ltdb_path));
837 db_tdb2_close_master(db_ctx);
838 return ret;
841 return 0;
844 static int db_tdb2_commit_local(struct db_tdb2_ctx *db_ctx, uint32_t mseqnum)
846 bool ok;
847 int ret;
849 /* first fetch the master seqnum */
850 db_ctx->mseqnum = mseqnum;
852 /* now we try to store the master seqnum in the local tdb */
853 ok = tdb_store_uint32_byblob(db_ctx->ltdb->tdb,
854 db_ctx->mseqkey,
855 db_ctx->mseqnum);
856 if (!ok) {
857 tdb_transaction_cancel(db_ctx->ltdb->tdb);
858 db_ctx->local_transaction = false;
859 DEBUG(0,("local failed[%s] store mseq[%u]\n",
860 db_ctx->ltdb_path, db_ctx->mseqnum));
861 return -1;
864 /* now commit all changes to the local tdb */
865 ret = tdb_transaction_commit(db_ctx->ltdb->tdb);
866 db_ctx->local_transaction = false;
867 if (ret != 0) {
868 DEBUG(0,("local failed[%s] commit mseq[%u]\n",
869 db_ctx->ltdb_path, db_ctx->mseqnum));
870 return ret;
874 * and update the cached local seqnum this is needed to
875 * let us cache the master seqnum.
877 db_ctx->lseqnum = tdb_get_seqnum(db_ctx->ltdb->tdb);
878 DEBUG(10,("local updated[%s] mseq[%u]\n",
879 db_ctx->ltdb_path, db_ctx->mseqnum));
881 return 0;
884 static int db_tdb2_close_master(struct db_tdb2_ctx *db_ctx)
886 if (db_ctx->master_transaction) {
887 tdb_transaction_cancel(db_ctx->mtdb->tdb);
889 db_ctx->master_transaction = false;
890 /* now we can close the master handle */
891 TALLOC_FREE(db_ctx->mtdb);
893 DEBUG(10,("close_master[%s] ok\n", db_ctx->mtdb_path));
894 return 0;
897 static int db_tdb2_traverse_sync_all_func(TDB_CONTEXT *tdb,
898 TDB_DATA kbuf, TDB_DATA dbuf,
899 void *private_data)
901 struct db_tdb2_traverse_ctx *ctx =
902 (struct db_tdb2_traverse_ctx *)private_data;
903 uint32_t *seqnum = (uint32_t *)ctx->private_data;
904 int ret;
906 DEBUG(10,("sync_entry[%s]\n", ctx->db_ctx->mtdb_path));
908 /* Do not accidently allocate/deallocate w/o need when debug level is lower than needed */
909 if(DEBUGLEVEL >= 10) {
910 char *keystr = hex_encode(NULL, (unsigned char*)kbuf.dptr, kbuf.dsize);
911 DEBUG(10, (DEBUGLEVEL > 10
912 ? "Locking key %s\n" : "Locking key %.20s\n",
913 keystr));
914 TALLOC_FREE(keystr);
917 ret = tdb_store(ctx->db_ctx->ltdb->tdb, kbuf, dbuf, TDB_INSERT);
918 if (ret != 0) {
919 DEBUG(0,("sync_entry[%s] %d: %s\n",
920 ctx->db_ctx->ltdb_path, ret,
921 tdb_errorstr(ctx->db_ctx->ltdb->tdb)));
922 return ret;
925 *seqnum = tdb_get_seqnum(ctx->db_ctx->mtdb->tdb);
927 return 0;
930 static int db_tdb2_sync_all(struct db_tdb2_ctx *db_ctx, uint32_t *seqnum)
932 struct db_tdb2_traverse_ctx ctx;
933 int ret;
935 ret = tdb_wipe_all(db_ctx->ltdb->tdb);
936 if (ret != 0) {
937 DEBUG(0,("tdb_wipe_all[%s] failed %d: %s\n",
938 db_ctx->ltdb_path, ret,
939 tdb_errorstr(db_ctx->ltdb->tdb)));
940 return ret;
943 ctx.db_ctx = db_ctx;
944 ctx.f = NULL;
945 ctx.private_data = seqnum;
946 ret = tdb_traverse_read(db_ctx->mtdb->tdb,
947 db_tdb2_traverse_sync_all_func,
948 &ctx);
949 DEBUG(10,("db_tdb2_sync_all[%s] count[%d]\n",
950 db_ctx->mtdb_path, ret));
951 if (ret < 0) {
952 return ret;
955 return 0;
958 static int db_tdb2_sync_changes(struct db_tdb2_ctx *db_ctx,
959 const struct dbwrap_tdb2_changes *changes,
960 uint32_t *seqnum)
962 uint32_t cseqnum;
963 uint32_t mseqnum;
964 uint32_t i;
965 int ret;
966 bool need_full_sync = false;
968 DEBUG(10,("db_tdb2_sync_changes[%s] changes[%u]\n",
969 changes->name, changes->num_changes));
970 if(DEBUGLEVEL >= 10) {
971 NDR_PRINT_DEBUG(dbwrap_tdb2_changes, discard_const(changes));
974 /* for the master tdb for reading */
975 ret = tdb_lockall_read(db_ctx->mtdb->tdb);
976 if (ret != 0) {
977 DEBUG(0,("tdb_lockall_read[%s] %d\n", db_ctx->mtdb_path, ret));
978 return ret;
981 /* first fetch seqnum we know about */
982 cseqnum = db_tdb2_get_seqnum(db_ctx->db);
984 /* then fetch the master seqnum */
985 mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb);
987 if (cseqnum == mseqnum) {
988 DEBUG(10,("db_tdb2_sync_changes[%s] uptodate[%u]\n",
989 db_ctx->mtdb_path, mseqnum));
990 /* we hit a race before and now noticed we're uptodate */
991 goto done;
994 /* now see if the changes describe what we need */
995 if (changes->old_seqnum != cseqnum) {
996 need_full_sync = true;
999 if (changes->new_seqnum != mseqnum) {
1000 need_full_sync = true;
1003 /* this was the overflow case */
1004 if (changes->num_keys == 0) {
1005 need_full_sync = true;
1008 if (need_full_sync) {
1009 tdb_unlockall_read(db_ctx->mtdb->tdb);
1010 DEBUG(0,("fallback to full sync[%s] seq[%u=>%u] keys[%u]\n",
1011 db_ctx->ltdb_path, cseqnum, mseqnum,
1012 changes->num_keys));
1013 return db_tdb2_sync_all(db_ctx, &mseqnum);
1016 for (i=0; i < changes->num_keys; i++) {
1017 const char *op = NULL;
1018 bool del = false;
1019 TDB_DATA key;
1020 TDB_DATA val;
1022 key.dsize = changes->keys[i].length;
1023 key.dptr = changes->keys[i].data;
1025 val = tdb_fetch(db_ctx->mtdb->tdb, key);
1026 ret = tdb_error(db_ctx->mtdb->tdb);
1027 if (ret == TDB_ERR_NOEXIST) {
1028 del = true;
1029 } else if (ret != 0) {
1030 DEBUG(0,("sync_changes[%s] failure %d\n",
1031 db_ctx->mtdb_path, ret));
1032 goto failed;
1035 if (del) {
1036 op = "delete";
1037 ret = tdb_delete(db_ctx->ltdb->tdb, key);
1038 DEBUG(10,("sync_changes[%s] delete key[%u] %d\n",
1039 db_ctx->mtdb_path, i, ret));
1040 } else {
1041 op = "store";
1042 ret = tdb_store(db_ctx->ltdb->tdb, key,
1043 val, TDB_REPLACE);
1044 DEBUG(10,("sync_changes[%s] store key[%u] %d\n",
1045 db_ctx->mtdb_path, i, ret));
1047 SAFE_FREE(val.dptr);
1048 if (ret != 0) {
1049 DEBUG(0,("sync_changes[%s] %s key[%u] failed %d\n",
1050 db_ctx->mtdb_path, op, i, ret));
1051 goto failed;
1055 done:
1056 tdb_unlockall_read(db_ctx->mtdb->tdb);
1058 *seqnum = mseqnum;
1059 return 0;
1060 failed:
1061 tdb_unlockall_read(db_ctx->mtdb->tdb);
1062 return ret;
1065 static int db_tdb2_sync_from_master(struct db_tdb2_ctx *db_ctx,
1066 const struct dbwrap_tdb2_changes *changes)
1068 int ret;
1069 uint32_t cseqnum;
1070 uint32_t mseqnum;
1071 bool force = false;
1073 /* first fetch seqnum we know about */
1074 cseqnum = db_tdb2_get_seqnum(db_ctx->db);
1076 /* then fetch the master seqnum */
1077 mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb);
1079 if (db_ctx->lseqnum == 0) {
1080 force = true;
1083 if (!force && cseqnum == mseqnum) {
1084 DEBUG(10,("uptodate[%s] mseq[%u]\n",
1085 db_ctx->ltdb_path, mseqnum));
1086 /* the local copy is uptodate, close the master db */
1087 return 0;
1089 DEBUG(10,("not uptodate[%s] seq[%u=>%u]\n",
1090 db_ctx->ltdb_path, cseqnum, mseqnum));
1092 ret = tdb_transaction_start(db_ctx->ltdb->tdb);
1093 if (ret != 0) {
1094 DEBUG(0,("failed to start transaction[%s] %d: %s\n",
1095 db_ctx->ltdb_path, ret,
1096 tdb_errorstr(db_ctx->ltdb->tdb)));
1097 db_ctx->out_of_sync = true;
1098 return ret;
1100 db_ctx->local_transaction = true;
1102 if (changes && !force) {
1103 ret = db_tdb2_sync_changes(db_ctx, changes, &mseqnum);
1104 if (ret != 0) {
1105 db_ctx->out_of_sync = true;
1106 tdb_transaction_cancel(db_ctx->ltdb->tdb);
1107 db_ctx->local_transaction = false;
1108 return ret;
1110 } else {
1111 ret = db_tdb2_sync_all(db_ctx, &mseqnum);
1112 if (ret != 0) {
1113 db_ctx->out_of_sync = true;
1114 tdb_transaction_cancel(db_ctx->ltdb->tdb);
1115 db_ctx->local_transaction = false;
1116 return ret;
1120 ret = db_tdb2_commit_local(db_ctx, mseqnum);
1121 if (ret != 0) {
1122 db_ctx->out_of_sync = true;
1123 return ret;
1126 db_ctx->out_of_sync = false;
1128 return 0;
1131 static int db_tdb2_ctx_destructor(struct db_tdb2_ctx *db_tdb2)
1133 db_tdb2_close_master(db_tdb2);
1134 if (db_tdb2->local_transaction) {
1135 tdb_transaction_cancel(db_tdb2->ltdb->tdb);
1137 db_tdb2->local_transaction = false;
1138 TALLOC_FREE(db_tdb2->ltdb);
1139 return 0;
1142 static struct db_context *db_open_tdb2_ex(TALLOC_CTX *mem_ctx,
1143 const char *name,
1144 int hash_size, int tdb_flags,
1145 int open_flags, mode_t mode,
1146 const struct dbwrap_tdb2_changes *chgs)
1148 struct db_context *result = NULL;
1149 struct db_tdb2_ctx *db_tdb2;
1150 int ret;
1151 const char *md;
1152 const char *ld;
1153 const char *bn;
1155 bn = strrchr_m(name, '/');
1156 if (bn) {
1157 bn++;
1158 DEBUG(3,("db_open_tdb2: use basename[%s] of abspath[%s]:\n",
1159 bn, name));
1160 } else {
1161 bn = name;
1164 md = lp_parm_const_string(-1, "dbwrap_tdb2", "master directory", NULL);
1165 if (!md) {
1166 DEBUG(0,("'dbwrap_tdb2:master directory' empty\n"));
1167 goto fail;
1170 ld = lp_parm_const_string(-1, "dbwrap_tdb2", "local directory", NULL);
1171 if (!ld) {
1172 DEBUG(0,("'dbwrap_tdb2:local directory' empty\n"));
1173 goto fail;
1176 result = TALLOC_ZERO_P(mem_ctx, struct db_context);
1177 if (result == NULL) {
1178 DEBUG(0, ("talloc failed\n"));
1179 goto fail;
1182 result->private_data = db_tdb2 = TALLOC_ZERO_P(result, struct db_tdb2_ctx);
1183 if (db_tdb2 == NULL) {
1184 DEBUG(0, ("talloc failed\n"));
1185 goto fail;
1188 db_tdb2->db = result;
1190 db_tdb2->open.hash_size = hash_size;
1191 db_tdb2->open.tdb_flags = tdb_flags;
1192 db_tdb2->open.open_flags= open_flags;
1193 db_tdb2->open.mode = mode;
1195 db_tdb2->max_buffer_size = lp_parm_ulong(-1, "dbwrap_tdb2",
1196 "notify buffer size", 512);
1198 db_tdb2->name = talloc_strdup(db_tdb2, bn);
1199 if (db_tdb2->name == NULL) {
1200 DEBUG(0, ("talloc_strdup failed\n"));
1201 goto fail;
1204 db_tdb2->mtdb_path = talloc_asprintf(db_tdb2, "%s/%s",
1205 md, bn);
1206 if (db_tdb2->mtdb_path == NULL) {
1207 DEBUG(0, ("talloc_asprintf failed\n"));
1208 goto fail;
1211 db_tdb2->ltdb_path = talloc_asprintf(db_tdb2, "%s/%s.tdb2",
1212 ld, bn);
1213 if (db_tdb2->ltdb_path == NULL) {
1214 DEBUG(0, ("talloc_asprintf failed\n"));
1215 goto fail;
1218 db_tdb2->mseqkey = string_term_tdb_data(DB_TDB2_MASTER_SEQNUM_KEYSTR);
1221 * this implicit opens the local one if as it's not yet open
1222 * it syncs the local copy.
1224 ret = db_tdb2_open_master(db_tdb2, false, chgs);
1225 if (ret != 0) {
1226 goto fail;
1229 ret = db_tdb2_close_master(db_tdb2);
1230 if (ret != 0) {
1231 goto fail;
1234 DEBUG(10,("db_open_tdb2[%s] opened with mseq[%u]\n",
1235 db_tdb2->name, db_tdb2->mseqnum));
1237 result->fetch_locked = db_tdb2_fetch_locked;
1238 result->fetch = db_tdb2_fetch;
1239 result->traverse = db_tdb2_traverse;
1240 result->traverse_read = db_tdb2_traverse_read;
1241 result->get_seqnum = db_tdb2_get_seqnum;
1242 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1243 result->transaction_start = db_tdb2_transaction_start;
1244 result->transaction_commit = db_tdb2_transaction_commit;
1245 result->transaction_cancel = db_tdb2_transaction_cancel;
1247 talloc_set_destructor(db_tdb2, db_tdb2_ctx_destructor);
1249 return result;
1251 fail:
1252 if (result != NULL) {
1253 TALLOC_FREE(result);
1255 return NULL;
1258 struct db_context *db_open_tdb2(TALLOC_CTX *mem_ctx,
1259 const char *name,
1260 int hash_size, int tdb_flags,
1261 int open_flags, mode_t mode)
1263 return db_open_tdb2_ex(mem_ctx, name, hash_size,
1264 tdb_flags, open_flags, mode, NULL);