r25068: Older samba3 DCs will return DCERPC_FAULT_OP_RNG_ERROR for every opcode on the
[Samba.git] / source / lib / tdb / common / transaction.c
blob9530b8b24428aa2e97a9d45ab1a4313741ff458e
1 /*
2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
10 ** under the LGPL
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 2 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, write to the Free Software
24 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #include "tdb_private.h"
30 transaction design:
32 - only allow a single transaction at a time per database. This makes
33 using the transaction API simpler, as otherwise the caller would
34 have to cope with temporary failures in transactions that conflict
35 with other current transactions
37 - keep the transaction recovery information in the same file as the
38 database, using a special 'transaction recovery' record pointed at
39 by the header. This removes the need for extra journal files as
40 used by some other databases
42 - dynamically allocated the transaction recover record, re-using it
43 for subsequent transactions. If a larger record is needed then
44 tdb_free() the old record to place it on the normal tdb freelist
45 before allocating the new record
47 - during transactions, keep a linked list of writes all that have
48 been performed by intercepting all tdb_write() calls. The hooked
49 transaction versions of tdb_read() and tdb_write() check this
50 linked list and try to use the elements of the list in preference
51 to the real database.
53 - don't allow any locks to be held when a transaction starts,
54 otherwise we can end up with deadlock (plus lack of lock nesting
55 in posix locks would mean the lock is lost)
57 - if the caller gains a lock during the transaction but doesn't
58 release it then fail the commit
60 - allow for nested calls to tdb_transaction_start(), re-using the
61 existing transaction record. If the inner transaction is cancelled
62 then a subsequent commit will fail
64 - keep a mirrored copy of the tdb hash chain heads to allow for the
65 fast hash heads scan on traverse, updating the mirrored copy in
66 the transaction version of tdb_write
68 - allow callers to mix transaction and non-transaction use of tdb,
69 although once a transaction is started then an exclusive lock is
70 gained until the transaction is committed or cancelled
72 - the commit stategy involves first saving away all modified data
73 into a linearised buffer in the transaction recovery area, then
74 marking the transaction recovery area with a magic value to
75 indicate a valid recovery record. In total 4 fsync/msync calls are
76 needed per commit to prevent race conditions. It might be possible
77 to reduce this to 3 or even 2 with some more work.
79 - check for a valid recovery record on open of the tdb, while the
80 global lock is held. Automatically recover from the transaction
81 recovery area if needed, then continue with the open as
82 usual. This allows for smooth crash recovery with no administrator
83 intervention.
85 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86 still available, but no transaction recovery area is used and no
87 fsync/msync calls are made.
91 struct tdb_transaction_el {
92 struct tdb_transaction_el *next, *prev;
93 tdb_off_t offset;
94 tdb_len_t length;
95 unsigned char *data;
99 hold the context of any current transaction
101 struct tdb_transaction {
102 /* we keep a mirrored copy of the tdb hash heads here so
103 tdb_next_hash_chain() can operate efficiently */
104 uint32_t *hash_heads;
106 /* the original io methods - used to do IOs to the real db */
107 const struct tdb_methods *io_methods;
109 /* the list of transaction elements. We use a doubly linked
110 list with a last pointer to allow us to keep the list
111 ordered, with first element at the front of the list. It
112 needs to be doubly linked as the read/write traversals need
113 to be backwards, while the commit needs to be forwards */
114 struct tdb_transaction_el *elements, *elements_last;
116 /* non-zero when an internal transaction error has
117 occurred. All write operations will then fail until the
118 transaction is ended */
119 int transaction_error;
121 /* when inside a transaction we need to keep track of any
122 nested tdb_transaction_start() calls, as these are allowed,
123 but don't create a new transaction */
124 int nesting;
126 /* old file size before transaction */
127 tdb_len_t old_map_size;
132 read while in a transaction. We need to check first if the data is in our list
133 of transaction elements, then if not do a real read
135 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
136 tdb_len_t len, int cv)
138 struct tdb_transaction_el *el;
140 /* we need to walk the list backwards to get the most recent data */
141 for (el=tdb->transaction->elements_last;el;el=el->prev) {
142 tdb_len_t partial;
144 if (off+len <= el->offset) {
145 continue;
147 if (off >= el->offset + el->length) {
148 continue;
151 /* an overlapping read - needs to be split into up to
152 2 reads and a memcpy */
153 if (off < el->offset) {
154 partial = el->offset - off;
155 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
156 goto fail;
158 len -= partial;
159 off += partial;
160 buf = (void *)(partial + (char *)buf);
162 if (off + len <= el->offset + el->length) {
163 partial = len;
164 } else {
165 partial = el->offset + el->length - off;
167 memcpy(buf, el->data + (off - el->offset), partial);
168 if (cv) {
169 tdb_convert(buf, len);
171 len -= partial;
172 off += partial;
173 buf = (void *)(partial + (char *)buf);
175 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
176 goto fail;
179 return 0;
182 /* its not in the transaction elements - do a real read */
183 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
185 fail:
186 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
187 tdb->ecode = TDB_ERR_IO;
188 tdb->transaction->transaction_error = 1;
189 return -1;
194 write while in a transaction
196 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
197 const void *buf, tdb_len_t len)
199 struct tdb_transaction_el *el, *best_el=NULL;
201 if (len == 0) {
202 return 0;
205 /* if the write is to a hash head, then update the transaction
206 hash heads */
207 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
208 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
209 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
210 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
213 /* first see if we can replace an existing entry */
214 for (el=tdb->transaction->elements_last;el;el=el->prev) {
215 tdb_len_t partial;
217 if (best_el == NULL && off == el->offset+el->length) {
218 best_el = el;
221 if (off+len <= el->offset) {
222 continue;
224 if (off >= el->offset + el->length) {
225 continue;
228 /* an overlapping write - needs to be split into up to
229 2 writes and a memcpy */
230 if (off < el->offset) {
231 partial = el->offset - off;
232 if (transaction_write(tdb, off, buf, partial) != 0) {
233 goto fail;
235 len -= partial;
236 off += partial;
237 buf = (const void *)(partial + (const char *)buf);
239 if (off + len <= el->offset + el->length) {
240 partial = len;
241 } else {
242 partial = el->offset + el->length - off;
244 memcpy(el->data + (off - el->offset), buf, partial);
245 len -= partial;
246 off += partial;
247 buf = (const void *)(partial + (const char *)buf);
249 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
250 goto fail;
253 return 0;
256 /* see if we can append the new entry to an existing entry */
257 if (best_el && best_el->offset + best_el->length == off &&
258 (off+len < tdb->transaction->old_map_size ||
259 off > tdb->transaction->old_map_size)) {
260 unsigned char *data = best_el->data;
261 el = best_el;
262 el->data = (unsigned char *)realloc(el->data,
263 el->length + len);
264 if (el->data == NULL) {
265 tdb->ecode = TDB_ERR_OOM;
266 tdb->transaction->transaction_error = 1;
267 el->data = data;
268 return -1;
270 if (buf) {
271 memcpy(el->data + el->length, buf, len);
272 } else {
273 memset(el->data + el->length, TDB_PAD_BYTE, len);
275 el->length += len;
276 return 0;
279 /* add a new entry at the end of the list */
280 el = (struct tdb_transaction_el *)malloc(sizeof(*el));
281 if (el == NULL) {
282 tdb->ecode = TDB_ERR_OOM;
283 tdb->transaction->transaction_error = 1;
284 return -1;
286 el->next = NULL;
287 el->prev = tdb->transaction->elements_last;
288 el->offset = off;
289 el->length = len;
290 el->data = (unsigned char *)malloc(len);
291 if (el->data == NULL) {
292 free(el);
293 tdb->ecode = TDB_ERR_OOM;
294 tdb->transaction->transaction_error = 1;
295 return -1;
297 if (buf) {
298 memcpy(el->data, buf, len);
299 } else {
300 memset(el->data, TDB_PAD_BYTE, len);
302 if (el->prev) {
303 el->prev->next = el;
304 } else {
305 tdb->transaction->elements = el;
307 tdb->transaction->elements_last = el;
308 return 0;
310 fail:
311 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
312 tdb->ecode = TDB_ERR_IO;
313 tdb->transaction->transaction_error = 1;
314 return -1;
318 accelerated hash chain head search, using the cached hash heads
320 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
322 uint32_t h = *chain;
323 for (;h < tdb->header.hash_size;h++) {
324 /* the +1 takes account of the freelist */
325 if (0 != tdb->transaction->hash_heads[h+1]) {
326 break;
329 (*chain) = h;
333 out of bounds check during a transaction
335 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
337 if (len <= tdb->map_size) {
338 return 0;
340 return TDB_ERRCODE(TDB_ERR_IO, -1);
344 transaction version of tdb_expand().
346 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
347 tdb_off_t addition)
349 /* add a write to the transaction elements, so subsequent
350 reads see the zero data */
351 if (transaction_write(tdb, size, NULL, addition) != 0) {
352 return -1;
355 return 0;
359 brlock during a transaction - ignore them
361 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
362 int rw_type, int lck_type, int probe, size_t len)
364 return 0;
367 static const struct tdb_methods transaction_methods = {
368 transaction_read,
369 transaction_write,
370 transaction_next_hash_chain,
371 transaction_oob,
372 transaction_expand_file,
373 transaction_brlock
378 start a tdb transaction. No token is returned, as only a single
379 transaction is allowed to be pending per tdb_context
381 int tdb_transaction_start(struct tdb_context *tdb)
383 /* some sanity checks */
384 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
385 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
386 tdb->ecode = TDB_ERR_EINVAL;
387 return -1;
390 /* cope with nested tdb_transaction_start() calls */
391 if (tdb->transaction != NULL) {
392 tdb->transaction->nesting++;
393 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
394 tdb->transaction->nesting));
395 return 0;
398 if (tdb->num_locks != 0 || tdb->global_lock.count) {
399 /* the caller must not have any locks when starting a
400 transaction as otherwise we'll be screwed by lack
401 of nested locks in posix */
402 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
403 tdb->ecode = TDB_ERR_LOCK;
404 return -1;
407 if (tdb->travlocks.next != NULL) {
408 /* you cannot use transactions inside a traverse (although you can use
409 traverse inside a transaction) as otherwise you can end up with
410 deadlock */
411 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
412 tdb->ecode = TDB_ERR_LOCK;
413 return -1;
416 tdb->transaction = (struct tdb_transaction *)
417 calloc(sizeof(struct tdb_transaction), 1);
418 if (tdb->transaction == NULL) {
419 tdb->ecode = TDB_ERR_OOM;
420 return -1;
423 /* get the transaction write lock. This is a blocking lock. As
424 discussed with Volker, there are a number of ways we could
425 make this async, which we will probably do in the future */
426 if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
427 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
428 tdb->ecode = TDB_ERR_LOCK;
429 SAFE_FREE(tdb->transaction);
430 return -1;
433 /* get a read lock from the freelist to the end of file. This
434 is upgraded to a write lock during the commit */
435 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
436 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
437 tdb->ecode = TDB_ERR_LOCK;
438 goto fail;
441 /* setup a copy of the hash table heads so the hash scan in
442 traverse can be fast */
443 tdb->transaction->hash_heads = (uint32_t *)
444 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
445 if (tdb->transaction->hash_heads == NULL) {
446 tdb->ecode = TDB_ERR_OOM;
447 goto fail;
449 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
450 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
451 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
452 tdb->ecode = TDB_ERR_IO;
453 goto fail;
456 /* make sure we know about any file expansions already done by
457 anyone else */
458 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
459 tdb->transaction->old_map_size = tdb->map_size;
461 /* finally hook the io methods, replacing them with
462 transaction specific methods */
463 tdb->transaction->io_methods = tdb->methods;
464 tdb->methods = &transaction_methods;
466 /* by calling this transaction write here, we ensure that we don't grow the
467 transaction linked list due to hash table updates */
468 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
469 TDB_HASHTABLE_SIZE(tdb)) != 0) {
470 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
471 tdb->ecode = TDB_ERR_IO;
472 goto fail;
475 return 0;
477 fail:
478 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
479 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
480 SAFE_FREE(tdb->transaction->hash_heads);
481 SAFE_FREE(tdb->transaction);
482 return -1;
487 cancel the current transaction
489 int tdb_transaction_cancel(struct tdb_context *tdb)
491 if (tdb->transaction == NULL) {
492 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
493 return -1;
496 if (tdb->transaction->nesting != 0) {
497 tdb->transaction->transaction_error = 1;
498 tdb->transaction->nesting--;
499 return 0;
502 tdb->map_size = tdb->transaction->old_map_size;
504 /* free all the transaction elements */
505 while (tdb->transaction->elements) {
506 struct tdb_transaction_el *el = tdb->transaction->elements;
507 tdb->transaction->elements = el->next;
508 free(el->data);
509 free(el);
512 /* remove any global lock created during the transaction */
513 if (tdb->global_lock.count != 0) {
514 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
515 tdb->global_lock.count = 0;
518 /* remove any locks created during the transaction */
519 if (tdb->num_locks != 0) {
520 int i;
521 for (i=0;i<tdb->num_lockrecs;i++) {
522 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
523 F_UNLCK,F_SETLKW, 0, 1);
525 tdb->num_locks = 0;
526 tdb->num_lockrecs = 0;
527 SAFE_FREE(tdb->lockrecs);
530 /* restore the normal io methods */
531 tdb->methods = tdb->transaction->io_methods;
533 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
534 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
535 SAFE_FREE(tdb->transaction->hash_heads);
536 SAFE_FREE(tdb->transaction);
538 return 0;
542 sync to disk
544 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
546 if (fsync(tdb->fd) != 0) {
547 tdb->ecode = TDB_ERR_IO;
548 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
549 return -1;
551 #ifdef MS_SYNC
552 if (tdb->map_ptr) {
553 tdb_off_t moffset = offset & ~(tdb->page_size-1);
554 if (msync(moffset + (char *)tdb->map_ptr,
555 length + (offset - moffset), MS_SYNC) != 0) {
556 tdb->ecode = TDB_ERR_IO;
557 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
558 strerror(errno)));
559 return -1;
562 #endif
563 return 0;
568 work out how much space the linearised recovery data will consume
570 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
572 struct tdb_transaction_el *el;
573 tdb_len_t recovery_size = 0;
575 recovery_size = sizeof(uint32_t);
576 for (el=tdb->transaction->elements;el;el=el->next) {
577 if (el->offset >= tdb->transaction->old_map_size) {
578 continue;
580 recovery_size += 2*sizeof(tdb_off_t) + el->length;
583 return recovery_size;
587 allocate the recovery area, or use an existing recovery area if it is
588 large enough
590 static int tdb_recovery_allocate(struct tdb_context *tdb,
591 tdb_len_t *recovery_size,
592 tdb_off_t *recovery_offset,
593 tdb_len_t *recovery_max_size)
595 struct list_struct rec;
596 const struct tdb_methods *methods = tdb->transaction->io_methods;
597 tdb_off_t recovery_head;
599 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
600 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
601 return -1;
604 rec.rec_len = 0;
606 if (recovery_head != 0 &&
607 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
608 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
609 return -1;
612 *recovery_size = tdb_recovery_size(tdb);
614 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
615 /* it fits in the existing area */
616 *recovery_max_size = rec.rec_len;
617 *recovery_offset = recovery_head;
618 return 0;
621 /* we need to free up the old recovery area, then allocate a
622 new one at the end of the file. Note that we cannot use
623 tdb_allocate() to allocate the new one as that might return
624 us an area that is being currently used (as of the start of
625 the transaction) */
626 if (recovery_head != 0) {
627 if (tdb_free(tdb, recovery_head, &rec) == -1) {
628 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
629 return -1;
633 /* the tdb_free() call might have increased the recovery size */
634 *recovery_size = tdb_recovery_size(tdb);
636 /* round up to a multiple of page size */
637 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
638 *recovery_offset = tdb->map_size;
639 recovery_head = *recovery_offset;
641 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
642 (tdb->map_size - tdb->transaction->old_map_size) +
643 sizeof(rec) + *recovery_max_size) == -1) {
644 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
645 return -1;
648 /* remap the file (if using mmap) */
649 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
651 /* we have to reset the old map size so that we don't try to expand the file
652 again in the transaction commit, which would destroy the recovery area */
653 tdb->transaction->old_map_size = tdb->map_size;
655 /* write the recovery header offset and sync - we can sync without a race here
656 as the magic ptr in the recovery record has not been set */
657 CONVERT(recovery_head);
658 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
659 &recovery_head, sizeof(tdb_off_t)) == -1) {
660 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
661 return -1;
664 return 0;
669 setup the recovery data that will be used on a crash during commit
671 static int transaction_setup_recovery(struct tdb_context *tdb,
672 tdb_off_t *magic_offset)
674 struct tdb_transaction_el *el;
675 tdb_len_t recovery_size;
676 unsigned char *data, *p;
677 const struct tdb_methods *methods = tdb->transaction->io_methods;
678 struct list_struct *rec;
679 tdb_off_t recovery_offset, recovery_max_size;
680 tdb_off_t old_map_size = tdb->transaction->old_map_size;
681 uint32_t magic, tailer;
684 check that the recovery area has enough space
686 if (tdb_recovery_allocate(tdb, &recovery_size,
687 &recovery_offset, &recovery_max_size) == -1) {
688 return -1;
691 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
692 if (data == NULL) {
693 tdb->ecode = TDB_ERR_OOM;
694 return -1;
697 rec = (struct list_struct *)data;
698 memset(rec, 0, sizeof(*rec));
700 rec->magic = 0;
701 rec->data_len = recovery_size;
702 rec->rec_len = recovery_max_size;
703 rec->key_len = old_map_size;
704 CONVERT(rec);
706 /* build the recovery data into a single blob to allow us to do a single
707 large write, which should be more efficient */
708 p = data + sizeof(*rec);
709 for (el=tdb->transaction->elements;el;el=el->next) {
710 if (el->offset >= old_map_size) {
711 continue;
713 if (el->offset + el->length > tdb->transaction->old_map_size) {
714 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
715 free(data);
716 tdb->ecode = TDB_ERR_CORRUPT;
717 return -1;
719 memcpy(p, &el->offset, 4);
720 memcpy(p+4, &el->length, 4);
721 if (DOCONV()) {
722 tdb_convert(p, 8);
724 /* the recovery area contains the old data, not the
725 new data, so we have to call the original tdb_read
726 method to get it */
727 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
728 free(data);
729 tdb->ecode = TDB_ERR_IO;
730 return -1;
732 p += 8 + el->length;
735 /* and the tailer */
736 tailer = sizeof(*rec) + recovery_max_size;
737 memcpy(p, &tailer, 4);
738 CONVERT(p);
740 /* write the recovery data to the recovery area */
741 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
742 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
743 free(data);
744 tdb->ecode = TDB_ERR_IO;
745 return -1;
748 /* as we don't have ordered writes, we have to sync the recovery
749 data before we update the magic to indicate that the recovery
750 data is present */
751 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
752 free(data);
753 return -1;
756 free(data);
758 magic = TDB_RECOVERY_MAGIC;
759 CONVERT(magic);
761 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
763 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
764 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
765 tdb->ecode = TDB_ERR_IO;
766 return -1;
769 /* ensure the recovery magic marker is on disk */
770 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
771 return -1;
774 return 0;
778 commit the current transaction
780 int tdb_transaction_commit(struct tdb_context *tdb)
782 const struct tdb_methods *methods;
783 tdb_off_t magic_offset = 0;
784 uint32_t zero = 0;
786 if (tdb->transaction == NULL) {
787 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
788 return -1;
791 if (tdb->transaction->transaction_error) {
792 tdb->ecode = TDB_ERR_IO;
793 tdb_transaction_cancel(tdb);
794 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
795 return -1;
798 if (tdb->transaction->nesting != 0) {
799 tdb->transaction->nesting--;
800 return 0;
803 /* check for a null transaction */
804 if (tdb->transaction->elements == NULL) {
805 tdb_transaction_cancel(tdb);
806 return 0;
809 methods = tdb->transaction->io_methods;
811 /* if there are any locks pending then the caller has not
812 nested their locks properly, so fail the transaction */
813 if (tdb->num_locks || tdb->global_lock.count) {
814 tdb->ecode = TDB_ERR_LOCK;
815 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
816 tdb_transaction_cancel(tdb);
817 return -1;
820 /* upgrade the main transaction lock region to a write lock */
821 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
822 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
823 tdb->ecode = TDB_ERR_LOCK;
824 tdb_transaction_cancel(tdb);
825 return -1;
828 /* get the global lock - this prevents new users attaching to the database
829 during the commit */
830 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
831 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
832 tdb->ecode = TDB_ERR_LOCK;
833 tdb_transaction_cancel(tdb);
834 return -1;
837 if (!(tdb->flags & TDB_NOSYNC)) {
838 /* write the recovery data to the end of the file */
839 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
840 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
841 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
842 tdb_transaction_cancel(tdb);
843 return -1;
847 /* expand the file to the new size if needed */
848 if (tdb->map_size != tdb->transaction->old_map_size) {
849 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
850 tdb->map_size -
851 tdb->transaction->old_map_size) == -1) {
852 tdb->ecode = TDB_ERR_IO;
853 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
854 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
855 tdb_transaction_cancel(tdb);
856 return -1;
858 tdb->map_size = tdb->transaction->old_map_size;
859 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
862 /* perform all the writes */
863 while (tdb->transaction->elements) {
864 struct tdb_transaction_el *el = tdb->transaction->elements;
866 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
867 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
869 /* we've overwritten part of the data and
870 possibly expanded the file, so we need to
871 run the crash recovery code */
872 tdb->methods = methods;
873 tdb_transaction_recover(tdb);
875 tdb_transaction_cancel(tdb);
876 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
878 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
879 return -1;
881 tdb->transaction->elements = el->next;
882 free(el->data);
883 free(el);
886 if (!(tdb->flags & TDB_NOSYNC)) {
887 /* ensure the new data is on disk */
888 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
889 return -1;
892 /* remove the recovery marker */
893 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
894 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
895 return -1;
898 /* ensure the recovery marker has been removed on disk */
899 if (transaction_sync(tdb, magic_offset, 4) == -1) {
900 return -1;
904 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
907 TODO: maybe write to some dummy hdr field, or write to magic
908 offset without mmap, before the last sync, instead of the
909 utime() call
912 /* on some systems (like Linux 2.6.x) changes via mmap/msync
913 don't change the mtime of the file, this means the file may
914 not be backed up (as tdb rounding to block sizes means that
915 file size changes are quite rare too). The following forces
916 mtime changes when a transaction completes */
917 #ifdef HAVE_UTIME
918 utime(tdb->name, NULL);
919 #endif
921 /* use a transaction cancel to free memory and remove the
922 transaction locks */
923 tdb_transaction_cancel(tdb);
924 return 0;
929 recover from an aborted transaction. Must be called with exclusive
930 database write access already established (including the global
931 lock to prevent new processes attaching)
933 int tdb_transaction_recover(struct tdb_context *tdb)
935 tdb_off_t recovery_head, recovery_eof;
936 unsigned char *data, *p;
937 uint32_t zero = 0;
938 struct list_struct rec;
940 /* find the recovery area */
941 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
942 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
943 tdb->ecode = TDB_ERR_IO;
944 return -1;
947 if (recovery_head == 0) {
948 /* we have never allocated a recovery record */
949 return 0;
952 /* read the recovery record */
953 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
954 sizeof(rec), DOCONV()) == -1) {
955 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
956 tdb->ecode = TDB_ERR_IO;
957 return -1;
960 if (rec.magic != TDB_RECOVERY_MAGIC) {
961 /* there is no valid recovery data */
962 return 0;
965 if (tdb->read_only) {
966 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
967 tdb->ecode = TDB_ERR_CORRUPT;
968 return -1;
971 recovery_eof = rec.key_len;
973 data = (unsigned char *)malloc(rec.data_len);
974 if (data == NULL) {
975 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
976 tdb->ecode = TDB_ERR_OOM;
977 return -1;
980 /* read the full recovery data */
981 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
982 rec.data_len, 0) == -1) {
983 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
984 tdb->ecode = TDB_ERR_IO;
985 return -1;
988 /* recover the file data */
989 p = data;
990 while (p+8 < data + rec.data_len) {
991 uint32_t ofs, len;
992 if (DOCONV()) {
993 tdb_convert(p, 8);
995 memcpy(&ofs, p, 4);
996 memcpy(&len, p+4, 4);
998 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
999 free(data);
1000 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1001 tdb->ecode = TDB_ERR_IO;
1002 return -1;
1004 p += 8 + len;
1007 free(data);
1009 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1010 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1011 tdb->ecode = TDB_ERR_IO;
1012 return -1;
1015 /* if the recovery area is after the recovered eof then remove it */
1016 if (recovery_eof <= recovery_head) {
1017 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1018 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1019 tdb->ecode = TDB_ERR_IO;
1020 return -1;
1024 /* remove the recovery magic */
1025 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1026 &zero) == -1) {
1027 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1028 tdb->ecode = TDB_ERR_IO;
1029 return -1;
1032 /* reduce the file size to the old size */
1033 tdb_munmap(tdb);
1034 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1035 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1036 tdb->ecode = TDB_ERR_IO;
1037 return -1;
1039 tdb->map_size = recovery_eof;
1040 tdb_mmap(tdb);
1042 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1043 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1044 tdb->ecode = TDB_ERR_IO;
1045 return -1;
1048 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
1049 recovery_eof));
1051 /* all done */
1052 return 0;