[GLUE] Rsync SAMBA_3_0 SVN r25598 in order to create the v3-0-test branch.
[Samba.git] / source / tdb / common / transaction.c
blob8bda651f1e44b2081bd5e7ec0f26211ee054b935
1 /*
2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
10 ** under the LGPL
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 2 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, write to the Free Software
24 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #include "tdb_private.h"
30 transaction design:
32 - only allow a single transaction at a time per database. This makes
33 using the transaction API simpler, as otherwise the caller would
34 have to cope with temporary failures in transactions that conflict
35 with other current transactions
37 - keep the transaction recovery information in the same file as the
38 database, using a special 'transaction recovery' record pointed at
39 by the header. This removes the need for extra journal files as
40 used by some other databases
42 - dynamically allocated the transaction recover record, re-using it
43 for subsequent transactions. If a larger record is needed then
44 tdb_free() the old record to place it on the normal tdb freelist
45 before allocating the new record
47 - during transactions, keep a linked list of writes all that have
48 been performed by intercepting all tdb_write() calls. The hooked
49 transaction versions of tdb_read() and tdb_write() check this
50 linked list and try to use the elements of the list in preference
51 to the real database.
53 - don't allow any locks to be held when a transaction starts,
54 otherwise we can end up with deadlock (plus lack of lock nesting
55 in posix locks would mean the lock is lost)
57 - if the caller gains a lock during the transaction but doesn't
58 release it then fail the commit
60 - allow for nested calls to tdb_transaction_start(), re-using the
61 existing transaction record. If the inner transaction is cancelled
62 then a subsequent commit will fail
64 - keep a mirrored copy of the tdb hash chain heads to allow for the
65 fast hash heads scan on traverse, updating the mirrored copy in
66 the transaction version of tdb_write
68 - allow callers to mix transaction and non-transaction use of tdb,
69 although once a transaction is started then an exclusive lock is
70 gained until the transaction is committed or cancelled
72 - the commit stategy involves first saving away all modified data
73 into a linearised buffer in the transaction recovery area, then
74 marking the transaction recovery area with a magic value to
75 indicate a valid recovery record. In total 4 fsync/msync calls are
76 needed per commit to prevent race conditions. It might be possible
77 to reduce this to 3 or even 2 with some more work.
79 - check for a valid recovery record on open of the tdb, while the
80 global lock is held. Automatically recover from the transaction
81 recovery area if needed, then continue with the open as
82 usual. This allows for smooth crash recovery with no administrator
83 intervention.
85 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86 still available, but no transaction recovery area is used and no
87 fsync/msync calls are made.
91 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
92 int rw_type, int lck_type, int probe, size_t len);
94 struct tdb_transaction_el {
95 struct tdb_transaction_el *next, *prev;
96 tdb_off_t offset;
97 tdb_len_t length;
98 unsigned char *data;
102 hold the context of any current transaction
104 struct tdb_transaction {
105 /* we keep a mirrored copy of the tdb hash heads here so
106 tdb_next_hash_chain() can operate efficiently */
107 u32 *hash_heads;
109 /* the original io methods - used to do IOs to the real db */
110 const struct tdb_methods *io_methods;
112 /* the list of transaction elements. We use a doubly linked
113 list with a last pointer to allow us to keep the list
114 ordered, with first element at the front of the list. It
115 needs to be doubly linked as the read/write traversals need
116 to be backwards, while the commit needs to be forwards */
117 struct tdb_transaction_el *elements, *elements_last;
119 /* non-zero when an internal transaction error has
120 occurred. All write operations will then fail until the
121 transaction is ended */
122 int transaction_error;
124 /* when inside a transaction we need to keep track of any
125 nested tdb_transaction_start() calls, as these are allowed,
126 but don't create a new transaction */
127 int nesting;
129 /* old file size before transaction */
130 tdb_len_t old_map_size;
135 read while in a transaction. We need to check first if the data is in our list
136 of transaction elements, then if not do a real read
138 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
139 tdb_len_t len, int cv)
141 struct tdb_transaction_el *el;
143 /* we need to walk the list backwards to get the most recent data */
144 for (el=tdb->transaction->elements_last;el;el=el->prev) {
145 tdb_len_t partial;
147 if (off+len <= el->offset) {
148 continue;
150 if (off >= el->offset + el->length) {
151 continue;
154 /* an overlapping read - needs to be split into up to
155 2 reads and a memcpy */
156 if (off < el->offset) {
157 partial = el->offset - off;
158 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
159 goto fail;
161 len -= partial;
162 off += partial;
163 buf = (void *)(partial + (char *)buf);
165 if (off + len <= el->offset + el->length) {
166 partial = len;
167 } else {
168 partial = el->offset + el->length - off;
170 memcpy(buf, el->data + (off - el->offset), partial);
171 if (cv) {
172 tdb_convert(buf, len);
174 len -= partial;
175 off += partial;
176 buf = (void *)(partial + (char *)buf);
178 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
179 goto fail;
182 return 0;
185 /* its not in the transaction elements - do a real read */
186 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
188 fail:
189 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
190 tdb->ecode = TDB_ERR_IO;
191 tdb->transaction->transaction_error = 1;
192 return -1;
197 write while in a transaction
199 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
200 const void *buf, tdb_len_t len)
202 struct tdb_transaction_el *el, *best_el=NULL;
204 if (len == 0) {
205 return 0;
208 /* if the write is to a hash head, then update the transaction
209 hash heads */
210 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
211 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
212 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
213 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
216 /* first see if we can replace an existing entry */
217 for (el=tdb->transaction->elements_last;el;el=el->prev) {
218 tdb_len_t partial;
220 if (best_el == NULL && off == el->offset+el->length) {
221 best_el = el;
224 if (off+len <= el->offset) {
225 continue;
227 if (off >= el->offset + el->length) {
228 continue;
231 /* an overlapping write - needs to be split into up to
232 2 writes and a memcpy */
233 if (off < el->offset) {
234 partial = el->offset - off;
235 if (transaction_write(tdb, off, buf, partial) != 0) {
236 goto fail;
238 len -= partial;
239 off += partial;
240 buf = (const void *)(partial + (const char *)buf);
242 if (off + len <= el->offset + el->length) {
243 partial = len;
244 } else {
245 partial = el->offset + el->length - off;
247 memcpy(el->data + (off - el->offset), buf, partial);
248 len -= partial;
249 off += partial;
250 buf = (const void *)(partial + (const char *)buf);
252 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
253 goto fail;
256 return 0;
259 /* see if we can append the new entry to an existing entry */
260 if (best_el && best_el->offset + best_el->length == off &&
261 (off+len < tdb->transaction->old_map_size ||
262 off > tdb->transaction->old_map_size)) {
263 unsigned char *data = best_el->data;
264 el = best_el;
265 el->data = (unsigned char *)realloc(el->data,
266 el->length + len);
267 if (el->data == NULL) {
268 tdb->ecode = TDB_ERR_OOM;
269 tdb->transaction->transaction_error = 1;
270 el->data = data;
271 return -1;
273 if (buf) {
274 memcpy(el->data + el->length, buf, len);
275 } else {
276 memset(el->data + el->length, TDB_PAD_BYTE, len);
278 el->length += len;
279 return 0;
282 /* add a new entry at the end of the list */
283 el = (struct tdb_transaction_el *)malloc(sizeof(*el));
284 if (el == NULL) {
285 tdb->ecode = TDB_ERR_OOM;
286 tdb->transaction->transaction_error = 1;
287 return -1;
289 el->next = NULL;
290 el->prev = tdb->transaction->elements_last;
291 el->offset = off;
292 el->length = len;
293 el->data = (unsigned char *)malloc(len);
294 if (el->data == NULL) {
295 free(el);
296 tdb->ecode = TDB_ERR_OOM;
297 tdb->transaction->transaction_error = 1;
298 return -1;
300 if (buf) {
301 memcpy(el->data, buf, len);
302 } else {
303 memset(el->data, TDB_PAD_BYTE, len);
305 if (el->prev) {
306 el->prev->next = el;
307 } else {
308 tdb->transaction->elements = el;
310 tdb->transaction->elements_last = el;
311 return 0;
313 fail:
314 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
315 tdb->ecode = TDB_ERR_IO;
316 tdb->transaction->transaction_error = 1;
317 return -1;
321 accelerated hash chain head search, using the cached hash heads
323 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
325 u32 h = *chain;
326 for (;h < tdb->header.hash_size;h++) {
327 /* the +1 takes account of the freelist */
328 if (0 != tdb->transaction->hash_heads[h+1]) {
329 break;
332 (*chain) = h;
336 out of bounds check during a transaction
338 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
340 if (len <= tdb->map_size) {
341 return 0;
343 return TDB_ERRCODE(TDB_ERR_IO, -1);
347 transaction version of tdb_expand().
349 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
350 tdb_off_t addition)
352 /* add a write to the transaction elements, so subsequent
353 reads see the zero data */
354 if (transaction_write(tdb, size, NULL, addition) != 0) {
355 return -1;
358 return 0;
362 brlock during a transaction - ignore them
364 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
365 int rw_type, int lck_type, int probe, size_t len)
367 return 0;
370 static const struct tdb_methods transaction_methods = {
371 transaction_read,
372 transaction_write,
373 transaction_next_hash_chain,
374 transaction_oob,
375 transaction_expand_file,
376 transaction_brlock
381 start a tdb transaction. No token is returned, as only a single
382 transaction is allowed to be pending per tdb_context
384 int tdb_transaction_start(struct tdb_context *tdb)
386 /* some sanity checks */
387 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
388 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
389 tdb->ecode = TDB_ERR_EINVAL;
390 return -1;
393 /* cope with nested tdb_transaction_start() calls */
394 if (tdb->transaction != NULL) {
395 tdb->transaction->nesting++;
396 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
397 tdb->transaction->nesting));
398 return 0;
401 if (tdb->num_locks != 0 || tdb->global_lock.count) {
402 /* the caller must not have any locks when starting a
403 transaction as otherwise we'll be screwed by lack
404 of nested locks in posix */
405 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
406 tdb->ecode = TDB_ERR_LOCK;
407 return -1;
410 if (tdb->travlocks.next != NULL) {
411 /* you cannot use transactions inside a traverse (although you can use
412 traverse inside a transaction) as otherwise you can end up with
413 deadlock */
414 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
415 tdb->ecode = TDB_ERR_LOCK;
416 return -1;
419 tdb->transaction = (struct tdb_transaction *)
420 calloc(sizeof(struct tdb_transaction), 1);
421 if (tdb->transaction == NULL) {
422 tdb->ecode = TDB_ERR_OOM;
423 return -1;
426 /* get the transaction write lock. This is a blocking lock. As
427 discussed with Volker, there are a number of ways we could
428 make this async, which we will probably do in the future */
429 if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
430 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
431 tdb->ecode = TDB_ERR_LOCK;
432 SAFE_FREE(tdb->transaction);
433 return -1;
436 /* get a read lock from the freelist to the end of file. This
437 is upgraded to a write lock during the commit */
438 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
439 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
440 tdb->ecode = TDB_ERR_LOCK;
441 goto fail;
444 /* setup a copy of the hash table heads so the hash scan in
445 traverse can be fast */
446 tdb->transaction->hash_heads = (u32 *)
447 calloc(tdb->header.hash_size+1, sizeof(u32));
448 if (tdb->transaction->hash_heads == NULL) {
449 tdb->ecode = TDB_ERR_OOM;
450 goto fail;
452 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
453 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
454 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
455 tdb->ecode = TDB_ERR_IO;
456 goto fail;
459 /* make sure we know about any file expansions already done by
460 anyone else */
461 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
462 tdb->transaction->old_map_size = tdb->map_size;
464 /* finally hook the io methods, replacing them with
465 transaction specific methods */
466 tdb->transaction->io_methods = tdb->methods;
467 tdb->methods = &transaction_methods;
469 /* by calling this transaction write here, we ensure that we don't grow the
470 transaction linked list due to hash table updates */
471 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
472 TDB_HASHTABLE_SIZE(tdb)) != 0) {
473 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
474 tdb->ecode = TDB_ERR_IO;
475 goto fail;
478 return 0;
480 fail:
481 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
482 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
483 SAFE_FREE(tdb->transaction->hash_heads);
484 SAFE_FREE(tdb->transaction);
485 return -1;
490 cancel the current transaction
492 int tdb_transaction_cancel(struct tdb_context *tdb)
494 if (tdb->transaction == NULL) {
495 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
496 return -1;
499 if (tdb->transaction->nesting != 0) {
500 tdb->transaction->transaction_error = 1;
501 tdb->transaction->nesting--;
502 return 0;
505 tdb->map_size = tdb->transaction->old_map_size;
507 /* free all the transaction elements */
508 while (tdb->transaction->elements) {
509 struct tdb_transaction_el *el = tdb->transaction->elements;
510 tdb->transaction->elements = el->next;
511 free(el->data);
512 free(el);
515 /* remove any global lock created during the transaction */
516 if (tdb->global_lock.count != 0) {
517 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
518 tdb->global_lock.count = 0;
521 /* remove any locks created during the transaction */
522 if (tdb->num_locks != 0) {
523 int i;
524 for (i=0;i<tdb->num_lockrecs;i++) {
525 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
526 F_UNLCK,F_SETLKW, 0, 1);
528 tdb->num_locks = 0;
529 tdb->num_lockrecs = 0;
530 SAFE_FREE(tdb->lockrecs);
533 /* restore the normal io methods */
534 tdb->methods = tdb->transaction->io_methods;
536 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
537 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
538 SAFE_FREE(tdb->transaction->hash_heads);
539 SAFE_FREE(tdb->transaction);
541 return 0;
545 sync to disk
547 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
549 if (fsync(tdb->fd) != 0) {
550 tdb->ecode = TDB_ERR_IO;
551 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
552 return -1;
554 #ifdef MS_SYNC
555 if (tdb->map_ptr) {
556 tdb_off_t moffset = offset & ~(tdb->page_size-1);
557 if (msync(moffset + (char *)tdb->map_ptr,
558 length + (offset - moffset), MS_SYNC) != 0) {
559 tdb->ecode = TDB_ERR_IO;
560 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
561 strerror(errno)));
562 return -1;
565 #endif
566 return 0;
571 work out how much space the linearised recovery data will consume
573 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
575 struct tdb_transaction_el *el;
576 tdb_len_t recovery_size = 0;
578 recovery_size = sizeof(u32);
579 for (el=tdb->transaction->elements;el;el=el->next) {
580 if (el->offset >= tdb->transaction->old_map_size) {
581 continue;
583 recovery_size += 2*sizeof(tdb_off_t) + el->length;
586 return recovery_size;
590 allocate the recovery area, or use an existing recovery area if it is
591 large enough
593 static int tdb_recovery_allocate(struct tdb_context *tdb,
594 tdb_len_t *recovery_size,
595 tdb_off_t *recovery_offset,
596 tdb_len_t *recovery_max_size)
598 struct list_struct rec;
599 const struct tdb_methods *methods = tdb->transaction->io_methods;
600 tdb_off_t recovery_head;
602 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
603 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
604 return -1;
607 rec.rec_len = 0;
609 if (recovery_head != 0 &&
610 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
611 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
612 return -1;
615 *recovery_size = tdb_recovery_size(tdb);
617 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
618 /* it fits in the existing area */
619 *recovery_max_size = rec.rec_len;
620 *recovery_offset = recovery_head;
621 return 0;
624 /* we need to free up the old recovery area, then allocate a
625 new one at the end of the file. Note that we cannot use
626 tdb_allocate() to allocate the new one as that might return
627 us an area that is being currently used (as of the start of
628 the transaction) */
629 if (recovery_head != 0) {
630 if (tdb_free(tdb, recovery_head, &rec) == -1) {
631 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
632 return -1;
636 /* the tdb_free() call might have increased the recovery size */
637 *recovery_size = tdb_recovery_size(tdb);
639 /* round up to a multiple of page size */
640 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
641 *recovery_offset = tdb->map_size;
642 recovery_head = *recovery_offset;
644 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
645 (tdb->map_size - tdb->transaction->old_map_size) +
646 sizeof(rec) + *recovery_max_size) == -1) {
647 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
648 return -1;
651 /* remap the file (if using mmap) */
652 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
654 /* we have to reset the old map size so that we don't try to expand the file
655 again in the transaction commit, which would destroy the recovery area */
656 tdb->transaction->old_map_size = tdb->map_size;
658 /* write the recovery header offset and sync - we can sync without a race here
659 as the magic ptr in the recovery record has not been set */
660 CONVERT(recovery_head);
661 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
662 &recovery_head, sizeof(tdb_off_t)) == -1) {
663 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
664 return -1;
667 return 0;
672 setup the recovery data that will be used on a crash during commit
674 static int transaction_setup_recovery(struct tdb_context *tdb,
675 tdb_off_t *magic_offset)
677 struct tdb_transaction_el *el;
678 tdb_len_t recovery_size;
679 unsigned char *data, *p;
680 const struct tdb_methods *methods = tdb->transaction->io_methods;
681 struct list_struct *rec;
682 tdb_off_t recovery_offset, recovery_max_size;
683 tdb_off_t old_map_size = tdb->transaction->old_map_size;
684 u32 magic, tailer;
687 check that the recovery area has enough space
689 if (tdb_recovery_allocate(tdb, &recovery_size,
690 &recovery_offset, &recovery_max_size) == -1) {
691 return -1;
694 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
695 if (data == NULL) {
696 tdb->ecode = TDB_ERR_OOM;
697 return -1;
700 rec = (struct list_struct *)data;
701 memset(rec, 0, sizeof(*rec));
703 rec->magic = 0;
704 rec->data_len = recovery_size;
705 rec->rec_len = recovery_max_size;
706 rec->key_len = old_map_size;
707 CONVERT(rec);
709 /* build the recovery data into a single blob to allow us to do a single
710 large write, which should be more efficient */
711 p = data + sizeof(*rec);
712 for (el=tdb->transaction->elements;el;el=el->next) {
713 if (el->offset >= old_map_size) {
714 continue;
716 if (el->offset + el->length > tdb->transaction->old_map_size) {
717 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
718 free(data);
719 tdb->ecode = TDB_ERR_CORRUPT;
720 return -1;
722 memcpy(p, &el->offset, 4);
723 memcpy(p+4, &el->length, 4);
724 if (DOCONV()) {
725 tdb_convert(p, 8);
727 /* the recovery area contains the old data, not the
728 new data, so we have to call the original tdb_read
729 method to get it */
730 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
731 free(data);
732 tdb->ecode = TDB_ERR_IO;
733 return -1;
735 p += 8 + el->length;
738 /* and the tailer */
739 tailer = sizeof(*rec) + recovery_max_size;
740 memcpy(p, &tailer, 4);
741 CONVERT(p);
743 /* write the recovery data to the recovery area */
744 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
745 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
746 free(data);
747 tdb->ecode = TDB_ERR_IO;
748 return -1;
751 /* as we don't have ordered writes, we have to sync the recovery
752 data before we update the magic to indicate that the recovery
753 data is present */
754 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
755 free(data);
756 return -1;
759 free(data);
761 magic = TDB_RECOVERY_MAGIC;
762 CONVERT(magic);
764 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
766 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
767 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
768 tdb->ecode = TDB_ERR_IO;
769 return -1;
772 /* ensure the recovery magic marker is on disk */
773 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
774 return -1;
777 return 0;
781 commit the current transaction
783 int tdb_transaction_commit(struct tdb_context *tdb)
785 const struct tdb_methods *methods;
786 tdb_off_t magic_offset = 0;
787 u32 zero = 0;
789 if (tdb->transaction == NULL) {
790 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
791 return -1;
794 if (tdb->transaction->transaction_error) {
795 tdb->ecode = TDB_ERR_IO;
796 tdb_transaction_cancel(tdb);
797 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
798 return -1;
801 if (tdb->transaction->nesting != 0) {
802 tdb->transaction->nesting--;
803 return 0;
806 /* check for a null transaction */
807 if (tdb->transaction->elements == NULL) {
808 tdb_transaction_cancel(tdb);
809 return 0;
812 methods = tdb->transaction->io_methods;
814 /* if there are any locks pending then the caller has not
815 nested their locks properly, so fail the transaction */
816 if (tdb->num_locks || tdb->global_lock.count) {
817 tdb->ecode = TDB_ERR_LOCK;
818 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
819 tdb_transaction_cancel(tdb);
820 return -1;
823 /* upgrade the main transaction lock region to a write lock */
824 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
825 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
826 tdb->ecode = TDB_ERR_LOCK;
827 tdb_transaction_cancel(tdb);
828 return -1;
831 /* get the global lock - this prevents new users attaching to the database
832 during the commit */
833 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
834 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
835 tdb->ecode = TDB_ERR_LOCK;
836 tdb_transaction_cancel(tdb);
837 return -1;
840 if (!(tdb->flags & TDB_NOSYNC)) {
841 /* write the recovery data to the end of the file */
842 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
843 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
844 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
845 tdb_transaction_cancel(tdb);
846 return -1;
850 /* expand the file to the new size if needed */
851 if (tdb->map_size != tdb->transaction->old_map_size) {
852 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
853 tdb->map_size -
854 tdb->transaction->old_map_size) == -1) {
855 tdb->ecode = TDB_ERR_IO;
856 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
857 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
858 tdb_transaction_cancel(tdb);
859 return -1;
861 tdb->map_size = tdb->transaction->old_map_size;
862 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
865 /* perform all the writes */
866 while (tdb->transaction->elements) {
867 struct tdb_transaction_el *el = tdb->transaction->elements;
869 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
870 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
872 /* we've overwritten part of the data and
873 possibly expanded the file, so we need to
874 run the crash recovery code */
875 tdb->methods = methods;
876 tdb_transaction_recover(tdb);
878 tdb_transaction_cancel(tdb);
879 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
881 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
882 return -1;
884 tdb->transaction->elements = el->next;
885 free(el->data);
886 free(el);
889 if (!(tdb->flags & TDB_NOSYNC)) {
890 /* ensure the new data is on disk */
891 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
892 return -1;
895 /* remove the recovery marker */
896 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
897 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
898 return -1;
901 /* ensure the recovery marker has been removed on disk */
902 if (transaction_sync(tdb, magic_offset, 4) == -1) {
903 return -1;
907 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
910 TODO: maybe write to some dummy hdr field, or write to magic
911 offset without mmap, before the last sync, instead of the
912 utime() call
915 /* on some systems (like Linux 2.6.x) changes via mmap/msync
916 don't change the mtime of the file, this means the file may
917 not be backed up (as tdb rounding to block sizes means that
918 file size changes are quite rare too). The following forces
919 mtime changes when a transaction completes */
920 #ifdef HAVE_UTIME
921 utime(tdb->name, NULL);
922 #endif
924 /* use a transaction cancel to free memory and remove the
925 transaction locks */
926 tdb_transaction_cancel(tdb);
927 return 0;
932 recover from an aborted transaction. Must be called with exclusive
933 database write access already established (including the global
934 lock to prevent new processes attaching)
936 int tdb_transaction_recover(struct tdb_context *tdb)
938 tdb_off_t recovery_head, recovery_eof;
939 unsigned char *data, *p;
940 u32 zero = 0;
941 struct list_struct rec;
943 /* find the recovery area */
944 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
945 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
946 tdb->ecode = TDB_ERR_IO;
947 return -1;
950 if (recovery_head == 0) {
951 /* we have never allocated a recovery record */
952 return 0;
955 /* read the recovery record */
956 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
957 sizeof(rec), DOCONV()) == -1) {
958 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
959 tdb->ecode = TDB_ERR_IO;
960 return -1;
963 if (rec.magic != TDB_RECOVERY_MAGIC) {
964 /* there is no valid recovery data */
965 return 0;
968 if (tdb->read_only) {
969 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
970 tdb->ecode = TDB_ERR_CORRUPT;
971 return -1;
974 recovery_eof = rec.key_len;
976 data = (unsigned char *)malloc(rec.data_len);
977 if (data == NULL) {
978 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
979 tdb->ecode = TDB_ERR_OOM;
980 return -1;
983 /* read the full recovery data */
984 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
985 rec.data_len, 0) == -1) {
986 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
987 tdb->ecode = TDB_ERR_IO;
988 return -1;
991 /* recover the file data */
992 p = data;
993 while (p+8 < data + rec.data_len) {
994 u32 ofs, len;
995 if (DOCONV()) {
996 tdb_convert(p, 8);
998 memcpy(&ofs, p, 4);
999 memcpy(&len, p+4, 4);
1001 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1002 free(data);
1003 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1004 tdb->ecode = TDB_ERR_IO;
1005 return -1;
1007 p += 8 + len;
1010 free(data);
1012 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1013 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1014 tdb->ecode = TDB_ERR_IO;
1015 return -1;
1018 /* if the recovery area is after the recovered eof then remove it */
1019 if (recovery_eof <= recovery_head) {
1020 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1021 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1022 tdb->ecode = TDB_ERR_IO;
1023 return -1;
1027 /* remove the recovery magic */
1028 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1029 &zero) == -1) {
1030 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1031 tdb->ecode = TDB_ERR_IO;
1032 return -1;
1035 /* reduce the file size to the old size */
1036 tdb_munmap(tdb);
1037 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1038 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1039 tdb->ecode = TDB_ERR_IO;
1040 return -1;
1042 tdb->map_size = recovery_eof;
1043 tdb_mmap(tdb);
1045 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1046 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1047 tdb->ecode = TDB_ERR_IO;
1048 return -1;
1051 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
1052 recovery_eof));
1054 /* all done */
1055 return 0;