Rename misleading internal sequential_mode references
[xapian.git] / xapian-core / backends / glass / glass_table.cc
blobc313e4954a6ec49a01bf31de0e0e92119ed892a3
1 /* glass_table.cc: Btree implementation
3 * Copyright 1999,2000,2001 BrightStation PLC
4 * Copyright 2002 Ananova Ltd
5 * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
6 * Copyright 2008 Lemur Consulting Ltd
8 * This program is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU General Public License as
10 * published by the Free Software Foundation; either version 2 of the
11 * License, or (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
21 * USA
24 #include <config.h>
26 #include "glass_table.h"
28 #include <xapian/error.h>
30 #include "safeerrno.h"
32 #include "omassert.h"
33 #include "posixy_wrapper.h"
34 #include "str.h"
35 #include "stringutils.h" // For STRINGIZE().
37 #include <sys/types.h>
39 #include <cstring> /* for memmove */
40 #include <climits> /* for CHAR_BIT */
42 #include "glass_freelist.h"
43 #include "glass_changes.h"
44 #include "glass_cursor.h"
45 #include "glass_defs.h"
46 #include "glass_version.h"
48 #include "debuglog.h"
49 #include "errno_to_string.h"
50 #include "filetests.h"
51 #include "io_utils.h"
52 #include "pack.h"
53 #include "wordaccess.h"
55 #include <algorithm> // for std::min()
56 #include <string>
58 #include "xapian/constants.h"
60 using namespace Glass;
61 using namespace std;
63 //#define BTREE_DEBUG_FULL 1
64 #undef BTREE_DEBUG_FULL
66 #ifdef BTREE_DEBUG_FULL
67 /*------debugging aids from here--------*/
69 static void print_key(const byte * p, int c, int j);
70 static void print_tag(const byte * p, int c, int j);
73 static void report_cursor(int N, Btree * B, Glass::Cursor * C)
75 int i;
76 printf("%d)\n", N);
77 for (i = 0; i <= B->level; i++)
78 printf("p=%d, c=%d, n=[%d], rewrite=%d\n",
79 C[i].p, C[i].c, C[i].n, C[i].rewrite);
83 /*------to here--------*/
84 #endif /* BTREE_DEBUG_FULL */
86 static inline byte *zeroed_new(size_t size)
88 byte *temp = new byte[size];
89 memset(temp, 0, size);
90 return temp;
93 /* A B-tree consists of a file with extension GLASS_TABLE_EXTENSION divided
94 into a sequence of equal sized blocks, numbered 0, 1, 2 ... some of which
95 are free, some in use. Those in use are arranged in a tree. Lists of
96 currently unused blocks are stored in a freelist which is itself stored in
97 unused blocks.
99 Some "root info" is needed to locate a particular revision of the B-tree
100 - this is stored in the "version file" (we store this data for all tables
101 at a particular revision together).
103 Each block, b, has a structure like this:
105 R L M T D o1 o2 o3 ... oN <gap> [item] .. [item] .. [item] ...
106 <---------- D ----------> <-M->
108 And then,
110 R = REVISION(b) is the revision number the B-tree had when the block was
111 written into the DB file.
112 L = GET_LEVEL(b) is the level of the block, which is the number of levels
113 towards the root of the B-tree structure. So leaf blocks
114 have level 0 and the one root block has the highest level
115 equal to the number of levels in the B-tree. For blocks
116 storing the freelist, the level is set to 254.
117 M = MAX_FREE(b) is the size of the gap between the end of the directory and
118 the first item of data. (It is not necessarily the maximum
119 size among the bits of space that are free, but I can't
120 think of a better name.)
121 T = TOTAL_FREE(b)is the total amount of free space left in b.
122 D = DIR_END(b) gives the offset to the end of the directory.
124 o1, o2 ... oN are a directory of offsets to the N items held in the block.
125 The items are key-tag pairs, and as they occur in the directory are ordered
126 by the keys.
128 An item has this form:
130 I K key X tag
131 ←K→
132 <------I---->
134 A long tag presented through the API is split up into C pieces small enough
135 to be accommodated in the blocks of the B-tree. The key is extended to
136 include a counter, x, which runs from 1 to C. The key is preceded by a
137 length, K, and the whole item with a length, I, as depicted above. The
138 upper bits of I encode a flag indicating if this item is compressed, and a
139 flag saying if this is the last piece of a tag (i.e. if x == C).
141 Here are the corresponding definitions:
145 /** Flip to sequential addition block-splitting after this number of observed
146 * sequential additions (in negated form). */
147 #define SEQ_START_POINT (-10)
149 /* Note use of the limits.h values:
150 UCHAR_MAX = 255, an unsigned with all bits set, and
151 CHAR_BIT = 8, the number of bits per byte
153 BYTE_PAIR_RANGE below is the smallest +ve number that can't be held in two
154 bytes -- 64K effectively.
157 #define BYTE_PAIR_RANGE (1 << 2 * CHAR_BIT)
159 /// read_block(n, p) reads block n of the DB file to address p.
160 void
161 GlassTable::read_block(uint4 n, byte * p) const
163 // Log the value of p, not the contents of the block it points to...
164 LOGCALL_VOID(DB, "GlassTable::read_block", n | (void*)p);
165 if (rare(handle == -2))
166 GlassTable::throw_database_closed();
167 AssertRel(n,<,free_list.get_first_unused_block());
169 io_read_block(handle, reinterpret_cast<char *>(p), block_size, n, offset);
171 if (GET_LEVEL(p) != LEVEL_FREELIST) {
172 int dir_end = DIR_END(p);
173 if (rare(dir_end < DIR_START || unsigned(dir_end) > block_size)) {
174 string msg("dir_end invalid in block ");
175 msg += str(n);
176 throw Xapian::DatabaseCorruptError(msg);
181 /** write_block(n, p, appending) writes block n in the DB file from address p.
183 * If appending is true (not specified it defaults to false), then this
184 * indicates that we've added data to a block in space which was previously
185 * unused, and are writing the block back in place - we use this to add
186 * free list entries (the information about where the freelist data for a
187 * revision begins and ends is stored in the "iamglass" file). We don't
188 * currently use this flag for much, but it signifies that we don't risk
189 * invalidating any existing revisions, which may be useful information.
191 void
192 GlassTable::write_block(uint4 n, const byte * p, bool appending) const
194 LOGCALL_VOID(DB, "GlassTable::write_block", n | p | appending);
195 Assert(writable);
196 /* Check that n is in range. */
197 AssertRel(n,<,free_list.get_first_unused_block());
199 /* don't write to non-free */
200 // FIXME: We can no longer check this easily.
201 // AssertParanoid(free_list.block_free_at_start(block_size, n));
203 /* write revision is okay */
204 AssertEqParanoid(REVISION(p), revision_number + 1);
206 if (appending) {
207 // In the case of the freelist, new entries can safely be written into
208 // the space at the end of the latest freelist block, as that's unused
209 // by previous revisions. It is also safe to write into a block which
210 // wasn't used in the previous revision.
212 // It's only when we start a new freelist block that we need to worry
213 // about invalidating old revisions.
214 } else if (flags & Xapian::DB_DANGEROUS) {
215 // FIXME: We should somehow flag this to prevent readers opening the
216 // database. Removing "iamglass" or setting a flag in it doesn't deal
217 // with any existing readers though. Perhaps we want to have readers
218 // read lock and try to take an exclusive lock here?
221 const char * p_char = reinterpret_cast<const char *>(p);
222 io_write_block(handle, p_char, block_size, n, offset);
224 if (!changes_obj) return;
226 unsigned char v;
227 // FIXME: track table_type in this class?
228 if (strcmp(tablename, "position") == 0) {
229 v = int(Glass::POSITION);
230 } else if (strcmp(tablename, "postlist") == 0) {
231 v = int(Glass::POSTLIST);
232 } else if (strcmp(tablename, "docdata") == 0) {
233 v = int(Glass::DOCDATA);
234 } else if (strcmp(tablename, "spelling") == 0) {
235 v = int(Glass::SPELLING);
236 } else if (strcmp(tablename, "synonym") == 0) {
237 v = int(Glass::SYNONYM);
238 } else if (strcmp(tablename, "termlist") == 0) {
239 v = int(Glass::TERMLIST);
240 } else {
241 return; // FIXME
244 if (block_size == 2048) {
245 v |= 0 << 3;
246 } else if (block_size == 4096) {
247 v |= 1 << 3;
248 } else if (block_size == 8192) {
249 v |= 2 << 3;
250 } else if (block_size == 16384) {
251 v |= 3 << 3;
252 } else if (block_size == 32768) {
253 v |= 4 << 3;
254 } else if (block_size == 65536) {
255 v |= 5 << 3;
256 } else {
257 return; // FIXME
260 string buf;
261 buf += char(v);
262 // Write the block number to the file
263 pack_uint(buf, n);
265 changes_obj->write_block(buf);
266 changes_obj->write_block(reinterpret_cast<const char *>(p), block_size);
269 /* A note on cursors:
271 Each B-tree level has a corresponding array element C[j] in a
272 cursor, C. C[0] is the leaf (or data) level, and C[B->level] is the
273 root block level. Within a level j,
275 C[j].p addresses the block
276 C[j].c is the offset into the directory entry in the block
277 C[j].n is the number of the block at C[j].p
279 A look up in the B-tree causes navigation of the blocks starting
280 from the root. In each block, p, we find an offset, c, to an item
281 which gives the number, n, of the block for the next level. This
282 leads to an array of values p,c,n which are held inside the cursor.
284 Any Btree object B has a built-in cursor, at B->C. But other cursors may
285 be created. If BC is a created cursor, BC->C is the cursor in the
286 sense given above, and BC->B is the handle for the B-tree again.
290 void
291 GlassTable::set_overwritten() const
293 LOGCALL_VOID(DB, "GlassTable::set_overwritten", NO_ARGS);
294 // If we're writable, there shouldn't be another writer who could cause
295 // overwritten to be flagged, so that's a DatabaseCorruptError.
296 if (writable)
297 throw Xapian::DatabaseCorruptError("Db block overwritten - are there multiple writers?");
298 throw Xapian::DatabaseModifiedError("The revision being read has been discarded - you should call Xapian::Database::reopen() and retry the operation");
301 /* block_to_cursor(C, j, n) puts block n into position C[j] of cursor
302 C, writing the block currently at C[j] back to disk if necessary.
303 Note that
305 C[j].rewrite
307 is true iff C[j].n is different from block n in file DB. If it is
308 false no rewriting is necessary.
311 void
312 GlassTable::block_to_cursor(Glass::Cursor * C_, int j, uint4 n) const
314 LOGCALL_VOID(DB, "GlassTable::block_to_cursor", (void*)C_ | j | n);
315 if (n == C_[j].get_n()) return;
317 if (writable && C_[j].rewrite) {
318 Assert(C == C_);
319 write_block(C_[j].get_n(), C_[j].get_p());
320 C_[j].rewrite = false;
323 // Check if the block is in the built-in cursor (potentially in
324 // modified form).
325 const byte * p;
326 if (n == C[j].get_n()) {
327 p = C_[j].clone(C[j]);
328 } else {
329 byte * q = C_[j].init(block_size);
330 read_block(n, q);
331 p = q;
332 C_[j].set_n(n);
335 if (j < level) {
336 /* unsigned comparison */
337 if (rare(REVISION(p) > REVISION(C_[j + 1].get_p()))) {
338 set_overwritten();
339 return;
343 if (rare(j != GET_LEVEL(p))) {
344 string msg = "Expected block ";
345 msg += str(n);
346 msg += " to be level ";
347 msg += str(j);
348 msg += ", not ";
349 msg += str(GET_LEVEL(p));
350 throw Xapian::DatabaseCorruptError(msg);
354 /** Btree::alter(); is called when the B-tree is to be altered.
356 It causes new blocks to be forced for the current set of blocks in
357 the cursor.
359 The point is that if a block at level 0 is to be altered it may get
360 a new number. Then the pointer to this block from level 1 will need
361 changing. So the block at level 1 needs altering and may get a new
362 block number. Then the pointer to this block from level 2 will need
363 changing ... and so on back to the root.
365 The clever bit here is spotting the cases when we can make an early
366 exit from this process. If C[j].rewrite is true, C[j+k].rewrite
367 will be true for k = 1,2 ... We have been through all this before,
368 and there is no need to do it again. If C[j].n was free at the
369 start of the transaction, we can copy it back to the same place
370 without violating the integrity of the B-tree. We don't then need a
371 new n and can return. The corresponding C[j].rewrite may be true or
372 false in that case.
375 void
376 GlassTable::alter()
378 LOGCALL_VOID(DB, "GlassTable::alter", NO_ARGS);
379 Assert(writable);
380 if (flags & Xapian::DB_DANGEROUS) {
381 C[0].rewrite = true;
382 return;
384 int j = 0;
385 while (true) {
386 if (C[j].rewrite) return; /* all new, so return */
387 C[j].rewrite = true;
389 glass_revision_number_t rev = REVISION(C[j].get_p());
390 if (rev == revision_number + 1) {
391 return;
393 Assert(rev < revision_number + 1);
394 uint4 n = C[j].get_n();
395 free_list.mark_block_unused(this, block_size, n);
396 SET_REVISION(C[j].get_modifiable_p(block_size), revision_number + 1);
397 n = free_list.get_block(this, block_size);
398 C[j].set_n(n);
400 if (j == level) return;
401 j++;
402 BItem_wr(C[j].get_modifiable_p(block_size), C[j].c).set_block_given_by(n);
406 /** find_in_leaf(p, key, c, exact) searches for the key in the leaf block at p.
408 What we get is the directory entry to the last key <= the key being searched
409 for.
411 The lookup is by binary chop, with i and j set to the left and
412 right ends of the search area. In sequential addition, c will often
413 be the answer, so we test the keys round c and move i and j towards
414 c if possible.
416 exact is set to true if the match was exact (otherwise exact is unchanged).
420 GlassTable::find_in_leaf(const byte * p, LeafItem item, int c, bool& exact)
422 LOGCALL_STATIC(DB, int, "GlassTable::find_in_leaf", (const void*)p | (const void *)item.get_address() | c | Literal("bool&"));
423 // c should be odd (either -1, or an even offset from DIR_START).
424 Assert((c & 1) == 1);
425 int i = DIR_START;
426 i -= D2;
427 if (c != -1) {
428 AssertRel(i,<=,c);
430 int j = DIR_END(p);
432 if (c != -1) {
433 if (c < j && i < c) {
434 int r = compare(LeafItem(p, c), item);
435 if (r == 0) {
436 exact = true;
437 return c;
439 if (r < 0) i = c;
441 c += D2;
442 if (c < j && i < c) {
443 int r = compare(item, LeafItem(p, c));
444 if (r == 0) {
445 exact = true;
446 return c;
448 if (r < 0) j = c;
452 while (j - i > D2) {
453 int k = i + ((j - i) / (D2 * 2)) * D2; /* mid way */
454 int r = compare(item, LeafItem(p, k));
455 if (r < 0) {
456 j = k;
457 } else {
458 i = k;
459 if (r == 0) {
460 exact = true;
461 break;
465 AssertRel(DIR_START - D2,<=,i);
466 AssertRel(i,<,DIR_END(p));
467 RETURN(i);
470 template<typename ITEM> int
471 find_in_branch_(const byte * p, ITEM item, int c)
473 // c should be odd (either -1, or an even offset from DIR_START).
474 Assert((c & 1) == 1);
475 int i = DIR_START;
476 if (c != -1) {
477 AssertRel(i,<=,c);
479 int j = DIR_END(p);
481 if (c != -1) {
482 if (c < j && i < c) {
483 int r = compare(BItem(p, c), item);
484 if (r == 0) return c;
485 if (r < 0) i = c;
487 c += D2;
488 if (c < j && i < c) {
489 int r = compare(item, BItem(p, c));
490 if (r == 0) return c;
491 if (r < 0) j = c;
495 while (j - i > D2) {
496 int k = i + ((j - i) / (D2 * 2)) * D2; /* mid way */
497 int r = compare(item, BItem(p, k));
498 if (r < 0) {
499 j = k;
500 } else {
501 i = k;
502 if (r == 0) break;
505 AssertRel(DIR_START,<=,i);
506 AssertRel(i,<,DIR_END(p));
507 return i;
511 GlassTable::find_in_branch(const byte * p, LeafItem item, int c)
513 LOGCALL_STATIC(DB, int, "GlassTable::find_in_branch", (const void*)p | (const void *)item.get_address() | c);
514 RETURN(find_in_branch_(p, item, c));
518 GlassTable::find_in_branch(const byte * p, BItem item, int c)
520 LOGCALL_STATIC(DB, int, "GlassTable::find_in_branch", (const void*)p | (const void *)item.get_address() | c);
521 RETURN(find_in_branch_(p, item, c));
524 /** find(C_) searches for the key of B->kt in the B-tree.
526 Result is true if found, false otherwise. When false, the B_tree
527 cursor is positioned at the last key in the B-tree <= the search
528 key. Goes to first (null) item in B-tree when key length == 0.
531 bool
532 GlassTable::find(Glass::Cursor * C_) const
534 LOGCALL(DB, bool, "GlassTable::find", (void*)C_);
535 // Note: the parameter is needed when we're called by GlassCursor
536 const byte * p;
537 int c;
538 for (int j = level; j > 0; --j) {
539 p = C_[j].get_p();
540 c = find_in_branch(p, kt, C_[j].c);
541 #ifdef BTREE_DEBUG_FULL
542 printf("Block in GlassTable:find - code position 1");
543 report_block_full(j, C_[j].get_n(), p);
544 #endif /* BTREE_DEBUG_FULL */
545 C_[j].c = c;
546 block_to_cursor(C_, j - 1, BItem(p, c).block_given_by());
548 p = C_[0].get_p();
549 bool exact = false;
550 c = find_in_leaf(p, kt, C_[0].c, exact);
551 #ifdef BTREE_DEBUG_FULL
552 printf("Block in GlassTable:find - code position 2");
553 report_block_full(0, C_[0].get_n(), p);
554 #endif /* BTREE_DEBUG_FULL */
555 C_[0].c = c;
556 RETURN(exact);
559 /** compact(p) compact the block at p by shuffling all the items up to the end.
561 MAX_FREE(p) is then maximized, and is equal to TOTAL_FREE(p).
564 void
565 GlassTable::compact(byte * p)
567 LOGCALL_VOID(DB, "GlassTable::compact", (void*)p);
568 Assert(p != buffer);
569 Assert(writable);
570 int e = block_size;
571 byte * b = buffer;
572 int dir_end = DIR_END(p);
573 if (GET_LEVEL(p) == 0) {
574 // Leaf.
575 for (int c = DIR_START; c < dir_end; c += D2) {
576 LeafItem item(p, c);
577 int l = item.size();
578 e -= l;
579 memcpy(b + e, item.get_address(), l);
580 LeafItem_wr::setD(p, c, e); /* reform in b */
582 } else {
583 // Branch.
584 for (int c = DIR_START; c < dir_end; c += D2) {
585 BItem item(p, c);
586 int l = item.size();
587 e -= l;
588 memcpy(b + e, item.get_address(), l);
589 BItem_wr::setD(p, c, e); /* reform in b */
592 memcpy(p + e, b + e, block_size - e); /* copy back */
593 e -= dir_end;
594 SET_TOTAL_FREE(p, e);
595 SET_MAX_FREE(p, e);
598 /** Btree needs to gain a new level to insert more items: so split root block
599 * and construct a new one.
601 void
602 GlassTable::split_root(uint4 split_n)
604 LOGCALL_VOID(DB, "GlassTable::split_root", split_n);
605 /* gain a level */
606 ++level;
608 /* check level overflow - this isn't something that should ever happen
609 * but deserves more than an Assert()... */
610 if (level == BTREE_CURSOR_LEVELS) {
611 throw Xapian::DatabaseCorruptError("Btree has grown impossibly large (" STRINGIZE(BTREE_CURSOR_LEVELS) " levels)");
614 byte * q = C[level].init(block_size);
615 memset(q, 0, block_size);
616 C[level].c = DIR_START;
617 C[level].set_n(free_list.get_block(this, block_size));
618 C[level].rewrite = true;
619 SET_REVISION(q, revision_number + 1);
620 SET_LEVEL(q, level);
621 SET_DIR_END(q, DIR_START);
622 compact(q); /* to reset TOTAL_FREE, MAX_FREE */
624 /* form a null key in b with a pointer to the old root */
625 byte b[10]; /* 7 is exact */
626 BItem_wr item(b);
627 item.form_null_key(split_n);
628 add_branch_item(item, level);
631 /** enter_key_above_leaf(previtem, newitem) is called after a leaf block split.
633 It enters in the block at level C[1] a separating key for the block
634 at level C[0]. The key itself is newitem.key(). previtem is the
635 preceding item, and at level 1 newitem.key() can be trimmed down to the
636 first point of difference to previtem.key() for entry in C[j].
638 This code looks longer than it really is. If j exceeds the number
639 of B-tree levels the root block has split and we have to construct
640 a new one, but this is a rare event.
642 The key is constructed in b, with block number C[0].n as tag,
643 and this is added in with add_item. add_item may itself cause a
644 block split, with a further call to enter_key. Hence the recursion.
646 void
647 GlassTable::enter_key_above_leaf(LeafItem previtem, LeafItem newitem)
649 LOGCALL_VOID(DB, "GlassTable::enter_key_above_leaf", Literal("previtem") | Literal("newitem"));
650 Assert(writable);
651 Assert(compare(previtem, newitem) < 0);
653 Key prevkey = previtem.key();
654 Key newkey = newitem.key();
655 int new_comp = newitem.component_of();
657 uint4 blocknumber = C[0].get_n();
659 // FIXME update to use Key
660 // Keys are truncated here: but don't truncate the count at the end away.
661 const int newkey_len = newkey.length();
662 AssertRel(newkey_len,>,0);
664 // Truncate the key to the minimal key which differs from prevkey,
665 // the preceding key in the block.
666 int i = 0;
667 const int min_len = min(newkey_len, prevkey.length());
668 while (i < min_len && prevkey[i] == newkey[i]) {
669 i++;
672 // Want one byte of difference.
673 if (i < newkey_len) i++;
675 // Enough space for a branch item with maximum length key.
676 byte b[BYTES_PER_BLOCK_NUMBER + K1 + 255 + X2];
677 BItem_wr item(b);
678 AssertRel(i, <=, 255);
679 item.set_truncated_key_and_block(newkey, new_comp, i, blocknumber);
681 // The split block gets inserted into the parent after the pointer to the
682 // current child.
683 AssertEq(C[1].c, find_in_branch(C[1].get_p(), item, C[1].c));
684 C[1].c += D2;
685 C[1].rewrite = true; /* a subtle point: this *is* required. */
686 add_branch_item(item, 1);
689 /** enter_key_above_branch(j, newkey) is called after a branch block split.
691 It enters in the block at level C[j] a separating key for the block
692 at level C[j - 1]. The key itself is newkey.
694 This code looks longer than it really is. If j exceeds the number
695 of B-tree levels the root block has split and we have to construct
696 a new one, but this is a rare event.
698 The key is constructed in b, with block number C[j - 1].n as tag,
699 and this is added in with add_item. add_item may itself cause a
700 block split, with a further call to enter_key. Hence the recursion.
702 void
703 GlassTable::enter_key_above_branch(int j, BItem newitem)
705 LOGCALL_VOID(DB, "GlassTable::enter_key_above_branch", j | Literal("newitem"));
706 Assert(writable);
707 AssertRel(j,>,1);
709 /* Can't truncate between branch levels, since the separated keys
710 * are in at the leaf level, and truncating again will change the
711 * branch point.
714 uint4 blocknumber = C[j - 1].get_n();
716 // Enough space for a branch item with maximum length key.
717 byte b[BYTES_PER_BLOCK_NUMBER + K1 + 255 + X2];
718 BItem_wr item(b);
719 item.set_key_and_block(newitem.key(), blocknumber);
721 // The split block gets inserted into the parent after the pointer to the
722 // current child.
723 AssertEq(C[j].c, find_in_branch(C[j].get_p(), item, C[j].c));
724 C[j].c += D2;
725 C[j].rewrite = true; /* a subtle point: this *is* required. */
726 add_branch_item(item, j);
729 /** mid_point(p) finds the directory entry in c that determines the
730 approximate mid point of the data in the block at p.
734 GlassTable::mid_point(byte * p)
736 LOGCALL(DB, int, "GlassTable::mid_point", (void*)p);
737 int n = 0;
738 int dir_end = DIR_END(p);
739 int size = block_size - TOTAL_FREE(p) - dir_end;
740 for (int c = DIR_START; c < dir_end; c += D2) {
741 int l;
742 if (GET_LEVEL(p) == 0) {
743 l = LeafItem(p, c).size();
744 } else {
745 l = BItem(p, c).size();
747 n += 2 * l;
748 if (n >= size) {
749 if (l < n - size) RETURN(c);
750 RETURN(c + D2);
754 /* This shouldn't happen, as the sum of the item sizes should be the same
755 * as the value calculated in size, so assert but return a sane value just
756 * in case. */
757 Assert(false);
758 RETURN(dir_end);
761 /** add_item_to_leaf(p, kt_, c) adds item kt_ to the leaf block at p.
763 c is the offset in the directory that needs to be expanded to accommodate
764 the new entry for the item. We know before this is called that there is
765 enough contiguous room for the item in the block, so it's just a matter of
766 shuffling up any directory entries after where we're inserting and copying
767 in the item.
770 void
771 GlassTable::add_item_to_leaf(byte * p, LeafItem kt_, int c)
773 LOGCALL_VOID(DB, "GlassTable::add_item_to_leaf", (void*)p | Literal("kt_") | c);
774 Assert(writable);
775 int dir_end = DIR_END(p);
776 int kt_len = kt_.size();
777 int needed = kt_len + D2;
778 int new_total = TOTAL_FREE(p) - needed;
779 int new_max = MAX_FREE(p) - needed;
781 Assert(new_total >= 0);
783 AssertRel(MAX_FREE(p),>=,needed);
785 AssertRel(DIR_START,<=,c);
786 AssertRel(c,<=,dir_end);
788 memmove(p + c + D2, p + c, dir_end - c);
789 dir_end += D2;
790 SET_DIR_END(p, dir_end);
792 int o = dir_end + new_max;
793 LeafItem_wr::setD(p, c, o);
794 memmove(p + o, kt_.get_address(), kt_len);
796 SET_MAX_FREE(p, new_max);
798 SET_TOTAL_FREE(p, new_total);
801 /** add_item_to_branch(p, kt_, c) adds item kt_ to the branch block at p.
803 c is the offset in the directory that needs to be expanded to accommodate
804 the new entry for the item. We know before this is called that there is
805 enough contiguous room for the item in the block, so it's just a matter of
806 shuffling up any directory entries after where we're inserting and copying
807 in the item.
810 void
811 GlassTable::add_item_to_branch(byte * p, BItem kt_, int c)
813 LOGCALL_VOID(DB, "GlassTable::add_item_to_branch", (void*)p | Literal("kt_") | c);
814 Assert(writable);
815 int dir_end = DIR_END(p);
816 int kt_len = kt_.size();
817 int needed = kt_len + D2;
818 int new_total = TOTAL_FREE(p) - needed;
819 int new_max = MAX_FREE(p) - needed;
821 Assert(new_total >= 0);
823 AssertRel(MAX_FREE(p),>=,needed);
825 AssertRel(DIR_START,<=,c);
826 AssertRel(c,<=,dir_end);
828 memmove(p + c + D2, p + c, dir_end - c);
829 dir_end += D2;
830 SET_DIR_END(p, dir_end);
832 int o = dir_end + new_max;
833 BItem_wr::setD(p, c, o);
834 memmove(p + o, kt_.get_address(), kt_len);
836 SET_MAX_FREE(p, new_max);
838 SET_TOTAL_FREE(p, new_total);
841 /** GlassTable::add_leaf_item(kt_) adds item kt_ to the leaf block.
843 * If there is not enough room the block splits and the item is then
844 * added to the appropriate half.
846 void
847 GlassTable::add_leaf_item(LeafItem kt_)
849 LOGCALL_VOID(DB, "GlassTable::add_leaf_item", Literal("kt_"));
850 Assert(writable);
851 byte * p = C[0].get_modifiable_p(block_size);
852 int c = C[0].c;
853 uint4 n;
855 int needed = kt_.size() + D2;
856 if (TOTAL_FREE(p) < needed) {
857 int m;
858 // Prepare to split p. After splitting, the block is in two halves, the
859 // lower half is split_p, the upper half p again. add_to_upper_half
860 // becomes true when the item gets added to p, false when it gets added
861 // to split_p.
863 if (seq_count < 0) {
864 // If we're not in sequential mode, we split at the mid point
865 // of the node.
866 m = mid_point(p);
867 } else {
868 // During sequential addition, split at the insert point
869 AssertRel(c,>=,DIR_START);
870 m = c;
873 uint4 split_n = C[0].get_n();
874 C[0].set_n(free_list.get_block(this, block_size));
876 memcpy(split_p, p, block_size); // replicate the whole block in split_p
877 SET_DIR_END(split_p, m);
878 compact(split_p); /* to reset TOTAL_FREE, MAX_FREE */
881 int residue = DIR_END(p) - m;
882 int new_dir_end = DIR_START + residue;
883 memmove(p + DIR_START, p + m, residue);
884 SET_DIR_END(p, new_dir_end);
887 compact(p); /* to reset TOTAL_FREE, MAX_FREE */
889 bool add_to_upper_half;
890 if (seq_count < 0) {
891 add_to_upper_half = (c >= m);
892 } else {
893 // And add item to lower half if split_p has room, otherwise upper
894 // half
895 add_to_upper_half = (TOTAL_FREE(split_p) < needed);
898 if (add_to_upper_half) {
899 c -= (m - DIR_START);
900 Assert(seq_count < 0 || c <= DIR_START + D2);
901 Assert(c >= DIR_START);
902 Assert(c <= DIR_END(p));
903 add_item_to_leaf(p, kt_, c);
904 n = C[0].get_n();
905 } else {
906 Assert(c >= DIR_START);
907 Assert(c <= DIR_END(split_p));
908 add_item_to_leaf(split_p, kt_, c);
909 n = split_n;
911 write_block(split_n, split_p);
913 // Check if we're splitting the root block.
914 if (0 == level) split_root(split_n);
916 /* Enter a separating key at level 1 between */
917 /* the last key of block split_p, and the first key of block p */
918 enter_key_above_leaf(LeafItem(split_p, DIR_END(split_p) - D2),
919 LeafItem(p, DIR_START));
920 } else {
921 AssertRel(TOTAL_FREE(p),>=,needed);
923 if (MAX_FREE(p) < needed) {
924 compact(p);
925 AssertRel(MAX_FREE(p),>=,needed);
928 add_item_to_leaf(p, kt_, c);
929 n = C[0].get_n();
932 changed_n = n;
933 changed_c = c;
936 /** GlassTable::add_item(kt_, j) adds item kt_ to the block at cursor level C[j].
938 * If there is not enough room the block splits and the item is then
939 * added to the appropriate half.
941 void
942 GlassTable::add_branch_item(BItem kt_, int j)
944 LOGCALL_VOID(DB, "GlassTable::add_branch_item", Literal("kt_") | j);
945 Assert(writable);
946 byte * p = C[j].get_modifiable_p(block_size);
947 int c = C[j].c;
949 int needed = kt_.size() + D2;
950 if (TOTAL_FREE(p) < needed) {
951 int m;
952 // Prepare to split p. After splitting, the block is in two halves, the
953 // lower half is split_p, the upper half p again. add_to_upper_half
954 // becomes true when the item gets added to p, false when it gets added
955 // to split_p.
957 if (seq_count < 0) {
958 // If we're not in sequential mode, we split at the mid point
959 // of the node.
960 m = mid_point(p);
961 } else {
962 // During sequential addition, split at the insert point
963 AssertRel(c,>=,DIR_START);
964 m = c;
967 uint4 split_n = C[j].get_n();
968 C[j].set_n(free_list.get_block(this, block_size));
970 memcpy(split_p, p, block_size); // replicate the whole block in split_p
971 SET_DIR_END(split_p, m);
972 compact(split_p); /* to reset TOTAL_FREE, MAX_FREE */
975 int residue = DIR_END(p) - m;
976 int new_dir_end = DIR_START + residue;
977 memmove(p + DIR_START, p + m, residue);
978 SET_DIR_END(p, new_dir_end);
981 compact(p); /* to reset TOTAL_FREE, MAX_FREE */
983 bool add_to_upper_half;
984 if (seq_count < 0) {
985 add_to_upper_half = (c >= m);
986 } else {
987 // And add item to lower half if split_p has room, otherwise upper
988 // half
989 add_to_upper_half = (TOTAL_FREE(split_p) < needed);
992 if (add_to_upper_half) {
993 c -= (m - DIR_START);
994 Assert(seq_count < 0 || c <= DIR_START + D2);
995 Assert(c >= DIR_START);
996 Assert(c <= DIR_END(p));
997 add_item_to_branch(p, kt_, c);
998 } else {
999 Assert(c >= DIR_START);
1000 Assert(c <= DIR_END(split_p));
1001 add_item_to_branch(split_p, kt_, c);
1003 write_block(split_n, split_p);
1005 // Check if we're splitting the root block.
1006 if (j == level) split_root(split_n);
1008 /* Enter a separating key at level j + 1 between */
1009 /* the last key of block split_p, and the first key of block p */
1010 enter_key_above_branch(j + 1, BItem(p, DIR_START));
1012 // In branch levels, we can make the first key of block p null and
1013 // save a bit of disk space. Other redundant keys will still creep
1014 // in though.
1015 BItem_wr item(p, DIR_START);
1016 int new_total_free = TOTAL_FREE(p) + item.key().length();
1017 item.form_null_key(item.block_given_by());
1018 SET_TOTAL_FREE(p, new_total_free);
1019 } else {
1020 AssertRel(TOTAL_FREE(p),>=,needed);
1022 if (MAX_FREE(p) < needed) {
1023 compact(p);
1024 AssertRel(MAX_FREE(p),>=,needed);
1027 add_item_to_branch(p, kt_, c);
1031 /** GlassTable::delete_leaf_item(repeatedly) is (almost) the converse of add_leaf_item.
1033 * If repeatedly is true, the process repeats at the next level when a
1034 * block has been completely emptied, freeing the block and taking out
1035 * the pointer to it. Emptied root blocks are also removed, which
1036 * reduces the number of levels in the B-tree.
1038 void
1039 GlassTable::delete_leaf_item(bool repeatedly)
1041 LOGCALL_VOID(DB, "GlassTable::delete_leaf_item", repeatedly);
1042 Assert(writable);
1043 byte * p = C[0].get_modifiable_p(block_size);
1044 int c = C[0].c;
1045 AssertRel(DIR_START,<=,c);
1046 AssertRel(c,<,DIR_END(p));
1047 int kt_len = LeafItem(p, c).size(); /* size of the item to be deleted */
1048 int dir_end = DIR_END(p) - D2; /* directory length will go down by 2 bytes */
1050 memmove(p + c, p + c + D2, dir_end - c);
1051 SET_DIR_END(p, dir_end);
1052 SET_MAX_FREE(p, MAX_FREE(p) + D2);
1053 SET_TOTAL_FREE(p, TOTAL_FREE(p) + kt_len + D2);
1055 if (!repeatedly) return;
1056 if (0 < level) {
1057 if (dir_end == DIR_START) {
1058 free_list.mark_block_unused(this, block_size, C[0].get_n());
1059 C[0].rewrite = false;
1060 C[0].set_n(BLK_UNUSED);
1061 C[1].rewrite = true; /* *is* necessary */
1062 delete_branch_item(1);
1067 /** GlassTable::delete_branch_item(j, repeatedly) is (almost) the converse of add_branch_item.
1069 * The process repeats at the next level when a block has been completely
1070 * emptied, freeing the block and taking out the pointer to it. Emptied root
1071 * blocks are also removed, which reduces the number of levels in the B-tree.
1073 void
1074 GlassTable::delete_branch_item(int j)
1076 LOGCALL_VOID(DB, "GlassTable::delete_branch_item", j);
1077 Assert(writable);
1078 byte * p = C[j].get_modifiable_p(block_size);
1079 int c = C[j].c;
1080 AssertRel(DIR_START,<=,c);
1081 AssertRel(c,<,DIR_END(p));
1082 int kt_len = BItem(p, c).size(); /* size of the item to be deleted */
1083 int dir_end = DIR_END(p) - D2; /* directory length will go down by 2 bytes */
1085 memmove(p + c, p + c + D2, dir_end - c);
1086 SET_DIR_END(p, dir_end);
1087 SET_MAX_FREE(p, MAX_FREE(p) + D2);
1088 SET_TOTAL_FREE(p, TOTAL_FREE(p) + kt_len + D2);
1090 if (j < level) {
1091 if (dir_end == DIR_START) {
1092 free_list.mark_block_unused(this, block_size, C[j].get_n());
1093 C[j].rewrite = false;
1094 C[j].set_n(BLK_UNUSED);
1095 C[j + 1].rewrite = true; /* *is* necessary */
1096 delete_branch_item(j + 1);
1098 } else {
1099 Assert(j == level);
1100 while (dir_end == DIR_START + D2 && level > 0) {
1101 /* single item in the root block, so lose a level */
1102 uint4 new_root = BItem(C[level].get_p(), DIR_START).block_given_by();
1103 free_list.mark_block_unused(this, block_size, C[level].get_n());
1104 C[level].destroy();
1105 level--;
1107 block_to_cursor(C, level, new_root);
1109 dir_end = DIR_END(C[level].get_p()); /* prepare for the loop */
1114 /* debugging aid:
1115 static addcount = 0;
1118 /** add_kt(found) adds the item (key-tag pair) at B->kt into the
1119 B-tree, using cursor C.
1121 found == find() is handed over as a parameter from Btree::add.
1122 Btree::alter() prepares for the alteration to the B-tree. Then
1123 there are a number of cases to consider:
1125 If an item with the same key is in the B-tree (found is true),
1126 the new kt replaces it.
1128 If then kt is smaller, or the same size as, the item it replaces,
1129 kt is put in the same place as the item it replaces, and the
1130 TOTAL_FREE measure is reduced.
1132 If kt is larger than the item it replaces it is put in the
1133 MAX_FREE space if there is room, and the directory entry and
1134 space counts are adjusted accordingly.
1136 - But if there is not room we do it the long way: the old item is
1137 deleted with delete_leaf_item and kt is added in with add_item.
1139 If the key of kt is not in the B-tree (found is false), the new
1140 kt is added in with add_item.
1142 Returns:
1143 0 : added kt
1144 1 : replaced kt
1145 2 : replaced kt and it was the final one
1149 GlassTable::add_kt(bool found)
1151 LOGCALL(DB, int, "GlassTable::add_kt", found);
1152 Assert(writable);
1156 printf("%d) %s ", addcount++, (found ? "replacing" : "adding"));
1157 print_bytes(kt[I2], kt + I2 + K1); putchar('\n');
1160 alter();
1162 int result = 0;
1163 if (found) { /* replacement */
1164 seq_count = SEQ_START_POINT;
1165 sequential = false;
1167 byte * p = C[0].get_modifiable_p(block_size);
1168 int c = C[0].c;
1169 AssertRel(DIR_START,<=,c);
1170 AssertRel(c,<,DIR_END(p));
1171 LeafItem item(p, c);
1172 int kt_size = kt.size();
1173 int needed = kt_size - item.size();
1175 result = item.last_component() ? 2 : 1;
1177 if (needed <= 0) {
1178 /* simple replacement */
1179 memmove(const_cast<byte *>(item.get_address()),
1180 kt.get_address(), kt_size);
1181 SET_TOTAL_FREE(p, TOTAL_FREE(p) - needed);
1182 } else {
1183 /* new item into the block's freespace */
1184 int new_max = MAX_FREE(p) - kt_size;
1185 if (new_max >= 0) {
1186 int o = DIR_END(p) + new_max;
1187 memmove(p + o, kt.get_address(), kt_size);
1188 LeafItem_wr::setD(p, c, o);
1189 SET_MAX_FREE(p, new_max);
1190 SET_TOTAL_FREE(p, TOTAL_FREE(p) - needed);
1191 } else {
1192 /* do it the long way */
1193 delete_leaf_item(false);
1194 add_leaf_item(kt);
1197 } else {
1198 /* addition */
1199 if (changed_n == C[0].get_n() && changed_c == C[0].c) {
1200 if (seq_count < 0) seq_count++;
1201 } else {
1202 seq_count = SEQ_START_POINT;
1203 sequential = false;
1205 C[0].c += D2;
1206 add_leaf_item(kt);
1208 RETURN(result);
1211 /* delete_kt() corresponds to add_kt(found), but there are only
1212 two cases: if the key is not found nothing is done, and if it is
1213 found the corresponding item is deleted with delete_leaf_item.
1215 Returns:
1216 0 : nothing to delete
1217 1 : deleted kt
1218 2 : deleted kt and it was the final one
1222 GlassTable::delete_kt()
1224 LOGCALL(DB, int, "GlassTable::delete_kt", NO_ARGS);
1225 Assert(writable);
1227 seq_count = SEQ_START_POINT;
1228 sequential = false;
1230 if (!find(C))
1231 return 0;
1233 int result = LeafItem(C[0].get_p(), C[0].c).last_component() ? 2 : 1;
1234 alter();
1235 delete_leaf_item(true);
1237 RETURN(result);
1240 /* GlassTable::form_key(key) treats address kt as an item holder and fills in
1241 the key part:
1243 (I) K key c (C tag)
1245 The bracketed parts are left blank. The key is filled in with key_len bytes and
1246 K set accordingly. c is set to 1.
1249 void GlassTable::form_key(const string & key) const
1251 LOGCALL_VOID(DB, "GlassTable::form_key", key);
1252 kt.form_key(key);
1255 /* GlassTable::add(key, tag) adds the key/tag item to the
1256 B-tree, replacing any existing item with the same key.
1258 For a long tag, we end up having to add m components, of the form
1260 key 1 m tag1
1261 key 2 m tag2
1263 key m m tagm
1265 and tag1+tag2+...+tagm are equal to tag. These in their turn may be replacing
1266 n components of the form
1268 key 1 n TAG1
1269 key 2 n TAG2
1271 key n n TAGn
1273 and n may be greater than, equal to, or less than m. These cases are dealt
1274 with in the code below. If m < n for example, we end up with a series of
1275 deletions.
1278 void
1279 GlassTable::add(const string &key, string tag, bool already_compressed)
1281 LOGCALL_VOID(DB, "GlassTable::add", key | tag | already_compressed);
1282 Assert(writable);
1284 if (handle < 0) {
1285 if (handle == -2) {
1286 GlassTable::throw_database_closed();
1288 RootInfo root_info;
1289 root_info.init(block_size, compress_min);
1290 do_open_to_write(&root_info);
1293 form_key(key);
1295 const char* tag_data = tag.data();
1296 size_t tag_size = tag.size();
1298 bool compressed = false;
1299 if (already_compressed) {
1300 compressed = true;
1301 } else if (compress_min > 0 && tag_size > compress_min) {
1302 const char * res = comp_stream.compress(tag_data, &tag_size);
1303 if (res) {
1304 compressed = true;
1305 tag_data = res;
1309 // sort of matching kt.append_chunk(), but setting the chunk
1310 const size_t cd = kt.key().length() + K1 + I2 + X2; // offset to the tag data
1311 const size_t L = max_item_size - cd; // largest amount of tag data for any chunk
1312 size_t first_L = L + X2; // - amount for tag1 (we don't store X there)
1313 bool found = find(C);
1314 if (tag_size <= first_L) {
1315 // The whole tag clearly fits in one item, so no need to make this
1316 // complicated.
1317 first_L = tag_size;
1318 } else if (!found) {
1319 const byte * p = C[0].get_p();
1320 size_t n = TOTAL_FREE(p) % (max_item_size + D2);
1321 if (n > D2 + cd) {
1322 n -= (D2 + cd);
1323 // if n >= last then fully filling this block won't produce
1324 // an extra item, so we might as well do this even if
1325 // full_compaction isn't active.
1327 // In the full_compaction case, it turns out we shouldn't always
1328 // try to fill every last byte. Doing so can actually increase the
1329 // total space required (I believe this effect is due to longer
1330 // dividing keys being required in the index blocks). Empirically,
1331 // n >= key.size() + K appears a good criterion for K ~= 34. This
1332 // seems to save about 0.2% in total database size over always
1333 // splitting the tag. It'll also give be slightly faster retrieval
1334 // as we can avoid reading an extra block occasionally.
1335 size_t last = (tag_size - X2) % L;
1336 if (n >= last || (full_compaction && n >= key.size() + 34)) {
1337 // first_L < max_item_size + D2 - D2 - cd
1338 // Total size of first item = cd + first_L < max_item_size
1339 first_L = n + X2;
1344 // There are m items to add.
1345 int m = (tag_size - first_L + L - 1) / L + 1;
1346 /* FIXME: sort out this error higher up and turn this into
1347 * an assert.
1349 if (m >= BYTE_PAIR_RANGE)
1350 throw Xapian::UnimplementedError("Can't handle insanely large tags");
1352 size_t o = 0; // Offset into the tag
1353 size_t residue = tag_size; // Bytes of the tag remaining to add in
1354 bool replacement = false; // Has there been a replacement?
1355 bool components_to_del = false; // Are there components to delete?
1356 int i;
1357 for (i = 1; i <= m; i++) {
1358 size_t l = (i == m ? residue : (i == 1 ? first_L : L));
1359 size_t this_cd = (i == 1 ? cd - X2 : cd);
1360 Assert(this_cd + l <= block_size);
1361 Assert(o + l <= tag_size);
1362 kt.set_tag(this_cd, tag_data + o, l, compressed, i, m);
1364 o += l;
1365 residue -= l;
1367 if (i > 1) found = find(C);
1368 int result = add_kt(found);
1369 if (result) replacement = true;
1370 components_to_del = (result == 1);
1372 AssertEq(o, tag_size);
1373 if (components_to_del) {
1374 i = m;
1375 do {
1376 kt.set_component_of(++i);
1377 } while (delete_kt() == 1);
1379 if (!replacement) ++item_count;
1380 Btree_modified = true;
1381 if (cursor_created_since_last_modification) {
1382 cursor_created_since_last_modification = false;
1383 ++cursor_version;
1387 /* GlassTable::del(key) returns false if the key is not in the B-tree,
1388 otherwise deletes it and returns true.
1390 Again, this is parallel to GlassTable::add, but simpler in form.
1393 bool
1394 GlassTable::del(const string &key)
1396 LOGCALL(DB, bool, "GlassTable::del", key);
1397 Assert(writable);
1399 if (handle < 0) {
1400 if (handle == -2) {
1401 GlassTable::throw_database_closed();
1403 RETURN(false);
1406 // We can't delete a key which we is too long for us to store.
1407 if (key.size() > GLASS_BTREE_MAX_KEY_LEN) RETURN(false);
1409 if (key.empty()) RETURN(false);
1410 form_key(key);
1412 int r = delete_kt();
1413 if (r == 0) RETURN(false);
1414 int i = 1;
1415 while (r == 1) {
1416 kt.set_component_of(++i);
1417 r = delete_kt();
1420 item_count--;
1421 Btree_modified = true;
1422 if (cursor_created_since_last_modification) {
1423 cursor_created_since_last_modification = false;
1424 ++cursor_version;
1426 RETURN(true);
1429 bool
1430 GlassTable::readahead_key(const string &key) const
1432 LOGCALL(DB, bool, "GlassTable::readahead_key", key);
1433 Assert(!key.empty());
1435 // Three cases:
1437 // handle == -1: Lazy table in a multi-file database which isn't yet open.
1439 // handle == -2: Table has been closed. Since the readahead is just a
1440 // hint, we can safely ignore it for a closed table.
1442 // handle <= -3: Lazy table in a single-file database which isn't yet
1443 // open.
1444 if (handle < 0)
1445 RETURN(false);
1447 // If the table only has one level, there are no branch blocks to preread.
1448 if (level == 0)
1449 RETURN(false);
1451 form_key(key);
1453 // We'll only readahead the first level, since descending the B-tree would
1454 // require actual reads that would likely hurt performance more than help.
1455 const byte * p = C[level].get_p();
1456 int c = find_in_branch(p, kt, C[level].c);
1457 uint4 n = BItem(p, c).block_given_by();
1458 // Don't preread if it's the block we last preread or already in the
1459 // cursor.
1460 if (n != last_readahead && n != C[level - 1].get_n()) {
1461 last_readahead = n;
1462 if (!io_readahead_block(handle, block_size, n, offset))
1463 RETURN(false);
1465 RETURN(true);
1468 bool
1469 GlassTable::get_exact_entry(const string &key, string & tag) const
1471 LOGCALL(DB, bool, "GlassTable::get_exact_entry", key | tag);
1472 Assert(!key.empty());
1474 if (handle < 0) {
1475 if (handle == -2) {
1476 GlassTable::throw_database_closed();
1478 RETURN(false);
1481 // An oversized key can't exist, so attempting to search for it should fail.
1482 if (key.size() > GLASS_BTREE_MAX_KEY_LEN) RETURN(false);
1484 form_key(key);
1485 if (!find(C)) RETURN(false);
1487 (void)read_tag(C, &tag, false);
1488 RETURN(true);
1491 bool
1492 GlassTable::key_exists(const string &key) const
1494 LOGCALL(DB, bool, "GlassTable::key_exists", key);
1495 Assert(!key.empty());
1497 // An oversized key can't exist, so attempting to search for it should fail.
1498 if (key.size() > GLASS_BTREE_MAX_KEY_LEN) RETURN(false);
1500 form_key(key);
1501 RETURN(find(C));
1504 bool
1505 GlassTable::read_tag(Glass::Cursor * C_, string *tag, bool keep_compressed) const
1507 LOGCALL(DB, bool, "GlassTable::read_tag", Literal("C_") | tag | keep_compressed);
1509 tag->resize(0);
1511 bool first = true;
1512 bool compressed = false;
1513 bool decompress = false;
1514 while (true) {
1515 LeafItem item(C_[0].get_p(), C_[0].c);
1516 if (first) {
1517 first = false;
1518 compressed = item.get_compressed();
1519 if (compressed && !keep_compressed) {
1520 comp_stream.decompress_start();
1521 decompress = true;
1524 bool last = item.last_component();
1525 if (decompress) {
1526 // Decompress each chunk as we read it so we don't need both the
1527 // full compressed and uncompressed tags in memory at once.
1528 bool done = item.decompress_chunk(comp_stream, *tag);
1529 if (done != last) {
1530 throw Xapian::DatabaseCorruptError(done ?
1531 "Too many chunks of compressed data" :
1532 "Too few chunks of compressed data");
1534 } else {
1535 item.append_chunk(tag);
1537 if (last) break;
1538 if (!next(C_, 0)) {
1539 throw Xapian::DatabaseCorruptError("Unexpected end of table when reading continuation of tag");
1542 // At this point the cursor is on the last item - calling next will move
1543 // it to the next key (GlassCursor::read_tag() relies on this).
1545 RETURN(compressed && keep_compressed);
1548 void
1549 GlassTable::set_full_compaction(bool parity)
1551 LOGCALL_VOID(DB, "GlassTable::set_full_compaction", parity);
1552 Assert(writable);
1554 if (parity) seq_count = 0;
1555 full_compaction = parity;
1558 GlassCursor * GlassTable::cursor_get() const {
1559 LOGCALL(DB, GlassCursor *, "GlassTable::cursor_get", NO_ARGS);
1560 if (handle < 0) {
1561 if (handle == -2) {
1562 GlassTable::throw_database_closed();
1564 RETURN(NULL);
1566 // FIXME Ick - casting away const is nasty
1567 RETURN(new GlassCursor(const_cast<GlassTable *>(this)));
1570 /************ B-tree opening and closing ************/
1572 void
1573 GlassTable::basic_open(const RootInfo * root_info, glass_revision_number_t rev)
1575 LOGCALL_VOID(DB, "GlassTable::basic_open", root_info|rev);
1576 revision_number = rev;
1577 root = root_info->get_root();
1578 level = root_info->get_level();
1579 item_count = root_info->get_num_entries();
1580 faked_root_block = root_info->get_root_is_fake();
1581 sequential = root_info->get_sequential();
1582 const string & fl_serialised = root_info->get_free_list();
1583 if (!fl_serialised.empty()) {
1584 if (!free_list.unpack(fl_serialised))
1585 throw Xapian::DatabaseCorruptError("Bad freelist metadata");
1586 } else {
1587 free_list.reset();
1590 compress_min = root_info->get_compress_min();
1592 /* kt holds constructed items as well as keys */
1593 kt = LeafItem_wr(zeroed_new(block_size));
1595 set_max_item_size(BLOCK_CAPACITY);
1597 for (int j = 0; j <= level; j++) {
1598 C[j].init(block_size);
1601 read_root();
1603 if (cursor_created_since_last_modification) {
1604 cursor_created_since_last_modification = false;
1605 ++cursor_version;
1609 void
1610 GlassTable::read_root()
1612 LOGCALL_VOID(DB, "GlassTable::read_root", NO_ARGS);
1613 if (faked_root_block) {
1614 /* root block for an unmodified database. */
1615 byte * p = C[0].init(block_size);
1616 Assert(p);
1618 /* clear block - shouldn't be necessary, but is a bit nicer,
1619 * and means that the same operations should always produce
1620 * the same database. */
1621 memset(p, 0, block_size);
1623 int o = block_size - I2 - K1;
1624 LeafItem_wr(p + o).fake_root_item();
1626 LeafItem_wr::setD(p, DIR_START, o); // its directory entry
1627 SET_DIR_END(p, DIR_START + D2);// the directory size
1629 o -= (DIR_START + D2);
1630 SET_MAX_FREE(p, o);
1631 SET_TOTAL_FREE(p, o);
1632 SET_LEVEL(p, 0);
1634 if (!writable) {
1635 /* reading - revision number doesn't matter as long as
1636 * it's not greater than the current one. */
1637 SET_REVISION(p, 0);
1638 C[0].set_n(0);
1639 } else {
1640 /* writing - */
1641 SET_REVISION(p, revision_number + 1);
1642 C[0].set_n(free_list.get_block(this, block_size));
1644 } else {
1645 /* using a root block stored on disk */
1646 block_to_cursor(C, level, root);
1648 if (REVISION(C[level].get_p()) > revision_number) set_overwritten();
1649 /* although this is unlikely */
1653 void
1654 GlassTable::do_open_to_write(const RootInfo * root_info,
1655 glass_revision_number_t rev)
1657 LOGCALL_VOID(DB, "GlassTable::do_open_to_write", root_info|rev);
1658 if (handle == -2) {
1659 GlassTable::throw_database_closed();
1661 if (handle <= -2) {
1662 // Single file database.
1663 handle = -3 - handle;
1664 } else {
1665 handle = io_open_block_wr(name + GLASS_TABLE_EXTENSION, (rev == 0));
1666 if (handle < 0) {
1667 // lazy doesn't make a lot of sense when we're creating a DB (which
1668 // is the case when rev==0), but ENOENT with O_CREAT means a parent
1669 // directory doesn't exist.
1670 if (lazy && rev && errno == ENOENT) {
1671 revision_number = rev;
1672 return;
1674 string message((rev == 0) ? "Couldn't create " : "Couldn't open ");
1675 message += name;
1676 message += GLASS_TABLE_EXTENSION" read/write: ";
1677 errno_to_string(errno, message);
1678 throw Xapian::DatabaseOpeningError(message);
1682 writable = true;
1683 basic_open(root_info, rev);
1685 split_p = new byte[block_size];
1687 buffer = zeroed_new(block_size);
1689 changed_n = 0;
1690 changed_c = DIR_START;
1691 seq_count = SEQ_START_POINT;
1694 GlassTable::GlassTable(const char * tablename_, const string & path_,
1695 bool readonly_, bool lazy_)
1696 : tablename(tablename_),
1697 revision_number(0),
1698 item_count(0),
1699 block_size(0),
1700 faked_root_block(true),
1701 sequential(true),
1702 handle(-1),
1703 level(0),
1704 root(0),
1705 kt(0),
1706 buffer(0),
1707 free_list(),
1708 name(path_),
1709 seq_count(0),
1710 changed_n(0),
1711 changed_c(0),
1712 max_item_size(0),
1713 Btree_modified(false),
1714 full_compaction(false),
1715 writable(!readonly_),
1716 cursor_created_since_last_modification(false),
1717 cursor_version(0),
1718 changes_obj(NULL),
1719 split_p(0),
1720 compress_min(0),
1721 comp_stream(Z_DEFAULT_STRATEGY),
1722 lazy(lazy_),
1723 last_readahead(BLK_UNUSED),
1724 offset(0)
1726 LOGCALL_CTOR(DB, "GlassTable", tablename_ | path_ | readonly_ | lazy_);
1729 GlassTable::GlassTable(const char * tablename_, int fd, off_t offset_,
1730 bool readonly_, bool lazy_)
1731 : tablename(tablename_),
1732 revision_number(0),
1733 item_count(0),
1734 block_size(0),
1735 faked_root_block(true),
1736 sequential(true),
1737 handle(-3 - fd),
1738 level(0),
1739 root(0),
1740 kt(0),
1741 buffer(0),
1742 free_list(),
1743 name(),
1744 seq_count(0),
1745 changed_n(0),
1746 changed_c(0),
1747 max_item_size(0),
1748 Btree_modified(false),
1749 full_compaction(false),
1750 writable(!readonly_),
1751 cursor_created_since_last_modification(false),
1752 cursor_version(0),
1753 changes_obj(NULL),
1754 split_p(0),
1755 compress_min(0),
1756 comp_stream(Z_DEFAULT_STRATEGY),
1757 lazy(lazy_),
1758 last_readahead(BLK_UNUSED),
1759 offset(offset_)
1761 LOGCALL_CTOR(DB, "GlassTable", tablename_ | fd | offset_ | readonly_ | lazy_);
1764 bool
1765 GlassTable::exists() const {
1766 LOGCALL(DB, bool, "GlassTable::exists", NO_ARGS);
1767 // We know a single-file database exists, since we have an fd open on it!
1768 return single_file() || file_exists(name + GLASS_TABLE_EXTENSION);
1771 void
1772 GlassTable::create_and_open(int flags_, const RootInfo & root_info)
1774 LOGCALL_VOID(DB, "GlassTable::create_and_open", flags_|root_info);
1775 if (handle == -2) {
1776 GlassTable::throw_database_closed();
1778 Assert(writable);
1779 close();
1781 unsigned int block_size_ = root_info.get_blocksize();
1782 Assert(block_size_ >= 2048);
1783 Assert(block_size_ <= BYTE_PAIR_RANGE);
1784 // Must be a power of two.
1785 Assert((block_size_ & (block_size_ - 1)) == 0);
1787 flags = flags_;
1788 block_size = block_size_;
1790 if (lazy) {
1791 close();
1792 (void)io_unlink(name + GLASS_TABLE_EXTENSION);
1793 compress_min = root_info.get_compress_min();
1794 } else {
1795 // FIXME: it would be good to arrange that this works such that there's
1796 // always a valid table in place if you run create_and_open() on an
1797 // existing table.
1799 do_open_to_write(&root_info);
1803 GlassTable::~GlassTable() {
1804 LOGCALL_DTOR(DB, "GlassTable");
1805 GlassTable::close();
1808 void GlassTable::close(bool permanent) {
1809 LOGCALL_VOID(DB, "GlassTable::close", permanent);
1811 if (handle >= 0) {
1812 if (single_file()) {
1813 handle = -3 - handle;
1814 } else {
1815 // If an error occurs here, we just ignore it, since we're just
1816 // trying to free everything.
1817 (void)::close(handle);
1818 handle = -1;
1822 if (permanent) {
1823 handle = -2;
1824 // Don't delete the resources in the table, since they may
1825 // still be used to look up cached content.
1826 return;
1828 for (int j = level; j >= 0; j--) {
1829 C[j].destroy();
1831 delete [] split_p;
1832 split_p = 0;
1834 delete [] kt.get_address();
1835 kt = 0;
1836 delete [] buffer;
1837 buffer = 0;
1840 void
1841 GlassTable::flush_db()
1843 LOGCALL_VOID(DB, "GlassTable::flush_db", NO_ARGS);
1844 Assert(writable);
1845 if (handle < 0) {
1846 if (handle == -2) {
1847 GlassTable::throw_database_closed();
1849 return;
1852 for (int j = level; j >= 0; j--) {
1853 if (C[j].rewrite) {
1854 write_block(C[j].get_n(), C[j].get_p());
1858 if (Btree_modified) {
1859 faked_root_block = false;
1863 void
1864 GlassTable::commit(glass_revision_number_t revision, RootInfo * root_info)
1866 LOGCALL_VOID(DB, "GlassTable::commit", revision|root_info);
1867 Assert(writable);
1869 if (revision <= revision_number) {
1870 throw Xapian::DatabaseError("New revision too low");
1873 if (handle < 0) {
1874 if (handle == -2) {
1875 GlassTable::throw_database_closed();
1877 revision_number = revision;
1878 root_info->set_blocksize(block_size);
1879 root_info->set_level(0);
1880 root_info->set_num_entries(0);
1881 root_info->set_root_is_fake(true);
1882 root_info->set_sequential(true);
1883 root_info->set_root(0);
1884 return;
1887 try {
1888 root = C[level].get_n();
1890 root_info->set_blocksize(block_size);
1891 root_info->set_level(level);
1892 root_info->set_num_entries(item_count);
1893 root_info->set_root_is_fake(faked_root_block);
1894 root_info->set_sequential(sequential);
1895 root_info->set_root(root);
1897 Btree_modified = false;
1899 for (int i = 0; i < BTREE_CURSOR_LEVELS; ++i) {
1900 C[i].init(block_size);
1903 free_list.set_revision(revision);
1904 free_list.commit(this, block_size);
1906 // Save the freelist details into the root_info.
1907 string serialised;
1908 free_list.pack(serialised);
1909 root_info->set_free_list(serialised);
1911 revision_number = revision;
1913 read_root();
1915 changed_n = 0;
1916 changed_c = DIR_START;
1917 seq_count = SEQ_START_POINT;
1918 } catch (...) {
1919 GlassTable::close();
1920 throw;
1924 void
1925 GlassTable::cancel(const RootInfo & root_info, glass_revision_number_t rev)
1927 LOGCALL_VOID(DB, "GlassTable::cancel", root_info|rev);
1928 Assert(writable);
1930 if (handle < 0) {
1931 if (handle == -2) {
1932 GlassTable::throw_database_closed();
1934 return;
1937 // This causes problems: if (!Btree_modified) return;
1939 if (flags & Xapian::DB_DANGEROUS)
1940 throw Xapian::InvalidOperationError("cancel() not supported under Xapian::DB_DANGEROUS");
1942 revision_number = rev;
1943 block_size = root_info.get_blocksize();
1944 root = root_info.get_root();
1945 level = root_info.get_level();
1946 item_count = root_info.get_num_entries();
1947 faked_root_block = root_info.get_root_is_fake();
1948 sequential = root_info.get_sequential();
1950 Btree_modified = false;
1952 for (int j = 0; j <= level; j++) {
1953 C[j].init(block_size);
1954 C[j].rewrite = false;
1956 read_root();
1958 changed_n = 0;
1959 changed_c = DIR_START;
1960 seq_count = SEQ_START_POINT;
1962 if (cursor_created_since_last_modification) {
1963 cursor_created_since_last_modification = false;
1964 ++cursor_version;
1968 /************ B-tree reading ************/
1970 void
1971 GlassTable::do_open_to_read(const RootInfo * root_info,
1972 glass_revision_number_t rev)
1974 LOGCALL(DB, bool, "GlassTable::do_open_to_read", root_info|rev);
1975 if (handle == -2) {
1976 GlassTable::throw_database_closed();
1978 if (single_file()) {
1979 handle = -3 - handle;
1980 } else {
1981 handle = io_open_block_rd(name + GLASS_TABLE_EXTENSION);
1982 if (handle < 0) {
1983 if (lazy) {
1984 // This table is optional when reading!
1985 revision_number = rev;
1986 return;
1988 string message("Couldn't open ");
1989 message += name;
1990 message += GLASS_TABLE_EXTENSION" to read: ";
1991 errno_to_string(errno, message);
1992 throw Xapian::DatabaseOpeningError(message);
1996 basic_open(root_info, rev);
1998 read_root();
2001 void
2002 GlassTable::open(int flags_, const RootInfo & root_info,
2003 glass_revision_number_t rev)
2005 LOGCALL_VOID(DB, "GlassTable::open", flags_|root_info|rev);
2006 close();
2008 flags = flags_;
2009 block_size = root_info.get_blocksize();
2010 root = root_info.get_root();
2012 if (!writable) {
2013 do_open_to_read(&root_info, rev);
2014 return;
2017 do_open_to_write(&root_info, rev);
2020 bool
2021 GlassTable::prev_for_sequential(Glass::Cursor * C_, int /*dummy*/) const
2023 LOGCALL(DB, bool, "GlassTable::prev_for_sequential", Literal("C_") | Literal("/*dummy*/"));
2024 int c = C_[0].c;
2025 AssertRel(DIR_START,<=,c);
2026 AssertRel(c,<,DIR_END(C_[0].get_p()));
2027 if (c == DIR_START) {
2028 uint4 n = C_[0].get_n();
2029 const byte * p;
2030 while (true) {
2031 if (n == 0) RETURN(false);
2032 n--;
2033 if (n == C[0].get_n()) {
2034 // Block is a leaf block in the built-in cursor (potentially in
2035 // modified form if the table is writable).
2036 p = C_[0].clone(C[0]);
2037 } else {
2038 if (writable) {
2039 // Blocks in the built-in cursor may not have been written
2040 // to disk yet, so we have to check that the block number
2041 // isn't in the built-in cursor or we'll read an
2042 // uninitialised block (for which GET_LEVEL(p) will
2043 // probably return 0).
2044 int j;
2045 for (j = 1; j <= level; ++j) {
2046 if (n == C[j].get_n()) break;
2048 if (j <= level) continue;
2051 // Block isn't in the built-in cursor, so the form on disk
2052 // is valid, so read it to check if it's the next level 0
2053 // block.
2054 byte * q = C_[0].init(block_size);
2055 read_block(n, q);
2056 p = q;
2057 C_[0].set_n(n);
2059 if (REVISION(p) > revision_number + writable) {
2060 set_overwritten();
2061 RETURN(false);
2063 if (GET_LEVEL(p) == 0) break;
2065 c = DIR_END(p);
2066 AssertRel(DIR_START,<,c);
2068 c -= D2;
2069 C_[0].c = c;
2070 RETURN(true);
2073 bool
2074 GlassTable::next_for_sequential(Glass::Cursor * C_, int /*dummy*/) const
2076 LOGCALL(DB, bool, "GlassTable::next_for_sequential", Literal("C_") | Literal("/*dummy*/"));
2077 const byte * p = C_[0].get_p();
2078 Assert(p);
2079 int c = C_[0].c;
2080 AssertRel(c,<,DIR_END(p));
2081 c += D2;
2082 Assert((unsigned)c < block_size);
2083 if (c == DIR_END(p)) {
2084 uint4 n = C_[0].get_n();
2085 while (true) {
2086 n++;
2087 if (n >= free_list.get_first_unused_block()) RETURN(false);
2088 if (writable) {
2089 if (n == C[0].get_n()) {
2090 // Block is a leaf block in the built-in cursor
2091 // (potentially in modified form).
2092 p = C_[0].clone(C[0]);
2093 } else {
2094 // Blocks in the built-in cursor may not have been written
2095 // to disk yet, so we have to check that the block number
2096 // isn't in the built-in cursor or we'll read an
2097 // uninitialised block (for which GET_LEVEL(p) will
2098 // probably return 0).
2099 int j;
2100 for (j = 1; j <= level; ++j) {
2101 if (n == C[j].get_n()) break;
2103 if (j <= level) continue;
2105 // Block isn't in the built-in cursor, so the form on disk
2106 // is valid, so read it to check if it's the next level 0
2107 // block.
2108 byte * q = C_[0].init(block_size);
2109 read_block(n, q);
2110 p = q;
2112 } else {
2113 byte * q = C_[0].init(block_size);
2114 read_block(n, q);
2115 p = q;
2117 if (REVISION(p) > revision_number + writable) {
2118 set_overwritten();
2119 RETURN(false);
2121 if (GET_LEVEL(p) == 0) break;
2123 c = DIR_START;
2124 C_[0].set_n(n);
2126 C_[0].c = c;
2127 RETURN(true);
2130 bool
2131 GlassTable::prev_default(Glass::Cursor * C_, int j) const
2133 LOGCALL(DB, bool, "GlassTable::prev_default", Literal("C_") | j);
2134 const byte * p = C_[j].get_p();
2135 int c = C_[j].c;
2136 AssertRel(DIR_START,<=,c);
2137 AssertRel(c,<,DIR_END(p));
2138 AssertRel((unsigned)DIR_END(p),<=,block_size);
2139 if (c == DIR_START) {
2140 if (j == level) RETURN(false);
2141 if (!prev_default(C_, j + 1)) RETURN(false);
2142 p = C_[j].get_p();
2143 c = DIR_END(p);
2144 AssertRel(DIR_START,<,c);
2146 c -= D2;
2147 C_[j].c = c;
2148 if (j > 0) {
2149 block_to_cursor(C_, j - 1, BItem(p, c).block_given_by());
2151 RETURN(true);
2154 bool
2155 GlassTable::next_default(Glass::Cursor * C_, int j) const
2157 LOGCALL(DB, bool, "GlassTable::next_default", Literal("C_") | j);
2158 const byte * p = C_[j].get_p();
2159 int c = C_[j].c;
2160 AssertRel(c,<,DIR_END(p));
2161 AssertRel((unsigned)DIR_END(p),<=,block_size);
2162 c += D2;
2163 if (j > 0) {
2164 AssertRel(DIR_START,<,c);
2165 } else {
2166 AssertRel(DIR_START,<=,c);
2168 // Sometimes c can be DIR_END(p) + 2 here it appears...
2169 if (c >= DIR_END(p)) {
2170 if (j == level) RETURN(false);
2171 if (!next_default(C_, j + 1)) RETURN(false);
2172 p = C_[j].get_p();
2173 c = DIR_START;
2175 C_[j].c = c;
2176 if (j > 0) {
2177 block_to_cursor(C_, j - 1, BItem(p, c).block_given_by());
2178 #ifdef BTREE_DEBUG_FULL
2179 printf("Block in GlassTable:next_default");
2180 report_block_full(j - 1, C_[j - 1].get_n(), C_[j - 1].get_p());
2181 #endif /* BTREE_DEBUG_FULL */
2183 RETURN(true);
2186 void
2187 GlassTable::throw_database_closed()
2189 throw Xapian::DatabaseError("Database has been closed");