1 /* glass_table.cc: Btree implementation
3 * Copyright 1999,2000,2001 BrightStation PLC
4 * Copyright 2002 Ananova Ltd
5 * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
6 * Copyright 2008 Lemur Consulting Ltd
8 * This program is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU General Public License as
10 * published by the Free Software Foundation; either version 2 of the
11 * License, or (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
26 #include "glass_table.h"
28 #include <xapian/error.h>
30 #include "safeerrno.h"
33 #include "posixy_wrapper.h"
35 #include "stringutils.h" // For STRINGIZE().
37 #include <sys/types.h>
39 #include <cstring> /* for memmove */
40 #include <climits> /* for CHAR_BIT */
42 #include "glass_freelist.h"
43 #include "glass_changes.h"
44 #include "glass_cursor.h"
45 #include "glass_defs.h"
46 #include "glass_version.h"
49 #include "errno_to_string.h"
50 #include "filetests.h"
53 #include "wordaccess.h"
55 #include <algorithm> // for std::min()
58 #include "xapian/constants.h"
60 using namespace Glass
;
63 //#define BTREE_DEBUG_FULL 1
64 #undef BTREE_DEBUG_FULL
66 #ifdef BTREE_DEBUG_FULL
67 /*------debugging aids from here--------*/
69 static void print_key(const byte
* p
, int c
, int j
);
70 static void print_tag(const byte
* p
, int c
, int j
);
73 static void report_cursor(int N, Btree * B, Glass::Cursor * C)
77 for (i = 0; i <= B->level; i++)
78 printf("p=%d, c=%d, n=[%d], rewrite=%d\n",
79 C[i].p, C[i].c, C[i].n, C[i].rewrite);
83 /*------to here--------*/
84 #endif /* BTREE_DEBUG_FULL */
86 static inline byte
*zeroed_new(size_t size
)
88 byte
*temp
= new byte
[size
];
89 memset(temp
, 0, size
);
93 /* A B-tree consists of a file with extension GLASS_TABLE_EXTENSION divided
94 into a sequence of equal sized blocks, numbered 0, 1, 2 ... some of which
95 are free, some in use. Those in use are arranged in a tree. Lists of
96 currently unused blocks are stored in a freelist which is itself stored in
99 Some "root info" is needed to locate a particular revision of the B-tree
100 - this is stored in the "version file" (we store this data for all tables
101 at a particular revision together).
103 Each block, b, has a structure like this:
105 R L M T D o1 o2 o3 ... oN <gap> [item] .. [item] .. [item] ...
106 <---------- D ----------> <-M->
110 R = REVISION(b) is the revision number the B-tree had when the block was
111 written into the DB file.
112 L = GET_LEVEL(b) is the level of the block, which is the number of levels
113 towards the root of the B-tree structure. So leaf blocks
114 have level 0 and the one root block has the highest level
115 equal to the number of levels in the B-tree. For blocks
116 storing the freelist, the level is set to 254.
117 M = MAX_FREE(b) is the size of the gap between the end of the directory and
118 the first item of data. (It is not necessarily the maximum
119 size among the bits of space that are free, but I can't
120 think of a better name.)
121 T = TOTAL_FREE(b)is the total amount of free space left in b.
122 D = DIR_END(b) gives the offset to the end of the directory.
124 o1, o2 ... oN are a directory of offsets to the N items held in the block.
125 The items are key-tag pairs, and as they occur in the directory are ordered
128 An item has this form:
134 A long tag presented through the API is split up into C pieces small enough
135 to be accommodated in the blocks of the B-tree. The key is extended to
136 include a counter, x, which runs from 1 to C. The key is preceded by a
137 length, K, and the whole item with a length, I, as depicted above. The
138 upper bits of I encode a flag indicating if this item is compressed, and a
139 flag saying if this is the last piece of a tag (i.e. if x == C).
141 Here are the corresponding definitions:
145 /** Flip to sequential addition block-splitting after this number of observed
146 * sequential additions (in negated form). */
147 #define SEQ_START_POINT (-10)
149 /* Note use of the limits.h values:
150 UCHAR_MAX = 255, an unsigned with all bits set, and
151 CHAR_BIT = 8, the number of bits per byte
153 BYTE_PAIR_RANGE below is the smallest +ve number that can't be held in two
154 bytes -- 64K effectively.
157 #define BYTE_PAIR_RANGE (1 << 2 * CHAR_BIT)
159 /// read_block(n, p) reads block n of the DB file to address p.
161 GlassTable::read_block(uint4 n
, byte
* p
) const
163 // Log the value of p, not the contents of the block it points to...
164 LOGCALL_VOID(DB
, "GlassTable::read_block", n
| (void*)p
);
165 if (rare(handle
== -2))
166 GlassTable::throw_database_closed();
167 AssertRel(n
,<,free_list
.get_first_unused_block());
169 io_read_block(handle
, reinterpret_cast<char *>(p
), block_size
, n
, offset
);
171 if (GET_LEVEL(p
) != LEVEL_FREELIST
) {
172 int dir_end
= DIR_END(p
);
173 if (rare(dir_end
< DIR_START
|| unsigned(dir_end
) > block_size
)) {
174 string
msg("dir_end invalid in block ");
176 throw Xapian::DatabaseCorruptError(msg
);
181 /** write_block(n, p, appending) writes block n in the DB file from address p.
183 * If appending is true (not specified it defaults to false), then this
184 * indicates that we've added data to a block in space which was previously
185 * unused, and are writing the block back in place - we use this to add
186 * free list entries (the information about where the freelist data for a
187 * revision begins and ends is stored in the "iamglass" file). We don't
188 * currently use this flag for much, but it signifies that we don't risk
189 * invalidating any existing revisions, which may be useful information.
192 GlassTable::write_block(uint4 n
, const byte
* p
, bool appending
) const
194 LOGCALL_VOID(DB
, "GlassTable::write_block", n
| p
| appending
);
196 /* Check that n is in range. */
197 AssertRel(n
,<,free_list
.get_first_unused_block());
199 /* don't write to non-free */
200 // FIXME: We can no longer check this easily.
201 // AssertParanoid(free_list.block_free_at_start(block_size, n));
203 /* write revision is okay */
204 AssertEqParanoid(REVISION(p
), revision_number
+ 1);
207 // In the case of the freelist, new entries can safely be written into
208 // the space at the end of the latest freelist block, as that's unused
209 // by previous revisions. It is also safe to write into a block which
210 // wasn't used in the previous revision.
212 // It's only when we start a new freelist block that we need to worry
213 // about invalidating old revisions.
214 } else if (flags
& Xapian::DB_DANGEROUS
) {
215 // FIXME: We should somehow flag this to prevent readers opening the
216 // database. Removing "iamglass" or setting a flag in it doesn't deal
217 // with any existing readers though. Perhaps we want to have readers
218 // read lock and try to take an exclusive lock here?
221 const char * p_char
= reinterpret_cast<const char *>(p
);
222 io_write_block(handle
, p_char
, block_size
, n
, offset
);
224 if (!changes_obj
) return;
227 // FIXME: track table_type in this class?
228 if (strcmp(tablename
, "position") == 0) {
229 v
= int(Glass::POSITION
);
230 } else if (strcmp(tablename
, "postlist") == 0) {
231 v
= int(Glass::POSTLIST
);
232 } else if (strcmp(tablename
, "docdata") == 0) {
233 v
= int(Glass::DOCDATA
);
234 } else if (strcmp(tablename
, "spelling") == 0) {
235 v
= int(Glass::SPELLING
);
236 } else if (strcmp(tablename
, "synonym") == 0) {
237 v
= int(Glass::SYNONYM
);
238 } else if (strcmp(tablename
, "termlist") == 0) {
239 v
= int(Glass::TERMLIST
);
244 if (block_size
== 2048) {
246 } else if (block_size
== 4096) {
248 } else if (block_size
== 8192) {
250 } else if (block_size
== 16384) {
252 } else if (block_size
== 32768) {
254 } else if (block_size
== 65536) {
262 // Write the block number to the file
265 changes_obj
->write_block(buf
);
266 changes_obj
->write_block(reinterpret_cast<const char *>(p
), block_size
);
269 /* A note on cursors:
271 Each B-tree level has a corresponding array element C[j] in a
272 cursor, C. C[0] is the leaf (or data) level, and C[B->level] is the
273 root block level. Within a level j,
275 C[j].p addresses the block
276 C[j].c is the offset into the directory entry in the block
277 C[j].n is the number of the block at C[j].p
279 A look up in the B-tree causes navigation of the blocks starting
280 from the root. In each block, p, we find an offset, c, to an item
281 which gives the number, n, of the block for the next level. This
282 leads to an array of values p,c,n which are held inside the cursor.
284 Any Btree object B has a built-in cursor, at B->C. But other cursors may
285 be created. If BC is a created cursor, BC->C is the cursor in the
286 sense given above, and BC->B is the handle for the B-tree again.
291 GlassTable::set_overwritten() const
293 LOGCALL_VOID(DB
, "GlassTable::set_overwritten", NO_ARGS
);
294 // If we're writable, there shouldn't be another writer who could cause
295 // overwritten to be flagged, so that's a DatabaseCorruptError.
297 throw Xapian::DatabaseCorruptError("Db block overwritten - are there multiple writers?");
298 throw Xapian::DatabaseModifiedError("The revision being read has been discarded - you should call Xapian::Database::reopen() and retry the operation");
301 /* block_to_cursor(C, j, n) puts block n into position C[j] of cursor
302 C, writing the block currently at C[j] back to disk if necessary.
307 is true iff C[j].n is different from block n in file DB. If it is
308 false no rewriting is necessary.
312 GlassTable::block_to_cursor(Glass::Cursor
* C_
, int j
, uint4 n
) const
314 LOGCALL_VOID(DB
, "GlassTable::block_to_cursor", (void*)C_
| j
| n
);
315 if (n
== C_
[j
].get_n()) return;
317 if (writable
&& C_
[j
].rewrite
) {
319 write_block(C_
[j
].get_n(), C_
[j
].get_p());
320 C_
[j
].rewrite
= false;
323 // Check if the block is in the built-in cursor (potentially in
326 if (n
== C
[j
].get_n()) {
327 p
= C_
[j
].clone(C
[j
]);
329 byte
* q
= C_
[j
].init(block_size
);
336 /* unsigned comparison */
337 if (rare(REVISION(p
) > REVISION(C_
[j
+ 1].get_p()))) {
343 if (rare(j
!= GET_LEVEL(p
))) {
344 string msg
= "Expected block ";
346 msg
+= " to be level ";
349 msg
+= str(GET_LEVEL(p
));
350 throw Xapian::DatabaseCorruptError(msg
);
354 /** Btree::alter(); is called when the B-tree is to be altered.
356 It causes new blocks to be forced for the current set of blocks in
359 The point is that if a block at level 0 is to be altered it may get
360 a new number. Then the pointer to this block from level 1 will need
361 changing. So the block at level 1 needs altering and may get a new
362 block number. Then the pointer to this block from level 2 will need
363 changing ... and so on back to the root.
365 The clever bit here is spotting the cases when we can make an early
366 exit from this process. If C[j].rewrite is true, C[j+k].rewrite
367 will be true for k = 1,2 ... We have been through all this before,
368 and there is no need to do it again. If C[j].n was free at the
369 start of the transaction, we can copy it back to the same place
370 without violating the integrity of the B-tree. We don't then need a
371 new n and can return. The corresponding C[j].rewrite may be true or
378 LOGCALL_VOID(DB
, "GlassTable::alter", NO_ARGS
);
380 if (flags
& Xapian::DB_DANGEROUS
) {
386 if (C
[j
].rewrite
) return; /* all new, so return */
389 glass_revision_number_t rev
= REVISION(C
[j
].get_p());
390 if (rev
== revision_number
+ 1) {
393 Assert(rev
< revision_number
+ 1);
394 uint4 n
= C
[j
].get_n();
395 free_list
.mark_block_unused(this, block_size
, n
);
396 SET_REVISION(C
[j
].get_modifiable_p(block_size
), revision_number
+ 1);
397 n
= free_list
.get_block(this, block_size
);
400 if (j
== level
) return;
402 BItem_wr(C
[j
].get_modifiable_p(block_size
), C
[j
].c
).set_block_given_by(n
);
406 /** find_in_leaf(p, key, c, exact) searches for the key in the leaf block at p.
408 What we get is the directory entry to the last key <= the key being searched
411 The lookup is by binary chop, with i and j set to the left and
412 right ends of the search area. In sequential addition, c will often
413 be the answer, so we test the keys round c and move i and j towards
416 exact is set to true if the match was exact (otherwise exact is unchanged).
420 GlassTable::find_in_leaf(const byte
* p
, LeafItem item
, int c
, bool& exact
)
422 LOGCALL_STATIC(DB
, int, "GlassTable::find_in_leaf", (const void*)p
| (const void *)item
.get_address() | c
| Literal("bool&"));
423 // c should be odd (either -1, or an even offset from DIR_START).
424 Assert((c
& 1) == 1);
433 if (c
< j
&& i
< c
) {
434 int r
= compare(LeafItem(p
, c
), item
);
442 if (c
< j
&& i
< c
) {
443 int r
= compare(item
, LeafItem(p
, c
));
453 int k
= i
+ ((j
- i
) / (D2
* 2)) * D2
; /* mid way */
454 int r
= compare(item
, LeafItem(p
, k
));
465 AssertRel(DIR_START
- D2
,<=,i
);
466 AssertRel(i
,<,DIR_END(p
));
470 template<typename ITEM
> int
471 find_in_branch_(const byte
* p
, ITEM item
, int c
)
473 // c should be odd (either -1, or an even offset from DIR_START).
474 Assert((c
& 1) == 1);
482 if (c
< j
&& i
< c
) {
483 int r
= compare(BItem(p
, c
), item
);
484 if (r
== 0) return c
;
488 if (c
< j
&& i
< c
) {
489 int r
= compare(item
, BItem(p
, c
));
490 if (r
== 0) return c
;
496 int k
= i
+ ((j
- i
) / (D2
* 2)) * D2
; /* mid way */
497 int r
= compare(item
, BItem(p
, k
));
505 AssertRel(DIR_START
,<=,i
);
506 AssertRel(i
,<,DIR_END(p
));
511 GlassTable::find_in_branch(const byte
* p
, LeafItem item
, int c
)
513 LOGCALL_STATIC(DB
, int, "GlassTable::find_in_branch", (const void*)p
| (const void *)item
.get_address() | c
);
514 RETURN(find_in_branch_(p
, item
, c
));
518 GlassTable::find_in_branch(const byte
* p
, BItem item
, int c
)
520 LOGCALL_STATIC(DB
, int, "GlassTable::find_in_branch", (const void*)p
| (const void *)item
.get_address() | c
);
521 RETURN(find_in_branch_(p
, item
, c
));
524 /** find(C_) searches for the key of B->kt in the B-tree.
526 Result is true if found, false otherwise. When false, the B_tree
527 cursor is positioned at the last key in the B-tree <= the search
528 key. Goes to first (null) item in B-tree when key length == 0.
532 GlassTable::find(Glass::Cursor
* C_
) const
534 LOGCALL(DB
, bool, "GlassTable::find", (void*)C_
);
535 // Note: the parameter is needed when we're called by GlassCursor
538 for (int j
= level
; j
> 0; --j
) {
540 c
= find_in_branch(p
, kt
, C_
[j
].c
);
541 #ifdef BTREE_DEBUG_FULL
542 printf("Block in GlassTable:find - code position 1");
543 report_block_full(j
, C_
[j
].get_n(), p
);
544 #endif /* BTREE_DEBUG_FULL */
546 block_to_cursor(C_
, j
- 1, BItem(p
, c
).block_given_by());
550 c
= find_in_leaf(p
, kt
, C_
[0].c
, exact
);
551 #ifdef BTREE_DEBUG_FULL
552 printf("Block in GlassTable:find - code position 2");
553 report_block_full(0, C_
[0].get_n(), p
);
554 #endif /* BTREE_DEBUG_FULL */
559 /** compact(p) compact the block at p by shuffling all the items up to the end.
561 MAX_FREE(p) is then maximized, and is equal to TOTAL_FREE(p).
565 GlassTable::compact(byte
* p
)
567 LOGCALL_VOID(DB
, "GlassTable::compact", (void*)p
);
572 int dir_end
= DIR_END(p
);
573 if (GET_LEVEL(p
) == 0) {
575 for (int c
= DIR_START
; c
< dir_end
; c
+= D2
) {
579 memcpy(b
+ e
, item
.get_address(), l
);
580 LeafItem_wr::setD(p
, c
, e
); /* reform in b */
584 for (int c
= DIR_START
; c
< dir_end
; c
+= D2
) {
588 memcpy(b
+ e
, item
.get_address(), l
);
589 BItem_wr::setD(p
, c
, e
); /* reform in b */
592 memcpy(p
+ e
, b
+ e
, block_size
- e
); /* copy back */
594 SET_TOTAL_FREE(p
, e
);
598 /** Btree needs to gain a new level to insert more items: so split root block
599 * and construct a new one.
602 GlassTable::split_root(uint4 split_n
)
604 LOGCALL_VOID(DB
, "GlassTable::split_root", split_n
);
608 /* check level overflow - this isn't something that should ever happen
609 * but deserves more than an Assert()... */
610 if (level
== BTREE_CURSOR_LEVELS
) {
611 throw Xapian::DatabaseCorruptError("Btree has grown impossibly large (" STRINGIZE(BTREE_CURSOR_LEVELS
) " levels)");
614 byte
* q
= C
[level
].init(block_size
);
615 memset(q
, 0, block_size
);
616 C
[level
].c
= DIR_START
;
617 C
[level
].set_n(free_list
.get_block(this, block_size
));
618 C
[level
].rewrite
= true;
619 SET_REVISION(q
, revision_number
+ 1);
621 SET_DIR_END(q
, DIR_START
);
622 compact(q
); /* to reset TOTAL_FREE, MAX_FREE */
624 /* form a null key in b with a pointer to the old root */
625 byte b
[10]; /* 7 is exact */
627 item
.form_null_key(split_n
);
628 add_branch_item(item
, level
);
631 /** enter_key_above_leaf(previtem, newitem) is called after a leaf block split.
633 It enters in the block at level C[1] a separating key for the block
634 at level C[0]. The key itself is newitem.key(). previtem is the
635 preceding item, and at level 1 newitem.key() can be trimmed down to the
636 first point of difference to previtem.key() for entry in C[j].
638 This code looks longer than it really is. If j exceeds the number
639 of B-tree levels the root block has split and we have to construct
640 a new one, but this is a rare event.
642 The key is constructed in b, with block number C[0].n as tag,
643 and this is added in with add_item. add_item may itself cause a
644 block split, with a further call to enter_key. Hence the recursion.
647 GlassTable::enter_key_above_leaf(LeafItem previtem
, LeafItem newitem
)
649 LOGCALL_VOID(DB
, "GlassTable::enter_key_above_leaf", Literal("previtem") | Literal("newitem"));
651 Assert(compare(previtem
, newitem
) < 0);
653 Key prevkey
= previtem
.key();
654 Key newkey
= newitem
.key();
655 int new_comp
= newitem
.component_of();
657 uint4 blocknumber
= C
[0].get_n();
659 // FIXME update to use Key
660 // Keys are truncated here: but don't truncate the count at the end away.
661 const int newkey_len
= newkey
.length();
662 AssertRel(newkey_len
,>,0);
664 // Truncate the key to the minimal key which differs from prevkey,
665 // the preceding key in the block.
667 const int min_len
= min(newkey_len
, prevkey
.length());
668 while (i
< min_len
&& prevkey
[i
] == newkey
[i
]) {
672 // Want one byte of difference.
673 if (i
< newkey_len
) i
++;
675 // Enough space for a branch item with maximum length key.
676 byte b
[BYTES_PER_BLOCK_NUMBER
+ K1
+ 255 + X2
];
678 AssertRel(i
, <=, 255);
679 item
.set_truncated_key_and_block(newkey
, new_comp
, i
, blocknumber
);
681 // The split block gets inserted into the parent after the pointer to the
683 AssertEq(C
[1].c
, find_in_branch(C
[1].get_p(), item
, C
[1].c
));
685 C
[1].rewrite
= true; /* a subtle point: this *is* required. */
686 add_branch_item(item
, 1);
689 /** enter_key_above_branch(j, newkey) is called after a branch block split.
691 It enters in the block at level C[j] a separating key for the block
692 at level C[j - 1]. The key itself is newkey.
694 This code looks longer than it really is. If j exceeds the number
695 of B-tree levels the root block has split and we have to construct
696 a new one, but this is a rare event.
698 The key is constructed in b, with block number C[j - 1].n as tag,
699 and this is added in with add_item. add_item may itself cause a
700 block split, with a further call to enter_key. Hence the recursion.
703 GlassTable::enter_key_above_branch(int j
, BItem newitem
)
705 LOGCALL_VOID(DB
, "GlassTable::enter_key_above_branch", j
| Literal("newitem"));
709 /* Can't truncate between branch levels, since the separated keys
710 * are in at the leaf level, and truncating again will change the
714 uint4 blocknumber
= C
[j
- 1].get_n();
716 // Enough space for a branch item with maximum length key.
717 byte b
[BYTES_PER_BLOCK_NUMBER
+ K1
+ 255 + X2
];
719 item
.set_key_and_block(newitem
.key(), blocknumber
);
721 // The split block gets inserted into the parent after the pointer to the
723 AssertEq(C
[j
].c
, find_in_branch(C
[j
].get_p(), item
, C
[j
].c
));
725 C
[j
].rewrite
= true; /* a subtle point: this *is* required. */
726 add_branch_item(item
, j
);
729 /** mid_point(p) finds the directory entry in c that determines the
730 approximate mid point of the data in the block at p.
734 GlassTable::mid_point(byte
* p
)
736 LOGCALL(DB
, int, "GlassTable::mid_point", (void*)p
);
738 int dir_end
= DIR_END(p
);
739 int size
= block_size
- TOTAL_FREE(p
) - dir_end
;
740 for (int c
= DIR_START
; c
< dir_end
; c
+= D2
) {
742 if (GET_LEVEL(p
) == 0) {
743 l
= LeafItem(p
, c
).size();
745 l
= BItem(p
, c
).size();
749 if (l
< n
- size
) RETURN(c
);
754 /* This shouldn't happen, as the sum of the item sizes should be the same
755 * as the value calculated in size, so assert but return a sane value just
761 /** add_item_to_leaf(p, kt_, c) adds item kt_ to the leaf block at p.
763 c is the offset in the directory that needs to be expanded to accommodate
764 the new entry for the item. We know before this is called that there is
765 enough contiguous room for the item in the block, so it's just a matter of
766 shuffling up any directory entries after where we're inserting and copying
771 GlassTable::add_item_to_leaf(byte
* p
, LeafItem kt_
, int c
)
773 LOGCALL_VOID(DB
, "GlassTable::add_item_to_leaf", (void*)p
| Literal("kt_") | c
);
775 int dir_end
= DIR_END(p
);
776 int kt_len
= kt_
.size();
777 int needed
= kt_len
+ D2
;
778 int new_total
= TOTAL_FREE(p
) - needed
;
779 int new_max
= MAX_FREE(p
) - needed
;
781 Assert(new_total
>= 0);
783 AssertRel(MAX_FREE(p
),>=,needed
);
785 AssertRel(DIR_START
,<=,c
);
786 AssertRel(c
,<=,dir_end
);
788 memmove(p
+ c
+ D2
, p
+ c
, dir_end
- c
);
790 SET_DIR_END(p
, dir_end
);
792 int o
= dir_end
+ new_max
;
793 LeafItem_wr::setD(p
, c
, o
);
794 memmove(p
+ o
, kt_
.get_address(), kt_len
);
796 SET_MAX_FREE(p
, new_max
);
798 SET_TOTAL_FREE(p
, new_total
);
801 /** add_item_to_branch(p, kt_, c) adds item kt_ to the branch block at p.
803 c is the offset in the directory that needs to be expanded to accommodate
804 the new entry for the item. We know before this is called that there is
805 enough contiguous room for the item in the block, so it's just a matter of
806 shuffling up any directory entries after where we're inserting and copying
811 GlassTable::add_item_to_branch(byte
* p
, BItem kt_
, int c
)
813 LOGCALL_VOID(DB
, "GlassTable::add_item_to_branch", (void*)p
| Literal("kt_") | c
);
815 int dir_end
= DIR_END(p
);
816 int kt_len
= kt_
.size();
817 int needed
= kt_len
+ D2
;
818 int new_total
= TOTAL_FREE(p
) - needed
;
819 int new_max
= MAX_FREE(p
) - needed
;
821 Assert(new_total
>= 0);
823 AssertRel(MAX_FREE(p
),>=,needed
);
825 AssertRel(DIR_START
,<=,c
);
826 AssertRel(c
,<=,dir_end
);
828 memmove(p
+ c
+ D2
, p
+ c
, dir_end
- c
);
830 SET_DIR_END(p
, dir_end
);
832 int o
= dir_end
+ new_max
;
833 BItem_wr::setD(p
, c
, o
);
834 memmove(p
+ o
, kt_
.get_address(), kt_len
);
836 SET_MAX_FREE(p
, new_max
);
838 SET_TOTAL_FREE(p
, new_total
);
841 /** GlassTable::add_leaf_item(kt_) adds item kt_ to the leaf block.
843 * If there is not enough room the block splits and the item is then
844 * added to the appropriate half.
847 GlassTable::add_leaf_item(LeafItem kt_
)
849 LOGCALL_VOID(DB
, "GlassTable::add_leaf_item", Literal("kt_"));
851 byte
* p
= C
[0].get_modifiable_p(block_size
);
855 int needed
= kt_
.size() + D2
;
856 if (TOTAL_FREE(p
) < needed
) {
858 // Prepare to split p. After splitting, the block is in two halves, the
859 // lower half is split_p, the upper half p again. add_to_upper_half
860 // becomes true when the item gets added to p, false when it gets added
864 // If we're not in sequential mode, we split at the mid point
868 // During sequential addition, split at the insert point
869 AssertRel(c
,>=,DIR_START
);
873 uint4 split_n
= C
[0].get_n();
874 C
[0].set_n(free_list
.get_block(this, block_size
));
876 memcpy(split_p
, p
, block_size
); // replicate the whole block in split_p
877 SET_DIR_END(split_p
, m
);
878 compact(split_p
); /* to reset TOTAL_FREE, MAX_FREE */
881 int residue
= DIR_END(p
) - m
;
882 int new_dir_end
= DIR_START
+ residue
;
883 memmove(p
+ DIR_START
, p
+ m
, residue
);
884 SET_DIR_END(p
, new_dir_end
);
887 compact(p
); /* to reset TOTAL_FREE, MAX_FREE */
889 bool add_to_upper_half
;
891 add_to_upper_half
= (c
>= m
);
893 // And add item to lower half if split_p has room, otherwise upper
895 add_to_upper_half
= (TOTAL_FREE(split_p
) < needed
);
898 if (add_to_upper_half
) {
899 c
-= (m
- DIR_START
);
900 Assert(seq_count
< 0 || c
<= DIR_START
+ D2
);
901 Assert(c
>= DIR_START
);
902 Assert(c
<= DIR_END(p
));
903 add_item_to_leaf(p
, kt_
, c
);
906 Assert(c
>= DIR_START
);
907 Assert(c
<= DIR_END(split_p
));
908 add_item_to_leaf(split_p
, kt_
, c
);
911 write_block(split_n
, split_p
);
913 // Check if we're splitting the root block.
914 if (0 == level
) split_root(split_n
);
916 /* Enter a separating key at level 1 between */
917 /* the last key of block split_p, and the first key of block p */
918 enter_key_above_leaf(LeafItem(split_p
, DIR_END(split_p
) - D2
),
919 LeafItem(p
, DIR_START
));
921 AssertRel(TOTAL_FREE(p
),>=,needed
);
923 if (MAX_FREE(p
) < needed
) {
925 AssertRel(MAX_FREE(p
),>=,needed
);
928 add_item_to_leaf(p
, kt_
, c
);
936 /** GlassTable::add_item(kt_, j) adds item kt_ to the block at cursor level C[j].
938 * If there is not enough room the block splits and the item is then
939 * added to the appropriate half.
942 GlassTable::add_branch_item(BItem kt_
, int j
)
944 LOGCALL_VOID(DB
, "GlassTable::add_branch_item", Literal("kt_") | j
);
946 byte
* p
= C
[j
].get_modifiable_p(block_size
);
949 int needed
= kt_
.size() + D2
;
950 if (TOTAL_FREE(p
) < needed
) {
952 // Prepare to split p. After splitting, the block is in two halves, the
953 // lower half is split_p, the upper half p again. add_to_upper_half
954 // becomes true when the item gets added to p, false when it gets added
958 // If we're not in sequential mode, we split at the mid point
962 // During sequential addition, split at the insert point
963 AssertRel(c
,>=,DIR_START
);
967 uint4 split_n
= C
[j
].get_n();
968 C
[j
].set_n(free_list
.get_block(this, block_size
));
970 memcpy(split_p
, p
, block_size
); // replicate the whole block in split_p
971 SET_DIR_END(split_p
, m
);
972 compact(split_p
); /* to reset TOTAL_FREE, MAX_FREE */
975 int residue
= DIR_END(p
) - m
;
976 int new_dir_end
= DIR_START
+ residue
;
977 memmove(p
+ DIR_START
, p
+ m
, residue
);
978 SET_DIR_END(p
, new_dir_end
);
981 compact(p
); /* to reset TOTAL_FREE, MAX_FREE */
983 bool add_to_upper_half
;
985 add_to_upper_half
= (c
>= m
);
987 // And add item to lower half if split_p has room, otherwise upper
989 add_to_upper_half
= (TOTAL_FREE(split_p
) < needed
);
992 if (add_to_upper_half
) {
993 c
-= (m
- DIR_START
);
994 Assert(seq_count
< 0 || c
<= DIR_START
+ D2
);
995 Assert(c
>= DIR_START
);
996 Assert(c
<= DIR_END(p
));
997 add_item_to_branch(p
, kt_
, c
);
999 Assert(c
>= DIR_START
);
1000 Assert(c
<= DIR_END(split_p
));
1001 add_item_to_branch(split_p
, kt_
, c
);
1003 write_block(split_n
, split_p
);
1005 // Check if we're splitting the root block.
1006 if (j
== level
) split_root(split_n
);
1008 /* Enter a separating key at level j + 1 between */
1009 /* the last key of block split_p, and the first key of block p */
1010 enter_key_above_branch(j
+ 1, BItem(p
, DIR_START
));
1012 // In branch levels, we can make the first key of block p null and
1013 // save a bit of disk space. Other redundant keys will still creep
1015 BItem_wr
item(p
, DIR_START
);
1016 int new_total_free
= TOTAL_FREE(p
) + item
.key().length();
1017 item
.form_null_key(item
.block_given_by());
1018 SET_TOTAL_FREE(p
, new_total_free
);
1020 AssertRel(TOTAL_FREE(p
),>=,needed
);
1022 if (MAX_FREE(p
) < needed
) {
1024 AssertRel(MAX_FREE(p
),>=,needed
);
1027 add_item_to_branch(p
, kt_
, c
);
1031 /** GlassTable::delete_leaf_item(repeatedly) is (almost) the converse of add_leaf_item.
1033 * If repeatedly is true, the process repeats at the next level when a
1034 * block has been completely emptied, freeing the block and taking out
1035 * the pointer to it. Emptied root blocks are also removed, which
1036 * reduces the number of levels in the B-tree.
1039 GlassTable::delete_leaf_item(bool repeatedly
)
1041 LOGCALL_VOID(DB
, "GlassTable::delete_leaf_item", repeatedly
);
1043 byte
* p
= C
[0].get_modifiable_p(block_size
);
1045 AssertRel(DIR_START
,<=,c
);
1046 AssertRel(c
,<,DIR_END(p
));
1047 int kt_len
= LeafItem(p
, c
).size(); /* size of the item to be deleted */
1048 int dir_end
= DIR_END(p
) - D2
; /* directory length will go down by 2 bytes */
1050 memmove(p
+ c
, p
+ c
+ D2
, dir_end
- c
);
1051 SET_DIR_END(p
, dir_end
);
1052 SET_MAX_FREE(p
, MAX_FREE(p
) + D2
);
1053 SET_TOTAL_FREE(p
, TOTAL_FREE(p
) + kt_len
+ D2
);
1055 if (!repeatedly
) return;
1057 if (dir_end
== DIR_START
) {
1058 free_list
.mark_block_unused(this, block_size
, C
[0].get_n());
1059 C
[0].rewrite
= false;
1060 C
[0].set_n(BLK_UNUSED
);
1061 C
[1].rewrite
= true; /* *is* necessary */
1062 delete_branch_item(1);
1067 /** GlassTable::delete_branch_item(j, repeatedly) is (almost) the converse of add_branch_item.
1069 * The process repeats at the next level when a block has been completely
1070 * emptied, freeing the block and taking out the pointer to it. Emptied root
1071 * blocks are also removed, which reduces the number of levels in the B-tree.
1074 GlassTable::delete_branch_item(int j
)
1076 LOGCALL_VOID(DB
, "GlassTable::delete_branch_item", j
);
1078 byte
* p
= C
[j
].get_modifiable_p(block_size
);
1080 AssertRel(DIR_START
,<=,c
);
1081 AssertRel(c
,<,DIR_END(p
));
1082 int kt_len
= BItem(p
, c
).size(); /* size of the item to be deleted */
1083 int dir_end
= DIR_END(p
) - D2
; /* directory length will go down by 2 bytes */
1085 memmove(p
+ c
, p
+ c
+ D2
, dir_end
- c
);
1086 SET_DIR_END(p
, dir_end
);
1087 SET_MAX_FREE(p
, MAX_FREE(p
) + D2
);
1088 SET_TOTAL_FREE(p
, TOTAL_FREE(p
) + kt_len
+ D2
);
1091 if (dir_end
== DIR_START
) {
1092 free_list
.mark_block_unused(this, block_size
, C
[j
].get_n());
1093 C
[j
].rewrite
= false;
1094 C
[j
].set_n(BLK_UNUSED
);
1095 C
[j
+ 1].rewrite
= true; /* *is* necessary */
1096 delete_branch_item(j
+ 1);
1100 while (dir_end
== DIR_START
+ D2
&& level
> 0) {
1101 /* single item in the root block, so lose a level */
1102 uint4 new_root
= BItem(C
[level
].get_p(), DIR_START
).block_given_by();
1103 free_list
.mark_block_unused(this, block_size
, C
[level
].get_n());
1107 block_to_cursor(C
, level
, new_root
);
1109 dir_end
= DIR_END(C
[level
].get_p()); /* prepare for the loop */
1115 static addcount = 0;
1118 /** add_kt(found) adds the item (key-tag pair) at B->kt into the
1119 B-tree, using cursor C.
1121 found == find() is handed over as a parameter from Btree::add.
1122 Btree::alter() prepares for the alteration to the B-tree. Then
1123 there are a number of cases to consider:
1125 If an item with the same key is in the B-tree (found is true),
1126 the new kt replaces it.
1128 If then kt is smaller, or the same size as, the item it replaces,
1129 kt is put in the same place as the item it replaces, and the
1130 TOTAL_FREE measure is reduced.
1132 If kt is larger than the item it replaces it is put in the
1133 MAX_FREE space if there is room, and the directory entry and
1134 space counts are adjusted accordingly.
1136 - But if there is not room we do it the long way: the old item is
1137 deleted with delete_leaf_item and kt is added in with add_item.
1139 If the key of kt is not in the B-tree (found is false), the new
1140 kt is added in with add_item.
1145 2 : replaced kt and it was the final one
1149 GlassTable::add_kt(bool found
)
1151 LOGCALL(DB
, int, "GlassTable::add_kt", found
);
1156 printf("%d) %s ", addcount++, (found ? "replacing" : "adding"));
1157 print_bytes(kt[I2], kt + I2 + K1); putchar('\n');
1163 if (found
) { /* replacement */
1164 seq_count
= SEQ_START_POINT
;
1167 byte
* p
= C
[0].get_modifiable_p(block_size
);
1169 AssertRel(DIR_START
,<=,c
);
1170 AssertRel(c
,<,DIR_END(p
));
1171 LeafItem
item(p
, c
);
1172 int kt_size
= kt
.size();
1173 int needed
= kt_size
- item
.size();
1175 result
= item
.last_component() ? 2 : 1;
1178 /* simple replacement */
1179 memmove(const_cast<byte
*>(item
.get_address()),
1180 kt
.get_address(), kt_size
);
1181 SET_TOTAL_FREE(p
, TOTAL_FREE(p
) - needed
);
1183 /* new item into the block's freespace */
1184 int new_max
= MAX_FREE(p
) - kt_size
;
1186 int o
= DIR_END(p
) + new_max
;
1187 memmove(p
+ o
, kt
.get_address(), kt_size
);
1188 LeafItem_wr::setD(p
, c
, o
);
1189 SET_MAX_FREE(p
, new_max
);
1190 SET_TOTAL_FREE(p
, TOTAL_FREE(p
) - needed
);
1192 /* do it the long way */
1193 delete_leaf_item(false);
1199 if (changed_n
== C
[0].get_n() && changed_c
== C
[0].c
) {
1200 if (seq_count
< 0) seq_count
++;
1202 seq_count
= SEQ_START_POINT
;
1211 /* delete_kt() corresponds to add_kt(found), but there are only
1212 two cases: if the key is not found nothing is done, and if it is
1213 found the corresponding item is deleted with delete_leaf_item.
1216 0 : nothing to delete
1218 2 : deleted kt and it was the final one
1222 GlassTable::delete_kt()
1224 LOGCALL(DB
, int, "GlassTable::delete_kt", NO_ARGS
);
1227 seq_count
= SEQ_START_POINT
;
1233 int result
= LeafItem(C
[0].get_p(), C
[0].c
).last_component() ? 2 : 1;
1235 delete_leaf_item(true);
1240 /* GlassTable::form_key(key) treats address kt as an item holder and fills in
1245 The bracketed parts are left blank. The key is filled in with key_len bytes and
1246 K set accordingly. c is set to 1.
1249 void GlassTable::form_key(const string
& key
) const
1251 LOGCALL_VOID(DB
, "GlassTable::form_key", key
);
1255 /* GlassTable::add(key, tag) adds the key/tag item to the
1256 B-tree, replacing any existing item with the same key.
1258 For a long tag, we end up having to add m components, of the form
1265 and tag1+tag2+...+tagm are equal to tag. These in their turn may be replacing
1266 n components of the form
1273 and n may be greater than, equal to, or less than m. These cases are dealt
1274 with in the code below. If m < n for example, we end up with a series of
1279 GlassTable::add(const string
&key
, string tag
, bool already_compressed
)
1281 LOGCALL_VOID(DB
, "GlassTable::add", key
| tag
| already_compressed
);
1286 GlassTable::throw_database_closed();
1289 root_info
.init(block_size
, compress_min
);
1290 do_open_to_write(&root_info
);
1295 const char* tag_data
= tag
.data();
1296 size_t tag_size
= tag
.size();
1298 bool compressed
= false;
1299 if (already_compressed
) {
1301 } else if (compress_min
> 0 && tag_size
> compress_min
) {
1302 const char * res
= comp_stream
.compress(tag_data
, &tag_size
);
1309 // sort of matching kt.append_chunk(), but setting the chunk
1310 const size_t cd
= kt
.key().length() + K1
+ I2
+ X2
; // offset to the tag data
1311 const size_t L
= max_item_size
- cd
; // largest amount of tag data for any chunk
1312 size_t first_L
= L
+ X2
; // - amount for tag1 (we don't store X there)
1313 bool found
= find(C
);
1314 if (tag_size
<= first_L
) {
1315 // The whole tag clearly fits in one item, so no need to make this
1318 } else if (!found
) {
1319 const byte
* p
= C
[0].get_p();
1320 size_t n
= TOTAL_FREE(p
) % (max_item_size
+ D2
);
1323 // if n >= last then fully filling this block won't produce
1324 // an extra item, so we might as well do this even if
1325 // full_compaction isn't active.
1327 // In the full_compaction case, it turns out we shouldn't always
1328 // try to fill every last byte. Doing so can actually increase the
1329 // total space required (I believe this effect is due to longer
1330 // dividing keys being required in the index blocks). Empirically,
1331 // n >= key.size() + K appears a good criterion for K ~= 34. This
1332 // seems to save about 0.2% in total database size over always
1333 // splitting the tag. It'll also give be slightly faster retrieval
1334 // as we can avoid reading an extra block occasionally.
1335 size_t last
= (tag_size
- X2
) % L
;
1336 if (n
>= last
|| (full_compaction
&& n
>= key
.size() + 34)) {
1337 // first_L < max_item_size + D2 - D2 - cd
1338 // Total size of first item = cd + first_L < max_item_size
1344 // There are m items to add.
1345 int m
= (tag_size
- first_L
+ L
- 1) / L
+ 1;
1346 /* FIXME: sort out this error higher up and turn this into
1349 if (m
>= BYTE_PAIR_RANGE
)
1350 throw Xapian::UnimplementedError("Can't handle insanely large tags");
1352 size_t o
= 0; // Offset into the tag
1353 size_t residue
= tag_size
; // Bytes of the tag remaining to add in
1354 bool replacement
= false; // Has there been a replacement?
1355 bool components_to_del
= false; // Are there components to delete?
1357 for (i
= 1; i
<= m
; i
++) {
1358 size_t l
= (i
== m
? residue
: (i
== 1 ? first_L
: L
));
1359 size_t this_cd
= (i
== 1 ? cd
- X2
: cd
);
1360 Assert(this_cd
+ l
<= block_size
);
1361 Assert(o
+ l
<= tag_size
);
1362 kt
.set_tag(this_cd
, tag_data
+ o
, l
, compressed
, i
, m
);
1367 if (i
> 1) found
= find(C
);
1368 int result
= add_kt(found
);
1369 if (result
) replacement
= true;
1370 components_to_del
= (result
== 1);
1372 AssertEq(o
, tag_size
);
1373 if (components_to_del
) {
1376 kt
.set_component_of(++i
);
1377 } while (delete_kt() == 1);
1379 if (!replacement
) ++item_count
;
1380 Btree_modified
= true;
1381 if (cursor_created_since_last_modification
) {
1382 cursor_created_since_last_modification
= false;
1387 /* GlassTable::del(key) returns false if the key is not in the B-tree,
1388 otherwise deletes it and returns true.
1390 Again, this is parallel to GlassTable::add, but simpler in form.
1394 GlassTable::del(const string
&key
)
1396 LOGCALL(DB
, bool, "GlassTable::del", key
);
1401 GlassTable::throw_database_closed();
1406 // We can't delete a key which we is too long for us to store.
1407 if (key
.size() > GLASS_BTREE_MAX_KEY_LEN
) RETURN(false);
1409 if (key
.empty()) RETURN(false);
1412 int r
= delete_kt();
1413 if (r
== 0) RETURN(false);
1416 kt
.set_component_of(++i
);
1421 Btree_modified
= true;
1422 if (cursor_created_since_last_modification
) {
1423 cursor_created_since_last_modification
= false;
1430 GlassTable::readahead_key(const string
&key
) const
1432 LOGCALL(DB
, bool, "GlassTable::readahead_key", key
);
1433 Assert(!key
.empty());
1437 // handle == -1: Lazy table in a multi-file database which isn't yet open.
1439 // handle == -2: Table has been closed. Since the readahead is just a
1440 // hint, we can safely ignore it for a closed table.
1442 // handle <= -3: Lazy table in a single-file database which isn't yet
1447 // If the table only has one level, there are no branch blocks to preread.
1453 // We'll only readahead the first level, since descending the B-tree would
1454 // require actual reads that would likely hurt performance more than help.
1455 const byte
* p
= C
[level
].get_p();
1456 int c
= find_in_branch(p
, kt
, C
[level
].c
);
1457 uint4 n
= BItem(p
, c
).block_given_by();
1458 // Don't preread if it's the block we last preread or already in the
1460 if (n
!= last_readahead
&& n
!= C
[level
- 1].get_n()) {
1462 if (!io_readahead_block(handle
, block_size
, n
, offset
))
1469 GlassTable::get_exact_entry(const string
&key
, string
& tag
) const
1471 LOGCALL(DB
, bool, "GlassTable::get_exact_entry", key
| tag
);
1472 Assert(!key
.empty());
1476 GlassTable::throw_database_closed();
1481 // An oversized key can't exist, so attempting to search for it should fail.
1482 if (key
.size() > GLASS_BTREE_MAX_KEY_LEN
) RETURN(false);
1485 if (!find(C
)) RETURN(false);
1487 (void)read_tag(C
, &tag
, false);
1492 GlassTable::key_exists(const string
&key
) const
1494 LOGCALL(DB
, bool, "GlassTable::key_exists", key
);
1495 Assert(!key
.empty());
1497 // An oversized key can't exist, so attempting to search for it should fail.
1498 if (key
.size() > GLASS_BTREE_MAX_KEY_LEN
) RETURN(false);
1505 GlassTable::read_tag(Glass::Cursor
* C_
, string
*tag
, bool keep_compressed
) const
1507 LOGCALL(DB
, bool, "GlassTable::read_tag", Literal("C_") | tag
| keep_compressed
);
1512 bool compressed
= false;
1513 bool decompress
= false;
1515 LeafItem
item(C_
[0].get_p(), C_
[0].c
);
1518 compressed
= item
.get_compressed();
1519 if (compressed
&& !keep_compressed
) {
1520 comp_stream
.decompress_start();
1524 bool last
= item
.last_component();
1526 // Decompress each chunk as we read it so we don't need both the
1527 // full compressed and uncompressed tags in memory at once.
1528 bool done
= item
.decompress_chunk(comp_stream
, *tag
);
1530 throw Xapian::DatabaseCorruptError(done
?
1531 "Too many chunks of compressed data" :
1532 "Too few chunks of compressed data");
1535 item
.append_chunk(tag
);
1539 throw Xapian::DatabaseCorruptError("Unexpected end of table when reading continuation of tag");
1542 // At this point the cursor is on the last item - calling next will move
1543 // it to the next key (GlassCursor::read_tag() relies on this).
1545 RETURN(compressed
&& keep_compressed
);
1549 GlassTable::set_full_compaction(bool parity
)
1551 LOGCALL_VOID(DB
, "GlassTable::set_full_compaction", parity
);
1554 if (parity
) seq_count
= 0;
1555 full_compaction
= parity
;
1558 GlassCursor
* GlassTable::cursor_get() const {
1559 LOGCALL(DB
, GlassCursor
*, "GlassTable::cursor_get", NO_ARGS
);
1562 GlassTable::throw_database_closed();
1566 // FIXME Ick - casting away const is nasty
1567 RETURN(new GlassCursor(const_cast<GlassTable
*>(this)));
1570 /************ B-tree opening and closing ************/
1573 GlassTable::basic_open(const RootInfo
* root_info
, glass_revision_number_t rev
)
1575 LOGCALL_VOID(DB
, "GlassTable::basic_open", root_info
|rev
);
1576 revision_number
= rev
;
1577 root
= root_info
->get_root();
1578 level
= root_info
->get_level();
1579 item_count
= root_info
->get_num_entries();
1580 faked_root_block
= root_info
->get_root_is_fake();
1581 sequential
= root_info
->get_sequential();
1582 const string
& fl_serialised
= root_info
->get_free_list();
1583 if (!fl_serialised
.empty()) {
1584 if (!free_list
.unpack(fl_serialised
))
1585 throw Xapian::DatabaseCorruptError("Bad freelist metadata");
1590 compress_min
= root_info
->get_compress_min();
1592 /* kt holds constructed items as well as keys */
1593 kt
= LeafItem_wr(zeroed_new(block_size
));
1595 set_max_item_size(BLOCK_CAPACITY
);
1597 for (int j
= 0; j
<= level
; j
++) {
1598 C
[j
].init(block_size
);
1603 if (cursor_created_since_last_modification
) {
1604 cursor_created_since_last_modification
= false;
1610 GlassTable::read_root()
1612 LOGCALL_VOID(DB
, "GlassTable::read_root", NO_ARGS
);
1613 if (faked_root_block
) {
1614 /* root block for an unmodified database. */
1615 byte
* p
= C
[0].init(block_size
);
1618 /* clear block - shouldn't be necessary, but is a bit nicer,
1619 * and means that the same operations should always produce
1620 * the same database. */
1621 memset(p
, 0, block_size
);
1623 int o
= block_size
- I2
- K1
;
1624 LeafItem_wr(p
+ o
).fake_root_item();
1626 LeafItem_wr::setD(p
, DIR_START
, o
); // its directory entry
1627 SET_DIR_END(p
, DIR_START
+ D2
);// the directory size
1629 o
-= (DIR_START
+ D2
);
1631 SET_TOTAL_FREE(p
, o
);
1635 /* reading - revision number doesn't matter as long as
1636 * it's not greater than the current one. */
1641 SET_REVISION(p
, revision_number
+ 1);
1642 C
[0].set_n(free_list
.get_block(this, block_size
));
1645 /* using a root block stored on disk */
1646 block_to_cursor(C
, level
, root
);
1648 if (REVISION(C
[level
].get_p()) > revision_number
) set_overwritten();
1649 /* although this is unlikely */
1654 GlassTable::do_open_to_write(const RootInfo
* root_info
,
1655 glass_revision_number_t rev
)
1657 LOGCALL_VOID(DB
, "GlassTable::do_open_to_write", root_info
|rev
);
1659 GlassTable::throw_database_closed();
1662 // Single file database.
1663 handle
= -3 - handle
;
1665 handle
= io_open_block_wr(name
+ GLASS_TABLE_EXTENSION
, (rev
== 0));
1667 // lazy doesn't make a lot of sense when we're creating a DB (which
1668 // is the case when rev==0), but ENOENT with O_CREAT means a parent
1669 // directory doesn't exist.
1670 if (lazy
&& rev
&& errno
== ENOENT
) {
1671 revision_number
= rev
;
1674 string
message((rev
== 0) ? "Couldn't create " : "Couldn't open ");
1676 message
+= GLASS_TABLE_EXTENSION
" read/write: ";
1677 errno_to_string(errno
, message
);
1678 throw Xapian::DatabaseOpeningError(message
);
1683 basic_open(root_info
, rev
);
1685 split_p
= new byte
[block_size
];
1687 buffer
= zeroed_new(block_size
);
1690 changed_c
= DIR_START
;
1691 seq_count
= SEQ_START_POINT
;
1694 GlassTable::GlassTable(const char * tablename_
, const string
& path_
,
1695 bool readonly_
, bool lazy_
)
1696 : tablename(tablename_
),
1700 faked_root_block(true),
1713 Btree_modified(false),
1714 full_compaction(false),
1715 writable(!readonly_
),
1716 cursor_created_since_last_modification(false),
1721 comp_stream(Z_DEFAULT_STRATEGY
),
1723 last_readahead(BLK_UNUSED
),
1726 LOGCALL_CTOR(DB
, "GlassTable", tablename_
| path_
| readonly_
| lazy_
);
1729 GlassTable::GlassTable(const char * tablename_
, int fd
, off_t offset_
,
1730 bool readonly_
, bool lazy_
)
1731 : tablename(tablename_
),
1735 faked_root_block(true),
1748 Btree_modified(false),
1749 full_compaction(false),
1750 writable(!readonly_
),
1751 cursor_created_since_last_modification(false),
1756 comp_stream(Z_DEFAULT_STRATEGY
),
1758 last_readahead(BLK_UNUSED
),
1761 LOGCALL_CTOR(DB
, "GlassTable", tablename_
| fd
| offset_
| readonly_
| lazy_
);
1765 GlassTable::exists() const {
1766 LOGCALL(DB
, bool, "GlassTable::exists", NO_ARGS
);
1767 // We know a single-file database exists, since we have an fd open on it!
1768 return single_file() || file_exists(name
+ GLASS_TABLE_EXTENSION
);
1772 GlassTable::create_and_open(int flags_
, const RootInfo
& root_info
)
1774 LOGCALL_VOID(DB
, "GlassTable::create_and_open", flags_
|root_info
);
1776 GlassTable::throw_database_closed();
1781 unsigned int block_size_
= root_info
.get_blocksize();
1782 Assert(block_size_
>= 2048);
1783 Assert(block_size_
<= BYTE_PAIR_RANGE
);
1784 // Must be a power of two.
1785 Assert((block_size_
& (block_size_
- 1)) == 0);
1788 block_size
= block_size_
;
1792 (void)io_unlink(name
+ GLASS_TABLE_EXTENSION
);
1793 compress_min
= root_info
.get_compress_min();
1795 // FIXME: it would be good to arrange that this works such that there's
1796 // always a valid table in place if you run create_and_open() on an
1799 do_open_to_write(&root_info
);
1803 GlassTable::~GlassTable() {
1804 LOGCALL_DTOR(DB
, "GlassTable");
1805 GlassTable::close();
1808 void GlassTable::close(bool permanent
) {
1809 LOGCALL_VOID(DB
, "GlassTable::close", permanent
);
1812 if (single_file()) {
1813 handle
= -3 - handle
;
1815 // If an error occurs here, we just ignore it, since we're just
1816 // trying to free everything.
1817 (void)::close(handle
);
1824 // Don't delete the resources in the table, since they may
1825 // still be used to look up cached content.
1828 for (int j
= level
; j
>= 0; j
--) {
1834 delete [] kt
.get_address();
1841 GlassTable::flush_db()
1843 LOGCALL_VOID(DB
, "GlassTable::flush_db", NO_ARGS
);
1847 GlassTable::throw_database_closed();
1852 for (int j
= level
; j
>= 0; j
--) {
1854 write_block(C
[j
].get_n(), C
[j
].get_p());
1858 if (Btree_modified
) {
1859 faked_root_block
= false;
1864 GlassTable::commit(glass_revision_number_t revision
, RootInfo
* root_info
)
1866 LOGCALL_VOID(DB
, "GlassTable::commit", revision
|root_info
);
1869 if (revision
<= revision_number
) {
1870 throw Xapian::DatabaseError("New revision too low");
1875 GlassTable::throw_database_closed();
1877 revision_number
= revision
;
1878 root_info
->set_blocksize(block_size
);
1879 root_info
->set_level(0);
1880 root_info
->set_num_entries(0);
1881 root_info
->set_root_is_fake(true);
1882 root_info
->set_sequential(true);
1883 root_info
->set_root(0);
1888 root
= C
[level
].get_n();
1890 root_info
->set_blocksize(block_size
);
1891 root_info
->set_level(level
);
1892 root_info
->set_num_entries(item_count
);
1893 root_info
->set_root_is_fake(faked_root_block
);
1894 root_info
->set_sequential(sequential
);
1895 root_info
->set_root(root
);
1897 Btree_modified
= false;
1899 for (int i
= 0; i
< BTREE_CURSOR_LEVELS
; ++i
) {
1900 C
[i
].init(block_size
);
1903 free_list
.set_revision(revision
);
1904 free_list
.commit(this, block_size
);
1906 // Save the freelist details into the root_info.
1908 free_list
.pack(serialised
);
1909 root_info
->set_free_list(serialised
);
1911 revision_number
= revision
;
1916 changed_c
= DIR_START
;
1917 seq_count
= SEQ_START_POINT
;
1919 GlassTable::close();
1925 GlassTable::cancel(const RootInfo
& root_info
, glass_revision_number_t rev
)
1927 LOGCALL_VOID(DB
, "GlassTable::cancel", root_info
|rev
);
1932 GlassTable::throw_database_closed();
1937 // This causes problems: if (!Btree_modified) return;
1939 if (flags
& Xapian::DB_DANGEROUS
)
1940 throw Xapian::InvalidOperationError("cancel() not supported under Xapian::DB_DANGEROUS");
1942 revision_number
= rev
;
1943 block_size
= root_info
.get_blocksize();
1944 root
= root_info
.get_root();
1945 level
= root_info
.get_level();
1946 item_count
= root_info
.get_num_entries();
1947 faked_root_block
= root_info
.get_root_is_fake();
1948 sequential
= root_info
.get_sequential();
1950 Btree_modified
= false;
1952 for (int j
= 0; j
<= level
; j
++) {
1953 C
[j
].init(block_size
);
1954 C
[j
].rewrite
= false;
1959 changed_c
= DIR_START
;
1960 seq_count
= SEQ_START_POINT
;
1962 if (cursor_created_since_last_modification
) {
1963 cursor_created_since_last_modification
= false;
1968 /************ B-tree reading ************/
1971 GlassTable::do_open_to_read(const RootInfo
* root_info
,
1972 glass_revision_number_t rev
)
1974 LOGCALL(DB
, bool, "GlassTable::do_open_to_read", root_info
|rev
);
1976 GlassTable::throw_database_closed();
1978 if (single_file()) {
1979 handle
= -3 - handle
;
1981 handle
= io_open_block_rd(name
+ GLASS_TABLE_EXTENSION
);
1984 // This table is optional when reading!
1985 revision_number
= rev
;
1988 string
message("Couldn't open ");
1990 message
+= GLASS_TABLE_EXTENSION
" to read: ";
1991 errno_to_string(errno
, message
);
1992 throw Xapian::DatabaseOpeningError(message
);
1996 basic_open(root_info
, rev
);
2002 GlassTable::open(int flags_
, const RootInfo
& root_info
,
2003 glass_revision_number_t rev
)
2005 LOGCALL_VOID(DB
, "GlassTable::open", flags_
|root_info
|rev
);
2009 block_size
= root_info
.get_blocksize();
2010 root
= root_info
.get_root();
2013 do_open_to_read(&root_info
, rev
);
2017 do_open_to_write(&root_info
, rev
);
2021 GlassTable::prev_for_sequential(Glass::Cursor
* C_
, int /*dummy*/) const
2023 LOGCALL(DB
, bool, "GlassTable::prev_for_sequential", Literal("C_") | Literal("/*dummy*/"));
2025 AssertRel(DIR_START
,<=,c
);
2026 AssertRel(c
,<,DIR_END(C_
[0].get_p()));
2027 if (c
== DIR_START
) {
2028 uint4 n
= C_
[0].get_n();
2031 if (n
== 0) RETURN(false);
2033 if (n
== C
[0].get_n()) {
2034 // Block is a leaf block in the built-in cursor (potentially in
2035 // modified form if the table is writable).
2036 p
= C_
[0].clone(C
[0]);
2039 // Blocks in the built-in cursor may not have been written
2040 // to disk yet, so we have to check that the block number
2041 // isn't in the built-in cursor or we'll read an
2042 // uninitialised block (for which GET_LEVEL(p) will
2043 // probably return 0).
2045 for (j
= 1; j
<= level
; ++j
) {
2046 if (n
== C
[j
].get_n()) break;
2048 if (j
<= level
) continue;
2051 // Block isn't in the built-in cursor, so the form on disk
2052 // is valid, so read it to check if it's the next level 0
2054 byte
* q
= C_
[0].init(block_size
);
2059 if (REVISION(p
) > revision_number
+ writable
) {
2063 if (GET_LEVEL(p
) == 0) break;
2066 AssertRel(DIR_START
,<,c
);
2074 GlassTable::next_for_sequential(Glass::Cursor
* C_
, int /*dummy*/) const
2076 LOGCALL(DB
, bool, "GlassTable::next_for_sequential", Literal("C_") | Literal("/*dummy*/"));
2077 const byte
* p
= C_
[0].get_p();
2080 AssertRel(c
,<,DIR_END(p
));
2082 Assert((unsigned)c
< block_size
);
2083 if (c
== DIR_END(p
)) {
2084 uint4 n
= C_
[0].get_n();
2087 if (n
>= free_list
.get_first_unused_block()) RETURN(false);
2089 if (n
== C
[0].get_n()) {
2090 // Block is a leaf block in the built-in cursor
2091 // (potentially in modified form).
2092 p
= C_
[0].clone(C
[0]);
2094 // Blocks in the built-in cursor may not have been written
2095 // to disk yet, so we have to check that the block number
2096 // isn't in the built-in cursor or we'll read an
2097 // uninitialised block (for which GET_LEVEL(p) will
2098 // probably return 0).
2100 for (j
= 1; j
<= level
; ++j
) {
2101 if (n
== C
[j
].get_n()) break;
2103 if (j
<= level
) continue;
2105 // Block isn't in the built-in cursor, so the form on disk
2106 // is valid, so read it to check if it's the next level 0
2108 byte
* q
= C_
[0].init(block_size
);
2113 byte
* q
= C_
[0].init(block_size
);
2117 if (REVISION(p
) > revision_number
+ writable
) {
2121 if (GET_LEVEL(p
) == 0) break;
2131 GlassTable::prev_default(Glass::Cursor
* C_
, int j
) const
2133 LOGCALL(DB
, bool, "GlassTable::prev_default", Literal("C_") | j
);
2134 const byte
* p
= C_
[j
].get_p();
2136 AssertRel(DIR_START
,<=,c
);
2137 AssertRel(c
,<,DIR_END(p
));
2138 AssertRel((unsigned)DIR_END(p
),<=,block_size
);
2139 if (c
== DIR_START
) {
2140 if (j
== level
) RETURN(false);
2141 if (!prev_default(C_
, j
+ 1)) RETURN(false);
2144 AssertRel(DIR_START
,<,c
);
2149 block_to_cursor(C_
, j
- 1, BItem(p
, c
).block_given_by());
2155 GlassTable::next_default(Glass::Cursor
* C_
, int j
) const
2157 LOGCALL(DB
, bool, "GlassTable::next_default", Literal("C_") | j
);
2158 const byte
* p
= C_
[j
].get_p();
2160 AssertRel(c
,<,DIR_END(p
));
2161 AssertRel((unsigned)DIR_END(p
),<=,block_size
);
2164 AssertRel(DIR_START
,<,c
);
2166 AssertRel(DIR_START
,<=,c
);
2168 // Sometimes c can be DIR_END(p) + 2 here it appears...
2169 if (c
>= DIR_END(p
)) {
2170 if (j
== level
) RETURN(false);
2171 if (!next_default(C_
, j
+ 1)) RETURN(false);
2177 block_to_cursor(C_
, j
- 1, BItem(p
, c
).block_given_by());
2178 #ifdef BTREE_DEBUG_FULL
2179 printf("Block in GlassTable:next_default");
2180 report_block_full(j
- 1, C_
[j
- 1].get_n(), C_
[j
- 1].get_p());
2181 #endif /* BTREE_DEBUG_FULL */
2187 GlassTable::throw_database_closed()
2189 throw Xapian::DatabaseError("Database has been closed");