Some doc path fixes from Anders
[pkg-k5-afs_openafs.git] / src / ubik / disk.c
blob926c825e07a67d4471fe7d4ceb3fcb4e354b7237
1 /*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
10 #include <afsconfig.h>
11 #include <afs/param.h>
13 #include <roken.h>
14 #include <afs/opr.h>
16 #ifdef AFS_PTHREAD_ENV
17 # include <opr/lock.h>
18 #else
19 # include <opr/lockstub.h>
20 #endif
22 #define UBIK_INTERNALS
23 #include "ubik.h"
24 #include "ubik_int.h"
26 #define PHSIZE 128
27 static struct buffer {
28 struct ubik_dbase *dbase; /*!< dbase within which the buffer resides */
29 afs_int32 file; /*!< Unique cache key */
30 afs_int32 page; /*!< page number */
31 struct buffer *lru_next;
32 struct buffer *lru_prev;
33 struct buffer *hashNext; /*!< next dude in hash table */
34 char *data; /*!< ptr to the data */
35 char lockers; /*!< usage ref count */
36 char dirty; /*!< is buffer modified */
37 char hashIndex; /*!< back ptr to hash table */
38 } *Buffers;
40 #define pHash(page) ((page) & (PHSIZE-1))
42 afs_int32 ubik_nBuffers = NBUFFERS;
43 static struct buffer *phTable[PHSIZE]; /*!< page hash table */
44 static struct buffer *LruBuffer;
45 static int nbuffers;
46 static int calls = 0, ios = 0, lastb = 0;
47 static char *BufferData;
48 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
49 afs_int32 apage);
50 #define BADFID 0xffffffff
52 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
54 static struct ubik_trunc *freeTruncList = 0;
56 /*!
57 * \brief Remove a transaction from the database's active transaction list. Don't free it.
59 static int
60 unthread(struct ubik_trans *atrans)
62 struct ubik_trans **lt, *tt;
63 lt = &atrans->dbase->activeTrans;
64 for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
65 if (tt == atrans) {
66 /* found it */
67 *lt = tt->next;
68 return 0;
71 return 2; /* no entry */
74 /*!
75 * \brief some debugging assistance
77 void
78 udisk_Debug(struct ubik_debug *aparm)
80 struct buffer *tb;
81 int i;
83 memcpy(&aparm->localVersion, &ubik_dbase->version,
84 sizeof(struct ubik_version));
85 aparm->lockedPages = 0;
86 aparm->writeLockedPages = 0;
87 tb = Buffers;
88 for (i = 0; i < nbuffers; i++, tb++) {
89 if (tb->lockers) {
90 aparm->lockedPages++;
91 if (tb->dirty)
92 aparm->writeLockedPages++;
97 /*!
98 * \brief Write an opcode to the log.
100 * log format is defined here, and implicitly in recovery.c
102 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
103 * are in logged in network standard byte order, in case we want to move logs
104 * from machine-to-machine someday.
106 * Begin transaction: opcode \n
107 * Commit transaction: opcode, version (8 bytes) \n
108 * Truncate file: opcode, file number, length \n
109 * Abort transaction: opcode \n
110 * Write data: opcode, file, position, length, <length> data bytes \n
113 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
115 struct ubik_stat ustat;
116 afs_int32 code;
118 /* figure out where to write */
119 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
120 if (code < 0)
121 return code;
123 /* setup data and do write */
124 aopcode = htonl(aopcode);
125 code =
126 (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
127 sizeof(afs_int32));
128 if (code != sizeof(afs_int32))
129 return UIOERROR;
131 /* optionally sync data */
132 if (async)
133 code = (*adbase->sync) (adbase, LOGFILE);
134 else
135 code = 0;
136 return code;
140 * \brief Log a commit, never syncing.
143 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
145 afs_int32 code;
146 afs_int32 data[3];
147 struct ubik_stat ustat;
149 /* figure out where to write */
150 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
151 if (code)
152 return code;
154 /* setup data */
155 data[0] = htonl(LOGEND);
156 data[1] = htonl(aversion->epoch);
157 data[2] = htonl(aversion->counter);
159 /* do write */
160 code =
161 (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
162 3 * sizeof(afs_int32));
163 if (code != 3 * sizeof(afs_int32))
164 return UIOERROR;
166 /* finally sync the log */
167 code = (*adbase->sync) (adbase, LOGFILE);
168 return code;
172 * \brief Log a truncate operation, never syncing.
175 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
176 afs_int32 alength)
178 afs_int32 code;
179 afs_int32 data[3];
180 struct ubik_stat ustat;
182 /* figure out where to write */
183 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
184 if (code < 0)
185 return code;
187 /* setup data */
188 data[0] = htonl(LOGTRUNCATE);
189 data[1] = htonl(afile);
190 data[2] = htonl(alength);
192 /* do write */
193 code =
194 (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
195 3 * sizeof(afs_int32));
196 if (code != 3 * sizeof(afs_int32))
197 return UIOERROR;
198 return 0;
202 * \brief Write some data to the log, never syncing.
205 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
206 afs_int32 apos, afs_int32 alen)
208 struct ubik_stat ustat;
209 afs_int32 code;
210 afs_int32 data[4];
211 afs_int32 lpos;
213 /* find end of log */
214 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
215 lpos = ustat.size;
216 if (code < 0)
217 return code;
219 /* setup header */
220 data[0] = htonl(LOGDATA);
221 data[1] = htonl(afile);
222 data[2] = htonl(apos);
223 data[3] = htonl(alen);
225 /* write header */
226 code =
227 (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
228 if (code != 4 * sizeof(afs_int32))
229 return UIOERROR;
230 lpos += 4 * sizeof(afs_int32);
232 /* write data */
233 code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
234 if (code != alen)
235 return UIOERROR;
236 return 0;
240 udisk_Init(int abuffers)
242 /* Initialize the venus buffer system. */
243 int i;
244 struct buffer *tb;
245 Buffers = calloc(abuffers, sizeof(struct buffer));
246 BufferData = malloc(abuffers * UBIK_PAGESIZE);
247 nbuffers = abuffers;
248 for (i = 0; i < PHSIZE; i++)
249 phTable[i] = 0;
250 for (i = 0; i < abuffers; i++) {
251 /* Fill in each buffer with an empty indication. */
252 tb = &Buffers[i];
253 tb->lru_next = &(Buffers[i + 1]);
254 tb->lru_prev = &(Buffers[i - 1]);
255 tb->data = &BufferData[UBIK_PAGESIZE * i];
256 tb->file = BADFID;
258 Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
259 Buffers[abuffers - 1].lru_next = &(Buffers[0]);
260 LruBuffer = &(Buffers[0]);
261 return 0;
265 * \brief Take a buffer and mark it as the least recently used buffer.
267 static void
268 Dlru(struct buffer *abuf)
270 if (LruBuffer == abuf)
271 return;
273 /* Unthread from where it is in the list */
274 abuf->lru_next->lru_prev = abuf->lru_prev;
275 abuf->lru_prev->lru_next = abuf->lru_next;
277 /* Thread onto beginning of LRU list */
278 abuf->lru_next = LruBuffer;
279 abuf->lru_prev = LruBuffer->lru_prev;
281 LruBuffer->lru_prev->lru_next = abuf;
282 LruBuffer->lru_prev = abuf;
283 LruBuffer = abuf;
287 * \brief Take a buffer and mark it as the most recently used buffer.
289 static void
290 Dmru(struct buffer *abuf)
292 if (LruBuffer == abuf) {
293 LruBuffer = LruBuffer->lru_next;
294 return;
297 /* Unthread from where it is in the list */
298 abuf->lru_next->lru_prev = abuf->lru_prev;
299 abuf->lru_prev->lru_next = abuf->lru_next;
301 /* Thread onto end of LRU list - making it the MRU buffer */
302 abuf->lru_next = LruBuffer;
303 abuf->lru_prev = LruBuffer->lru_prev;
304 LruBuffer->lru_prev->lru_next = abuf;
305 LruBuffer->lru_prev = abuf;
308 static_inline int
309 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
310 struct ubik_trans *atrans)
312 if (buf->page != page) {
313 return 0;
315 if (buf->file != fid) {
316 return 0;
318 if (atrans->type == UBIK_READTRANS && buf->dirty) {
319 /* if 'buf' is dirty, it has uncommitted changes; we do not want to
320 * see uncommitted changes if we are a read transaction, so skip over
321 * it. */
322 return 0;
324 if (buf->dbase != atrans->dbase) {
325 return 0;
327 return 1;
331 * \brief Get a pointer to a particular buffer.
333 static char *
334 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
336 /* Read a page from the disk. */
337 struct buffer *tb, *lastbuffer, *found_tb = NULL;
338 afs_int32 code;
339 struct ubik_dbase *dbase = atrans->dbase;
341 calls++;
342 lastbuffer = LruBuffer->lru_prev;
344 /* Skip for write transactions for a clean page - this may not be the right page to use */
345 if (MatchBuffer(lastbuffer, page, fid, atrans)
346 && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
347 tb = lastbuffer;
348 tb->lockers++;
349 lastb++;
350 return tb->data;
352 for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
353 if (MatchBuffer(tb, page, fid, atrans)) {
354 if (tb->dirty || atrans->type == UBIK_READTRANS) {
355 found_tb = tb;
356 break;
358 /* Remember this clean page - we might use it */
359 found_tb = tb;
362 /* For a write transaction, use a matching clean page if no dirty one was found */
363 if (found_tb) {
364 Dmru(found_tb);
365 found_tb->lockers++;
366 return found_tb->data;
369 /* can't find it */
370 tb = newslot(dbase, fid, page);
371 if (!tb)
372 return 0;
373 memset(tb->data, 0, UBIK_PAGESIZE);
375 tb->lockers++;
376 code =
377 (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
378 UBIK_PAGESIZE);
379 if (code < 0) {
380 tb->file = BADFID;
381 Dlru(tb);
382 tb->lockers--;
383 ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
384 return 0;
386 ios++;
388 /* Note that findslot sets the page field in the buffer equal to
389 * what it is searching for.
391 return tb->data;
395 * \brief Zap truncated pages.
397 static int
398 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
400 afs_int32 maxPage;
401 struct buffer *tb;
402 int i;
403 struct ubik_dbase *dbase = atrans->dbase;
405 maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
406 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
407 if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
408 tb->file = BADFID;
409 Dlru(tb);
412 return 0;
416 * \brief Allocate a truncation entry.
418 * We allocate special entries representing truncations, rather than
419 * performing them immediately, so that we can abort a transaction easily by simply purging
420 * the in-core memory buffers and discarding these truncation entries.
422 static struct ubik_trunc *
423 GetTrunc(void)
425 struct ubik_trunc *tt;
426 if (!freeTruncList) {
427 freeTruncList = malloc(sizeof(struct ubik_trunc));
428 freeTruncList->next = (struct ubik_trunc *)0;
430 tt = freeTruncList;
431 freeTruncList = tt->next;
432 return tt;
436 * \brief Free a truncation entry.
438 static int
439 PutTrunc(struct ubik_trunc *at)
441 at->next = freeTruncList;
442 freeTruncList = at;
443 return 0;
447 * \brief Find a truncation entry for a file, if any.
449 static struct ubik_trunc *
450 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
452 struct ubik_trunc *tt;
453 for (tt = atrans->activeTruncs; tt; tt = tt->next) {
454 if (tt->file == afile)
455 return tt;
457 return (struct ubik_trunc *)0;
461 * \brief Do truncates associated with \p atrans, and free them.
463 static int
464 DoTruncs(struct ubik_trans *atrans)
466 struct ubik_trunc *tt, *nt;
467 int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
468 afs_int32 rcode = 0, code;
470 tproc = atrans->dbase->truncate;
471 for (tt = atrans->activeTruncs; tt; tt = nt) {
472 nt = tt->next;
473 DTrunc(atrans, tt->file, tt->length); /* zap pages from buffer cache */
474 code = (*tproc) (atrans->dbase, tt->file, tt->length);
475 if (code)
476 rcode = code;
477 PutTrunc(tt);
479 /* don't unthread, because we do the entire list's worth here */
480 atrans->activeTruncs = (struct ubik_trunc *)0;
481 return (rcode);
485 * \brief Mark an \p fid as invalid.
488 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
490 struct buffer *tb;
491 int i;
493 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
494 if (tb->file == afid) {
495 tb->file = BADFID;
496 Dlru(tb);
499 return 0;
503 * \brief Move this page into the correct hash bucket.
505 static int
506 FixupBucket(struct buffer *ap)
508 struct buffer **lp, *tp;
509 int i;
510 /* first try to get it out of its current hash bucket, in which it might not be */
511 i = ap->hashIndex;
512 lp = &phTable[i];
513 for (tp = *lp; tp; tp = tp->hashNext) {
514 if (tp == ap) {
515 *lp = tp->hashNext;
516 break;
518 lp = &tp->hashNext;
520 /* now figure the new hash bucket */
521 i = pHash(ap->page);
522 ap->hashIndex = i; /* remember where we are for deletion */
523 ap->hashNext = phTable[i]; /* add us to the list */
524 phTable[i] = ap;
525 return 0;
529 * \brief Create a new slot for a particular dbase page.
531 static struct buffer *
532 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
534 /* Find a usable buffer slot */
535 afs_int32 i;
536 struct buffer *pp, *tp;
538 pp = 0; /* last pure */
539 for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
540 if (!tp->lockers && !tp->dirty) {
541 pp = tp;
542 break;
546 if (pp == 0) {
547 /* There are no unlocked buffers that don't need to be written to the disk. */
548 ubik_print
549 ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
550 return NULL;
553 /* Now fill in the header. */
554 pp->dbase = adbase;
555 pp->file = afid;
556 pp->page = apage;
558 FixupBucket(pp); /* move to the right hash bucket */
559 Dmru(pp);
560 return pp;
564 * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
566 static void
567 DRelease(char *ap, int flag)
569 int index;
570 struct buffer *bp;
572 if (!ap)
573 return;
574 index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
575 bp = &(Buffers[index]);
576 bp->lockers--;
577 if (flag)
578 bp->dirty = 1;
579 return;
583 * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
584 * by DSync()).
586 * \note Note interaction with DSync(): you call this thing first,
587 * writing the buffers to the disk. Then you call DSync() to sync all the
588 * files that were written, and to clear the dirty bits. You should
589 * always call DFlush/DSync as a pair.
591 static int
592 DFlush(struct ubik_trans *atrans)
594 int i;
595 afs_int32 code;
596 struct buffer *tb;
597 struct ubik_dbase *adbase = atrans->dbase;
599 tb = Buffers;
600 for (i = 0; i < nbuffers; i++, tb++) {
601 if (tb->dirty) {
602 code = tb->page * UBIK_PAGESIZE; /* offset within file */
603 code =
604 (*adbase->write) (adbase, tb->file, tb->data, code,
605 UBIK_PAGESIZE);
606 if (code != UBIK_PAGESIZE)
607 return UIOERROR;
610 return 0;
614 * \brief Flush all modified buffers.
616 static int
617 DAbort(struct ubik_trans *atrans)
619 int i;
620 struct buffer *tb;
622 tb = Buffers;
623 for (i = 0; i < nbuffers; i++, tb++) {
624 if (tb->dirty) {
625 tb->dirty = 0;
626 tb->file = BADFID;
627 Dlru(tb);
630 return 0;
634 * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
635 * can appear if a read transaction reads a page that is dirty, then that
636 * dirty page is synced. The read transaction will skip over the dirty page,
637 * and create a new buffer, and when the dirty page is synced, it will be
638 * identical (except for contents) to the read-transaction buffer.
640 static void
641 DedupBuffer(struct buffer *abuf)
643 struct buffer *tb;
644 for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
645 if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
646 && tb->dbase == abuf->dbase) {
648 tb->file = BADFID;
649 Dlru(tb);
655 * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
657 static int
658 DSync(struct ubik_trans *atrans)
660 int i;
661 afs_int32 code;
662 struct buffer *tb;
663 afs_int32 file;
664 afs_int32 rCode;
665 struct ubik_dbase *adbase = atrans->dbase;
667 rCode = 0;
668 while (1) {
669 file = BADFID;
670 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
671 if (tb->dirty == 1) {
672 if (file == BADFID)
673 file = tb->file;
674 if (file != BADFID && tb->file == file) {
675 tb->dirty = 0;
676 DedupBuffer(tb);
680 if (file == BADFID)
681 break;
682 /* otherwise we have a file to sync */
683 code = (*adbase->sync) (adbase, file);
684 if (code)
685 rCode = code;
687 return rCode;
691 * \brief Same as DRead(), only do not even try to read the page.
693 static char *
694 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
696 struct buffer *tb;
697 struct ubik_dbase *dbase = atrans->dbase;
699 if ((tb = newslot(dbase, fid, page)) == 0)
700 return NULL;
701 tb->lockers++;
702 memset(tb->data, 0, UBIK_PAGESIZE);
703 return tb->data;
707 * \brief Read data from database.
710 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
711 afs_int32 apos, afs_int32 alen)
713 char *bp;
714 afs_int32 offset, len, totalLen;
716 if (atrans->flags & TRDONE)
717 return UDONE;
718 totalLen = 0;
719 while (alen > 0) {
720 bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
721 if (!bp)
722 return UEOF;
723 /* otherwise, min of remaining bytes and end of buffer to user mode */
724 offset = apos & (UBIK_PAGESIZE - 1);
725 len = UBIK_PAGESIZE - offset;
726 if (len > alen)
727 len = alen;
728 memcpy(abuffer, bp + offset, len);
729 abuffer = (char *)abuffer + len;
730 apos += len;
731 alen -= len;
732 totalLen += len;
733 DRelease(bp, 0);
735 return 0;
739 * \brief Truncate file.
742 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
744 afs_int32 code;
745 struct ubik_trunc *tt;
747 if (atrans->flags & TRDONE)
748 return UDONE;
749 if (atrans->type != UBIK_WRITETRANS)
750 return UBADTYPE;
752 /* write a truncate log record */
753 code = udisk_LogTruncate(atrans->dbase, afile, alength);
755 /* don't truncate until commit time */
756 tt = FindTrunc(atrans, afile);
757 if (!tt) {
758 /* this file not truncated yet */
759 tt = GetTrunc();
760 tt->next = atrans->activeTruncs;
761 atrans->activeTruncs = tt;
762 tt->file = afile;
763 tt->length = alength;
764 } else {
765 /* already truncated to a certain length */
766 if (tt->length > alength)
767 tt->length = alength;
769 return code;
773 * \brief Write data to database, using logs.
776 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
777 afs_int32 apos, afs_int32 alen)
779 char *bp;
780 afs_int32 offset, len, totalLen;
781 struct ubik_trunc *tt;
782 afs_int32 code;
784 if (atrans->flags & TRDONE)
785 return UDONE;
786 if (atrans->type != UBIK_WRITETRANS)
787 return UBADTYPE;
789 /* first write the data to the log */
790 code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
791 if (code)
792 return code;
794 /* expand any truncations of this file */
795 tt = FindTrunc(atrans, afile);
796 if (tt) {
797 if (tt->length < apos + alen) {
798 tt->length = apos + alen;
802 /* now update vm */
803 totalLen = 0;
804 while (alen > 0) {
805 bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
806 if (!bp) {
807 bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
808 if (!bp)
809 return UIOERROR;
810 memset(bp, 0, UBIK_PAGESIZE);
812 /* otherwise, min of remaining bytes and end of buffer to user mode */
813 offset = apos & (UBIK_PAGESIZE - 1);
814 len = UBIK_PAGESIZE - offset;
815 if (len > alen)
816 len = alen;
817 memcpy(bp + offset, abuffer, len);
818 abuffer = (char *)abuffer + len;
819 apos += len;
820 alen -= len;
821 totalLen += len;
822 DRelease(bp, 1); /* buffer modified */
824 return 0;
828 * \brief Begin a new local transaction.
831 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
833 afs_int32 code;
834 struct ubik_trans *tt;
836 *atrans = NULL;
837 if (atype == UBIK_WRITETRANS) {
838 if (adbase->flags & DBWRITING)
839 return USYNC;
840 code = udisk_LogOpcode(adbase, LOGNEW, 0);
841 if (code)
842 return code;
844 tt = calloc(1, sizeof(struct ubik_trans));
845 tt->dbase = adbase;
846 tt->next = adbase->activeTrans;
847 adbase->activeTrans = tt;
848 tt->type = atype;
849 if (atype == UBIK_READTRANS)
850 adbase->readers++;
851 else if (atype == UBIK_WRITETRANS) {
852 UBIK_VERSION_LOCK;
853 adbase->flags |= DBWRITING;
854 UBIK_VERSION_UNLOCK;
856 *atrans = tt;
857 return 0;
861 * \brief Commit transaction.
864 udisk_commit(struct ubik_trans *atrans)
866 struct ubik_dbase *dbase;
867 afs_int32 code = 0;
868 struct ubik_version oldversion, newversion;
870 if (atrans->flags & TRDONE)
871 return (UTWOENDS);
873 if (atrans->type == UBIK_WRITETRANS) {
874 dbase = atrans->dbase;
876 /* On the first write to the database. We update the versions */
877 if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
878 UBIK_VERSION_LOCK;
879 oldversion = dbase->version;
880 newversion.epoch = version_globals.ubik_epochTime;
881 newversion.counter = 1;
883 code = (*dbase->setlabel) (dbase, 0, &newversion);
884 if (code) {
885 UBIK_VERSION_UNLOCK;
886 return code;
889 dbase->version = newversion;
890 UBIK_VERSION_UNLOCK;
892 urecovery_state |= UBIK_RECLABELDB;
894 /* Ignore the error here. If the call fails, the site is
895 * marked down and when we detect it is up again, we will
896 * send the entire database to it.
898 ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
899 &oldversion, &newversion);
902 UBIK_VERSION_LOCK;
903 dbase->version.counter++; /* bump commit count */
904 #ifdef AFS_PTHREAD_ENV
905 opr_cv_broadcast(&dbase->version_cond);
906 #else
907 LWP_NoYieldSignal(&dbase->version);
908 #endif
909 code = udisk_LogEnd(dbase, &dbase->version);
910 if (code) {
911 dbase->version.counter--;
912 UBIK_VERSION_UNLOCK;
913 return code;
915 UBIK_VERSION_UNLOCK;
917 /* If we fail anytime after this, then panic and let the
918 * recovery replay the log.
920 code = DFlush(atrans); /* write dirty pages to respective files */
921 if (code)
922 panic("Writing Ubik DB modifications\n");
923 code = DSync(atrans); /* sync the files and mark pages not dirty */
924 if (code)
925 panic("Synchronizing Ubik DB modifications\n");
927 code = DoTruncs(atrans); /* Perform requested truncations */
928 if (code)
929 panic("Truncating Ubik DB\n");
931 /* label the committed dbase */
932 code = (*dbase->setlabel) (dbase, 0, &dbase->version);
933 if (code)
934 panic("Truncating Ubik DB\n");
936 code = (*dbase->truncate) (dbase, LOGFILE, 0); /* discard log (optional) */
937 if (code)
938 panic("Truncating Ubik logfile\n");
942 /* When the transaction is marked done, it also means the logfile
943 * has been truncated.
945 atrans->flags |= TRDONE;
946 return code;
950 * \brief Abort transaction.
953 udisk_abort(struct ubik_trans *atrans)
955 struct ubik_dbase *dbase;
956 afs_int32 code;
958 if (atrans->flags & TRDONE)
959 return UTWOENDS;
961 /* Check if we are the write trans before logging abort, lest we
962 * abort a good write trans in progress.
963 * We don't really care if the LOGABORT gets to the log because we
964 * truncate the log next. If the truncate fails, we panic; for
965 * otherwise, the log entries remain. On restart, replay of the log
966 * will do nothing because the abort is there or no LogEnd opcode.
968 dbase = atrans->dbase;
969 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
970 udisk_LogOpcode(dbase, LOGABORT, 1);
971 code = (*dbase->truncate) (dbase, LOGFILE, 0);
972 if (code)
973 panic("Truncating Ubik logfile during an abort\n");
974 DAbort(atrans); /* remove all dirty pages */
977 /* When the transaction is marked done, it also means the logfile
978 * has been truncated.
980 atrans->flags |= (TRABORT | TRDONE);
981 return 0;
985 * \brief Destroy a transaction after it has been committed or aborted.
987 * If it hasn't committed before you call this routine, we'll abort the
988 * transaction for you.
991 udisk_end(struct ubik_trans *atrans)
993 struct ubik_dbase *dbase;
995 if (!(atrans->flags & TRDONE))
996 udisk_abort(atrans);
997 dbase = atrans->dbase;
999 ulock_relLock(atrans);
1000 unthread(atrans);
1002 /* check if we are the write trans before unsetting the DBWRITING bit, else
1003 * we could be unsetting someone else's bit.
1005 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
1006 UBIK_VERSION_LOCK;
1007 dbase->flags &= ~DBWRITING;
1008 UBIK_VERSION_UNLOCK;
1009 } else {
1010 dbase->readers--;
1012 if (atrans->iovec_info.iovec_wrt_val)
1013 free(atrans->iovec_info.iovec_wrt_val);
1014 if (atrans->iovec_data.iovec_buf_val)
1015 free(atrans->iovec_data.iovec_buf_val);
1016 free(atrans);
1018 /* Wakeup any writers waiting in BeginTrans() */
1019 #ifdef AFS_PTHREAD_ENV
1020 opr_cv_broadcast(&dbase->flags_cond);
1021 #else
1022 LWP_NoYieldSignal(&dbase->flags);
1023 #endif
1024 return 0;