Port from ctdb:
[Samba/vl.git] / source3 / lib / tdb / common / transaction.c
blobea0e3a93f3d46c2ccd835b95a3313afb3911f987
1 /*
2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
10 ** under the LGPL
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 #include "tdb_private.h"
29 transaction design:
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb_write() calls. The hooked
48 transaction versions of tdb_read() and tdb_write() check this
49 linked list and try to use the elements of the list in preference
50 to the real database.
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
59 - allow for nested calls to tdb_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb_write
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
78 - check for a valid recovery record on open of the tdb, while the
79 global lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
82 intervention.
84 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85 still available, but no transaction recovery area is used and no
86 fsync/msync calls are made.
92 hold the context of any current transaction
94 struct tdb_transaction {
95 /* we keep a mirrored copy of the tdb hash heads here so
96 tdb_next_hash_chain() can operate efficiently */
97 uint32_t *hash_heads;
99 /* the original io methods - used to do IOs to the real db */
100 const struct tdb_methods *io_methods;
102 /* the list of transaction blocks. When a block is first
103 written to, it gets created in this list */
104 uint8_t **blocks;
105 uint32_t num_blocks;
106 uint32_t block_size; /* bytes in each block */
107 uint32_t last_block_size; /* number of valid bytes in the last block */
109 /* non-zero when an internal transaction error has
110 occurred. All write operations will then fail until the
111 transaction is ended */
112 int transaction_error;
114 /* when inside a transaction we need to keep track of any
115 nested tdb_transaction_start() calls, as these are allowed,
116 but don't create a new transaction */
117 int nesting;
119 /* old file size before transaction */
120 tdb_len_t old_map_size;
125 read while in a transaction. We need to check first if the data is in our list
126 of transaction elements, then if not do a real read
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129 tdb_len_t len, int cv)
131 uint32_t blk;
133 /* break it down into block sized ops */
134 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
135 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
136 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
137 return -1;
139 len -= len2;
140 off += len2;
141 buf = (void *)(len2 + (char *)buf);
144 if (len == 0) {
145 return 0;
148 blk = off / tdb->transaction->block_size;
150 /* see if we have it in the block list */
151 if (tdb->transaction->num_blocks <= blk ||
152 tdb->transaction->blocks[blk] == NULL) {
153 /* nope, do a real read */
154 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
155 goto fail;
157 return 0;
160 /* it is in the block list. Now check for the last block */
161 if (blk == tdb->transaction->num_blocks-1) {
162 if (len > tdb->transaction->last_block_size) {
163 goto fail;
167 /* now copy it out of this block */
168 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
169 if (cv) {
170 tdb_convert(buf, len);
172 return 0;
174 fail:
175 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
176 tdb->ecode = TDB_ERR_IO;
177 tdb->transaction->transaction_error = 1;
178 return -1;
183 write while in a transaction
185 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
186 const void *buf, tdb_len_t len)
188 uint32_t blk;
190 /* if the write is to a hash head, then update the transaction
191 hash heads */
192 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
193 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
194 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
195 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
198 /* break it up into block sized chunks */
199 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
200 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
201 if (transaction_write(tdb, off, buf, len2) != 0) {
202 return -1;
204 len -= len2;
205 off += len2;
206 if (buf != NULL) {
207 buf = (const void *)(len2 + (const char *)buf);
211 if (len == 0) {
212 return 0;
215 blk = off / tdb->transaction->block_size;
216 off = off % tdb->transaction->block_size;
218 if (tdb->transaction->num_blocks <= blk) {
219 uint8_t **new_blocks;
220 /* expand the blocks array */
221 if (tdb->transaction->blocks == NULL) {
222 new_blocks = malloc((blk+1)*sizeof(uint8_t *));
223 } else {
224 new_blocks = realloc(tdb->transaction->blocks, (blk+1)*sizeof(uint8_t *));
226 if (new_blocks == NULL) {
227 tdb->ecode = TDB_ERR_OOM;
228 goto fail;
230 memset(&new_blocks[tdb->transaction->num_blocks], 0,
231 (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
232 tdb->transaction->blocks = new_blocks;
233 tdb->transaction->num_blocks = blk+1;
234 tdb->transaction->last_block_size = 0;
237 /* allocate and fill a block? */
238 if (tdb->transaction->blocks[blk] == NULL) {
239 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
240 if (tdb->transaction->blocks[blk] == NULL) {
241 tdb->ecode = TDB_ERR_OOM;
242 tdb->transaction->transaction_error = 1;
243 return -1;
245 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
246 tdb_len_t len2 = tdb->transaction->block_size;
247 if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
248 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
250 if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
251 tdb->transaction->blocks[blk],
252 len2, 0) != 0) {
253 SAFE_FREE(tdb->transaction->blocks[blk]);
254 tdb->ecode = TDB_ERR_IO;
255 goto fail;
257 if (blk == tdb->transaction->num_blocks-1) {
258 tdb->transaction->last_block_size = len2;
263 /* overwrite part of an existing block */
264 if (buf == NULL) {
265 memset(tdb->transaction->blocks[blk] + off, 0, len);
266 } else {
267 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
269 if (blk == tdb->transaction->num_blocks-1) {
270 if (len + off > tdb->transaction->last_block_size) {
271 tdb->transaction->last_block_size = len + off;
275 return 0;
277 fail:
278 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
279 (blk*tdb->transaction->block_size) + off, len));
280 tdb->transaction->transaction_error = 1;
281 return -1;
286 write while in a transaction - this varient never expands the transaction blocks, it only
287 updates existing blocks. This means it cannot change the recovery size
289 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
290 const void *buf, tdb_len_t len)
292 uint32_t blk;
294 /* break it up into block sized chunks */
295 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
296 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
297 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
298 return -1;
300 len -= len2;
301 off += len2;
302 if (buf != NULL) {
303 buf = (const void *)(len2 + (const char *)buf);
307 if (len == 0) {
308 return 0;
311 blk = off / tdb->transaction->block_size;
312 off = off % tdb->transaction->block_size;
314 if (tdb->transaction->num_blocks <= blk ||
315 tdb->transaction->blocks[blk] == NULL) {
316 return 0;
319 if (blk == tdb->transaction->num_blocks-1 &&
320 off + len > tdb->transaction->last_block_size) {
321 len = tdb->transaction->last_block_size - off;
324 /* overwrite part of an existing block */
325 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
327 return 0;
332 accelerated hash chain head search, using the cached hash heads
334 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
336 uint32_t h = *chain;
337 for (;h < tdb->header.hash_size;h++) {
338 /* the +1 takes account of the freelist */
339 if (0 != tdb->transaction->hash_heads[h+1]) {
340 break;
343 (*chain) = h;
347 out of bounds check during a transaction
349 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
351 if (len <= tdb->map_size) {
352 return 0;
354 return TDB_ERRCODE(TDB_ERR_IO, -1);
358 transaction version of tdb_expand().
360 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
361 tdb_off_t addition)
363 /* add a write to the transaction elements, so subsequent
364 reads see the zero data */
365 if (transaction_write(tdb, size, NULL, addition) != 0) {
366 return -1;
369 return 0;
373 brlock during a transaction - ignore them
375 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
376 int rw_type, int lck_type, int probe, size_t len)
378 return 0;
381 static const struct tdb_methods transaction_methods = {
382 transaction_read,
383 transaction_write,
384 transaction_next_hash_chain,
385 transaction_oob,
386 transaction_expand_file,
387 transaction_brlock
392 start a tdb transaction. No token is returned, as only a single
393 transaction is allowed to be pending per tdb_context
395 int tdb_transaction_start(struct tdb_context *tdb)
397 /* some sanity checks */
398 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
399 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
400 tdb->ecode = TDB_ERR_EINVAL;
401 return -1;
404 /* cope with nested tdb_transaction_start() calls */
405 if (tdb->transaction != NULL) {
406 tdb->transaction->nesting++;
407 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
408 tdb->transaction->nesting));
409 return 0;
412 if (tdb->num_locks != 0 || tdb->global_lock.count) {
413 /* the caller must not have any locks when starting a
414 transaction as otherwise we'll be screwed by lack
415 of nested locks in posix */
416 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
417 tdb->ecode = TDB_ERR_LOCK;
418 return -1;
421 if (tdb->travlocks.next != NULL) {
422 /* you cannot use transactions inside a traverse (although you can use
423 traverse inside a transaction) as otherwise you can end up with
424 deadlock */
425 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
426 tdb->ecode = TDB_ERR_LOCK;
427 return -1;
430 tdb->transaction = (struct tdb_transaction *)
431 calloc(sizeof(struct tdb_transaction), 1);
432 if (tdb->transaction == NULL) {
433 tdb->ecode = TDB_ERR_OOM;
434 return -1;
437 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
438 tdb->transaction->block_size = tdb->page_size;
440 /* get the transaction write lock. This is a blocking lock. As
441 discussed with Volker, there are a number of ways we could
442 make this async, which we will probably do in the future */
443 if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
444 SAFE_FREE(tdb->transaction->blocks);
445 SAFE_FREE(tdb->transaction);
446 return -1;
449 /* get a read lock from the freelist to the end of file. This
450 is upgraded to a write lock during the commit */
451 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
452 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
453 tdb->ecode = TDB_ERR_LOCK;
454 goto fail;
457 /* setup a copy of the hash table heads so the hash scan in
458 traverse can be fast */
459 tdb->transaction->hash_heads = (uint32_t *)
460 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
461 if (tdb->transaction->hash_heads == NULL) {
462 tdb->ecode = TDB_ERR_OOM;
463 goto fail;
465 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
466 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
467 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
468 tdb->ecode = TDB_ERR_IO;
469 goto fail;
472 /* make sure we know about any file expansions already done by
473 anyone else */
474 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
475 tdb->transaction->old_map_size = tdb->map_size;
477 /* finally hook the io methods, replacing them with
478 transaction specific methods */
479 tdb->transaction->io_methods = tdb->methods;
480 tdb->methods = &transaction_methods;
482 return 0;
484 fail:
485 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
486 tdb_transaction_unlock(tdb);
487 SAFE_FREE(tdb->transaction->blocks);
488 SAFE_FREE(tdb->transaction->hash_heads);
489 SAFE_FREE(tdb->transaction);
490 return -1;
495 cancel the current transaction
497 int tdb_transaction_cancel(struct tdb_context *tdb)
499 int i;
501 if (tdb->transaction == NULL) {
502 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
503 return -1;
506 if (tdb->transaction->nesting != 0) {
507 tdb->transaction->transaction_error = 1;
508 tdb->transaction->nesting--;
509 return 0;
512 tdb->map_size = tdb->transaction->old_map_size;
514 /* free all the transaction blocks */
515 for (i=0;i<tdb->transaction->num_blocks;i++) {
516 if (tdb->transaction->blocks[i] != NULL) {
517 free(tdb->transaction->blocks[i]);
520 SAFE_FREE(tdb->transaction->blocks);
522 /* remove any global lock created during the transaction */
523 if (tdb->global_lock.count != 0) {
524 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
525 tdb->global_lock.count = 0;
528 /* remove any locks created during the transaction */
529 if (tdb->num_locks != 0) {
530 for (i=0;i<tdb->num_lockrecs;i++) {
531 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
532 F_UNLCK,F_SETLKW, 0, 1);
534 tdb->num_locks = 0;
535 tdb->num_lockrecs = 0;
536 SAFE_FREE(tdb->lockrecs);
539 /* restore the normal io methods */
540 tdb->methods = tdb->transaction->io_methods;
542 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
543 tdb_transaction_unlock(tdb);
544 SAFE_FREE(tdb->transaction->hash_heads);
545 SAFE_FREE(tdb->transaction);
547 return 0;
551 sync to disk
553 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
555 if (fsync(tdb->fd) != 0) {
556 tdb->ecode = TDB_ERR_IO;
557 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
558 return -1;
560 #ifdef MS_SYNC
561 if (tdb->map_ptr) {
562 tdb_off_t moffset = offset & ~(tdb->page_size-1);
563 if (msync(moffset + (char *)tdb->map_ptr,
564 length + (offset - moffset), MS_SYNC) != 0) {
565 tdb->ecode = TDB_ERR_IO;
566 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
567 strerror(errno)));
568 return -1;
571 #endif
572 return 0;
577 work out how much space the linearised recovery data will consume
579 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
581 tdb_len_t recovery_size = 0;
582 int i;
584 recovery_size = sizeof(uint32_t);
585 for (i=0;i<tdb->transaction->num_blocks;i++) {
586 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
587 break;
589 if (tdb->transaction->blocks[i] == NULL) {
590 continue;
592 recovery_size += 2*sizeof(tdb_off_t);
593 if (i == tdb->transaction->num_blocks-1) {
594 recovery_size += tdb->transaction->last_block_size;
595 } else {
596 recovery_size += tdb->transaction->block_size;
600 return recovery_size;
604 allocate the recovery area, or use an existing recovery area if it is
605 large enough
607 static int tdb_recovery_allocate(struct tdb_context *tdb,
608 tdb_len_t *recovery_size,
609 tdb_off_t *recovery_offset,
610 tdb_len_t *recovery_max_size)
612 struct list_struct rec;
613 const struct tdb_methods *methods = tdb->transaction->io_methods;
614 tdb_off_t recovery_head;
616 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
617 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
618 return -1;
621 rec.rec_len = 0;
623 if (recovery_head != 0 &&
624 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
625 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
626 return -1;
629 *recovery_size = tdb_recovery_size(tdb);
631 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
632 /* it fits in the existing area */
633 *recovery_max_size = rec.rec_len;
634 *recovery_offset = recovery_head;
635 return 0;
638 /* we need to free up the old recovery area, then allocate a
639 new one at the end of the file. Note that we cannot use
640 tdb_allocate() to allocate the new one as that might return
641 us an area that is being currently used (as of the start of
642 the transaction) */
643 if (recovery_head != 0) {
644 if (tdb_free(tdb, recovery_head, &rec) == -1) {
645 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
646 return -1;
650 /* the tdb_free() call might have increased the recovery size */
651 *recovery_size = tdb_recovery_size(tdb);
653 /* round up to a multiple of page size */
654 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
655 *recovery_offset = tdb->map_size;
656 recovery_head = *recovery_offset;
658 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
659 (tdb->map_size - tdb->transaction->old_map_size) +
660 sizeof(rec) + *recovery_max_size) == -1) {
661 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
662 return -1;
665 /* remap the file (if using mmap) */
666 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
668 /* we have to reset the old map size so that we don't try to expand the file
669 again in the transaction commit, which would destroy the recovery area */
670 tdb->transaction->old_map_size = tdb->map_size;
672 /* write the recovery header offset and sync - we can sync without a race here
673 as the magic ptr in the recovery record has not been set */
674 CONVERT(recovery_head);
675 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
676 &recovery_head, sizeof(tdb_off_t)) == -1) {
677 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
678 return -1;
680 if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
681 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
682 return -1;
685 return 0;
690 setup the recovery data that will be used on a crash during commit
692 static int transaction_setup_recovery(struct tdb_context *tdb,
693 tdb_off_t *magic_offset)
695 tdb_len_t recovery_size;
696 unsigned char *data, *p;
697 const struct tdb_methods *methods = tdb->transaction->io_methods;
698 struct list_struct *rec;
699 tdb_off_t recovery_offset, recovery_max_size;
700 tdb_off_t old_map_size = tdb->transaction->old_map_size;
701 uint32_t magic, tailer;
702 int i;
705 check that the recovery area has enough space
707 if (tdb_recovery_allocate(tdb, &recovery_size,
708 &recovery_offset, &recovery_max_size) == -1) {
709 return -1;
712 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
713 if (data == NULL) {
714 tdb->ecode = TDB_ERR_OOM;
715 return -1;
718 rec = (struct list_struct *)data;
719 memset(rec, 0, sizeof(*rec));
721 rec->magic = 0;
722 rec->data_len = recovery_size;
723 rec->rec_len = recovery_max_size;
724 rec->key_len = old_map_size;
725 CONVERT(rec);
727 /* build the recovery data into a single blob to allow us to do a single
728 large write, which should be more efficient */
729 p = data + sizeof(*rec);
730 for (i=0;i<tdb->transaction->num_blocks;i++) {
731 tdb_off_t offset;
732 tdb_len_t length;
734 if (tdb->transaction->blocks[i] == NULL) {
735 continue;
738 offset = i * tdb->transaction->block_size;
739 length = tdb->transaction->block_size;
740 if (i == tdb->transaction->num_blocks-1) {
741 length = tdb->transaction->last_block_size;
744 if (offset >= old_map_size) {
745 continue;
747 if (offset + length > tdb->transaction->old_map_size) {
748 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
749 free(data);
750 tdb->ecode = TDB_ERR_CORRUPT;
751 return -1;
753 memcpy(p, &offset, 4);
754 memcpy(p+4, &length, 4);
755 if (DOCONV()) {
756 tdb_convert(p, 8);
758 /* the recovery area contains the old data, not the
759 new data, so we have to call the original tdb_read
760 method to get it */
761 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
762 free(data);
763 tdb->ecode = TDB_ERR_IO;
764 return -1;
766 p += 8 + length;
769 /* and the tailer */
770 tailer = sizeof(*rec) + recovery_max_size;
771 memcpy(p, &tailer, 4);
772 CONVERT(p);
774 /* write the recovery data to the recovery area */
775 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
776 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
777 free(data);
778 tdb->ecode = TDB_ERR_IO;
779 return -1;
781 if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
782 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
783 free(data);
784 tdb->ecode = TDB_ERR_IO;
785 return -1;
788 /* as we don't have ordered writes, we have to sync the recovery
789 data before we update the magic to indicate that the recovery
790 data is present */
791 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
792 free(data);
793 return -1;
796 free(data);
798 magic = TDB_RECOVERY_MAGIC;
799 CONVERT(magic);
801 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
803 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
804 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
805 tdb->ecode = TDB_ERR_IO;
806 return -1;
808 if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
809 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
810 tdb->ecode = TDB_ERR_IO;
811 return -1;
814 /* ensure the recovery magic marker is on disk */
815 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
816 return -1;
819 return 0;
823 commit the current transaction
825 int tdb_transaction_commit(struct tdb_context *tdb)
827 const struct tdb_methods *methods;
828 tdb_off_t magic_offset = 0;
829 uint32_t zero = 0;
830 int i;
832 if (tdb->transaction == NULL) {
833 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
834 return -1;
837 if (tdb->transaction->transaction_error) {
838 tdb->ecode = TDB_ERR_IO;
839 tdb_transaction_cancel(tdb);
840 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
841 return -1;
845 if (tdb->transaction->nesting != 0) {
846 tdb->transaction->nesting--;
847 return 0;
850 /* check for a null transaction */
851 if (tdb->transaction->blocks == NULL) {
852 tdb_transaction_cancel(tdb);
853 return 0;
856 methods = tdb->transaction->io_methods;
858 /* if there are any locks pending then the caller has not
859 nested their locks properly, so fail the transaction */
860 if (tdb->num_locks || tdb->global_lock.count) {
861 tdb->ecode = TDB_ERR_LOCK;
862 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
863 tdb_transaction_cancel(tdb);
864 return -1;
867 /* upgrade the main transaction lock region to a write lock */
868 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
869 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
870 tdb->ecode = TDB_ERR_LOCK;
871 tdb_transaction_cancel(tdb);
872 return -1;
875 /* get the global lock - this prevents new users attaching to the database
876 during the commit */
877 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
878 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
879 tdb->ecode = TDB_ERR_LOCK;
880 tdb_transaction_cancel(tdb);
881 return -1;
884 if (!(tdb->flags & TDB_NOSYNC)) {
885 /* write the recovery data to the end of the file */
886 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
887 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
888 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
889 tdb_transaction_cancel(tdb);
890 return -1;
894 /* expand the file to the new size if needed */
895 if (tdb->map_size != tdb->transaction->old_map_size) {
896 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
897 tdb->map_size -
898 tdb->transaction->old_map_size) == -1) {
899 tdb->ecode = TDB_ERR_IO;
900 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
901 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
902 tdb_transaction_cancel(tdb);
903 return -1;
905 tdb->map_size = tdb->transaction->old_map_size;
906 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
909 /* perform all the writes */
910 for (i=0;i<tdb->transaction->num_blocks;i++) {
911 tdb_off_t offset;
912 tdb_len_t length;
914 if (tdb->transaction->blocks[i] == NULL) {
915 continue;
918 offset = i * tdb->transaction->block_size;
919 length = tdb->transaction->block_size;
920 if (i == tdb->transaction->num_blocks-1) {
921 length = tdb->transaction->last_block_size;
924 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
925 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
927 /* we've overwritten part of the data and
928 possibly expanded the file, so we need to
929 run the crash recovery code */
930 tdb->methods = methods;
931 tdb_transaction_recover(tdb);
933 tdb_transaction_cancel(tdb);
934 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
936 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
937 return -1;
939 SAFE_FREE(tdb->transaction->blocks[i]);
942 SAFE_FREE(tdb->transaction->blocks);
943 tdb->transaction->num_blocks = 0;
945 if (!(tdb->flags & TDB_NOSYNC)) {
946 /* ensure the new data is on disk */
947 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
948 return -1;
951 /* remove the recovery marker */
952 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
953 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
954 return -1;
957 /* ensure the recovery marker has been removed on disk */
958 if (transaction_sync(tdb, magic_offset, 4) == -1) {
959 return -1;
963 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
966 TODO: maybe write to some dummy hdr field, or write to magic
967 offset without mmap, before the last sync, instead of the
968 utime() call
971 /* on some systems (like Linux 2.6.x) changes via mmap/msync
972 don't change the mtime of the file, this means the file may
973 not be backed up (as tdb rounding to block sizes means that
974 file size changes are quite rare too). The following forces
975 mtime changes when a transaction completes */
976 #ifdef HAVE_UTIME
977 utime(tdb->name, NULL);
978 #endif
980 /* use a transaction cancel to free memory and remove the
981 transaction locks */
982 tdb_transaction_cancel(tdb);
984 return 0;
989 recover from an aborted transaction. Must be called with exclusive
990 database write access already established (including the global
991 lock to prevent new processes attaching)
993 int tdb_transaction_recover(struct tdb_context *tdb)
995 tdb_off_t recovery_head, recovery_eof;
996 unsigned char *data, *p;
997 uint32_t zero = 0;
998 struct list_struct rec;
1000 /* find the recovery area */
1001 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1002 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1003 tdb->ecode = TDB_ERR_IO;
1004 return -1;
1007 if (recovery_head == 0) {
1008 /* we have never allocated a recovery record */
1009 return 0;
1012 /* read the recovery record */
1013 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1014 sizeof(rec), DOCONV()) == -1) {
1015 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1016 tdb->ecode = TDB_ERR_IO;
1017 return -1;
1020 if (rec.magic != TDB_RECOVERY_MAGIC) {
1021 /* there is no valid recovery data */
1022 return 0;
1025 if (tdb->read_only) {
1026 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1027 tdb->ecode = TDB_ERR_CORRUPT;
1028 return -1;
1031 recovery_eof = rec.key_len;
1033 data = (unsigned char *)malloc(rec.data_len);
1034 if (data == NULL) {
1035 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1036 tdb->ecode = TDB_ERR_OOM;
1037 return -1;
1040 /* read the full recovery data */
1041 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1042 rec.data_len, 0) == -1) {
1043 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1044 tdb->ecode = TDB_ERR_IO;
1045 return -1;
1048 /* recover the file data */
1049 p = data;
1050 while (p+8 < data + rec.data_len) {
1051 uint32_t ofs, len;
1052 if (DOCONV()) {
1053 tdb_convert(p, 8);
1055 memcpy(&ofs, p, 4);
1056 memcpy(&len, p+4, 4);
1058 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1059 free(data);
1060 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1061 tdb->ecode = TDB_ERR_IO;
1062 return -1;
1064 p += 8 + len;
1067 free(data);
1069 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1070 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1071 tdb->ecode = TDB_ERR_IO;
1072 return -1;
1075 /* if the recovery area is after the recovered eof then remove it */
1076 if (recovery_eof <= recovery_head) {
1077 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1078 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1079 tdb->ecode = TDB_ERR_IO;
1080 return -1;
1084 /* remove the recovery magic */
1085 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1086 &zero) == -1) {
1087 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1088 tdb->ecode = TDB_ERR_IO;
1089 return -1;
1092 /* reduce the file size to the old size */
1093 tdb_munmap(tdb);
1094 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1095 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1096 tdb->ecode = TDB_ERR_IO;
1097 return -1;
1099 tdb->map_size = recovery_eof;
1100 tdb_mmap(tdb);
1102 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1103 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1104 tdb->ecode = TDB_ERR_IO;
1105 return -1;
1108 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
1109 recovery_eof));
1111 /* all done */
1112 return 0;