2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 #include "tdb_private.h"
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb_write() calls. The hooked
48 transaction versions of tdb_read() and tdb_write() check this
49 linked list and try to use the elements of the list in preference
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
59 - allow for nested calls to tdb_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb_write
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
78 - check for a valid recovery record on open of the tdb, while the
79 open lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
84 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85 still available, but no fsync/msync calls are made. This means we
86 are still proof against a process dying during transaction commit,
87 but not against machine reboot.
89 - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
90 tdb_add_flags() transaction nesting is enabled.
91 It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
92 The default is that transaction nesting is allowed.
93 Note: this default may change in future versions of tdb.
95 Beware. when transactions are nested a transaction successfully
96 completed with tdb_transaction_commit() can be silently unrolled later.
98 - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
99 tdb_add_flags() transaction nesting is disabled.
100 It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
101 An attempt create a nested transaction will fail with TDB_ERR_NESTING.
102 The default is that transaction nesting is allowed.
103 Note: this default may change in future versions of tdb.
108 hold the context of any current transaction
110 struct tdb_transaction
{
111 /* we keep a mirrored copy of the tdb hash heads here so
112 tdb_next_hash_chain() can operate efficiently */
113 uint32_t *hash_heads
;
115 /* the original io methods - used to do IOs to the real db */
116 const struct tdb_methods
*io_methods
;
118 /* the list of transaction blocks. When a block is first
119 written to, it gets created in this list */
122 uint32_t block_size
; /* bytes in each block */
123 uint32_t last_block_size
; /* number of valid bytes in the last block */
125 /* non-zero when an internal transaction error has
126 occurred. All write operations will then fail until the
127 transaction is ended */
128 int transaction_error
;
130 /* when inside a transaction we need to keep track of any
131 nested tdb_transaction_start() calls, as these are allowed,
132 but don't create a new transaction */
135 /* set when a prepare has already occurred */
137 tdb_off_t magic_offset
;
139 /* old file size before transaction */
140 tdb_len_t old_map_size
;
142 /* did we expand in this transaction */
148 read while in a transaction. We need to check first if the data is in our list
149 of transaction elements, then if not do a real read
151 static int transaction_read(struct tdb_context
*tdb
, tdb_off_t off
, void *buf
,
152 tdb_len_t len
, int cv
)
156 /* break it down into block sized ops */
157 while (len
+ (off
% tdb
->transaction
->block_size
) > tdb
->transaction
->block_size
) {
158 tdb_len_t len2
= tdb
->transaction
->block_size
- (off
% tdb
->transaction
->block_size
);
159 if (transaction_read(tdb
, off
, buf
, len2
, cv
) != 0) {
164 buf
= (void *)(len2
+ (char *)buf
);
171 blk
= off
/ tdb
->transaction
->block_size
;
173 /* see if we have it in the block list */
174 if (tdb
->transaction
->num_blocks
<= blk
||
175 tdb
->transaction
->blocks
[blk
] == NULL
) {
176 /* nope, do a real read */
177 if (tdb
->transaction
->io_methods
->tdb_read(tdb
, off
, buf
, len
, cv
) != 0) {
183 /* it is in the block list. Now check for the last block */
184 if (blk
== tdb
->transaction
->num_blocks
-1) {
185 if (len
> tdb
->transaction
->last_block_size
) {
190 /* now copy it out of this block */
191 memcpy(buf
, tdb
->transaction
->blocks
[blk
] + (off
% tdb
->transaction
->block_size
), len
);
193 tdb_convert(buf
, len
);
198 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "transaction_read: failed at off=%d len=%d\n", off
, len
));
199 tdb
->ecode
= TDB_ERR_IO
;
200 tdb
->transaction
->transaction_error
= 1;
206 write while in a transaction
208 static int transaction_write(struct tdb_context
*tdb
, tdb_off_t off
,
209 const void *buf
, tdb_len_t len
)
213 /* Only a commit is allowed on a prepared transaction */
214 if (tdb
->transaction
->prepared
) {
215 tdb
->ecode
= TDB_ERR_EINVAL
;
216 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "transaction_write: transaction already prepared, write not allowed\n"));
217 tdb
->transaction
->transaction_error
= 1;
221 /* if the write is to a hash head, then update the transaction
223 if (len
== sizeof(tdb_off_t
) && off
>= FREELIST_TOP
&&
224 off
< FREELIST_TOP
+TDB_HASHTABLE_SIZE(tdb
)) {
225 uint32_t chain
= (off
-FREELIST_TOP
) / sizeof(tdb_off_t
);
226 memcpy(&tdb
->transaction
->hash_heads
[chain
], buf
, len
);
229 /* break it up into block sized chunks */
230 while (len
+ (off
% tdb
->transaction
->block_size
) > tdb
->transaction
->block_size
) {
231 tdb_len_t len2
= tdb
->transaction
->block_size
- (off
% tdb
->transaction
->block_size
);
232 if (transaction_write(tdb
, off
, buf
, len2
) != 0) {
238 buf
= (const void *)(len2
+ (const char *)buf
);
246 blk
= off
/ tdb
->transaction
->block_size
;
247 off
= off
% tdb
->transaction
->block_size
;
249 if (tdb
->transaction
->num_blocks
<= blk
) {
250 uint8_t **new_blocks
;
251 /* expand the blocks array */
252 if (tdb
->transaction
->blocks
== NULL
) {
253 new_blocks
= (uint8_t **)malloc(
254 (blk
+1)*sizeof(uint8_t *));
256 new_blocks
= (uint8_t **)realloc(
257 tdb
->transaction
->blocks
,
258 (blk
+1)*sizeof(uint8_t *));
260 if (new_blocks
== NULL
) {
261 tdb
->ecode
= TDB_ERR_OOM
;
264 memset(&new_blocks
[tdb
->transaction
->num_blocks
], 0,
265 (1+(blk
- tdb
->transaction
->num_blocks
))*sizeof(uint8_t *));
266 tdb
->transaction
->blocks
= new_blocks
;
267 tdb
->transaction
->num_blocks
= blk
+1;
268 tdb
->transaction
->last_block_size
= 0;
271 /* allocate and fill a block? */
272 if (tdb
->transaction
->blocks
[blk
] == NULL
) {
273 tdb
->transaction
->blocks
[blk
] = (uint8_t *)calloc(tdb
->transaction
->block_size
, 1);
274 if (tdb
->transaction
->blocks
[blk
] == NULL
) {
275 tdb
->ecode
= TDB_ERR_OOM
;
276 tdb
->transaction
->transaction_error
= 1;
279 if (tdb
->transaction
->old_map_size
> blk
* tdb
->transaction
->block_size
) {
280 tdb_len_t len2
= tdb
->transaction
->block_size
;
281 if (len2
+ (blk
* tdb
->transaction
->block_size
) > tdb
->transaction
->old_map_size
) {
282 len2
= tdb
->transaction
->old_map_size
- (blk
* tdb
->transaction
->block_size
);
284 if (tdb
->transaction
->io_methods
->tdb_read(tdb
, blk
* tdb
->transaction
->block_size
,
285 tdb
->transaction
->blocks
[blk
],
287 SAFE_FREE(tdb
->transaction
->blocks
[blk
]);
288 tdb
->ecode
= TDB_ERR_IO
;
291 if (blk
== tdb
->transaction
->num_blocks
-1) {
292 tdb
->transaction
->last_block_size
= len2
;
297 /* overwrite part of an existing block */
299 memset(tdb
->transaction
->blocks
[blk
] + off
, 0, len
);
301 memcpy(tdb
->transaction
->blocks
[blk
] + off
, buf
, len
);
303 if (blk
== tdb
->transaction
->num_blocks
-1) {
304 if (len
+ off
> tdb
->transaction
->last_block_size
) {
305 tdb
->transaction
->last_block_size
= len
+ off
;
312 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "transaction_write: failed at off=%d len=%d\n",
313 (blk
*tdb
->transaction
->block_size
) + off
, len
));
314 tdb
->transaction
->transaction_error
= 1;
320 write while in a transaction - this variant never expands the transaction blocks, it only
321 updates existing blocks. This means it cannot change the recovery size
323 static int transaction_write_existing(struct tdb_context
*tdb
, tdb_off_t off
,
324 const void *buf
, tdb_len_t len
)
328 /* break it up into block sized chunks */
329 while (len
+ (off
% tdb
->transaction
->block_size
) > tdb
->transaction
->block_size
) {
330 tdb_len_t len2
= tdb
->transaction
->block_size
- (off
% tdb
->transaction
->block_size
);
331 if (transaction_write_existing(tdb
, off
, buf
, len2
) != 0) {
337 buf
= (const void *)(len2
+ (const char *)buf
);
345 blk
= off
/ tdb
->transaction
->block_size
;
346 off
= off
% tdb
->transaction
->block_size
;
348 if (tdb
->transaction
->num_blocks
<= blk
||
349 tdb
->transaction
->blocks
[blk
] == NULL
) {
353 if (blk
== tdb
->transaction
->num_blocks
-1 &&
354 off
+ len
> tdb
->transaction
->last_block_size
) {
355 if (off
>= tdb
->transaction
->last_block_size
) {
358 len
= tdb
->transaction
->last_block_size
- off
;
361 /* overwrite part of an existing block */
362 memcpy(tdb
->transaction
->blocks
[blk
] + off
, buf
, len
);
369 accelerated hash chain head search, using the cached hash heads
371 static void transaction_next_hash_chain(struct tdb_context
*tdb
, uint32_t *chain
)
374 for (;h
< tdb
->header
.hash_size
;h
++) {
375 /* the +1 takes account of the freelist */
376 if (0 != tdb
->transaction
->hash_heads
[h
+1]) {
384 out of bounds check during a transaction
386 static int transaction_oob(struct tdb_context
*tdb
, tdb_off_t off
,
387 tdb_len_t len
, int probe
)
389 if (off
+ len
>= off
&& off
+ len
<= tdb
->map_size
) {
392 tdb
->ecode
= TDB_ERR_IO
;
397 transaction version of tdb_expand().
399 static int transaction_expand_file(struct tdb_context
*tdb
, tdb_off_t size
,
402 /* add a write to the transaction elements, so subsequent
403 reads see the zero data */
404 if (transaction_write(tdb
, size
, NULL
, addition
) != 0) {
408 tdb
->transaction
->expanded
= true;
413 static const struct tdb_methods transaction_methods
= {
416 transaction_next_hash_chain
,
418 transaction_expand_file
,
423 start a tdb transaction. No token is returned, as only a single
424 transaction is allowed to be pending per tdb_context
426 static int _tdb_transaction_start(struct tdb_context
*tdb
,
427 enum tdb_lock_flags lockflags
)
429 /* some sanity checks */
430 if (tdb
->read_only
|| (tdb
->flags
& TDB_INTERNAL
) || tdb
->traverse_read
) {
431 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
432 tdb
->ecode
= TDB_ERR_EINVAL
;
436 /* cope with nested tdb_transaction_start() calls */
437 if (tdb
->transaction
!= NULL
) {
438 if (!(tdb
->flags
& TDB_ALLOW_NESTING
)) {
439 tdb
->ecode
= TDB_ERR_NESTING
;
442 tdb
->transaction
->nesting
++;
443 TDB_LOG((tdb
, TDB_DEBUG_TRACE
, "tdb_transaction_start: nesting %d\n",
444 tdb
->transaction
->nesting
));
448 if (tdb_have_extra_locks(tdb
)) {
449 /* the caller must not have any locks when starting a
450 transaction as otherwise we'll be screwed by lack
451 of nested locks in posix */
452 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_start: cannot start a transaction with locks held\n"));
453 tdb
->ecode
= TDB_ERR_LOCK
;
457 if (tdb
->travlocks
.next
!= NULL
) {
458 /* you cannot use transactions inside a traverse (although you can use
459 traverse inside a transaction) as otherwise you can end up with
461 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
462 tdb
->ecode
= TDB_ERR_LOCK
;
466 tdb
->transaction
= (struct tdb_transaction
*)
467 calloc(sizeof(struct tdb_transaction
), 1);
468 if (tdb
->transaction
== NULL
) {
469 tdb
->ecode
= TDB_ERR_OOM
;
473 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
474 tdb
->transaction
->block_size
= tdb
->page_size
;
476 /* get the transaction write lock. This is a blocking lock. As
477 discussed with Volker, there are a number of ways we could
478 make this async, which we will probably do in the future */
479 if (tdb_transaction_lock(tdb
, F_WRLCK
, lockflags
) == -1) {
480 SAFE_FREE(tdb
->transaction
->blocks
);
481 SAFE_FREE(tdb
->transaction
);
482 if ((lockflags
& TDB_LOCK_WAIT
) == 0) {
483 tdb
->ecode
= TDB_ERR_NOLOCK
;
488 /* get a read lock from the freelist to the end of file. This
489 is upgraded to a write lock during the commit */
490 if (tdb_allrecord_lock(tdb
, F_RDLCK
, TDB_LOCK_WAIT
, true) == -1) {
491 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_start: failed to get hash locks\n"));
492 goto fail_allrecord_lock
;
495 /* setup a copy of the hash table heads so the hash scan in
496 traverse can be fast */
497 tdb
->transaction
->hash_heads
= (uint32_t *)
498 calloc(tdb
->header
.hash_size
+1, sizeof(uint32_t));
499 if (tdb
->transaction
->hash_heads
== NULL
) {
500 tdb
->ecode
= TDB_ERR_OOM
;
503 if (tdb
->methods
->tdb_read(tdb
, FREELIST_TOP
, tdb
->transaction
->hash_heads
,
504 TDB_HASHTABLE_SIZE(tdb
), 0) != 0) {
505 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_start: failed to read hash heads\n"));
506 tdb
->ecode
= TDB_ERR_IO
;
510 /* make sure we know about any file expansions already done by
512 tdb
->methods
->tdb_oob(tdb
, tdb
->map_size
, 1, 1);
513 tdb
->transaction
->old_map_size
= tdb
->map_size
;
515 /* finally hook the io methods, replacing them with
516 transaction specific methods */
517 tdb
->transaction
->io_methods
= tdb
->methods
;
518 tdb
->methods
= &transaction_methods
;
520 /* Trace at the end, so we get sequence number correct. */
521 tdb_trace(tdb
, "tdb_transaction_start");
525 tdb_allrecord_unlock(tdb
, F_RDLCK
, false);
527 tdb_transaction_unlock(tdb
, F_WRLCK
);
528 SAFE_FREE(tdb
->transaction
->blocks
);
529 SAFE_FREE(tdb
->transaction
->hash_heads
);
530 SAFE_FREE(tdb
->transaction
);
534 _PUBLIC_
int tdb_transaction_start(struct tdb_context
*tdb
)
536 return _tdb_transaction_start(tdb
, TDB_LOCK_WAIT
);
539 _PUBLIC_
int tdb_transaction_start_nonblock(struct tdb_context
*tdb
)
541 return _tdb_transaction_start(tdb
, TDB_LOCK_NOWAIT
|TDB_LOCK_PROBE
);
547 static int transaction_sync(struct tdb_context
*tdb
, tdb_off_t offset
, tdb_len_t length
)
549 if (tdb
->flags
& TDB_NOSYNC
) {
553 #ifdef HAVE_FDATASYNC
554 if (fdatasync(tdb
->fd
) != 0) {
556 if (fsync(tdb
->fd
) != 0) {
558 tdb
->ecode
= TDB_ERR_IO
;
559 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction: fsync failed\n"));
564 tdb_off_t moffset
= offset
& ~(tdb
->page_size
-1);
565 if (msync(moffset
+ (char *)tdb
->map_ptr
,
566 length
+ (offset
- moffset
), MS_SYNC
) != 0) {
567 tdb
->ecode
= TDB_ERR_IO
;
568 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction: msync failed - %s\n",
578 static int _tdb_transaction_cancel(struct tdb_context
*tdb
)
582 if (tdb
->transaction
== NULL
) {
583 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_cancel: no transaction\n"));
587 if (tdb
->transaction
->nesting
!= 0) {
588 tdb
->transaction
->transaction_error
= 1;
589 tdb
->transaction
->nesting
--;
593 tdb
->map_size
= tdb
->transaction
->old_map_size
;
595 /* free all the transaction blocks */
596 for (i
=0;i
<tdb
->transaction
->num_blocks
;i
++) {
597 if (tdb
->transaction
->blocks
[i
] != NULL
) {
598 free(tdb
->transaction
->blocks
[i
]);
601 SAFE_FREE(tdb
->transaction
->blocks
);
603 if (tdb
->transaction
->magic_offset
) {
604 const struct tdb_methods
*methods
= tdb
->transaction
->io_methods
;
605 const uint32_t invalid
= TDB_RECOVERY_INVALID_MAGIC
;
607 /* remove the recovery marker */
608 if (methods
->tdb_write(tdb
, tdb
->transaction
->magic_offset
, &invalid
, 4) == -1 ||
609 transaction_sync(tdb
, tdb
->transaction
->magic_offset
, 4) == -1) {
610 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_cancel: failed to remove recovery magic\n"));
615 /* This also removes the OPEN_LOCK, if we have it. */
616 tdb_release_transaction_locks(tdb
);
618 /* restore the normal io methods */
619 tdb
->methods
= tdb
->transaction
->io_methods
;
621 SAFE_FREE(tdb
->transaction
->hash_heads
);
622 SAFE_FREE(tdb
->transaction
);
628 cancel the current transaction
630 _PUBLIC_
int tdb_transaction_cancel(struct tdb_context
*tdb
)
632 tdb_trace(tdb
, "tdb_transaction_cancel");
633 return _tdb_transaction_cancel(tdb
);
637 work out how much space the linearised recovery data will consume
639 static tdb_len_t
tdb_recovery_size(struct tdb_context
*tdb
)
641 tdb_len_t recovery_size
= 0;
644 recovery_size
= sizeof(uint32_t);
645 for (i
=0;i
<tdb
->transaction
->num_blocks
;i
++) {
646 if (i
* tdb
->transaction
->block_size
>= tdb
->transaction
->old_map_size
) {
649 if (tdb
->transaction
->blocks
[i
] == NULL
) {
652 recovery_size
+= 2*sizeof(tdb_off_t
);
653 if (i
== tdb
->transaction
->num_blocks
-1) {
654 recovery_size
+= tdb
->transaction
->last_block_size
;
656 recovery_size
+= tdb
->transaction
->block_size
;
660 return recovery_size
;
663 int tdb_recovery_area(struct tdb_context
*tdb
,
664 const struct tdb_methods
*methods
,
665 tdb_off_t
*recovery_offset
,
666 struct tdb_record
*rec
)
668 if (tdb_ofs_read(tdb
, TDB_RECOVERY_HEAD
, recovery_offset
) == -1) {
672 if (*recovery_offset
== 0) {
677 if (methods
->tdb_read(tdb
, *recovery_offset
, rec
, sizeof(*rec
),
682 /* ignore invalid recovery regions: can happen in crash */
683 if (rec
->magic
!= TDB_RECOVERY_MAGIC
&&
684 rec
->magic
!= TDB_RECOVERY_INVALID_MAGIC
) {
685 *recovery_offset
= 0;
692 allocate the recovery area, or use an existing recovery area if it is
695 static int tdb_recovery_allocate(struct tdb_context
*tdb
,
696 tdb_len_t
*recovery_size
,
697 tdb_off_t
*recovery_offset
,
698 tdb_len_t
*recovery_max_size
)
700 struct tdb_record rec
;
701 const struct tdb_methods
*methods
= tdb
->transaction
->io_methods
;
702 tdb_off_t recovery_head
, new_end
;
704 if (tdb_recovery_area(tdb
, methods
, &recovery_head
, &rec
) == -1) {
705 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_recovery_allocate: failed to read recovery head\n"));
709 *recovery_size
= tdb_recovery_size(tdb
);
711 /* Existing recovery area? */
712 if (recovery_head
!= 0 && *recovery_size
<= rec
.rec_len
) {
713 /* it fits in the existing area */
714 *recovery_max_size
= rec
.rec_len
;
715 *recovery_offset
= recovery_head
;
719 /* If recovery area in middle of file, we need a new one. */
720 if (recovery_head
== 0
721 || recovery_head
+ sizeof(rec
) + rec
.rec_len
!= tdb
->map_size
) {
722 /* we need to free up the old recovery area, then allocate a
723 new one at the end of the file. Note that we cannot use
724 tdb_allocate() to allocate the new one as that might return
725 us an area that is being currently used (as of the start of
728 if (tdb_free(tdb
, recovery_head
, &rec
) == -1) {
729 TDB_LOG((tdb
, TDB_DEBUG_FATAL
,
730 "tdb_recovery_allocate: failed to"
731 " free previous recovery area\n"));
735 /* the tdb_free() call might have increased
736 * the recovery size */
737 *recovery_size
= tdb_recovery_size(tdb
);
740 /* New head will be at end of file. */
741 recovery_head
= tdb
->map_size
;
744 /* Now we know where it will be. */
745 *recovery_offset
= recovery_head
;
747 /* Expand by more than we need, so we don't do it often. */
748 *recovery_max_size
= tdb_expand_adjust(tdb
->map_size
,
753 new_end
= recovery_head
+ sizeof(rec
) + *recovery_max_size
;
755 if (methods
->tdb_expand_file(tdb
, tdb
->transaction
->old_map_size
,
756 new_end
- tdb
->transaction
->old_map_size
)
758 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_recovery_allocate: failed to create recovery area\n"));
762 /* remap the file (if using mmap) */
763 methods
->tdb_oob(tdb
, tdb
->map_size
, 1, 1);
765 /* we have to reset the old map size so that we don't try to expand the file
766 again in the transaction commit, which would destroy the recovery area */
767 tdb
->transaction
->old_map_size
= tdb
->map_size
;
769 /* write the recovery header offset and sync - we can sync without a race here
770 as the magic ptr in the recovery record has not been set */
771 CONVERT(recovery_head
);
772 if (methods
->tdb_write(tdb
, TDB_RECOVERY_HEAD
,
773 &recovery_head
, sizeof(tdb_off_t
)) == -1) {
774 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_recovery_allocate: failed to write recovery head\n"));
777 if (transaction_write_existing(tdb
, TDB_RECOVERY_HEAD
, &recovery_head
, sizeof(tdb_off_t
)) == -1) {
778 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_recovery_allocate: failed to write recovery head\n"));
787 setup the recovery data that will be used on a crash during commit
789 static int transaction_setup_recovery(struct tdb_context
*tdb
,
790 tdb_off_t
*magic_offset
)
792 tdb_len_t recovery_size
;
793 unsigned char *data
, *p
;
794 const struct tdb_methods
*methods
= tdb
->transaction
->io_methods
;
795 struct tdb_record
*rec
;
796 tdb_off_t recovery_offset
, recovery_max_size
;
797 tdb_off_t old_map_size
= tdb
->transaction
->old_map_size
;
798 uint32_t magic
, tailer
;
802 check that the recovery area has enough space
804 if (tdb_recovery_allocate(tdb
, &recovery_size
,
805 &recovery_offset
, &recovery_max_size
) == -1) {
809 data
= (unsigned char *)malloc(recovery_size
+ sizeof(*rec
));
811 tdb
->ecode
= TDB_ERR_OOM
;
815 rec
= (struct tdb_record
*)data
;
816 memset(rec
, 0, sizeof(*rec
));
818 rec
->magic
= TDB_RECOVERY_INVALID_MAGIC
;
819 rec
->data_len
= recovery_size
;
820 rec
->rec_len
= recovery_max_size
;
821 rec
->key_len
= old_map_size
;
824 /* build the recovery data into a single blob to allow us to do a single
825 large write, which should be more efficient */
826 p
= data
+ sizeof(*rec
);
827 for (i
=0;i
<tdb
->transaction
->num_blocks
;i
++) {
831 if (tdb
->transaction
->blocks
[i
] == NULL
) {
835 offset
= i
* tdb
->transaction
->block_size
;
836 length
= tdb
->transaction
->block_size
;
837 if (i
== tdb
->transaction
->num_blocks
-1) {
838 length
= tdb
->transaction
->last_block_size
;
841 if (offset
>= old_map_size
) {
844 if (offset
+ length
> tdb
->transaction
->old_map_size
) {
845 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
847 tdb
->ecode
= TDB_ERR_CORRUPT
;
850 memcpy(p
, &offset
, 4);
851 memcpy(p
+4, &length
, 4);
855 /* the recovery area contains the old data, not the
856 new data, so we have to call the original tdb_read
858 if (methods
->tdb_read(tdb
, offset
, p
+ 8, length
, 0) != 0) {
860 tdb
->ecode
= TDB_ERR_IO
;
867 tailer
= sizeof(*rec
) + recovery_max_size
;
868 memcpy(p
, &tailer
, 4);
873 /* write the recovery data to the recovery area */
874 if (methods
->tdb_write(tdb
, recovery_offset
, data
, sizeof(*rec
) + recovery_size
) == -1) {
875 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
877 tdb
->ecode
= TDB_ERR_IO
;
880 if (transaction_write_existing(tdb
, recovery_offset
, data
, sizeof(*rec
) + recovery_size
) == -1) {
881 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
883 tdb
->ecode
= TDB_ERR_IO
;
887 /* as we don't have ordered writes, we have to sync the recovery
888 data before we update the magic to indicate that the recovery
890 if (transaction_sync(tdb
, recovery_offset
, sizeof(*rec
) + recovery_size
) == -1) {
897 magic
= TDB_RECOVERY_MAGIC
;
900 *magic_offset
= recovery_offset
+ offsetof(struct tdb_record
, magic
);
902 if (methods
->tdb_write(tdb
, *magic_offset
, &magic
, sizeof(magic
)) == -1) {
903 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
904 tdb
->ecode
= TDB_ERR_IO
;
907 if (transaction_write_existing(tdb
, *magic_offset
, &magic
, sizeof(magic
)) == -1) {
908 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
909 tdb
->ecode
= TDB_ERR_IO
;
913 /* ensure the recovery magic marker is on disk */
914 if (transaction_sync(tdb
, *magic_offset
, sizeof(magic
)) == -1) {
921 static int _tdb_transaction_prepare_commit(struct tdb_context
*tdb
)
923 const struct tdb_methods
*methods
;
925 if (tdb
->transaction
== NULL
) {
926 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_prepare_commit: no transaction\n"));
930 if (tdb
->transaction
->prepared
) {
931 tdb
->ecode
= TDB_ERR_EINVAL
;
932 _tdb_transaction_cancel(tdb
);
933 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_prepare_commit: transaction already prepared\n"));
937 if (tdb
->transaction
->transaction_error
) {
938 tdb
->ecode
= TDB_ERR_IO
;
939 _tdb_transaction_cancel(tdb
);
940 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_prepare_commit: transaction error pending\n"));
945 if (tdb
->transaction
->nesting
!= 0) {
949 /* check for a null transaction */
950 if (tdb
->transaction
->blocks
== NULL
) {
954 methods
= tdb
->transaction
->io_methods
;
956 /* if there are any locks pending then the caller has not
957 nested their locks properly, so fail the transaction */
958 if (tdb_have_extra_locks(tdb
)) {
959 tdb
->ecode
= TDB_ERR_LOCK
;
960 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_prepare_commit: locks pending on commit\n"));
961 _tdb_transaction_cancel(tdb
);
965 /* upgrade the main transaction lock region to a write lock */
966 if (tdb_allrecord_upgrade(tdb
) == -1) {
967 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
968 _tdb_transaction_cancel(tdb
);
972 /* get the open lock - this prevents new users attaching to the database
974 if (tdb_nest_lock(tdb
, OPEN_LOCK
, F_WRLCK
, TDB_LOCK_WAIT
) == -1) {
975 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_prepare_commit: failed to get open lock\n"));
976 _tdb_transaction_cancel(tdb
);
980 /* write the recovery data to the end of the file */
981 if (transaction_setup_recovery(tdb
, &tdb
->transaction
->magic_offset
) == -1) {
982 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
983 _tdb_transaction_cancel(tdb
);
987 tdb
->transaction
->prepared
= true;
989 /* expand the file to the new size if needed */
990 if (tdb
->map_size
!= tdb
->transaction
->old_map_size
) {
991 if (methods
->tdb_expand_file(tdb
, tdb
->transaction
->old_map_size
,
993 tdb
->transaction
->old_map_size
) == -1) {
994 tdb
->ecode
= TDB_ERR_IO
;
995 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_prepare_commit: expansion failed\n"));
996 _tdb_transaction_cancel(tdb
);
999 tdb
->map_size
= tdb
->transaction
->old_map_size
;
1000 methods
->tdb_oob(tdb
, tdb
->map_size
, 1, 1);
1003 /* Keep the open lock until the actual commit */
1009 prepare to commit the current transaction
1011 _PUBLIC_
int tdb_transaction_prepare_commit(struct tdb_context
*tdb
)
1013 tdb_trace(tdb
, "tdb_transaction_prepare_commit");
1014 return _tdb_transaction_prepare_commit(tdb
);
1017 /* A repack is worthwhile if the largest is less than half total free. */
1018 static bool repack_worthwhile(struct tdb_context
*tdb
)
1021 struct tdb_record rec
;
1022 tdb_len_t total
= 0, largest
= 0;
1024 if (tdb_ofs_read(tdb
, FREELIST_TOP
, &ptr
) == -1) {
1028 while (ptr
!= 0 && tdb_rec_free_read(tdb
, ptr
, &rec
) == 0) {
1029 total
+= rec
.rec_len
;
1030 if (rec
.rec_len
> largest
) {
1031 largest
= rec
.rec_len
;
1036 return total
> largest
* 2;
1040 commit the current transaction
1042 _PUBLIC_
int tdb_transaction_commit(struct tdb_context
*tdb
)
1044 const struct tdb_methods
*methods
;
1046 bool need_repack
= false;
1048 if (tdb
->transaction
== NULL
) {
1049 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_commit: no transaction\n"));
1053 tdb_trace(tdb
, "tdb_transaction_commit");
1055 if (tdb
->transaction
->transaction_error
) {
1056 tdb
->ecode
= TDB_ERR_IO
;
1057 _tdb_transaction_cancel(tdb
);
1058 TDB_LOG((tdb
, TDB_DEBUG_ERROR
, "tdb_transaction_commit: transaction error pending\n"));
1063 if (tdb
->transaction
->nesting
!= 0) {
1064 tdb
->transaction
->nesting
--;
1068 /* check for a null transaction */
1069 if (tdb
->transaction
->blocks
== NULL
) {
1070 _tdb_transaction_cancel(tdb
);
1074 if (!tdb
->transaction
->prepared
) {
1075 int ret
= _tdb_transaction_prepare_commit(tdb
);
1080 methods
= tdb
->transaction
->io_methods
;
1082 /* perform all the writes */
1083 for (i
=0;i
<tdb
->transaction
->num_blocks
;i
++) {
1087 if (tdb
->transaction
->blocks
[i
] == NULL
) {
1091 offset
= i
* tdb
->transaction
->block_size
;
1092 length
= tdb
->transaction
->block_size
;
1093 if (i
== tdb
->transaction
->num_blocks
-1) {
1094 length
= tdb
->transaction
->last_block_size
;
1097 if (methods
->tdb_write(tdb
, offset
, tdb
->transaction
->blocks
[i
], length
) == -1) {
1098 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_commit: write failed during commit\n"));
1100 /* we've overwritten part of the data and
1101 possibly expanded the file, so we need to
1102 run the crash recovery code */
1103 tdb
->methods
= methods
;
1104 tdb_transaction_recover(tdb
);
1106 _tdb_transaction_cancel(tdb
);
1108 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_commit: write failed\n"));
1111 SAFE_FREE(tdb
->transaction
->blocks
[i
]);
1114 /* Do this before we drop lock or blocks. */
1115 if (tdb
->transaction
->expanded
) {
1116 need_repack
= repack_worthwhile(tdb
);
1119 SAFE_FREE(tdb
->transaction
->blocks
);
1120 tdb
->transaction
->num_blocks
= 0;
1122 /* ensure the new data is on disk */
1123 if (transaction_sync(tdb
, 0, tdb
->map_size
) == -1) {
1128 TODO: maybe write to some dummy hdr field, or write to magic
1129 offset without mmap, before the last sync, instead of the
1133 /* on some systems (like Linux 2.6.x) changes via mmap/msync
1134 don't change the mtime of the file, this means the file may
1135 not be backed up (as tdb rounding to block sizes means that
1136 file size changes are quite rare too). The following forces
1137 mtime changes when a transaction completes */
1139 utime(tdb
->name
, NULL
);
1142 /* use a transaction cancel to free memory and remove the
1143 transaction locks */
1144 _tdb_transaction_cancel(tdb
);
1147 return tdb_repack(tdb
);
1155 recover from an aborted transaction. Must be called with exclusive
1156 database write access already established (including the open
1157 lock to prevent new processes attaching)
1159 int tdb_transaction_recover(struct tdb_context
*tdb
)
1161 tdb_off_t recovery_head
, recovery_eof
;
1162 unsigned char *data
, *p
;
1164 struct tdb_record rec
;
1166 /* find the recovery area */
1167 if (tdb_ofs_read(tdb
, TDB_RECOVERY_HEAD
, &recovery_head
) == -1) {
1168 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_recover: failed to read recovery head\n"));
1169 tdb
->ecode
= TDB_ERR_IO
;
1173 if (recovery_head
== 0) {
1174 /* we have never allocated a recovery record */
1178 /* read the recovery record */
1179 if (tdb
->methods
->tdb_read(tdb
, recovery_head
, &rec
,
1180 sizeof(rec
), DOCONV()) == -1) {
1181 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_recover: failed to read recovery record\n"));
1182 tdb
->ecode
= TDB_ERR_IO
;
1186 if (rec
.magic
!= TDB_RECOVERY_MAGIC
) {
1187 /* there is no valid recovery data */
1191 if (tdb
->read_only
) {
1192 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_recover: attempt to recover read only database\n"));
1193 tdb
->ecode
= TDB_ERR_CORRUPT
;
1197 recovery_eof
= rec
.key_len
;
1199 data
= (unsigned char *)malloc(rec
.data_len
);
1201 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_recover: failed to allocate recovery data\n"));
1202 tdb
->ecode
= TDB_ERR_OOM
;
1206 /* read the full recovery data */
1207 if (tdb
->methods
->tdb_read(tdb
, recovery_head
+ sizeof(rec
), data
,
1208 rec
.data_len
, 0) == -1) {
1209 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_recover: failed to read recovery data\n"));
1210 tdb
->ecode
= TDB_ERR_IO
;
1214 /* recover the file data */
1216 while (p
+8 < data
+ rec
.data_len
) {
1222 memcpy(&len
, p
+4, 4);
1224 if (tdb
->methods
->tdb_write(tdb
, ofs
, p
+8, len
) == -1) {
1226 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len
, ofs
));
1227 tdb
->ecode
= TDB_ERR_IO
;
1235 if (transaction_sync(tdb
, 0, tdb
->map_size
) == -1) {
1236 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_recover: failed to sync recovery\n"));
1237 tdb
->ecode
= TDB_ERR_IO
;
1241 /* if the recovery area is after the recovered eof then remove it */
1242 if (recovery_eof
<= recovery_head
) {
1243 if (tdb_ofs_write(tdb
, TDB_RECOVERY_HEAD
, &zero
) == -1) {
1244 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_recover: failed to remove recovery head\n"));
1245 tdb
->ecode
= TDB_ERR_IO
;
1250 /* remove the recovery magic */
1251 if (tdb_ofs_write(tdb
, recovery_head
+ offsetof(struct tdb_record
, magic
),
1253 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_recover: failed to remove recovery magic\n"));
1254 tdb
->ecode
= TDB_ERR_IO
;
1258 if (transaction_sync(tdb
, 0, recovery_eof
) == -1) {
1259 TDB_LOG((tdb
, TDB_DEBUG_FATAL
, "tdb_transaction_recover: failed to sync2 recovery\n"));
1260 tdb
->ecode
= TDB_ERR_IO
;
1264 TDB_LOG((tdb
, TDB_DEBUG_TRACE
, "tdb_transaction_recover: recovered %d byte database\n",
1271 /* Any I/O failures we say "needs recovery". */
1272 bool tdb_needs_recovery(struct tdb_context
*tdb
)
1274 tdb_off_t recovery_head
;
1275 struct tdb_record rec
;
1277 /* find the recovery area */
1278 if (tdb_ofs_read(tdb
, TDB_RECOVERY_HEAD
, &recovery_head
) == -1) {
1282 if (recovery_head
== 0) {
1283 /* we have never allocated a recovery record */
1287 /* read the recovery record */
1288 if (tdb
->methods
->tdb_read(tdb
, recovery_head
, &rec
,
1289 sizeof(rec
), DOCONV()) == -1) {
1293 return (rec
.magic
== TDB_RECOVERY_MAGIC
);