tdb: Fix blank line endings
[Samba/gebeck_regimport.git] / lib / tdb / common / transaction.c
blobee9beeb3d37e3b9e85e98bf1f3964ddf2a2e7421
1 /*
2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
10 ** under the LGPL
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 #include "tdb_private.h"
29 transaction design:
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb_write() calls. The hooked
48 transaction versions of tdb_read() and tdb_write() check this
49 linked list and try to use the elements of the list in preference
50 to the real database.
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
59 - allow for nested calls to tdb_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb_write
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
78 - check for a valid recovery record on open of the tdb, while the
79 open lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
82 intervention.
84 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85 still available, but no fsync/msync calls are made. This means we
86 are still proof against a process dying during transaction commit,
87 but not against machine reboot.
89 - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
90 tdb_add_flags() transaction nesting is enabled.
91 It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
92 The default is that transaction nesting is allowed.
93 Note: this default may change in future versions of tdb.
95 Beware. when transactions are nested a transaction successfully
96 completed with tdb_transaction_commit() can be silently unrolled later.
98 - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
99 tdb_add_flags() transaction nesting is disabled.
100 It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
101 An attempt create a nested transaction will fail with TDB_ERR_NESTING.
102 The default is that transaction nesting is allowed.
103 Note: this default may change in future versions of tdb.
108 hold the context of any current transaction
110 struct tdb_transaction {
111 /* we keep a mirrored copy of the tdb hash heads here so
112 tdb_next_hash_chain() can operate efficiently */
113 uint32_t *hash_heads;
115 /* the original io methods - used to do IOs to the real db */
116 const struct tdb_methods *io_methods;
118 /* the list of transaction blocks. When a block is first
119 written to, it gets created in this list */
120 uint8_t **blocks;
121 uint32_t num_blocks;
122 uint32_t block_size; /* bytes in each block */
123 uint32_t last_block_size; /* number of valid bytes in the last block */
125 /* non-zero when an internal transaction error has
126 occurred. All write operations will then fail until the
127 transaction is ended */
128 int transaction_error;
130 /* when inside a transaction we need to keep track of any
131 nested tdb_transaction_start() calls, as these are allowed,
132 but don't create a new transaction */
133 int nesting;
135 /* set when a prepare has already occurred */
136 bool prepared;
137 tdb_off_t magic_offset;
139 /* old file size before transaction */
140 tdb_len_t old_map_size;
142 /* did we expand in this transaction */
143 bool expanded;
148 read while in a transaction. We need to check first if the data is in our list
149 of transaction elements, then if not do a real read
151 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
152 tdb_len_t len, int cv)
154 uint32_t blk;
156 /* break it down into block sized ops */
157 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
158 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
159 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
160 return -1;
162 len -= len2;
163 off += len2;
164 buf = (void *)(len2 + (char *)buf);
167 if (len == 0) {
168 return 0;
171 blk = off / tdb->transaction->block_size;
173 /* see if we have it in the block list */
174 if (tdb->transaction->num_blocks <= blk ||
175 tdb->transaction->blocks[blk] == NULL) {
176 /* nope, do a real read */
177 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
178 goto fail;
180 return 0;
183 /* it is in the block list. Now check for the last block */
184 if (blk == tdb->transaction->num_blocks-1) {
185 if (len > tdb->transaction->last_block_size) {
186 goto fail;
190 /* now copy it out of this block */
191 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
192 if (cv) {
193 tdb_convert(buf, len);
195 return 0;
197 fail:
198 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
199 tdb->ecode = TDB_ERR_IO;
200 tdb->transaction->transaction_error = 1;
201 return -1;
206 write while in a transaction
208 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
209 const void *buf, tdb_len_t len)
211 uint32_t blk;
213 /* Only a commit is allowed on a prepared transaction */
214 if (tdb->transaction->prepared) {
215 tdb->ecode = TDB_ERR_EINVAL;
216 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
217 tdb->transaction->transaction_error = 1;
218 return -1;
221 /* if the write is to a hash head, then update the transaction
222 hash heads */
223 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
224 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
225 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
226 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
229 /* break it up into block sized chunks */
230 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
231 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
232 if (transaction_write(tdb, off, buf, len2) != 0) {
233 return -1;
235 len -= len2;
236 off += len2;
237 if (buf != NULL) {
238 buf = (const void *)(len2 + (const char *)buf);
242 if (len == 0) {
243 return 0;
246 blk = off / tdb->transaction->block_size;
247 off = off % tdb->transaction->block_size;
249 if (tdb->transaction->num_blocks <= blk) {
250 uint8_t **new_blocks;
251 /* expand the blocks array */
252 if (tdb->transaction->blocks == NULL) {
253 new_blocks = (uint8_t **)malloc(
254 (blk+1)*sizeof(uint8_t *));
255 } else {
256 new_blocks = (uint8_t **)realloc(
257 tdb->transaction->blocks,
258 (blk+1)*sizeof(uint8_t *));
260 if (new_blocks == NULL) {
261 tdb->ecode = TDB_ERR_OOM;
262 goto fail;
264 memset(&new_blocks[tdb->transaction->num_blocks], 0,
265 (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
266 tdb->transaction->blocks = new_blocks;
267 tdb->transaction->num_blocks = blk+1;
268 tdb->transaction->last_block_size = 0;
271 /* allocate and fill a block? */
272 if (tdb->transaction->blocks[blk] == NULL) {
273 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
274 if (tdb->transaction->blocks[blk] == NULL) {
275 tdb->ecode = TDB_ERR_OOM;
276 tdb->transaction->transaction_error = 1;
277 return -1;
279 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
280 tdb_len_t len2 = tdb->transaction->block_size;
281 if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
282 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
284 if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
285 tdb->transaction->blocks[blk],
286 len2, 0) != 0) {
287 SAFE_FREE(tdb->transaction->blocks[blk]);
288 tdb->ecode = TDB_ERR_IO;
289 goto fail;
291 if (blk == tdb->transaction->num_blocks-1) {
292 tdb->transaction->last_block_size = len2;
297 /* overwrite part of an existing block */
298 if (buf == NULL) {
299 memset(tdb->transaction->blocks[blk] + off, 0, len);
300 } else {
301 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
303 if (blk == tdb->transaction->num_blocks-1) {
304 if (len + off > tdb->transaction->last_block_size) {
305 tdb->transaction->last_block_size = len + off;
309 return 0;
311 fail:
312 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
313 (blk*tdb->transaction->block_size) + off, len));
314 tdb->transaction->transaction_error = 1;
315 return -1;
320 write while in a transaction - this variant never expands the transaction blocks, it only
321 updates existing blocks. This means it cannot change the recovery size
323 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
324 const void *buf, tdb_len_t len)
326 uint32_t blk;
328 /* break it up into block sized chunks */
329 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
330 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
331 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
332 return -1;
334 len -= len2;
335 off += len2;
336 if (buf != NULL) {
337 buf = (const void *)(len2 + (const char *)buf);
341 if (len == 0) {
342 return 0;
345 blk = off / tdb->transaction->block_size;
346 off = off % tdb->transaction->block_size;
348 if (tdb->transaction->num_blocks <= blk ||
349 tdb->transaction->blocks[blk] == NULL) {
350 return 0;
353 if (blk == tdb->transaction->num_blocks-1 &&
354 off + len > tdb->transaction->last_block_size) {
355 if (off >= tdb->transaction->last_block_size) {
356 return 0;
358 len = tdb->transaction->last_block_size - off;
361 /* overwrite part of an existing block */
362 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
364 return 0;
369 accelerated hash chain head search, using the cached hash heads
371 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
373 uint32_t h = *chain;
374 for (;h < tdb->header.hash_size;h++) {
375 /* the +1 takes account of the freelist */
376 if (0 != tdb->transaction->hash_heads[h+1]) {
377 break;
380 (*chain) = h;
384 out of bounds check during a transaction
386 static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
387 tdb_len_t len, int probe)
389 if (off + len >= off && off + len <= tdb->map_size) {
390 return 0;
392 tdb->ecode = TDB_ERR_IO;
393 return -1;
397 transaction version of tdb_expand().
399 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
400 tdb_off_t addition)
402 /* add a write to the transaction elements, so subsequent
403 reads see the zero data */
404 if (transaction_write(tdb, size, NULL, addition) != 0) {
405 return -1;
408 tdb->transaction->expanded = true;
410 return 0;
413 static const struct tdb_methods transaction_methods = {
414 transaction_read,
415 transaction_write,
416 transaction_next_hash_chain,
417 transaction_oob,
418 transaction_expand_file,
423 start a tdb transaction. No token is returned, as only a single
424 transaction is allowed to be pending per tdb_context
426 static int _tdb_transaction_start(struct tdb_context *tdb,
427 enum tdb_lock_flags lockflags)
429 /* some sanity checks */
430 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
431 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
432 tdb->ecode = TDB_ERR_EINVAL;
433 return -1;
436 /* cope with nested tdb_transaction_start() calls */
437 if (tdb->transaction != NULL) {
438 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
439 tdb->ecode = TDB_ERR_NESTING;
440 return -1;
442 tdb->transaction->nesting++;
443 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
444 tdb->transaction->nesting));
445 return 0;
448 if (tdb_have_extra_locks(tdb)) {
449 /* the caller must not have any locks when starting a
450 transaction as otherwise we'll be screwed by lack
451 of nested locks in posix */
452 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
453 tdb->ecode = TDB_ERR_LOCK;
454 return -1;
457 if (tdb->travlocks.next != NULL) {
458 /* you cannot use transactions inside a traverse (although you can use
459 traverse inside a transaction) as otherwise you can end up with
460 deadlock */
461 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
462 tdb->ecode = TDB_ERR_LOCK;
463 return -1;
466 tdb->transaction = (struct tdb_transaction *)
467 calloc(sizeof(struct tdb_transaction), 1);
468 if (tdb->transaction == NULL) {
469 tdb->ecode = TDB_ERR_OOM;
470 return -1;
473 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
474 tdb->transaction->block_size = tdb->page_size;
476 /* get the transaction write lock. This is a blocking lock. As
477 discussed with Volker, there are a number of ways we could
478 make this async, which we will probably do in the future */
479 if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
480 SAFE_FREE(tdb->transaction->blocks);
481 SAFE_FREE(tdb->transaction);
482 if ((lockflags & TDB_LOCK_WAIT) == 0) {
483 tdb->ecode = TDB_ERR_NOLOCK;
485 return -1;
488 /* get a read lock from the freelist to the end of file. This
489 is upgraded to a write lock during the commit */
490 if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
491 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
492 goto fail_allrecord_lock;
495 /* setup a copy of the hash table heads so the hash scan in
496 traverse can be fast */
497 tdb->transaction->hash_heads = (uint32_t *)
498 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
499 if (tdb->transaction->hash_heads == NULL) {
500 tdb->ecode = TDB_ERR_OOM;
501 goto fail;
503 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
504 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
505 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
506 tdb->ecode = TDB_ERR_IO;
507 goto fail;
510 /* make sure we know about any file expansions already done by
511 anyone else */
512 tdb->methods->tdb_oob(tdb, tdb->map_size, 1, 1);
513 tdb->transaction->old_map_size = tdb->map_size;
515 /* finally hook the io methods, replacing them with
516 transaction specific methods */
517 tdb->transaction->io_methods = tdb->methods;
518 tdb->methods = &transaction_methods;
520 /* Trace at the end, so we get sequence number correct. */
521 tdb_trace(tdb, "tdb_transaction_start");
522 return 0;
524 fail:
525 tdb_allrecord_unlock(tdb, F_RDLCK, false);
526 fail_allrecord_lock:
527 tdb_transaction_unlock(tdb, F_WRLCK);
528 SAFE_FREE(tdb->transaction->blocks);
529 SAFE_FREE(tdb->transaction->hash_heads);
530 SAFE_FREE(tdb->transaction);
531 return -1;
534 _PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
536 return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
539 _PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
541 return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
545 sync to disk
547 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
549 if (tdb->flags & TDB_NOSYNC) {
550 return 0;
553 #ifdef HAVE_FDATASYNC
554 if (fdatasync(tdb->fd) != 0) {
555 #else
556 if (fsync(tdb->fd) != 0) {
557 #endif
558 tdb->ecode = TDB_ERR_IO;
559 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
560 return -1;
562 #ifdef HAVE_MMAP
563 if (tdb->map_ptr) {
564 tdb_off_t moffset = offset & ~(tdb->page_size-1);
565 if (msync(moffset + (char *)tdb->map_ptr,
566 length + (offset - moffset), MS_SYNC) != 0) {
567 tdb->ecode = TDB_ERR_IO;
568 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
569 strerror(errno)));
570 return -1;
573 #endif
574 return 0;
578 static int _tdb_transaction_cancel(struct tdb_context *tdb)
580 int i, ret = 0;
582 if (tdb->transaction == NULL) {
583 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
584 return -1;
587 if (tdb->transaction->nesting != 0) {
588 tdb->transaction->transaction_error = 1;
589 tdb->transaction->nesting--;
590 return 0;
593 tdb->map_size = tdb->transaction->old_map_size;
595 /* free all the transaction blocks */
596 for (i=0;i<tdb->transaction->num_blocks;i++) {
597 if (tdb->transaction->blocks[i] != NULL) {
598 free(tdb->transaction->blocks[i]);
601 SAFE_FREE(tdb->transaction->blocks);
603 if (tdb->transaction->magic_offset) {
604 const struct tdb_methods *methods = tdb->transaction->io_methods;
605 const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
607 /* remove the recovery marker */
608 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
609 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
610 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
611 ret = -1;
615 /* This also removes the OPEN_LOCK, if we have it. */
616 tdb_release_transaction_locks(tdb);
618 /* restore the normal io methods */
619 tdb->methods = tdb->transaction->io_methods;
621 SAFE_FREE(tdb->transaction->hash_heads);
622 SAFE_FREE(tdb->transaction);
624 return ret;
628 cancel the current transaction
630 _PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
632 tdb_trace(tdb, "tdb_transaction_cancel");
633 return _tdb_transaction_cancel(tdb);
637 work out how much space the linearised recovery data will consume
639 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
641 tdb_len_t recovery_size = 0;
642 int i;
644 recovery_size = sizeof(uint32_t);
645 for (i=0;i<tdb->transaction->num_blocks;i++) {
646 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
647 break;
649 if (tdb->transaction->blocks[i] == NULL) {
650 continue;
652 recovery_size += 2*sizeof(tdb_off_t);
653 if (i == tdb->transaction->num_blocks-1) {
654 recovery_size += tdb->transaction->last_block_size;
655 } else {
656 recovery_size += tdb->transaction->block_size;
660 return recovery_size;
663 int tdb_recovery_area(struct tdb_context *tdb,
664 const struct tdb_methods *methods,
665 tdb_off_t *recovery_offset,
666 struct tdb_record *rec)
668 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
669 return -1;
672 if (*recovery_offset == 0) {
673 rec->rec_len = 0;
674 return 0;
677 if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
678 DOCONV()) == -1) {
679 return -1;
682 /* ignore invalid recovery regions: can happen in crash */
683 if (rec->magic != TDB_RECOVERY_MAGIC &&
684 rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
685 *recovery_offset = 0;
686 rec->rec_len = 0;
688 return 0;
692 allocate the recovery area, or use an existing recovery area if it is
693 large enough
695 static int tdb_recovery_allocate(struct tdb_context *tdb,
696 tdb_len_t *recovery_size,
697 tdb_off_t *recovery_offset,
698 tdb_len_t *recovery_max_size)
700 struct tdb_record rec;
701 const struct tdb_methods *methods = tdb->transaction->io_methods;
702 tdb_off_t recovery_head, new_end;
704 if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
705 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
706 return -1;
709 *recovery_size = tdb_recovery_size(tdb);
711 /* Existing recovery area? */
712 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
713 /* it fits in the existing area */
714 *recovery_max_size = rec.rec_len;
715 *recovery_offset = recovery_head;
716 return 0;
719 /* If recovery area in middle of file, we need a new one. */
720 if (recovery_head == 0
721 || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
722 /* we need to free up the old recovery area, then allocate a
723 new one at the end of the file. Note that we cannot use
724 tdb_allocate() to allocate the new one as that might return
725 us an area that is being currently used (as of the start of
726 the transaction) */
727 if (recovery_head) {
728 if (tdb_free(tdb, recovery_head, &rec) == -1) {
729 TDB_LOG((tdb, TDB_DEBUG_FATAL,
730 "tdb_recovery_allocate: failed to"
731 " free previous recovery area\n"));
732 return -1;
735 /* the tdb_free() call might have increased
736 * the recovery size */
737 *recovery_size = tdb_recovery_size(tdb);
740 /* New head will be at end of file. */
741 recovery_head = tdb->map_size;
744 /* Now we know where it will be. */
745 *recovery_offset = recovery_head;
747 /* Expand by more than we need, so we don't do it often. */
748 *recovery_max_size = tdb_expand_adjust(tdb->map_size,
749 *recovery_size,
750 tdb->page_size)
751 - sizeof(rec);
753 new_end = recovery_head + sizeof(rec) + *recovery_max_size;
755 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
756 new_end - tdb->transaction->old_map_size)
757 == -1) {
758 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
759 return -1;
762 /* remap the file (if using mmap) */
763 methods->tdb_oob(tdb, tdb->map_size, 1, 1);
765 /* we have to reset the old map size so that we don't try to expand the file
766 again in the transaction commit, which would destroy the recovery area */
767 tdb->transaction->old_map_size = tdb->map_size;
769 /* write the recovery header offset and sync - we can sync without a race here
770 as the magic ptr in the recovery record has not been set */
771 CONVERT(recovery_head);
772 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
773 &recovery_head, sizeof(tdb_off_t)) == -1) {
774 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
775 return -1;
777 if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
778 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
779 return -1;
782 return 0;
787 setup the recovery data that will be used on a crash during commit
789 static int transaction_setup_recovery(struct tdb_context *tdb,
790 tdb_off_t *magic_offset)
792 tdb_len_t recovery_size;
793 unsigned char *data, *p;
794 const struct tdb_methods *methods = tdb->transaction->io_methods;
795 struct tdb_record *rec;
796 tdb_off_t recovery_offset, recovery_max_size;
797 tdb_off_t old_map_size = tdb->transaction->old_map_size;
798 uint32_t magic, tailer;
799 int i;
802 check that the recovery area has enough space
804 if (tdb_recovery_allocate(tdb, &recovery_size,
805 &recovery_offset, &recovery_max_size) == -1) {
806 return -1;
809 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
810 if (data == NULL) {
811 tdb->ecode = TDB_ERR_OOM;
812 return -1;
815 rec = (struct tdb_record *)data;
816 memset(rec, 0, sizeof(*rec));
818 rec->magic = TDB_RECOVERY_INVALID_MAGIC;
819 rec->data_len = recovery_size;
820 rec->rec_len = recovery_max_size;
821 rec->key_len = old_map_size;
822 CONVERT(*rec);
824 /* build the recovery data into a single blob to allow us to do a single
825 large write, which should be more efficient */
826 p = data + sizeof(*rec);
827 for (i=0;i<tdb->transaction->num_blocks;i++) {
828 tdb_off_t offset;
829 tdb_len_t length;
831 if (tdb->transaction->blocks[i] == NULL) {
832 continue;
835 offset = i * tdb->transaction->block_size;
836 length = tdb->transaction->block_size;
837 if (i == tdb->transaction->num_blocks-1) {
838 length = tdb->transaction->last_block_size;
841 if (offset >= old_map_size) {
842 continue;
844 if (offset + length > tdb->transaction->old_map_size) {
845 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
846 free(data);
847 tdb->ecode = TDB_ERR_CORRUPT;
848 return -1;
850 memcpy(p, &offset, 4);
851 memcpy(p+4, &length, 4);
852 if (DOCONV()) {
853 tdb_convert(p, 8);
855 /* the recovery area contains the old data, not the
856 new data, so we have to call the original tdb_read
857 method to get it */
858 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
859 free(data);
860 tdb->ecode = TDB_ERR_IO;
861 return -1;
863 p += 8 + length;
866 /* and the tailer */
867 tailer = sizeof(*rec) + recovery_max_size;
868 memcpy(p, &tailer, 4);
869 if (DOCONV()) {
870 tdb_convert(p, 4);
873 /* write the recovery data to the recovery area */
874 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
875 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
876 free(data);
877 tdb->ecode = TDB_ERR_IO;
878 return -1;
880 if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
881 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
882 free(data);
883 tdb->ecode = TDB_ERR_IO;
884 return -1;
887 /* as we don't have ordered writes, we have to sync the recovery
888 data before we update the magic to indicate that the recovery
889 data is present */
890 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
891 free(data);
892 return -1;
895 free(data);
897 magic = TDB_RECOVERY_MAGIC;
898 CONVERT(magic);
900 *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
902 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
903 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
904 tdb->ecode = TDB_ERR_IO;
905 return -1;
907 if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
908 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
909 tdb->ecode = TDB_ERR_IO;
910 return -1;
913 /* ensure the recovery magic marker is on disk */
914 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
915 return -1;
918 return 0;
921 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
923 const struct tdb_methods *methods;
925 if (tdb->transaction == NULL) {
926 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
927 return -1;
930 if (tdb->transaction->prepared) {
931 tdb->ecode = TDB_ERR_EINVAL;
932 _tdb_transaction_cancel(tdb);
933 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
934 return -1;
937 if (tdb->transaction->transaction_error) {
938 tdb->ecode = TDB_ERR_IO;
939 _tdb_transaction_cancel(tdb);
940 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
941 return -1;
945 if (tdb->transaction->nesting != 0) {
946 return 0;
949 /* check for a null transaction */
950 if (tdb->transaction->blocks == NULL) {
951 return 0;
954 methods = tdb->transaction->io_methods;
956 /* if there are any locks pending then the caller has not
957 nested their locks properly, so fail the transaction */
958 if (tdb_have_extra_locks(tdb)) {
959 tdb->ecode = TDB_ERR_LOCK;
960 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
961 _tdb_transaction_cancel(tdb);
962 return -1;
965 /* upgrade the main transaction lock region to a write lock */
966 if (tdb_allrecord_upgrade(tdb) == -1) {
967 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
968 _tdb_transaction_cancel(tdb);
969 return -1;
972 /* get the open lock - this prevents new users attaching to the database
973 during the commit */
974 if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
975 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
976 _tdb_transaction_cancel(tdb);
977 return -1;
980 /* write the recovery data to the end of the file */
981 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
982 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
983 _tdb_transaction_cancel(tdb);
984 return -1;
987 tdb->transaction->prepared = true;
989 /* expand the file to the new size if needed */
990 if (tdb->map_size != tdb->transaction->old_map_size) {
991 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
992 tdb->map_size -
993 tdb->transaction->old_map_size) == -1) {
994 tdb->ecode = TDB_ERR_IO;
995 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
996 _tdb_transaction_cancel(tdb);
997 return -1;
999 tdb->map_size = tdb->transaction->old_map_size;
1000 methods->tdb_oob(tdb, tdb->map_size, 1, 1);
1003 /* Keep the open lock until the actual commit */
1005 return 0;
1009 prepare to commit the current transaction
1011 _PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
1013 tdb_trace(tdb, "tdb_transaction_prepare_commit");
1014 return _tdb_transaction_prepare_commit(tdb);
1017 /* A repack is worthwhile if the largest is less than half total free. */
1018 static bool repack_worthwhile(struct tdb_context *tdb)
1020 tdb_off_t ptr;
1021 struct tdb_record rec;
1022 tdb_len_t total = 0, largest = 0;
1024 if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
1025 return false;
1028 while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
1029 total += rec.rec_len;
1030 if (rec.rec_len > largest) {
1031 largest = rec.rec_len;
1033 ptr = rec.next;
1036 return total > largest * 2;
1040 commit the current transaction
1042 _PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
1044 const struct tdb_methods *methods;
1045 int i;
1046 bool need_repack = false;
1048 if (tdb->transaction == NULL) {
1049 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1050 return -1;
1053 tdb_trace(tdb, "tdb_transaction_commit");
1055 if (tdb->transaction->transaction_error) {
1056 tdb->ecode = TDB_ERR_IO;
1057 _tdb_transaction_cancel(tdb);
1058 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1059 return -1;
1063 if (tdb->transaction->nesting != 0) {
1064 tdb->transaction->nesting--;
1065 return 0;
1068 /* check for a null transaction */
1069 if (tdb->transaction->blocks == NULL) {
1070 _tdb_transaction_cancel(tdb);
1071 return 0;
1074 if (!tdb->transaction->prepared) {
1075 int ret = _tdb_transaction_prepare_commit(tdb);
1076 if (ret)
1077 return ret;
1080 methods = tdb->transaction->io_methods;
1082 /* perform all the writes */
1083 for (i=0;i<tdb->transaction->num_blocks;i++) {
1084 tdb_off_t offset;
1085 tdb_len_t length;
1087 if (tdb->transaction->blocks[i] == NULL) {
1088 continue;
1091 offset = i * tdb->transaction->block_size;
1092 length = tdb->transaction->block_size;
1093 if (i == tdb->transaction->num_blocks-1) {
1094 length = tdb->transaction->last_block_size;
1097 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1098 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1100 /* we've overwritten part of the data and
1101 possibly expanded the file, so we need to
1102 run the crash recovery code */
1103 tdb->methods = methods;
1104 tdb_transaction_recover(tdb);
1106 _tdb_transaction_cancel(tdb);
1108 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1109 return -1;
1111 SAFE_FREE(tdb->transaction->blocks[i]);
1114 /* Do this before we drop lock or blocks. */
1115 if (tdb->transaction->expanded) {
1116 need_repack = repack_worthwhile(tdb);
1119 SAFE_FREE(tdb->transaction->blocks);
1120 tdb->transaction->num_blocks = 0;
1122 /* ensure the new data is on disk */
1123 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1124 return -1;
1128 TODO: maybe write to some dummy hdr field, or write to magic
1129 offset without mmap, before the last sync, instead of the
1130 utime() call
1133 /* on some systems (like Linux 2.6.x) changes via mmap/msync
1134 don't change the mtime of the file, this means the file may
1135 not be backed up (as tdb rounding to block sizes means that
1136 file size changes are quite rare too). The following forces
1137 mtime changes when a transaction completes */
1138 #ifdef HAVE_UTIME
1139 utime(tdb->name, NULL);
1140 #endif
1142 /* use a transaction cancel to free memory and remove the
1143 transaction locks */
1144 _tdb_transaction_cancel(tdb);
1146 if (need_repack) {
1147 return tdb_repack(tdb);
1150 return 0;
1155 recover from an aborted transaction. Must be called with exclusive
1156 database write access already established (including the open
1157 lock to prevent new processes attaching)
1159 int tdb_transaction_recover(struct tdb_context *tdb)
1161 tdb_off_t recovery_head, recovery_eof;
1162 unsigned char *data, *p;
1163 uint32_t zero = 0;
1164 struct tdb_record rec;
1166 /* find the recovery area */
1167 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1168 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1169 tdb->ecode = TDB_ERR_IO;
1170 return -1;
1173 if (recovery_head == 0) {
1174 /* we have never allocated a recovery record */
1175 return 0;
1178 /* read the recovery record */
1179 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1180 sizeof(rec), DOCONV()) == -1) {
1181 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1182 tdb->ecode = TDB_ERR_IO;
1183 return -1;
1186 if (rec.magic != TDB_RECOVERY_MAGIC) {
1187 /* there is no valid recovery data */
1188 return 0;
1191 if (tdb->read_only) {
1192 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1193 tdb->ecode = TDB_ERR_CORRUPT;
1194 return -1;
1197 recovery_eof = rec.key_len;
1199 data = (unsigned char *)malloc(rec.data_len);
1200 if (data == NULL) {
1201 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1202 tdb->ecode = TDB_ERR_OOM;
1203 return -1;
1206 /* read the full recovery data */
1207 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1208 rec.data_len, 0) == -1) {
1209 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1210 tdb->ecode = TDB_ERR_IO;
1211 return -1;
1214 /* recover the file data */
1215 p = data;
1216 while (p+8 < data + rec.data_len) {
1217 uint32_t ofs, len;
1218 if (DOCONV()) {
1219 tdb_convert(p, 8);
1221 memcpy(&ofs, p, 4);
1222 memcpy(&len, p+4, 4);
1224 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1225 free(data);
1226 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1227 tdb->ecode = TDB_ERR_IO;
1228 return -1;
1230 p += 8 + len;
1233 free(data);
1235 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1236 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1237 tdb->ecode = TDB_ERR_IO;
1238 return -1;
1241 /* if the recovery area is after the recovered eof then remove it */
1242 if (recovery_eof <= recovery_head) {
1243 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1244 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1245 tdb->ecode = TDB_ERR_IO;
1246 return -1;
1250 /* remove the recovery magic */
1251 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1252 &zero) == -1) {
1253 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1254 tdb->ecode = TDB_ERR_IO;
1255 return -1;
1258 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1259 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1260 tdb->ecode = TDB_ERR_IO;
1261 return -1;
1264 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
1265 recovery_eof));
1267 /* all done */
1268 return 0;
1271 /* Any I/O failures we say "needs recovery". */
1272 bool tdb_needs_recovery(struct tdb_context *tdb)
1274 tdb_off_t recovery_head;
1275 struct tdb_record rec;
1277 /* find the recovery area */
1278 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1279 return true;
1282 if (recovery_head == 0) {
1283 /* we have never allocated a recovery record */
1284 return false;
1287 /* read the recovery record */
1288 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1289 sizeof(rec), DOCONV()) == -1) {
1290 return true;
1293 return (rec.magic == TDB_RECOVERY_MAGIC);