s4:scripting/python: always treat the highwatermark as opaque (bug #9508)
[Samba/gebeck_regimport.git] / lib / ntdb / transaction.c
blob9608be43e8ffd2e59f45f5b56f8f31d6b95e08a5
1 /*
2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
7 Copyright (C) Rusty Russell 2010
9 ** NOTE! The following LGPL license applies to the ntdb
10 ** library. This does NOT imply that all of Samba is released
11 ** under the LGPL
13 This library is free software; you can redistribute it and/or
14 modify it under the terms of the GNU Lesser General Public
15 License as published by the Free Software Foundation; either
16 version 3 of the License, or (at your option) any later version.
18 This library is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 Lesser General Public License for more details.
23 You should have received a copy of the GNU Lesser General Public
24 License along with this library; if not, see <http://www.gnu.org/licenses/>.
27 #include "private.h"
28 #include <assert.h>
29 #define SAFE_FREE(ntdb, x) do { if ((x) != NULL) {ntdb->free_fn((void *)x, ntdb->alloc_data); (x)=NULL;} } while(0)
32 transaction design:
34 - only allow a single transaction at a time per database. This makes
35 using the transaction API simpler, as otherwise the caller would
36 have to cope with temporary failures in transactions that conflict
37 with other current transactions
39 - keep the transaction recovery information in the same file as the
40 database, using a special 'transaction recovery' record pointed at
41 by the header. This removes the need for extra journal files as
42 used by some other databases
44 - dynamically allocated the transaction recover record, re-using it
45 for subsequent transactions. If a larger record is needed then
46 ntdb_free() the old record to place it on the normal ntdb freelist
47 before allocating the new record
49 - during transactions, keep a linked list of writes all that have
50 been performed by intercepting all ntdb_write() calls. The hooked
51 transaction versions of ntdb_read() and ntdb_write() check this
52 linked list and try to use the elements of the list in preference
53 to the real database.
55 - don't allow any locks to be held when a transaction starts,
56 otherwise we can end up with deadlock (plus lack of lock nesting
57 in POSIX locks would mean the lock is lost)
59 - if the caller gains a lock during the transaction but doesn't
60 release it then fail the commit
62 - allow for nested calls to ntdb_transaction_start(), re-using the
63 existing transaction record. If the inner transaction is canceled
64 then a subsequent commit will fail
66 - keep a mirrored copy of the ntdb hash chain heads to allow for the
67 fast hash heads scan on traverse, updating the mirrored copy in
68 the transaction version of ntdb_write
70 - allow callers to mix transaction and non-transaction use of ntdb,
71 although once a transaction is started then an exclusive lock is
72 gained until the transaction is committed or canceled
74 - the commit stategy involves first saving away all modified data
75 into a linearised buffer in the transaction recovery area, then
76 marking the transaction recovery area with a magic value to
77 indicate a valid recovery record. In total 4 fsync/msync calls are
78 needed per commit to prevent race conditions. It might be possible
79 to reduce this to 3 or even 2 with some more work.
81 - check for a valid recovery record on open of the ntdb, while the
82 open lock is held. Automatically recover from the transaction
83 recovery area if needed, then continue with the open as
84 usual. This allows for smooth crash recovery with no administrator
85 intervention.
87 - if NTDB_NOSYNC is passed to flags in ntdb_open then transactions are
88 still available, but fsync/msync calls are made. This means we
89 still are safe against unexpected death during transaction commit,
90 but not against machine reboots.
94 hold the context of any current transaction
96 struct ntdb_transaction {
97 /* the original io methods - used to do IOs to the real db */
98 const struct ntdb_methods *io_methods;
100 /* the list of transaction blocks. When a block is first
101 written to, it gets created in this list */
102 uint8_t **blocks;
103 size_t num_blocks;
105 /* non-zero when an internal transaction error has
106 occurred. All write operations will then fail until the
107 transaction is ended */
108 int transaction_error;
110 /* when inside a transaction we need to keep track of any
111 nested ntdb_transaction_start() calls, as these are allowed,
112 but don't create a new transaction */
113 unsigned int nesting;
115 /* set when a prepare has already occurred */
116 bool prepared;
117 ntdb_off_t magic_offset;
119 /* old file size before transaction */
120 ntdb_len_t old_map_size;
124 read while in a transaction. We need to check first if the data is in our list
125 of transaction elements, then if not do a real read
127 static enum NTDB_ERROR transaction_read(struct ntdb_context *ntdb, ntdb_off_t off,
128 void *buf, ntdb_len_t len)
130 size_t blk;
131 enum NTDB_ERROR ecode;
133 /* break it down into block sized ops */
134 while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
135 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
136 ecode = transaction_read(ntdb, off, buf, len2);
137 if (ecode != NTDB_SUCCESS) {
138 return ecode;
140 len -= len2;
141 off += len2;
142 buf = (void *)(len2 + (char *)buf);
145 if (len == 0) {
146 return NTDB_SUCCESS;
149 blk = off / NTDB_PGSIZE;
151 /* see if we have it in the block list */
152 if (ntdb->transaction->num_blocks <= blk ||
153 ntdb->transaction->blocks[blk] == NULL) {
154 /* nope, do a real read */
155 ecode = ntdb->transaction->io_methods->tread(ntdb, off, buf, len);
156 if (ecode != NTDB_SUCCESS) {
157 goto fail;
159 return 0;
162 /* now copy it out of this block */
163 memcpy(buf, ntdb->transaction->blocks[blk] + (off % NTDB_PGSIZE), len);
164 return NTDB_SUCCESS;
166 fail:
167 ntdb->transaction->transaction_error = 1;
168 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
169 "transaction_read: failed at off=%zu len=%zu",
170 (size_t)off, (size_t)len);
175 write while in a transaction
177 static enum NTDB_ERROR transaction_write(struct ntdb_context *ntdb, ntdb_off_t off,
178 const void *buf, ntdb_len_t len)
180 size_t blk;
181 enum NTDB_ERROR ecode;
183 /* Only a commit is allowed on a prepared transaction */
184 if (ntdb->transaction->prepared) {
185 ecode = ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
186 "transaction_write: transaction already"
187 " prepared, write not allowed");
188 goto fail;
191 /* break it up into block sized chunks */
192 while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
193 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
194 ecode = transaction_write(ntdb, off, buf, len2);
195 if (ecode != NTDB_SUCCESS) {
196 return ecode;
198 len -= len2;
199 off += len2;
200 if (buf != NULL) {
201 buf = (const void *)(len2 + (const char *)buf);
205 if (len == 0) {
206 return NTDB_SUCCESS;
209 blk = off / NTDB_PGSIZE;
210 off = off % NTDB_PGSIZE;
212 if (ntdb->transaction->num_blocks <= blk) {
213 uint8_t **new_blocks;
214 /* expand the blocks array */
215 if (ntdb->transaction->blocks == NULL) {
216 new_blocks = (uint8_t **)ntdb->alloc_fn(ntdb,
217 (blk+1)*sizeof(uint8_t *), ntdb->alloc_data);
218 } else {
219 new_blocks = (uint8_t **)ntdb->expand_fn(
220 ntdb->transaction->blocks,
221 (blk+1)*sizeof(uint8_t *), ntdb->alloc_data);
223 if (new_blocks == NULL) {
224 ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
225 "transaction_write:"
226 " failed to allocate");
227 goto fail;
229 memset(&new_blocks[ntdb->transaction->num_blocks], 0,
230 (1+(blk - ntdb->transaction->num_blocks))*sizeof(uint8_t *));
231 ntdb->transaction->blocks = new_blocks;
232 ntdb->transaction->num_blocks = blk+1;
235 /* allocate and fill a block? */
236 if (ntdb->transaction->blocks[blk] == NULL) {
237 ntdb->transaction->blocks[blk] = (uint8_t *)
238 ntdb->alloc_fn(ntdb->transaction->blocks, NTDB_PGSIZE,
239 ntdb->alloc_data);
240 if (ntdb->transaction->blocks[blk] == NULL) {
241 ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
242 "transaction_write:"
243 " failed to allocate");
244 goto fail;
246 memset(ntdb->transaction->blocks[blk], 0, NTDB_PGSIZE);
247 if (ntdb->transaction->old_map_size > blk * NTDB_PGSIZE) {
248 ntdb_len_t len2 = NTDB_PGSIZE;
249 if (len2 + (blk * NTDB_PGSIZE) > ntdb->transaction->old_map_size) {
250 len2 = ntdb->transaction->old_map_size - (blk * NTDB_PGSIZE);
252 ecode = ntdb->transaction->io_methods->tread(ntdb,
253 blk * NTDB_PGSIZE,
254 ntdb->transaction->blocks[blk],
255 len2);
256 if (ecode != NTDB_SUCCESS) {
257 ecode = ntdb_logerr(ntdb, ecode,
258 NTDB_LOG_ERROR,
259 "transaction_write:"
260 " failed to"
261 " read old block: %s",
262 strerror(errno));
263 SAFE_FREE(ntdb, ntdb->transaction->blocks[blk]);
264 goto fail;
269 /* overwrite part of an existing block */
270 if (buf == NULL) {
271 memset(ntdb->transaction->blocks[blk] + off, 0, len);
272 } else {
273 memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
275 return NTDB_SUCCESS;
277 fail:
278 ntdb->transaction->transaction_error = 1;
279 return ecode;
284 write while in a transaction - this variant never expands the transaction blocks, it only
285 updates existing blocks. This means it cannot change the recovery size
287 static void transaction_write_existing(struct ntdb_context *ntdb, ntdb_off_t off,
288 const void *buf, ntdb_len_t len)
290 size_t blk;
292 /* break it up into block sized chunks */
293 while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
294 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
295 transaction_write_existing(ntdb, off, buf, len2);
296 len -= len2;
297 off += len2;
298 if (buf != NULL) {
299 buf = (const void *)(len2 + (const char *)buf);
303 if (len == 0) {
304 return;
307 blk = off / NTDB_PGSIZE;
308 off = off % NTDB_PGSIZE;
310 if (ntdb->transaction->num_blocks <= blk ||
311 ntdb->transaction->blocks[blk] == NULL) {
312 return;
315 /* overwrite part of an existing block */
316 memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
321 out of bounds check during a transaction
323 static enum NTDB_ERROR transaction_oob(struct ntdb_context *ntdb,
324 ntdb_off_t off, ntdb_len_t len, bool probe)
326 if ((off + len >= off && off + len <= ntdb->file->map_size) || probe) {
327 return NTDB_SUCCESS;
330 ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
331 "ntdb_oob len %lld beyond transaction size %lld",
332 (long long)(off + len),
333 (long long)ntdb->file->map_size);
334 return NTDB_ERR_IO;
338 transaction version of ntdb_expand().
340 static enum NTDB_ERROR transaction_expand_file(struct ntdb_context *ntdb,
341 ntdb_off_t addition)
343 enum NTDB_ERROR ecode;
345 assert((ntdb->file->map_size + addition) % NTDB_PGSIZE == 0);
347 /* add a write to the transaction elements, so subsequent
348 reads see the zero data */
349 ecode = transaction_write(ntdb, ntdb->file->map_size, NULL, addition);
350 if (ecode == NTDB_SUCCESS) {
351 ntdb->file->map_size += addition;
353 return ecode;
356 static void *transaction_direct(struct ntdb_context *ntdb, ntdb_off_t off,
357 size_t len, bool write_mode)
359 size_t blk = off / NTDB_PGSIZE, end_blk;
361 /* This is wrong for zero-length blocks, but will fail gracefully */
362 end_blk = (off + len - 1) / NTDB_PGSIZE;
364 /* Can only do direct if in single block and we've already copied. */
365 if (write_mode) {
366 ntdb->stats.transaction_write_direct++;
367 if (blk != end_blk
368 || blk >= ntdb->transaction->num_blocks
369 || ntdb->transaction->blocks[blk] == NULL) {
370 ntdb->stats.transaction_write_direct_fail++;
371 return NULL;
373 return ntdb->transaction->blocks[blk] + off % NTDB_PGSIZE;
376 ntdb->stats.transaction_read_direct++;
377 /* Single which we have copied? */
378 if (blk == end_blk
379 && blk < ntdb->transaction->num_blocks
380 && ntdb->transaction->blocks[blk])
381 return ntdb->transaction->blocks[blk] + off % NTDB_PGSIZE;
383 /* Otherwise must be all not copied. */
384 while (blk <= end_blk) {
385 if (blk >= ntdb->transaction->num_blocks)
386 break;
387 if (ntdb->transaction->blocks[blk]) {
388 ntdb->stats.transaction_read_direct_fail++;
389 return NULL;
391 blk++;
393 return ntdb->transaction->io_methods->direct(ntdb, off, len, false);
396 static ntdb_off_t transaction_read_off(struct ntdb_context *ntdb,
397 ntdb_off_t off)
399 ntdb_off_t ret;
400 enum NTDB_ERROR ecode;
402 ecode = transaction_read(ntdb, off, &ret, sizeof(ret));
403 ntdb_convert(ntdb, &ret, sizeof(ret));
404 if (ecode != NTDB_SUCCESS) {
405 return NTDB_ERR_TO_OFF(ecode);
407 return ret;
410 static enum NTDB_ERROR transaction_write_off(struct ntdb_context *ntdb,
411 ntdb_off_t off, ntdb_off_t val)
413 ntdb_convert(ntdb, &val, sizeof(val));
414 return transaction_write(ntdb, off, &val, sizeof(val));
417 static const struct ntdb_methods transaction_methods = {
418 transaction_read,
419 transaction_write,
420 transaction_oob,
421 transaction_expand_file,
422 transaction_direct,
423 transaction_read_off,
424 transaction_write_off,
428 sync to disk
430 static enum NTDB_ERROR transaction_sync(struct ntdb_context *ntdb,
431 ntdb_off_t offset, ntdb_len_t length)
433 if (ntdb->flags & NTDB_NOSYNC) {
434 return NTDB_SUCCESS;
437 if (fsync(ntdb->file->fd) != 0) {
438 return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
439 "ntdb_transaction: fsync failed: %s",
440 strerror(errno));
442 #ifdef MS_SYNC
443 if (ntdb->file->map_ptr) {
444 ntdb_off_t moffset = offset & ~(getpagesize()-1);
445 if (msync(moffset + (char *)ntdb->file->map_ptr,
446 length + (offset - moffset), MS_SYNC) != 0) {
447 return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
448 "ntdb_transaction: msync failed: %s",
449 strerror(errno));
452 #endif
453 return NTDB_SUCCESS;
457 static void _ntdb_transaction_cancel(struct ntdb_context *ntdb)
459 int i;
460 enum NTDB_ERROR ecode;
462 if (ntdb->transaction == NULL) {
463 ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
464 "ntdb_transaction_cancel: no transaction");
465 return;
468 if (ntdb->transaction->nesting != 0) {
469 ntdb->transaction->transaction_error = 1;
470 ntdb->transaction->nesting--;
471 return;
474 ntdb->file->map_size = ntdb->transaction->old_map_size;
476 /* free all the transaction blocks */
477 for (i=0;i<ntdb->transaction->num_blocks;i++) {
478 if (ntdb->transaction->blocks[i] != NULL) {
479 ntdb->free_fn(ntdb->transaction->blocks[i],
480 ntdb->alloc_data);
483 SAFE_FREE(ntdb, ntdb->transaction->blocks);
485 if (ntdb->transaction->magic_offset) {
486 const struct ntdb_methods *methods = ntdb->transaction->io_methods;
487 uint64_t invalid = NTDB_RECOVERY_INVALID_MAGIC;
489 /* remove the recovery marker */
490 ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
491 &invalid, sizeof(invalid));
492 if (ecode == NTDB_SUCCESS)
493 ecode = transaction_sync(ntdb,
494 ntdb->transaction->magic_offset,
495 sizeof(invalid));
496 if (ecode != NTDB_SUCCESS) {
497 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
498 "ntdb_transaction_cancel: failed to remove"
499 " recovery magic");
503 if (ntdb->file->allrecord_lock.count)
504 ntdb_allrecord_unlock(ntdb, ntdb->file->allrecord_lock.ltype);
506 /* restore the normal io methods */
507 ntdb->io = ntdb->transaction->io_methods;
509 ntdb_transaction_unlock(ntdb, F_WRLCK);
511 if (ntdb_has_open_lock(ntdb))
512 ntdb_unlock_open(ntdb, F_WRLCK);
514 SAFE_FREE(ntdb, ntdb->transaction);
518 start a ntdb transaction. No token is returned, as only a single
519 transaction is allowed to be pending per ntdb_context
521 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_start(struct ntdb_context *ntdb)
523 enum NTDB_ERROR ecode;
525 ntdb->stats.transactions++;
526 /* some sanity checks */
527 if (ntdb->flags & NTDB_INTERNAL) {
528 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
529 "ntdb_transaction_start:"
530 " cannot start a transaction on an"
531 " internal ntdb");
534 if (ntdb->flags & NTDB_RDONLY) {
535 return ntdb_logerr(ntdb, NTDB_ERR_RDONLY, NTDB_LOG_USE_ERROR,
536 "ntdb_transaction_start:"
537 " cannot start a transaction on a"
538 " read-only ntdb");
541 /* cope with nested ntdb_transaction_start() calls */
542 if (ntdb->transaction != NULL) {
543 if (!(ntdb->flags & NTDB_ALLOW_NESTING)) {
544 return ntdb_logerr(ntdb, NTDB_ERR_IO,
545 NTDB_LOG_USE_ERROR,
546 "ntdb_transaction_start:"
547 " already inside transaction");
549 ntdb->transaction->nesting++;
550 ntdb->stats.transaction_nest++;
551 return 0;
554 if (ntdb_has_hash_locks(ntdb)) {
555 /* the caller must not have any locks when starting a
556 transaction as otherwise we'll be screwed by lack
557 of nested locks in POSIX */
558 return ntdb_logerr(ntdb, NTDB_ERR_LOCK,
559 NTDB_LOG_USE_ERROR,
560 "ntdb_transaction_start:"
561 " cannot start a transaction with locks"
562 " held");
565 ntdb->transaction = (struct ntdb_transaction *)
566 ntdb->alloc_fn(ntdb, sizeof(struct ntdb_transaction),
567 ntdb->alloc_data);
568 if (ntdb->transaction == NULL) {
569 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
570 "ntdb_transaction_start:"
571 " cannot allocate");
573 memset(ntdb->transaction, 0, sizeof(*ntdb->transaction));
575 /* get the transaction write lock. This is a blocking lock. As
576 discussed with Volker, there are a number of ways we could
577 make this async, which we will probably do in the future */
578 ecode = ntdb_transaction_lock(ntdb, F_WRLCK);
579 if (ecode != NTDB_SUCCESS) {
580 SAFE_FREE(ntdb, ntdb->transaction->blocks);
581 SAFE_FREE(ntdb, ntdb->transaction);
582 return ecode;
585 /* get a read lock over entire file. This is upgraded to a write
586 lock during the commit */
587 ecode = ntdb_allrecord_lock(ntdb, F_RDLCK, NTDB_LOCK_WAIT, true);
588 if (ecode != NTDB_SUCCESS) {
589 goto fail_allrecord_lock;
592 /* make sure we know about any file expansions already done by
593 anyone else */
594 ntdb_oob(ntdb, ntdb->file->map_size, 1, true);
595 ntdb->transaction->old_map_size = ntdb->file->map_size;
597 /* finally hook the io methods, replacing them with
598 transaction specific methods */
599 ntdb->transaction->io_methods = ntdb->io;
600 ntdb->io = &transaction_methods;
601 return NTDB_SUCCESS;
603 fail_allrecord_lock:
604 ntdb_transaction_unlock(ntdb, F_WRLCK);
605 SAFE_FREE(ntdb, ntdb->transaction->blocks);
606 SAFE_FREE(ntdb, ntdb->transaction);
607 return ecode;
612 cancel the current transaction
614 _PUBLIC_ void ntdb_transaction_cancel(struct ntdb_context *ntdb)
616 ntdb->stats.transaction_cancel++;
617 _ntdb_transaction_cancel(ntdb);
621 work out how much space the linearised recovery data will consume (worst case)
623 static ntdb_len_t ntdb_recovery_size(struct ntdb_context *ntdb)
625 ntdb_len_t recovery_size = 0;
626 int i;
628 recovery_size = 0;
629 for (i=0;i<ntdb->transaction->num_blocks;i++) {
630 if (i * NTDB_PGSIZE >= ntdb->transaction->old_map_size) {
631 break;
633 if (ntdb->transaction->blocks[i] == NULL) {
634 continue;
636 recovery_size += 2*sizeof(ntdb_off_t) + NTDB_PGSIZE;
639 return recovery_size;
642 static enum NTDB_ERROR ntdb_recovery_area(struct ntdb_context *ntdb,
643 const struct ntdb_methods *methods,
644 ntdb_off_t *recovery_offset,
645 struct ntdb_recovery_record *rec)
647 enum NTDB_ERROR ecode;
649 *recovery_offset = ntdb_read_off(ntdb,
650 offsetof(struct ntdb_header, recovery));
651 if (NTDB_OFF_IS_ERR(*recovery_offset)) {
652 return NTDB_OFF_TO_ERR(*recovery_offset);
655 if (*recovery_offset == 0) {
656 rec->max_len = 0;
657 return NTDB_SUCCESS;
660 ecode = methods->tread(ntdb, *recovery_offset, rec, sizeof(*rec));
661 if (ecode != NTDB_SUCCESS)
662 return ecode;
664 ntdb_convert(ntdb, rec, sizeof(*rec));
665 /* ignore invalid recovery regions: can happen in crash */
666 if (rec->magic != NTDB_RECOVERY_MAGIC &&
667 rec->magic != NTDB_RECOVERY_INVALID_MAGIC) {
668 *recovery_offset = 0;
669 rec->max_len = 0;
671 return NTDB_SUCCESS;
674 static unsigned int same(const unsigned char *new,
675 const unsigned char *old,
676 unsigned int length)
678 unsigned int i;
680 for (i = 0; i < length; i++) {
681 if (new[i] != old[i])
682 break;
684 return i;
687 static unsigned int different(const unsigned char *new,
688 const unsigned char *old,
689 unsigned int length,
690 unsigned int min_same,
691 unsigned int *samelen)
693 unsigned int i;
695 *samelen = 0;
696 for (i = 0; i < length; i++) {
697 if (new[i] == old[i]) {
698 (*samelen)++;
699 } else {
700 if (*samelen >= min_same) {
701 return i - *samelen;
703 *samelen = 0;
707 if (*samelen < min_same)
708 *samelen = 0;
709 return length - *samelen;
712 /* Allocates recovery blob, without ntdb_recovery_record at head set up. */
713 static struct ntdb_recovery_record *alloc_recovery(struct ntdb_context *ntdb,
714 ntdb_len_t *len)
716 struct ntdb_recovery_record *rec;
717 size_t i;
718 enum NTDB_ERROR ecode;
719 unsigned char *p;
720 const struct ntdb_methods *old_methods = ntdb->io;
722 rec = ntdb->alloc_fn(ntdb, sizeof(*rec) + ntdb_recovery_size(ntdb),
723 ntdb->alloc_data);
724 if (!rec) {
725 ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
726 "transaction_setup_recovery:"
727 " cannot allocate");
728 return NTDB_ERR_PTR(NTDB_ERR_OOM);
731 /* We temporarily revert to the old I/O methods, so we can use
732 * ntdb_access_read */
733 ntdb->io = ntdb->transaction->io_methods;
735 /* build the recovery data into a single blob to allow us to do a single
736 large write, which should be more efficient */
737 p = (unsigned char *)(rec + 1);
738 for (i=0;i<ntdb->transaction->num_blocks;i++) {
739 ntdb_off_t offset;
740 ntdb_len_t length;
741 unsigned int off;
742 const unsigned char *buffer;
744 if (ntdb->transaction->blocks[i] == NULL) {
745 continue;
748 offset = i * NTDB_PGSIZE;
749 length = NTDB_PGSIZE;
750 if (offset >= ntdb->transaction->old_map_size) {
751 continue;
754 if (offset + length > ntdb->file->map_size) {
755 ecode = ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
756 "ntdb_transaction_setup_recovery:"
757 " transaction data over new region"
758 " boundary");
759 goto fail;
761 buffer = ntdb_access_read(ntdb, offset, length, false);
762 if (NTDB_PTR_IS_ERR(buffer)) {
763 ecode = NTDB_PTR_ERR(buffer);
764 goto fail;
767 /* Skip over anything the same at the start. */
768 off = same(ntdb->transaction->blocks[i], buffer, length);
769 offset += off;
771 while (off < length) {
772 ntdb_len_t len1;
773 unsigned int samelen;
775 len1 = different(ntdb->transaction->blocks[i] + off,
776 buffer + off, length - off,
777 sizeof(offset) + sizeof(len1) + 1,
778 &samelen);
780 memcpy(p, &offset, sizeof(offset));
781 memcpy(p + sizeof(offset), &len1, sizeof(len1));
782 ntdb_convert(ntdb, p, sizeof(offset) + sizeof(len1));
783 p += sizeof(offset) + sizeof(len1);
784 memcpy(p, buffer + off, len1);
785 p += len1;
786 off += len1 + samelen;
787 offset += len1 + samelen;
789 ntdb_access_release(ntdb, buffer);
792 *len = p - (unsigned char *)(rec + 1);
793 ntdb->io = old_methods;
794 return rec;
796 fail:
797 ntdb->free_fn(rec, ntdb->alloc_data);
798 ntdb->io = old_methods;
799 return NTDB_ERR_PTR(ecode);
802 static ntdb_off_t create_recovery_area(struct ntdb_context *ntdb,
803 ntdb_len_t rec_length,
804 struct ntdb_recovery_record *rec)
806 ntdb_off_t off, recovery_off;
807 ntdb_len_t addition;
808 enum NTDB_ERROR ecode;
809 const struct ntdb_methods *methods = ntdb->transaction->io_methods;
811 /* round up to a multiple of page size. Overallocate, since each
812 * such allocation forces us to expand the file. */
813 rec->max_len = ntdb_expand_adjust(ntdb->file->map_size, rec_length);
815 /* Round up to a page. */
816 rec->max_len = ((sizeof(*rec) + rec->max_len + NTDB_PGSIZE-1)
817 & ~(NTDB_PGSIZE-1))
818 - sizeof(*rec);
820 off = ntdb->file->map_size;
822 /* Restore ->map_size before calling underlying expand_file.
823 Also so that we don't try to expand the file again in the
824 transaction commit, which would destroy the recovery
825 area */
826 addition = (ntdb->file->map_size - ntdb->transaction->old_map_size) +
827 sizeof(*rec) + rec->max_len;
828 ntdb->file->map_size = ntdb->transaction->old_map_size;
829 ntdb->stats.transaction_expand_file++;
830 ecode = methods->expand_file(ntdb, addition);
831 if (ecode != NTDB_SUCCESS) {
832 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
833 "ntdb_recovery_allocate:"
834 " failed to create recovery area");
835 return NTDB_ERR_TO_OFF(ecode);
838 /* we have to reset the old map size so that we don't try to
839 expand the file again in the transaction commit, which
840 would destroy the recovery area */
841 ntdb->transaction->old_map_size = ntdb->file->map_size;
843 /* write the recovery header offset and sync - we can sync without a race here
844 as the magic ptr in the recovery record has not been set */
845 recovery_off = off;
846 ntdb_convert(ntdb, &recovery_off, sizeof(recovery_off));
847 ecode = methods->twrite(ntdb, offsetof(struct ntdb_header, recovery),
848 &recovery_off, sizeof(ntdb_off_t));
849 if (ecode != NTDB_SUCCESS) {
850 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
851 "ntdb_recovery_allocate:"
852 " failed to write recovery head");
853 return NTDB_ERR_TO_OFF(ecode);
855 transaction_write_existing(ntdb, offsetof(struct ntdb_header, recovery),
856 &recovery_off,
857 sizeof(ntdb_off_t));
858 return off;
862 setup the recovery data that will be used on a crash during commit
864 static enum NTDB_ERROR transaction_setup_recovery(struct ntdb_context *ntdb)
866 ntdb_len_t recovery_size = 0;
867 ntdb_off_t recovery_off = 0;
868 ntdb_off_t old_map_size = ntdb->transaction->old_map_size;
869 struct ntdb_recovery_record *recovery;
870 const struct ntdb_methods *methods = ntdb->transaction->io_methods;
871 uint64_t magic;
872 enum NTDB_ERROR ecode;
874 recovery = alloc_recovery(ntdb, &recovery_size);
875 if (NTDB_PTR_IS_ERR(recovery))
876 return NTDB_PTR_ERR(recovery);
878 ecode = ntdb_recovery_area(ntdb, methods, &recovery_off, recovery);
879 if (ecode) {
880 ntdb->free_fn(recovery, ntdb->alloc_data);
881 return ecode;
884 if (recovery->max_len < recovery_size) {
885 /* Not large enough. Free up old recovery area. */
886 if (recovery_off) {
887 ntdb->stats.frees++;
888 ecode = add_free_record(ntdb, recovery_off,
889 sizeof(*recovery)
890 + recovery->max_len,
891 NTDB_LOCK_WAIT, true);
892 ntdb->free_fn(recovery, ntdb->alloc_data);
893 if (ecode != NTDB_SUCCESS) {
894 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
895 "ntdb_recovery_allocate:"
896 " failed to free previous"
897 " recovery area");
900 /* Refresh recovery after add_free_record above. */
901 recovery = alloc_recovery(ntdb, &recovery_size);
902 if (NTDB_PTR_IS_ERR(recovery))
903 return NTDB_PTR_ERR(recovery);
906 recovery_off = create_recovery_area(ntdb, recovery_size,
907 recovery);
908 if (NTDB_OFF_IS_ERR(recovery_off)) {
909 ntdb->free_fn(recovery, ntdb->alloc_data);
910 return NTDB_OFF_TO_ERR(recovery_off);
914 /* Now we know size, convert rec header. */
915 recovery->magic = NTDB_RECOVERY_INVALID_MAGIC;
916 recovery->len = recovery_size;
917 recovery->eof = old_map_size;
918 ntdb_convert(ntdb, recovery, sizeof(*recovery));
920 /* write the recovery data to the recovery area */
921 ecode = methods->twrite(ntdb, recovery_off, recovery,
922 sizeof(*recovery) + recovery_size);
923 if (ecode != NTDB_SUCCESS) {
924 ntdb->free_fn(recovery, ntdb->alloc_data);
925 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
926 "ntdb_transaction_setup_recovery:"
927 " failed to write recovery data");
929 transaction_write_existing(ntdb, recovery_off, recovery, recovery_size);
931 ntdb->free_fn(recovery, ntdb->alloc_data);
933 /* as we don't have ordered writes, we have to sync the recovery
934 data before we update the magic to indicate that the recovery
935 data is present */
936 ecode = transaction_sync(ntdb, recovery_off, recovery_size);
937 if (ecode != NTDB_SUCCESS)
938 return ecode;
940 magic = NTDB_RECOVERY_MAGIC;
941 ntdb_convert(ntdb, &magic, sizeof(magic));
943 ntdb->transaction->magic_offset
944 = recovery_off + offsetof(struct ntdb_recovery_record, magic);
946 ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
947 &magic, sizeof(magic));
948 if (ecode != NTDB_SUCCESS) {
949 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
950 "ntdb_transaction_setup_recovery:"
951 " failed to write recovery magic");
953 transaction_write_existing(ntdb, ntdb->transaction->magic_offset,
954 &magic, sizeof(magic));
956 /* ensure the recovery magic marker is on disk */
957 return transaction_sync(ntdb, ntdb->transaction->magic_offset,
958 sizeof(magic));
961 static enum NTDB_ERROR _ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
963 const struct ntdb_methods *methods;
964 enum NTDB_ERROR ecode;
966 if (ntdb->transaction == NULL) {
967 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
968 "ntdb_transaction_prepare_commit:"
969 " no transaction");
972 if (ntdb->transaction->prepared) {
973 _ntdb_transaction_cancel(ntdb);
974 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
975 "ntdb_transaction_prepare_commit:"
976 " transaction already prepared");
979 if (ntdb->transaction->transaction_error) {
980 _ntdb_transaction_cancel(ntdb);
981 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
982 "ntdb_transaction_prepare_commit:"
983 " transaction error pending");
987 if (ntdb->transaction->nesting != 0) {
988 return NTDB_SUCCESS;
991 /* check for a null transaction */
992 if (ntdb->transaction->blocks == NULL) {
993 return NTDB_SUCCESS;
996 methods = ntdb->transaction->io_methods;
998 /* upgrade the main transaction lock region to a write lock */
999 ecode = ntdb_allrecord_upgrade(ntdb, NTDB_HASH_LOCK_START);
1000 if (ecode != NTDB_SUCCESS) {
1001 return ecode;
1004 /* get the open lock - this prevents new users attaching to the database
1005 during the commit */
1006 ecode = ntdb_lock_open(ntdb, F_WRLCK, NTDB_LOCK_WAIT|NTDB_LOCK_NOCHECK);
1007 if (ecode != NTDB_SUCCESS) {
1008 return ecode;
1011 /* Sets up ntdb->transaction->recovery and
1012 * ntdb->transaction->magic_offset. */
1013 ecode = transaction_setup_recovery(ntdb);
1014 if (ecode != NTDB_SUCCESS) {
1015 return ecode;
1018 ntdb->transaction->prepared = true;
1020 /* expand the file to the new size if needed */
1021 if (ntdb->file->map_size != ntdb->transaction->old_map_size) {
1022 ntdb_len_t add;
1024 add = ntdb->file->map_size - ntdb->transaction->old_map_size;
1025 /* Restore original map size for ntdb_expand_file */
1026 ntdb->file->map_size = ntdb->transaction->old_map_size;
1027 ecode = methods->expand_file(ntdb, add);
1028 if (ecode != NTDB_SUCCESS) {
1029 return ecode;
1033 /* Keep the open lock until the actual commit */
1034 return NTDB_SUCCESS;
1038 prepare to commit the current transaction
1040 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
1042 return _ntdb_transaction_prepare_commit(ntdb);
1046 commit the current transaction
1048 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_commit(struct ntdb_context *ntdb)
1050 const struct ntdb_methods *methods;
1051 int i;
1052 enum NTDB_ERROR ecode;
1054 if (ntdb->transaction == NULL) {
1055 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
1056 "ntdb_transaction_commit:"
1057 " no transaction");
1060 ntdb_trace(ntdb, "ntdb_transaction_commit");
1062 if (ntdb->transaction->nesting != 0) {
1063 ntdb->transaction->nesting--;
1064 return NTDB_SUCCESS;
1067 /* check for a null transaction */
1068 if (ntdb->transaction->blocks == NULL) {
1069 _ntdb_transaction_cancel(ntdb);
1070 return NTDB_SUCCESS;
1073 if (!ntdb->transaction->prepared) {
1074 ecode = _ntdb_transaction_prepare_commit(ntdb);
1075 if (ecode != NTDB_SUCCESS) {
1076 _ntdb_transaction_cancel(ntdb);
1077 return ecode;
1081 methods = ntdb->transaction->io_methods;
1083 /* perform all the writes */
1084 for (i=0;i<ntdb->transaction->num_blocks;i++) {
1085 ntdb_off_t offset;
1086 ntdb_len_t length;
1088 if (ntdb->transaction->blocks[i] == NULL) {
1089 continue;
1092 offset = i * NTDB_PGSIZE;
1093 length = NTDB_PGSIZE;
1095 ecode = methods->twrite(ntdb, offset,
1096 ntdb->transaction->blocks[i], length);
1097 if (ecode != NTDB_SUCCESS) {
1098 /* we've overwritten part of the data and
1099 possibly expanded the file, so we need to
1100 run the crash recovery code */
1101 ntdb->io = methods;
1102 ntdb_transaction_recover(ntdb);
1104 _ntdb_transaction_cancel(ntdb);
1106 return ecode;
1108 SAFE_FREE(ntdb, ntdb->transaction->blocks[i]);
1111 SAFE_FREE(ntdb, ntdb->transaction->blocks);
1112 ntdb->transaction->num_blocks = 0;
1114 /* ensure the new data is on disk */
1115 ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1116 if (ecode != NTDB_SUCCESS) {
1117 return ecode;
1121 TODO: maybe write to some dummy hdr field, or write to magic
1122 offset without mmap, before the last sync, instead of the
1123 utime() call
1126 /* on some systems (like Linux 2.6.x) changes via mmap/msync
1127 don't change the mtime of the file, this means the file may
1128 not be backed up (as ntdb rounding to block sizes means that
1129 file size changes are quite rare too). The following forces
1130 mtime changes when a transaction completes */
1131 #if HAVE_UTIME
1132 utime(ntdb->name, NULL);
1133 #endif
1135 /* use a transaction cancel to free memory and remove the
1136 transaction locks: it "restores" map_size, too. */
1137 ntdb->transaction->old_map_size = ntdb->file->map_size;
1138 _ntdb_transaction_cancel(ntdb);
1140 return NTDB_SUCCESS;
1145 recover from an aborted transaction. Must be called with exclusive
1146 database write access already established (including the open
1147 lock to prevent new processes attaching)
1149 enum NTDB_ERROR ntdb_transaction_recover(struct ntdb_context *ntdb)
1151 ntdb_off_t recovery_head, recovery_eof;
1152 unsigned char *data, *p;
1153 struct ntdb_recovery_record rec;
1154 enum NTDB_ERROR ecode;
1156 /* find the recovery area */
1157 recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1158 if (NTDB_OFF_IS_ERR(recovery_head)) {
1159 ecode = NTDB_OFF_TO_ERR(recovery_head);
1160 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1161 "ntdb_transaction_recover:"
1162 " failed to read recovery head");
1165 if (recovery_head == 0) {
1166 /* we have never allocated a recovery record */
1167 return NTDB_SUCCESS;
1170 /* read the recovery record */
1171 ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1172 if (ecode != NTDB_SUCCESS) {
1173 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1174 "ntdb_transaction_recover:"
1175 " failed to read recovery record");
1178 if (rec.magic != NTDB_RECOVERY_MAGIC) {
1179 /* there is no valid recovery data */
1180 return NTDB_SUCCESS;
1183 if (ntdb->flags & NTDB_RDONLY) {
1184 return ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
1185 "ntdb_transaction_recover:"
1186 " attempt to recover read only database");
1189 recovery_eof = rec.eof;
1191 data = (unsigned char *)ntdb->alloc_fn(ntdb, rec.len, ntdb->alloc_data);
1192 if (data == NULL) {
1193 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
1194 "ntdb_transaction_recover:"
1195 " failed to allocate recovery data");
1198 /* read the full recovery data */
1199 ecode = ntdb->io->tread(ntdb, recovery_head + sizeof(rec), data,
1200 rec.len);
1201 if (ecode != NTDB_SUCCESS) {
1202 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1203 "ntdb_transaction_recover:"
1204 " failed to read recovery data");
1207 /* recover the file data */
1208 p = data;
1209 while (p+sizeof(ntdb_off_t)+sizeof(ntdb_len_t) < data + rec.len) {
1210 ntdb_off_t ofs;
1211 ntdb_len_t len;
1212 ntdb_convert(ntdb, p, sizeof(ofs) + sizeof(len));
1213 memcpy(&ofs, p, sizeof(ofs));
1214 memcpy(&len, p + sizeof(ofs), sizeof(len));
1215 p += sizeof(ofs) + sizeof(len);
1217 ecode = ntdb->io->twrite(ntdb, ofs, p, len);
1218 if (ecode != NTDB_SUCCESS) {
1219 ntdb->free_fn(data, ntdb->alloc_data);
1220 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1221 "ntdb_transaction_recover:"
1222 " failed to recover %zu bytes"
1223 " at offset %zu",
1224 (size_t)len, (size_t)ofs);
1226 p += len;
1229 ntdb->free_fn(data, ntdb->alloc_data);
1231 ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1232 if (ecode != NTDB_SUCCESS) {
1233 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1234 "ntdb_transaction_recover:"
1235 " failed to sync recovery");
1238 /* if the recovery area is after the recovered eof then remove it */
1239 if (recovery_eof <= recovery_head) {
1240 ecode = ntdb_write_off(ntdb, offsetof(struct ntdb_header,
1241 recovery),
1243 if (ecode != NTDB_SUCCESS) {
1244 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1245 "ntdb_transaction_recover:"
1246 " failed to remove recovery head");
1250 /* remove the recovery magic */
1251 ecode = ntdb_write_off(ntdb,
1252 recovery_head
1253 + offsetof(struct ntdb_recovery_record, magic),
1254 NTDB_RECOVERY_INVALID_MAGIC);
1255 if (ecode != NTDB_SUCCESS) {
1256 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1257 "ntdb_transaction_recover:"
1258 " failed to remove recovery magic");
1261 ecode = transaction_sync(ntdb, 0, recovery_eof);
1262 if (ecode != NTDB_SUCCESS) {
1263 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1264 "ntdb_transaction_recover:"
1265 " failed to sync2 recovery");
1268 ntdb_logerr(ntdb, NTDB_SUCCESS, NTDB_LOG_WARNING,
1269 "ntdb_transaction_recover: recovered %zu byte database",
1270 (size_t)recovery_eof);
1272 /* all done */
1273 return NTDB_SUCCESS;
1276 ntdb_bool_err ntdb_needs_recovery(struct ntdb_context *ntdb)
1278 ntdb_off_t recovery_head;
1279 struct ntdb_recovery_record rec;
1280 enum NTDB_ERROR ecode;
1282 /* find the recovery area */
1283 recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1284 if (NTDB_OFF_IS_ERR(recovery_head)) {
1285 return recovery_head;
1288 if (recovery_head == 0) {
1289 /* we have never allocated a recovery record */
1290 return false;
1293 /* read the recovery record */
1294 ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1295 if (ecode != NTDB_SUCCESS) {
1296 return NTDB_ERR_TO_OFF(ecode);
1299 return (rec.magic == NTDB_RECOVERY_MAGIC);