Avoid repeated call to root_info->get_free_list()
[xapian.git] / xapian-core / backends / glass / glass_table.cc
blob5d5b60d23bff37d9649e850bcb5303eb1f44d1fe
1 /* glass_table.cc: Btree implementation
3 * Copyright 1999,2000,2001 BrightStation PLC
4 * Copyright 2002 Ananova Ltd
5 * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015 Olly Betts
6 * Copyright 2008 Lemur Consulting Ltd
8 * This program is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU General Public License as
10 * published by the Free Software Foundation; either version 2 of the
11 * License, or (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
21 * USA
24 #include <config.h>
26 #include "glass_table.h"
28 #include <xapian/error.h>
30 #include "safeerrno.h"
32 #include "omassert.h"
33 #include "posixy_wrapper.h"
34 #include "str.h"
35 #include "stringutils.h" // For STRINGIZE().
37 #include <sys/types.h>
39 #include <cstring> /* for memmove */
40 #include <climits> /* for CHAR_BIT */
42 #include "glass_freelist.h"
43 #include "glass_changes.h"
44 #include "glass_cursor.h"
45 #include "glass_defs.h"
46 #include "glass_version.h"
48 #include "debuglog.h"
49 #include "errno_to_string.h"
50 #include "filetests.h"
51 #include "io_utils.h"
52 #include "pack.h"
53 #include "unaligned.h"
55 #include <algorithm> // for std::min()
56 #include <string>
58 #include "xapian/constants.h"
60 using namespace Glass;
61 using namespace std;
63 // Only try to compress tags longer than this many bytes.
64 const size_t COMPRESS_MIN = 4;
66 //#define BTREE_DEBUG_FULL 1
67 #undef BTREE_DEBUG_FULL
69 #ifdef BTREE_DEBUG_FULL
70 /*------debugging aids from here--------*/
72 static void print_key(const byte * p, int c, int j);
73 static void print_tag(const byte * p, int c, int j);
76 static void report_cursor(int N, Btree * B, Glass::Cursor * C)
78 int i;
79 printf("%d)\n", N);
80 for (i = 0; i <= B->level; i++)
81 printf("p=%d, c=%d, n=[%d], rewrite=%d\n",
82 C[i].p, C[i].c, C[i].n, C[i].rewrite);
86 /*------to here--------*/
87 #endif /* BTREE_DEBUG_FULL */
89 static inline byte *zeroed_new(size_t size)
91 byte *temp = new byte[size];
92 memset(temp, 0, size);
93 return temp;
96 /* A B-tree consists of a file with extension GLASS_TABLE_EXTENSION divided
97 into a sequence of equal sized blocks, numbered 0, 1, 2 ... some of which
98 are free, some in use. Those in use are arranged in a tree. Lists of
99 currently unused blocks are stored in a freelist which is itself stored in
100 unused blocks.
102 Some "root info" is needed to locate a particular revision of the B-tree
103 - this is stored in the "version file" (we store this data for all tables
104 at a particular revision together).
106 Each block, b, has a structure like this:
108 R L M T D o1 o2 o3 ... oN <gap> [item] .. [item] .. [item] ...
109 <---------- D ----------> <-M->
111 And then,
113 R = REVISION(b) is the revision number the B-tree had when the block was
114 written into the DB file.
115 L = GET_LEVEL(b) is the level of the block, which is the number of levels
116 towards the root of the B-tree structure. So leaf blocks
117 have level 0 and the one root block has the highest level
118 equal to the number of levels in the B-tree. For blocks
119 storing the freelist, the level is set to 254.
120 M = MAX_FREE(b) is the size of the gap between the end of the directory and
121 the first item of data. (It is not necessarily the maximum
122 size among the bits of space that are free, but I can't
123 think of a better name.)
124 T = TOTAL_FREE(b)is the total amount of free space left in b.
125 D = DIR_END(b) gives the offset to the end of the directory.
127 o1, o2 ... oN are a directory of offsets to the N items held in the block.
128 The items are key-tag pairs, and as they occur in the directory are ordered
129 by the keys.
131 An item has this form:
133 I K key x C tag
134 <--K-->
135 <------I------>
137 A long tag presented through the API is split up into C tags small enough to
138 be accommodated in the blocks of the B-tree. The key is extended to include
139 a counter, x, which runs from 1 to C. The key is preceded by a length, K,
140 and the whole item with a length, I, as depicted above.
142 Here are the corresponding definitions:
146 /** Flip to sequential addition block-splitting after this number of observed
147 * sequential additions (in negated form). */
148 #define SEQ_START_POINT (-10)
150 /* Note use of the limits.h values:
151 UCHAR_MAX = 255, an unsigned with all bits set, and
152 CHAR_BIT = 8, the number of bits per byte
154 BYTE_PAIR_RANGE below is the smallest +ve number that can't be held in two
155 bytes -- 64K effectively.
158 #define BYTE_PAIR_RANGE (1 << 2 * CHAR_BIT)
160 /// read_block(n, p) reads block n of the DB file to address p.
161 void
162 GlassTable::read_block(uint4 n, byte * p) const
164 // Log the value of p, not the contents of the block it points to...
165 LOGCALL_VOID(DB, "GlassTable::read_block", n | (void*)p);
166 if (rare(handle == -2))
167 GlassTable::throw_database_closed();
168 AssertRel(n,<,free_list.get_first_unused_block());
170 io_read_block(handle, reinterpret_cast<char *>(p), block_size, n);
172 if (GET_LEVEL(p) != LEVEL_FREELIST) {
173 int dir_end = DIR_END(p);
174 if (rare(dir_end < DIR_START || unsigned(dir_end) > block_size)) {
175 string msg("dir_end invalid in block ");
176 msg += str(n);
177 throw Xapian::DatabaseCorruptError(msg);
182 /** write_block(n, p, appending) writes block n in the DB file from address p.
184 * If appending is true (not specified it defaults to false), then this
185 * indicates that we've added data to a block in space which was previously
186 * unused, and are writing the block back in place - we use this to add
187 * free list entries (the information about where the freelist data for a
188 * revision begins and ends is stored in the "iamglass" file). We don't
189 * currently use this flag for much, but it signifies that we don't risk
190 * invalidating any existing revisions, which may be useful information.
192 void
193 GlassTable::write_block(uint4 n, const byte * p, bool appending) const
195 LOGCALL_VOID(DB, "GlassTable::write_block", n | p | appending);
196 Assert(writable);
197 /* Check that n is in range. */
198 AssertRel(n,<,free_list.get_first_unused_block());
200 /* don't write to non-free */
201 // FIXME: We can no longer check this easily.
202 // AssertParanoid(free_list.block_free_at_start(block_size, n));
204 /* write revision is okay */
205 AssertEqParanoid(REVISION(p), revision_number + 1);
207 if (appending) {
208 // In the case of the freelist, new entries can safely be written into
209 // the space at the end of the latest freelist block, as that's unused
210 // by previous revisions. It is also safe to write into a block which
211 // wasn't used in the previous revision.
213 // It's only when we start a new freelist block that we need to worry
214 // about invalidating old revisions.
215 } else if (flags & Xapian::DB_DANGEROUS) {
216 // FIXME: We should somehow flag this to prevent readers opening the
217 // database. Removing "iamglass" or setting a flag in it doesn't deal
218 // with any existing readers though. Perhaps we want to have readers
219 // read lock and try to take an exclusive lock here?
222 io_write_block(handle, p, block_size, n);
224 if (!changes_obj) return;
226 unsigned char v;
227 // FIXME: track table_type in this class?
228 if (strcmp(tablename, "position") == 0) {
229 v = int(Glass::POSITION);
230 } else if (strcmp(tablename, "postlist") == 0) {
231 v = int(Glass::POSTLIST);
232 } else if (strcmp(tablename, "docdata") == 0) {
233 v = int(Glass::DOCDATA);
234 } else if (strcmp(tablename, "spelling") == 0) {
235 v = int(Glass::SPELLING);
236 } else if (strcmp(tablename, "synonym") == 0) {
237 v = int(Glass::SYNONYM);
238 } else if (strcmp(tablename, "termlist") == 0) {
239 v = int(Glass::TERMLIST);
240 } else {
241 return; // FIXME
244 if (block_size == 2048) {
245 v |= 0 << 3;
246 } else if (block_size == 4096) {
247 v |= 1 << 3;
248 } else if (block_size == 8192) {
249 v |= 2 << 3;
250 } else if (block_size == 16384) {
251 v |= 3 << 3;
252 } else if (block_size == 32768) {
253 v |= 4 << 3;
254 } else if (block_size == 65536) {
255 v |= 5 << 3;
256 } else {
257 return; // FIXME
260 string buf;
261 buf += char(v);
262 // Write the block number to the file
263 pack_uint(buf, n);
265 changes_obj->write_block(buf);
266 changes_obj->write_block(reinterpret_cast<const char *>(p), block_size);
269 /* A note on cursors:
271 Each B-tree level has a corresponding array element C[j] in a
272 cursor, C. C[0] is the leaf (or data) level, and C[B->level] is the
273 root block level. Within a level j,
275 C[j].p addresses the block
276 C[j].c is the offset into the directory entry in the block
277 C[j].n is the number of the block at C[j].p
279 A look up in the B-tree causes navigation of the blocks starting
280 from the root. In each block, p, we find an offset, c, to an item
281 which gives the number, n, of the block for the next level. This
282 leads to an array of values p,c,n which are held inside the cursor.
284 Any Btree object B has a built-in cursor, at B->C. But other cursors may
285 be created. If BC is a created cursor, BC->C is the cursor in the
286 sense given above, and BC->B is the handle for the B-tree again.
290 void
291 GlassTable::set_overwritten() const
293 LOGCALL_VOID(DB, "GlassTable::set_overwritten", NO_ARGS);
294 // If we're writable, there shouldn't be another writer who could cause
295 // overwritten to be flagged, so that's a DatabaseCorruptError.
296 if (writable)
297 throw Xapian::DatabaseCorruptError("Db block overwritten - are there multiple writers?");
298 throw Xapian::DatabaseModifiedError("The revision being read has been discarded - you should call Xapian::Database::reopen() and retry the operation");
301 /* block_to_cursor(C, j, n) puts block n into position C[j] of cursor
302 C, writing the block currently at C[j] back to disk if necessary.
303 Note that
305 C[j].rewrite
307 is true iff C[j].n is different from block n in file DB. If it is
308 false no rewriting is necessary.
311 void
312 GlassTable::block_to_cursor(Glass::Cursor * C_, int j, uint4 n) const
314 LOGCALL_VOID(DB, "GlassTable::block_to_cursor", (void*)C_ | j | n);
315 if (n == C_[j].get_n()) return;
317 if (writable && C_[j].rewrite) {
318 Assert(C == C_);
319 write_block(C_[j].get_n(), C_[j].get_p());
320 C_[j].rewrite = false;
323 // Check if the block is in the built-in cursor (potentially in
324 // modified form).
325 const byte * p;
326 if (n == C[j].get_n()) {
327 p = C_[j].clone(C[j]);
328 } else {
329 byte * q = C_[j].init(block_size);
330 read_block(n, q);
331 p = q;
332 C_[j].set_n(n);
335 if (j < level) {
336 /* unsigned comparison */
337 if (rare(REVISION(p) > REVISION(C_[j + 1].get_p()))) {
338 set_overwritten();
339 return;
343 if (rare(j != GET_LEVEL(p))) {
344 string msg = "Expected block ";
345 msg += str(n);
346 msg += " to be level ";
347 msg += str(j);
348 msg += ", not ";
349 msg += str(GET_LEVEL(p));
350 throw Xapian::DatabaseCorruptError(msg);
354 /** Btree::alter(); is called when the B-tree is to be altered.
356 It causes new blocks to be forced for the current set of blocks in
357 the cursor.
359 The point is that if a block at level 0 is to be altered it may get
360 a new number. Then the pointer to this block from level 1 will need
361 changing. So the block at level 1 needs altering and may get a new
362 block number. Then the pointer to this block from level 2 will need
363 changing ... and so on back to the root.
365 The clever bit here is spotting the cases when we can make an early
366 exit from this process. If C[j].rewrite is true, C[j+k].rewrite
367 will be true for k = 1,2 ... We have been through all this before,
368 and there is no need to do it again. If C[j].n was free at the
369 start of the transaction, we can copy it back to the same place
370 without violating the integrity of the B-tree. We don't then need a
371 new n and can return. The corresponding C[j].rewrite may be true or
372 false in that case.
375 void
376 GlassTable::alter()
378 LOGCALL_VOID(DB, "GlassTable::alter", NO_ARGS);
379 Assert(writable);
380 if (flags & Xapian::DB_DANGEROUS) {
381 C[0].rewrite = true;
382 return;
384 int j = 0;
385 while (true) {
386 if (C[j].rewrite) return; /* all new, so return */
387 C[j].rewrite = true;
389 glass_revision_number_t rev = REVISION(C[j].get_p());
390 if (rev == revision_number + 1) {
391 return;
393 Assert(rev < revision_number + 1);
394 uint4 n = C[j].get_n();
395 free_list.mark_block_unused(this, block_size, n);
396 SET_REVISION(C[j].get_modifiable_p(block_size), revision_number + 1);
397 n = free_list.get_block(this, block_size);
398 C[j].set_n(n);
400 if (j == level) return;
401 j++;
402 Item_wr(C[j].get_modifiable_p(block_size), C[j].c).set_block_given_by(n);
406 /** find_in_block(p, key, leaf, c) searches for the key in the block at p.
408 leaf is true for a data block, and false for an index block (when the
409 first key is dummy and never needs to be tested). What we get is the
410 directory entry to the last key <= the key being searched for.
412 The lookup is by binary chop, with i and j set to the left and
413 right ends of the search area. In sequential addition, c will often
414 be the answer, so we test the keys round c and move i and j towards
415 c if possible.
419 GlassTable::find_in_block(const byte * p, Key key, bool leaf, int c)
421 LOGCALL_STATIC(DB, int, "GlassTable::find_in_block", (const void*)p | (const void *)key.get_address() | leaf | c);
422 // c should be odd (either -1, or an even offset from DIR_START).
423 Assert((c & 1) == 1);
424 int i = DIR_START;
425 if (leaf) i -= D2;
426 if (c != -1) {
427 AssertRel(i,<=,c);
429 int j = DIR_END(p);
431 if (c != -1) {
432 if (c < j && i < c && Item(p, c).key() <= key)
433 i = c;
434 c += D2;
435 if (c < j && i < c && key < Item(p, c).key())
436 j = c;
439 while (j - i > D2) {
440 int k = i + ((j - i)/(D2 * 2))*D2; /* mid way */
441 if (key < Item(p, k).key()) j = k; else i = k;
443 if (leaf) {
444 AssertRel(DIR_START - D2,<=,i);
445 } else {
446 AssertRel(DIR_START,<=,i);
448 AssertRel(i,<,DIR_END(p));
449 RETURN(i);
452 /** find(C_) searches for the key of B->kt in the B-tree.
454 Result is true if found, false otherwise. When false, the B_tree
455 cursor is positioned at the last key in the B-tree <= the search
456 key. Goes to first (null) item in B-tree when key length == 0.
459 bool
460 GlassTable::find(Glass::Cursor * C_) const
462 LOGCALL(DB, bool, "GlassTable::find", (void*)C_);
463 // Note: the parameter is needed when we're called by GlassCursor
464 const byte * p;
465 int c;
466 Key key = kt.key();
467 for (int j = level; j > 0; --j) {
468 p = C_[j].get_p();
469 c = find_in_block(p, key, false, C_[j].c);
470 #ifdef BTREE_DEBUG_FULL
471 printf("Block in GlassTable:find - code position 1");
472 report_block_full(j, C_[j].get_n(), p);
473 #endif /* BTREE_DEBUG_FULL */
474 C_[j].c = c;
475 block_to_cursor(C_, j - 1, Item(p, c).block_given_by());
477 p = C_[0].get_p();
478 c = find_in_block(p, key, true, C_[0].c);
479 #ifdef BTREE_DEBUG_FULL
480 printf("Block in GlassTable:find - code position 2");
481 report_block_full(0, C_[0].get_n(), p);
482 #endif /* BTREE_DEBUG_FULL */
483 C_[0].c = c;
484 if (c < DIR_START) RETURN(false);
485 RETURN(Item(p, c).key() == key);
488 /** compact(p) compact the block at p by shuffling all the items up to the end.
490 MAX_FREE(p) is then maximized, and is equal to TOTAL_FREE(p).
493 void
494 GlassTable::compact(byte * p)
496 LOGCALL_VOID(DB, "GlassTable::compact", (void*)p);
497 Assert(writable);
498 int e = block_size;
499 byte * b = buffer;
500 int dir_end = DIR_END(p);
501 for (int c = DIR_START; c < dir_end; c += D2) {
502 Item item(p, c);
503 int l = item.size();
504 e -= l;
505 memmove(b + e, item.get_address(), l);
506 setD(p, c, e); /* reform in b */
508 memmove(p + e, b + e, block_size - e); /* copy back */
509 e -= dir_end;
510 SET_TOTAL_FREE(p, e);
511 SET_MAX_FREE(p, e);
514 /** Btree needs to gain a new level to insert more items: so split root block
515 * and construct a new one.
517 void
518 GlassTable::split_root(uint4 split_n)
520 LOGCALL_VOID(DB, "GlassTable::split_root", split_n);
521 /* gain a level */
522 ++level;
524 /* check level overflow - this isn't something that should ever happen
525 * but deserves more than an Assert()... */
526 if (level == BTREE_CURSOR_LEVELS) {
527 throw Xapian::DatabaseCorruptError("Btree has grown impossibly large (" STRINGIZE(BTREE_CURSOR_LEVELS) " levels)");
530 byte * q = C[level].init(block_size);
531 memset(q, 0, block_size);
532 C[level].c = DIR_START;
533 C[level].set_n(free_list.get_block(this, block_size));
534 C[level].rewrite = true;
535 SET_REVISION(q, revision_number + 1);
536 SET_LEVEL(q, level);
537 SET_DIR_END(q, DIR_START);
538 compact(q); /* to reset TOTAL_FREE, MAX_FREE */
540 /* form a null key in b with a pointer to the old root */
541 byte b[10]; /* 7 is exact */
542 Item_wr item(b);
543 item.form_null_key(split_n);
544 add_item(item, level);
547 /** enter_key(j, prevkey, newkey) is called after a block split.
549 It enters in the block at level C[j] a separating key for the block
550 at level C[j - 1]. The key itself is newkey. prevkey is the
551 preceding key, and at level 1 newkey can be trimmed down to the
552 first point of difference to prevkey for entry in C[j].
554 This code looks longer than it really is. If j exceeds the number
555 of B-tree levels the root block has split and we have to construct
556 a new one, but this is a rare event.
558 The key is constructed in b, with block number C[j - 1].n as tag,
559 and this is added in with add_item. add_item may itself cause a
560 block split, with a further call to enter_key. Hence the recursion.
562 void
563 GlassTable::enter_key(int j, Key prevkey, Key newkey)
565 LOGCALL_VOID(DB, "GlassTable::enter_key", j | Literal("prevkey") | Literal("newkey"));
566 Assert(writable);
567 Assert(prevkey < newkey);
568 AssertRel(j,>=,1);
570 uint4 blocknumber = C[j - 1].get_n();
572 // FIXME update to use Key
573 // Keys are truncated here: but don't truncate the count at the end away.
574 const int newkey_len = newkey.length();
575 AssertRel(newkey_len,>,0);
576 int i;
578 if (j == 1) {
579 // Truncate the key to the minimal key which differs from prevkey,
580 // the preceding key in the block.
581 i = 0;
582 const int min_len = min(newkey_len, prevkey.length());
583 while (i < min_len && prevkey[i] == newkey[i]) {
584 i++;
587 // Want one byte of difference.
588 if (i < newkey_len) i++;
589 } else {
590 /* Can't truncate between branch levels, since the separated keys
591 * are in at the leaf level, and truncating again will change the
592 * branch point.
594 i = newkey_len;
597 byte b[UCHAR_MAX + 6];
598 Item_wr item(b);
599 Assert(i <= 256 - I2 - C2);
600 Assert(i <= (int)sizeof(b) - I2 - C2 - 4);
601 item.set_key_and_block(newkey, i, blocknumber);
603 // When j > 1 we can make the first key of block p null. This is probably
604 // worthwhile as it trades a small amount of CPU and RAM use for a small
605 // saving in disk use. Other redundant keys will still creep in though.
606 if (j > 1) {
607 byte * p = C[j - 1].get_modifiable_p(block_size);
608 uint4 n = getint4(newkey.get_address(), newkey_len + K1 + C2);
609 int new_total_free = TOTAL_FREE(p) + newkey_len + C2;
610 // FIXME: incredibly icky going from key to item like this...
611 Item_wr(const_cast<byte*>(newkey.get_address()) - I2).form_null_key(n);
612 SET_TOTAL_FREE(p, new_total_free);
615 // The split block gets inserted into the parent after the pointer to the
616 // current child.
617 AssertEq(C[j].c, find_in_block(C[j].get_p(), item.key(), false, C[j].c));
618 C[j].c += D2;
619 C[j].rewrite = true; /* a subtle point: this *is* required. */
620 add_item(item, j);
623 /** mid_point(p) finds the directory entry in c that determines the
624 approximate mid point of the data in the block at p.
628 GlassTable::mid_point(byte * p)
630 LOGCALL(DB, int, "GlassTable::mid_point", (void*)p);
631 int n = 0;
632 int dir_end = DIR_END(p);
633 int size = block_size - TOTAL_FREE(p) - dir_end;
634 for (int c = DIR_START; c < dir_end; c += D2) {
635 int l = Item(p, c).size();
636 n += 2 * l;
637 if (n >= size) {
638 if (l < n - size) RETURN(c);
639 RETURN(c + D2);
643 /* This shouldn't happen, as the sum of the item sizes should be the same
644 * as the value calculated in size, so assert but return a sane value just
645 * in case. */
646 Assert(false);
647 RETURN(dir_end);
650 /** add_item_to_block(p, kt_, c) adds item kt_ to the block at p.
652 c is the offset in the directory that needs to be expanded to accommodate
653 the new entry for the item. We know before this is called that there is
654 enough contiguous room for the item in the block, so it's just a matter of
655 shuffling up any directory entries after where we're inserting and copying
656 in the item.
659 void
660 GlassTable::add_item_to_block(byte * p, Item_wr kt_, int c)
662 LOGCALL_VOID(DB, "GlassTable::add_item_to_block", (void*)p | Literal("kt_") | c);
663 Assert(writable);
664 int dir_end = DIR_END(p);
665 int kt_len = kt_.size();
666 int needed = kt_len + D2;
667 int new_total = TOTAL_FREE(p) - needed;
668 int new_max = MAX_FREE(p) - needed;
670 Assert(new_total >= 0);
672 AssertRel(MAX_FREE(p),>=,needed);
674 AssertRel(DIR_START,<=,c);
675 AssertRel(c,<=,dir_end);
677 memmove(p + c + D2, p + c, dir_end - c);
678 dir_end += D2;
679 SET_DIR_END(p, dir_end);
681 int o = dir_end + new_max;
682 setD(p, c, o);
683 memmove(p + o, kt_.get_address(), kt_len);
685 SET_MAX_FREE(p, new_max);
687 SET_TOTAL_FREE(p, new_total);
690 /** GlassTable::add_item(kt_, j) adds item kt_ to the block at cursor level C[j].
692 * If there is not enough room the block splits and the item is then
693 * added to the appropriate half.
695 void
696 GlassTable::add_item(Item_wr kt_, int j)
698 LOGCALL_VOID(DB, "GlassTable::add_item", Literal("kt_") | j);
699 Assert(writable);
700 byte * p = C[j].get_modifiable_p(block_size);
701 int c = C[j].c;
702 uint4 n;
704 int needed = kt_.size() + D2;
705 if (TOTAL_FREE(p) < needed) {
706 int m;
707 // Prepare to split p. After splitting, the block is in two halves, the
708 // lower half is split_p, the upper half p again. add_to_upper_half
709 // becomes true when the item gets added to p, false when it gets added
710 // to split_p.
712 if (seq_count < 0) {
713 // If we're not in sequential mode, we split at the mid point
714 // of the node.
715 m = mid_point(p);
716 } else {
717 // During sequential addition, split at the insert point
718 AssertRel(c,>=,DIR_START);
719 m = c;
722 uint4 split_n = C[j].get_n();
723 C[j].set_n(free_list.get_block(this, block_size));
725 memcpy(split_p, p, block_size); // replicate the whole block in split_p
726 SET_DIR_END(split_p, m);
727 compact(split_p); /* to reset TOTAL_FREE, MAX_FREE */
730 int residue = DIR_END(p) - m;
731 int new_dir_end = DIR_START + residue;
732 memmove(p + DIR_START, p + m, residue);
733 SET_DIR_END(p, new_dir_end);
736 compact(p); /* to reset TOTAL_FREE, MAX_FREE */
738 bool add_to_upper_half;
739 if (seq_count < 0) {
740 add_to_upper_half = (c >= m);
741 } else {
742 // And add item to lower half if split_p has room, otherwise upper
743 // half
744 add_to_upper_half = (TOTAL_FREE(split_p) < needed);
747 if (add_to_upper_half) {
748 c -= (m - DIR_START);
749 Assert(seq_count < 0 || c <= DIR_START + D2);
750 Assert(c >= DIR_START);
751 Assert(c <= DIR_END(p));
752 add_item_to_block(p, kt_, c);
753 n = C[j].get_n();
754 } else {
755 Assert(c >= DIR_START);
756 Assert(c <= DIR_END(split_p));
757 add_item_to_block(split_p, kt_, c);
758 n = split_n;
760 write_block(split_n, split_p);
762 // Check if we're splitting the root block.
763 if (j == level) split_root(split_n);
765 /* Enter a separating key at level j + 1 between */
766 /* the last key of block split_p, and the first key of block p */
767 enter_key(j + 1,
768 Item(split_p, DIR_END(split_p) - D2).key(),
769 Item(p, DIR_START).key());
770 } else {
771 AssertRel(TOTAL_FREE(p),>=,needed);
773 if (MAX_FREE(p) < needed) {
774 compact(p);
775 AssertRel(MAX_FREE(p),>=,needed);
778 add_item_to_block(p, kt_, c);
779 n = C[j].get_n();
781 if (j == 0) {
782 changed_n = n;
783 changed_c = c;
787 /** GlassTable::delete_item(j, repeatedly) is (almost) the converse of add_item.
789 * If repeatedly is true, the process repeats at the next level when a
790 * block has been completely emptied, freeing the block and taking out
791 * the pointer to it. Emptied root blocks are also removed, which
792 * reduces the number of levels in the B-tree.
794 void
795 GlassTable::delete_item(int j, bool repeatedly)
797 LOGCALL_VOID(DB, "GlassTable::delete_item", j | repeatedly);
798 Assert(writable);
799 byte * p = C[j].get_modifiable_p(block_size);
800 int c = C[j].c;
801 AssertRel(DIR_START,<=,c);
802 AssertRel(c,<,DIR_END(p));
803 int kt_len = Item(p, c).size(); /* size of the item to be deleted */
804 int dir_end = DIR_END(p) - D2; /* directory length will go down by 2 bytes */
806 memmove(p + c, p + c + D2, dir_end - c);
807 SET_DIR_END(p, dir_end);
808 SET_MAX_FREE(p, MAX_FREE(p) + D2);
809 SET_TOTAL_FREE(p, TOTAL_FREE(p) + kt_len + D2);
811 if (!repeatedly) return;
812 if (j < level) {
813 if (dir_end == DIR_START) {
814 free_list.mark_block_unused(this, block_size, C[j].get_n());
815 C[j].rewrite = false;
816 C[j].set_n(BLK_UNUSED);
817 C[j + 1].rewrite = true; /* *is* necessary */
818 delete_item(j + 1, true);
820 } else {
821 Assert(j == level);
822 while (dir_end == DIR_START + D2 && level > 0) {
823 /* single item in the root block, so lose a level */
824 uint4 new_root = Item(C[level].get_p(), DIR_START).block_given_by();
825 free_list.mark_block_unused(this, block_size, C[level].get_n());
826 C[level].destroy();
827 level--;
829 block_to_cursor(C, level, new_root);
831 dir_end = DIR_END(C[level].get_p()); /* prepare for the loop */
836 /* debugging aid:
837 static addcount = 0;
840 /** add_kt(found) adds the item (key-tag pair) at B->kt into the
841 B-tree, using cursor C.
843 found == find() is handed over as a parameter from Btree::add.
844 Btree::alter() prepares for the alteration to the B-tree. Then
845 there are a number of cases to consider:
847 If an item with the same key is in the B-tree (found is true),
848 the new kt replaces it.
850 If then kt is smaller, or the same size as, the item it replaces,
851 kt is put in the same place as the item it replaces, and the
852 TOTAL_FREE measure is reduced.
854 If kt is larger than the item it replaces it is put in the
855 MAX_FREE space if there is room, and the directory entry and
856 space counts are adjusted accordingly.
858 - But if there is not room we do it the long way: the old item is
859 deleted with delete_item and kt is added in with add_item.
861 If the key of kt is not in the B-tree (found is false), the new
862 kt is added in with add_item.
866 GlassTable::add_kt(bool found)
868 LOGCALL(DB, int, "GlassTable::add_kt", found);
869 Assert(writable);
870 int components = 0;
874 printf("%d) %s ", addcount++, (found ? "replacing" : "adding"));
875 print_bytes(kt[I2] - K1 - C2, kt + I2 + K1); putchar('\n');
878 alter();
880 if (found) { /* replacement */
881 seq_count = SEQ_START_POINT;
882 sequential = false;
884 byte * p = C[0].get_modifiable_p(block_size);
885 int c = C[0].c;
886 AssertRel(DIR_START,<=,c);
887 AssertRel(c,<,DIR_END(p));
888 Item item(p, c);
889 int kt_size = kt.size();
890 int needed = kt_size - item.size();
892 components = item.components_of();
894 if (needed <= 0) {
895 /* simple replacement */
896 memmove(const_cast<byte *>(item.get_address()),
897 kt.get_address(), kt_size);
898 SET_TOTAL_FREE(p, TOTAL_FREE(p) - needed);
899 } else {
900 /* new item into the block's freespace */
901 int new_max = MAX_FREE(p) - kt_size;
902 if (new_max >= 0) {
903 int o = DIR_END(p) + new_max;
904 memmove(p + o, kt.get_address(), kt_size);
905 setD(p, c, o);
906 SET_MAX_FREE(p, new_max);
907 SET_TOTAL_FREE(p, TOTAL_FREE(p) - needed);
908 } else {
909 /* do it the long way */
910 delete_item(0, false);
911 add_item(kt, 0);
914 } else {
915 /* addition */
916 if (changed_n == C[0].get_n() && changed_c == C[0].c) {
917 if (seq_count < 0) seq_count++;
918 } else {
919 seq_count = SEQ_START_POINT;
920 sequential = false;
922 C[0].c += D2;
923 add_item(kt, 0);
925 RETURN(components);
928 /* delete_kt() corresponds to add_kt(found), but there are only
929 two cases: if the key is not found nothing is done, and if it is
930 found the corresponding item is deleted with delete_item.
934 GlassTable::delete_kt()
936 LOGCALL(DB, int, "GlassTable::delete_kt", NO_ARGS);
937 Assert(writable);
939 bool found = find(C);
941 int components = 0;
942 seq_count = SEQ_START_POINT;
943 sequential = false;
947 printf("%d) %s ", addcount++, (found ? "deleting " : "ignoring "));
948 print_bytes(B->kt[I2] - K1 - C2, B->kt + I2 + K1); putchar('\n');
951 if (found) {
952 components = Item(C[0].get_p(), C[0].c).components_of();
953 alter();
954 delete_item(0, true);
956 RETURN(components);
959 /* GlassTable::form_key(key) treats address kt as an item holder and fills in
960 the key part:
962 (I) K key c (C tag)
964 The bracketed parts are left blank. The key is filled in with key_len bytes and
965 K set accordingly. c is set to 1.
968 void GlassTable::form_key(const string & key) const
970 LOGCALL_VOID(DB, "GlassTable::form_key", key);
971 kt.form_key(key);
974 /* GlassTable::add(key, tag) adds the key/tag item to the
975 B-tree, replacing any existing item with the same key.
977 For a long tag, we end up having to add m components, of the form
979 key 1 m tag1
980 key 2 m tag2
982 key m m tagm
984 and tag1+tag2+...+tagm are equal to tag. These in their turn may be replacing
985 n components of the form
987 key 1 n TAG1
988 key 2 n TAG2
990 key n n TAGn
992 and n may be greater than, equal to, or less than m. These cases are dealt
993 with in the code below. If m < n for example, we end up with a series of
994 deletions.
997 void
998 GlassTable::add(const string &key, string tag, bool already_compressed)
1000 LOGCALL_VOID(DB, "GlassTable::add", key | tag | already_compressed);
1001 Assert(writable);
1003 if (handle < 0) {
1004 if (handle == -2) {
1005 GlassTable::throw_database_closed();
1007 do_open_to_write();
1010 form_key(key);
1012 bool compressed = false;
1013 if (already_compressed) {
1014 compressed = true;
1015 } else if (compress_strategy != DONT_COMPRESS && tag.size() > COMPRESS_MIN) {
1016 static_assert(DONT_COMPRESS != Z_DEFAULT_STRATEGY,
1017 "DONT_COMPRESS clashes with zlib constant");
1018 static_assert(DONT_COMPRESS != Z_FILTERED,
1019 "DONT_COMPRESS clashes with zlib constant");
1020 static_assert(DONT_COMPRESS != Z_HUFFMAN_ONLY,
1021 "DONT_COMPRESS clashes with zlib constant");
1022 #ifdef Z_RLE
1023 static_assert(DONT_COMPRESS != Z_RLE,
1024 "DONT_COMPRESS clashes with zlib constant");
1025 #endif
1027 comp_stream.lazy_alloc_deflate_zstream();
1029 comp_stream.deflate_zstream->next_in = (Bytef *)const_cast<char *>(tag.data());
1030 comp_stream.deflate_zstream->avail_in = (uInt)tag.size();
1032 // If compressed size is >= tag.size(), we don't want to compress.
1033 unsigned long blk_len = tag.size() - 1;
1034 unsigned char * blk = new unsigned char[blk_len];
1035 comp_stream.deflate_zstream->next_out = blk;
1036 comp_stream.deflate_zstream->avail_out = (uInt)blk_len;
1038 int err = deflate(comp_stream.deflate_zstream, Z_FINISH);
1039 if (err == Z_STREAM_END) {
1040 // If deflate succeeded, then the output was at least one byte
1041 // smaller than the input.
1042 tag.assign(reinterpret_cast<const char *>(blk), comp_stream.deflate_zstream->total_out);
1043 compressed = true;
1044 } else {
1045 // Deflate failed - presumably the data wasn't compressible.
1048 delete [] blk;
1051 // sort of matching kt.append_chunk(), but setting the chunk
1052 const size_t cd = kt.key().length() + K1 + I2 + C2 + C2; // offset to the tag data
1053 const size_t L = max_item_size - cd; // largest amount of tag data for any chunk
1054 size_t first_L = L; // - amount for tag1
1055 bool found = find(C);
1056 if (!found) {
1057 const byte * p = C[0].get_p();
1058 size_t n = TOTAL_FREE(p) % (max_item_size + D2);
1059 if (n > D2 + cd) {
1060 n -= (D2 + cd);
1061 // if n >= last then fully filling this block won't produce
1062 // an extra item, so we might as well do this even if
1063 // full_compaction isn't active.
1065 // In the full_compaction case, it turns out we shouldn't always
1066 // try to fill every last byte. Doing so can actually increase the
1067 // total space required (I believe this effect is due to longer
1068 // dividing keys being required in the index blocks). Empirically,
1069 // n >= key.size() + K appears a good criterion for K ~= 34. This
1070 // seems to save about 0.2% in total database size over always
1071 // splitting the tag. It'll also give be slightly faster retrieval
1072 // as we can avoid reading an extra block occasionally.
1073 size_t last = tag.length() % L;
1074 if (n >= last || (full_compaction && n >= key.size() + 34))
1075 first_L = n;
1079 // a null tag must be added in of course
1080 int m = tag.empty() ? 1 : (tag.length() - first_L + L - 1) / L + 1;
1081 // there are m items to add
1082 /* FIXME: sort out this error higher up and turn this into
1083 * an assert.
1085 if (m >= BYTE_PAIR_RANGE)
1086 throw Xapian::UnimplementedError("Can't handle insanely large tags");
1088 int n = 0; // initialise to shut off warning
1089 // - and there will be n to delete
1090 int o = 0; // Offset into the tag
1091 size_t residue = tag.length(); // Bytes of the tag remaining to add in
1092 int replacement = false; // Has there been a replacement ?
1093 int i;
1094 kt.set_components_of(m);
1095 for (i = 1; i <= m; i++) {
1096 size_t l = (i == m ? residue : (i == 1 ? first_L : L));
1097 Assert(cd + l <= block_size);
1098 Assert(string::size_type(o + l) <= tag.length());
1099 kt.set_tag(cd, tag.data() + o, l, compressed);
1100 kt.set_component_of(i);
1102 o += l;
1103 residue -= l;
1105 if (i > 1) found = find(C);
1106 n = add_kt(found);
1107 if (n > 0) replacement = true;
1109 /* o == tag.length() here, and n may be zero */
1110 for (i = m + 1; i <= n; i++) {
1111 kt.set_component_of(i);
1112 delete_kt();
1114 if (!replacement) ++item_count;
1115 Btree_modified = true;
1116 if (cursor_created_since_last_modification) {
1117 cursor_created_since_last_modification = false;
1118 ++cursor_version;
1122 /* GlassTable::del(key) returns false if the key is not in the B-tree,
1123 otherwise deletes it and returns true.
1125 Again, this is parallel to GlassTable::add, but simpler in form.
1128 bool
1129 GlassTable::del(const string &key)
1131 LOGCALL(DB, bool, "GlassTable::del", key);
1132 Assert(writable);
1134 if (handle < 0) {
1135 if (handle == -2) {
1136 GlassTable::throw_database_closed();
1138 RETURN(false);
1141 // We can't delete a key which we is too long for us to store.
1142 if (key.size() > GLASS_BTREE_MAX_KEY_LEN) RETURN(false);
1144 if (key.empty()) RETURN(false);
1145 form_key(key);
1147 int n = delete_kt(); /* there are n items to delete */
1148 if (n <= 0) RETURN(false);
1150 for (int i = 2; i <= n; i++) {
1151 kt.set_component_of(i);
1152 delete_kt();
1155 item_count--;
1156 Btree_modified = true;
1157 if (cursor_created_since_last_modification) {
1158 cursor_created_since_last_modification = false;
1159 ++cursor_version;
1161 RETURN(true);
1164 bool
1165 GlassTable::readahead_key(const string &key) const
1167 LOGCALL(DB, bool, "GlassTable::readahead_key", key);
1168 Assert(!key.empty());
1170 // Two cases:
1172 // handle = -1: Lazy table which isn't yet open
1174 // handle = -2: Table has been closed. Since the readahead is just a
1175 // hint, we can safely ignore it for a closed table.
1176 if (handle < 0)
1177 RETURN(false);
1179 // If the table only has one level, there are no branch blocks to preread.
1180 if (level == 0)
1181 RETURN(false);
1183 form_key(key);
1184 Key ktkey = kt.key();
1186 // We'll only readahead the first level, since descending the B-tree would
1187 // require actual reads that would likely hurt performance more than help.
1188 const byte * p = C[level].get_p();
1189 int c = find_in_block(p, ktkey, false, C[level].c);
1190 uint4 n = Item(p, c).block_given_by();
1191 // Don't preread if it's the block we last preread or already in the
1192 // cursor.
1193 if (n != last_readahead && n != C[level - 1].get_n()) {
1194 last_readahead = n;
1195 if (!io_readahead_block(handle, block_size, n))
1196 RETURN(false);
1198 RETURN(true);
1201 bool
1202 GlassTable::get_exact_entry(const string &key, string & tag) const
1204 LOGCALL(DB, bool, "GlassTable::get_exact_entry", key | tag);
1205 Assert(!key.empty());
1207 if (handle < 0) {
1208 if (handle == -2) {
1209 GlassTable::throw_database_closed();
1211 RETURN(false);
1214 // An oversized key can't exist, so attempting to search for it should fail.
1215 if (key.size() > GLASS_BTREE_MAX_KEY_LEN) RETURN(false);
1217 form_key(key);
1218 if (!find(C)) RETURN(false);
1220 (void)read_tag(C, &tag, false);
1221 RETURN(true);
1224 bool
1225 GlassTable::key_exists(const string &key) const
1227 LOGCALL(DB, bool, "GlassTable::key_exists", key);
1228 Assert(!key.empty());
1230 // An oversized key can't exist, so attempting to search for it should fail.
1231 if (key.size() > GLASS_BTREE_MAX_KEY_LEN) RETURN(false);
1233 form_key(key);
1234 RETURN(find(C));
1237 bool
1238 GlassTable::read_tag(Glass::Cursor * C_, string *tag, bool keep_compressed) const
1240 LOGCALL(DB, bool, "GlassTable::read_tag", Literal("C_") | tag | keep_compressed);
1241 Item item(C_[0].get_p(), C_[0].c);
1243 /* n components to join */
1244 int n = item.components_of();
1246 tag->resize(0);
1247 // max_item_size also includes K1 + I2 + C2 + C2 bytes overhead and the key
1248 // (which is at least 1 byte long).
1249 if (n > 1) tag->reserve((max_item_size - (1 + K1 + I2 + C2 + C2)) * n);
1251 item.append_chunk(tag);
1252 bool compressed = item.get_compressed();
1254 for (int i = 2; i <= n; i++) {
1255 if (!next(C_, 0)) {
1256 throw Xapian::DatabaseCorruptError("Unexpected end of table when reading continuation of tag");
1258 (void)Item(C_[0].get_p(), C_[0].c).append_chunk(tag);
1260 // At this point the cursor is on the last item - calling next will move
1261 // it to the next key (GlassCursor::get_tag() relies on this).
1262 if (!compressed || keep_compressed) RETURN(compressed);
1264 // FIXME: Perhaps we should we decompress each chunk as we read it so we
1265 // don't need both the full compressed and uncompressed tags in memory
1266 // at once.
1268 string utag;
1269 // May not be enough for a compressed tag, but it's a reasonable guess.
1270 utag.reserve(tag->size() + tag->size() / 2);
1272 Bytef buf[8192];
1274 comp_stream.lazy_alloc_inflate_zstream();
1276 comp_stream.inflate_zstream->next_in = (Bytef*)const_cast<char *>(tag->data());
1277 comp_stream.inflate_zstream->avail_in = (uInt)tag->size();
1279 int err = Z_OK;
1280 while (err != Z_STREAM_END) {
1281 comp_stream.inflate_zstream->next_out = buf;
1282 comp_stream.inflate_zstream->avail_out = (uInt)sizeof(buf);
1283 err = inflate(comp_stream.inflate_zstream, Z_SYNC_FLUSH);
1284 if (err == Z_BUF_ERROR && comp_stream.inflate_zstream->avail_in == 0) {
1285 LOGLINE(DB, "Z_BUF_ERROR - faking checksum of " << comp_stream.inflate_zstream->adler);
1286 Bytef header2[4];
1287 setint4(header2, 0, comp_stream.inflate_zstream->adler);
1288 comp_stream.inflate_zstream->next_in = header2;
1289 comp_stream.inflate_zstream->avail_in = 4;
1290 err = inflate(comp_stream.inflate_zstream, Z_SYNC_FLUSH);
1291 if (err == Z_STREAM_END) break;
1294 if (err != Z_OK && err != Z_STREAM_END) {
1295 if (err == Z_MEM_ERROR) throw std::bad_alloc();
1296 string msg = "inflate failed";
1297 if (comp_stream.inflate_zstream->msg) {
1298 msg += " (";
1299 msg += comp_stream.inflate_zstream->msg;
1300 msg += ')';
1302 throw Xapian::DatabaseError(msg);
1305 utag.append(reinterpret_cast<const char *>(buf),
1306 comp_stream.inflate_zstream->next_out - buf);
1308 if (utag.size() != comp_stream.inflate_zstream->total_out) {
1309 string msg = "compressed tag didn't expand to the expected size: ";
1310 msg += str(utag.size());
1311 msg += " != ";
1312 // OpenBSD's zlib.h uses off_t instead of uLong for total_out.
1313 msg += str((size_t)comp_stream.inflate_zstream->total_out);
1314 throw Xapian::DatabaseCorruptError(msg);
1317 swap(*tag, utag);
1319 RETURN(false);
1322 void
1323 GlassTable::set_full_compaction(bool parity)
1325 LOGCALL_VOID(DB, "GlassTable::set_full_compaction", parity);
1326 Assert(writable);
1328 if (parity) seq_count = 0;
1329 full_compaction = parity;
1332 GlassCursor * GlassTable::cursor_get() const {
1333 LOGCALL(DB, GlassCursor *, "GlassTable::cursor_get", NO_ARGS);
1334 if (handle < 0) {
1335 if (handle == -2) {
1336 GlassTable::throw_database_closed();
1338 RETURN(NULL);
1340 // FIXME Ick - casting away const is nasty
1341 RETURN(new GlassCursor(const_cast<GlassTable *>(this)));
1344 /************ B-tree opening and closing ************/
1346 void
1347 GlassTable::basic_open(const RootInfo * root_info, glass_revision_number_t rev)
1349 LOGCALL_VOID(DB, "GlassTable::basic_open", root_info|rev);
1350 revision_number = rev;
1351 if (root_info) {
1352 root = root_info->get_root();
1353 level = root_info->get_level();
1354 item_count = root_info->get_num_entries();
1355 faked_root_block = root_info->get_root_is_fake();
1356 sequential = root_info->get_sequential_mode();
1357 const string & fl_serialised = root_info->get_free_list();
1358 if (!fl_serialised.empty()) {
1359 if (!free_list.unpack(fl_serialised))
1360 throw Xapian::DatabaseCorruptError("Bad freelist metadata");
1361 } else {
1362 free_list.reset();
1364 } else {
1365 root = 0;
1366 level = 0;
1367 item_count = 0;
1368 faked_root_block = true;
1369 sequential = true;
1370 free_list.reset();
1373 /* kt holds constructed items as well as keys */
1374 kt = Item_wr(zeroed_new(block_size));
1376 set_max_item_size(BLOCK_CAPACITY);
1378 for (int j = 0; j <= level; j++) {
1379 C[j].init(block_size);
1382 read_root();
1384 if (cursor_created_since_last_modification) {
1385 cursor_created_since_last_modification = false;
1386 ++cursor_version;
1390 void
1391 GlassTable::read_root()
1393 LOGCALL_VOID(DB, "GlassTable::read_root", NO_ARGS);
1394 if (faked_root_block) {
1395 /* root block for an unmodified database. */
1396 byte * p = C[0].init(block_size);
1397 Assert(p);
1399 /* clear block - shouldn't be necessary, but is a bit nicer,
1400 * and means that the same operations should always produce
1401 * the same database. */
1402 memset(p, 0, block_size);
1404 int o = block_size - I2 - K1 - C2 - C2;
1405 Item_wr(p + o).fake_root_item();
1407 setD(p, DIR_START, o); // its directory entry
1408 SET_DIR_END(p, DIR_START + D2);// the directory size
1410 o -= (DIR_START + D2);
1411 SET_MAX_FREE(p, o);
1412 SET_TOTAL_FREE(p, o);
1413 SET_LEVEL(p, 0);
1415 if (!writable) {
1416 /* reading - revision number doesn't matter as long as
1417 * it's not greater than the current one. */
1418 SET_REVISION(p, 0);
1419 C[0].set_n(0);
1420 } else {
1421 /* writing - */
1422 SET_REVISION(p, revision_number + 1);
1423 C[0].set_n(free_list.get_block(this, block_size));
1425 } else {
1426 /* using a root block stored on disk */
1427 block_to_cursor(C, level, root);
1429 if (REVISION(C[level].get_p()) > revision_number) set_overwritten();
1430 /* although this is unlikely */
1434 void
1435 GlassTable::do_open_to_write(const RootInfo * root_info,
1436 glass_revision_number_t rev)
1438 LOGCALL_VOID(DB, "GlassTable::do_open_to_write", root_info|rev);
1439 if (handle == -2) {
1440 GlassTable::throw_database_closed();
1442 handle = io_open_block_wr(name + GLASS_TABLE_EXTENSION,
1443 root_info == NULL);
1444 if (handle < 0) {
1445 // lazy doesn't make a lot of sense when we're creating a DB (which
1446 // is the case when root_info==NULL), but ENOENT with O_CREAT means a
1447 // parent directory doesn't exist.
1448 if (lazy && root_info && errno == ENOENT) {
1449 revision_number = rev;
1450 return;
1452 string message(!root_info ? "Couldn't create " : "Couldn't open ");
1453 message += name;
1454 message += "DB read/write: ";
1455 errno_to_string(errno, message);
1456 throw Xapian::DatabaseOpeningError(message);
1459 writable = true;
1460 basic_open(root_info, rev);
1462 split_p = new byte[block_size];
1464 buffer = zeroed_new(block_size);
1466 changed_n = 0;
1467 changed_c = DIR_START;
1468 seq_count = SEQ_START_POINT;
1471 GlassTable::GlassTable(const char * tablename_, const string & path_,
1472 bool readonly_, int compress_strategy_, bool lazy_)
1473 : tablename(tablename_),
1474 revision_number(0),
1475 item_count(0),
1476 block_size(0),
1477 faked_root_block(true),
1478 sequential(true),
1479 handle(-1),
1480 level(0),
1481 root(0),
1482 kt(0),
1483 buffer(0),
1484 free_list(),
1485 name(path_),
1486 seq_count(0),
1487 changed_n(0),
1488 changed_c(0),
1489 max_item_size(0),
1490 Btree_modified(false),
1491 full_compaction(false),
1492 writable(!readonly_),
1493 cursor_created_since_last_modification(false),
1494 cursor_version(0),
1495 changes_obj(NULL),
1496 split_p(0),
1497 compress_strategy(compress_strategy_),
1498 comp_stream(compress_strategy_),
1499 lazy(lazy_),
1500 last_readahead(BLK_UNUSED)
1502 LOGCALL_CTOR(DB, "GlassTable", tablename_ | path_ | readonly_ | compress_strategy_ | lazy_);
1505 bool
1506 GlassTable::exists() const {
1507 LOGCALL(DB, bool, "GlassTable::exists", NO_ARGS);
1508 return file_exists(name + GLASS_TABLE_EXTENSION);
1511 void
1512 GlassTable::create_and_open(int flags_, unsigned int block_size_)
1514 LOGCALL_VOID(DB, "GlassTable::create_and_open", flags_|block_size_);
1515 if (handle == -2) {
1516 GlassTable::throw_database_closed();
1518 Assert(writable);
1519 close();
1521 Assert(block_size_ >= 2048);
1522 Assert(block_size_ <= BYTE_PAIR_RANGE);
1523 // Must be a power of two.
1524 Assert((block_size_ & (block_size_ - 1)) == 0);
1526 flags = flags_;
1527 block_size = block_size_;
1529 if (lazy) {
1530 close();
1531 (void)io_unlink(name + GLASS_TABLE_EXTENSION);
1532 } else {
1533 // FIXME: it would be good to arrange that this works such that there's
1534 // always a valid table in place if you run create_and_open() on an
1535 // existing table.
1537 do_open_to_write();
1541 GlassTable::~GlassTable() {
1542 LOGCALL_DTOR(DB, "GlassTable");
1543 GlassTable::close();
1546 void GlassTable::close(bool permanent) {
1547 LOGCALL_VOID(DB, "GlassTable::close", permanent);
1549 if (handle >= 0) {
1550 // If an error occurs here, we just ignore it, since we're just
1551 // trying to free everything.
1552 (void)::close(handle);
1553 handle = -1;
1556 if (permanent) {
1557 handle = -2;
1558 // Don't delete the resources in the table, since they may
1559 // still be used to look up cached content.
1560 return;
1562 for (int j = level; j >= 0; j--) {
1563 C[j].destroy();
1565 delete [] split_p;
1566 split_p = 0;
1568 delete [] kt.get_address();
1569 kt = 0;
1570 delete [] buffer;
1571 buffer = 0;
1574 void
1575 GlassTable::flush_db()
1577 LOGCALL_VOID(DB, "GlassTable::flush_db", NO_ARGS);
1578 Assert(writable);
1579 if (handle < 0) {
1580 if (handle == -2) {
1581 GlassTable::throw_database_closed();
1583 return;
1586 for (int j = level; j >= 0; j--) {
1587 if (C[j].rewrite) {
1588 write_block(C[j].get_n(), C[j].get_p());
1592 if (Btree_modified) {
1593 faked_root_block = false;
1597 void
1598 GlassTable::commit(glass_revision_number_t revision, RootInfo * root_info)
1600 LOGCALL_VOID(DB, "GlassTable::commit", revision|root_info);
1601 Assert(writable);
1603 if (revision <= revision_number) {
1604 throw Xapian::DatabaseError("New revision too low");
1607 if (handle < 0) {
1608 if (handle == -2) {
1609 GlassTable::throw_database_closed();
1611 revision_number = revision;
1612 root_info->set_blocksize(block_size);
1613 root_info->set_level(0);
1614 root_info->set_num_entries(0);
1615 root_info->set_root_is_fake(true);
1616 root_info->set_sequential_mode(true);
1617 root_info->set_root(0);
1618 return;
1621 try {
1622 root = C[level].get_n();
1624 root_info->set_blocksize(block_size);
1625 root_info->set_level(level);
1626 root_info->set_num_entries(item_count);
1627 root_info->set_root_is_fake(faked_root_block);
1628 root_info->set_sequential_mode(sequential);
1629 root_info->set_root(root);
1631 Btree_modified = false;
1633 for (int i = 0; i < BTREE_CURSOR_LEVELS; ++i) {
1634 C[i].init(block_size);
1637 free_list.set_revision(revision);
1638 free_list.commit(this, block_size);
1640 // Save the freelist details into the root_info.
1641 string serialised;
1642 free_list.pack(serialised);
1643 root_info->set_free_list(serialised);
1645 revision_number = revision;
1647 read_root();
1649 changed_n = 0;
1650 changed_c = DIR_START;
1651 seq_count = SEQ_START_POINT;
1652 } catch (...) {
1653 GlassTable::close();
1654 throw;
1658 void
1659 GlassTable::cancel(const RootInfo & root_info, glass_revision_number_t rev)
1661 LOGCALL_VOID(DB, "GlassTable::cancel", root_info|rev);
1662 Assert(writable);
1664 if (handle < 0) {
1665 if (handle == -2) {
1666 GlassTable::throw_database_closed();
1668 return;
1671 // This causes problems: if (!Btree_modified) return;
1673 if (flags & Xapian::DB_DANGEROUS)
1674 throw Xapian::InvalidOperationError("cancel() not supported under Xapian::DB_DANGEROUS");
1676 revision_number = rev;
1677 block_size = root_info.get_blocksize();
1678 root = root_info.get_root();
1679 level = root_info.get_level();
1680 item_count = root_info.get_num_entries();
1681 faked_root_block = root_info.get_root_is_fake();
1682 sequential = root_info.get_sequential_mode();
1684 Btree_modified = false;
1686 for (int j = 0; j <= level; j++) {
1687 C[j].init(block_size);
1688 C[j].rewrite = false;
1690 read_root();
1692 changed_n = 0;
1693 changed_c = DIR_START;
1694 seq_count = SEQ_START_POINT;
1696 if (cursor_created_since_last_modification) {
1697 cursor_created_since_last_modification = false;
1698 ++cursor_version;
1702 /************ B-tree reading ************/
1704 void
1705 GlassTable::do_open_to_read(const RootInfo * root_info,
1706 glass_revision_number_t rev)
1708 LOGCALL(DB, bool, "GlassTable::do_open_to_read", root_info|rev);
1709 if (handle == -2) {
1710 GlassTable::throw_database_closed();
1712 handle = io_open_block_rd(name + GLASS_TABLE_EXTENSION);
1713 if (handle < 0) {
1714 if (lazy) {
1715 // This table is optional when reading!
1716 revision_number = rev;
1717 return;
1719 string message("Couldn't open ");
1720 message += name;
1721 message += "DB to read: ";
1722 errno_to_string(errno, message);
1723 throw Xapian::DatabaseOpeningError(message);
1726 basic_open(root_info, rev);
1728 read_root();
1731 void
1732 GlassTable::open(int flags_, const RootInfo & root_info,
1733 glass_revision_number_t rev)
1735 LOGCALL_VOID(DB, "GlassTable::open", flags_|root_info|rev);
1736 close();
1738 flags = flags_;
1739 block_size = root_info.get_blocksize();
1740 root = root_info.get_root();
1742 if (!writable) {
1743 do_open_to_read(&root_info, rev);
1744 return;
1747 do_open_to_write(&root_info, rev);
1750 bool
1751 GlassTable::prev_for_sequential(Glass::Cursor * C_, int /*dummy*/) const
1753 LOGCALL(DB, bool, "GlassTable::prev_for_sequential", Literal("C_") | Literal("/*dummy*/"));
1754 int c = C_[0].c;
1755 AssertRel(DIR_START,<=,c);
1756 AssertRel(c,<,DIR_END(C_[0].get_p()));
1757 if (c == DIR_START) {
1758 uint4 n = C_[0].get_n();
1759 const byte * p;
1760 while (true) {
1761 if (n == 0) RETURN(false);
1762 n--;
1763 if (n == C[0].get_n()) {
1764 // Block is a leaf block in the built-in cursor (potentially in
1765 // modified form if the table is writable).
1766 p = C_[0].clone(C[0]);
1767 } else {
1768 if (writable) {
1769 // Blocks in the built-in cursor may not have been written
1770 // to disk yet, so we have to check that the block number
1771 // isn't in the built-in cursor or we'll read an
1772 // uninitialised block (for which GET_LEVEL(p) will
1773 // probably return 0).
1774 int j;
1775 for (j = 1; j <= level; ++j) {
1776 if (n == C[j].get_n()) break;
1778 if (j <= level) continue;
1781 // Block isn't in the built-in cursor, so the form on disk
1782 // is valid, so read it to check if it's the next level 0
1783 // block.
1784 byte * q = C_[0].init(block_size);
1785 read_block(n, q);
1786 p = q;
1787 C_[0].set_n(n);
1789 if (REVISION(p) > revision_number + writable) {
1790 set_overwritten();
1791 RETURN(false);
1793 if (GET_LEVEL(p) == 0) break;
1795 c = DIR_END(p);
1796 AssertRel(DIR_START,<,c);
1798 c -= D2;
1799 C_[0].c = c;
1800 RETURN(true);
1803 bool
1804 GlassTable::next_for_sequential(Glass::Cursor * C_, int /*dummy*/) const
1806 LOGCALL(DB, bool, "GlassTable::next_for_sequential", Literal("C_") | Literal("/*dummy*/"));
1807 const byte * p = C_[0].get_p();
1808 Assert(p);
1809 int c = C_[0].c;
1810 AssertRel(c,<,DIR_END(p));
1811 c += D2;
1812 Assert((unsigned)c < block_size);
1813 if (c == DIR_END(p)) {
1814 uint4 n = C_[0].get_n();
1815 while (true) {
1816 n++;
1817 if (n >= free_list.get_first_unused_block()) RETURN(false);
1818 if (writable) {
1819 if (n == C[0].get_n()) {
1820 // Block is a leaf block in the built-in cursor
1821 // (potentially in modified form).
1822 p = C_[0].clone(C[0]);
1823 } else {
1824 // Blocks in the built-in cursor may not have been written
1825 // to disk yet, so we have to check that the block number
1826 // isn't in the built-in cursor or we'll read an
1827 // uninitialised block (for which GET_LEVEL(p) will
1828 // probably return 0).
1829 int j;
1830 for (j = 1; j <= level; ++j) {
1831 if (n == C[j].get_n()) break;
1833 if (j <= level) continue;
1835 // Block isn't in the built-in cursor, so the form on disk
1836 // is valid, so read it to check if it's the next level 0
1837 // block.
1838 byte * q = C_[0].init(block_size);
1839 read_block(n, q);
1840 p = q;
1842 } else {
1843 byte * q = C_[0].init(block_size);
1844 read_block(n, q);
1845 p = q;
1847 if (REVISION(p) > revision_number + writable) {
1848 set_overwritten();
1849 RETURN(false);
1851 if (GET_LEVEL(p) == 0) break;
1853 c = DIR_START;
1854 C_[0].set_n(n);
1856 C_[0].c = c;
1857 RETURN(true);
1860 bool
1861 GlassTable::prev_default(Glass::Cursor * C_, int j) const
1863 LOGCALL(DB, bool, "GlassTable::prev_default", Literal("C_") | j);
1864 const byte * p = C_[j].get_p();
1865 int c = C_[j].c;
1866 AssertRel(DIR_START,<=,c);
1867 AssertRel(c,<,DIR_END(p));
1868 AssertRel((unsigned)DIR_END(p),<=,block_size);
1869 if (c == DIR_START) {
1870 if (j == level) RETURN(false);
1871 if (!prev_default(C_, j + 1)) RETURN(false);
1872 p = C_[j].get_p();
1873 c = DIR_END(p);
1874 AssertRel(DIR_START,<,c);
1876 c -= D2;
1877 C_[j].c = c;
1878 if (j > 0) {
1879 block_to_cursor(C_, j - 1, Item(p, c).block_given_by());
1881 RETURN(true);
1884 bool
1885 GlassTable::next_default(Glass::Cursor * C_, int j) const
1887 LOGCALL(DB, bool, "GlassTable::next_default", Literal("C_") | j);
1888 const byte * p = C_[j].get_p();
1889 int c = C_[j].c;
1890 AssertRel(c,<,DIR_END(p));
1891 AssertRel((unsigned)DIR_END(p),<=,block_size);
1892 c += D2;
1893 if (j > 0) {
1894 AssertRel(DIR_START,<,c);
1895 } else {
1896 AssertRel(DIR_START,<=,c);
1898 // Sometimes c can be DIR_END(p) + 2 here it appears...
1899 if (c >= DIR_END(p)) {
1900 if (j == level) RETURN(false);
1901 if (!next_default(C_, j + 1)) RETURN(false);
1902 p = C_[j].get_p();
1903 c = DIR_START;
1905 C_[j].c = c;
1906 if (j > 0) {
1907 block_to_cursor(C_, j - 1, Item(p, c).block_given_by());
1908 #ifdef BTREE_DEBUG_FULL
1909 printf("Block in GlassTable:next_default");
1910 report_block_full(j - 1, C_[j - 1].get_n(), C_[j - 1].get_p());
1911 #endif /* BTREE_DEBUG_FULL */
1913 RETURN(true);
1916 void
1917 GlassTable::throw_database_closed()
1919 throw Xapian::DatabaseError("Database has been closed");
1922 /** Compares this key with key2.
1924 The result is true if this key precedes key2. The comparison is for byte
1925 sequence collating order, taking lengths into account. So if the keys are
1926 made up of lower case ASCII letters we get alphabetical ordering.
1928 Now remember that items are added into the B-tree in fastest time
1929 when they are preordered by their keys. This is therefore the piece
1930 of code that needs to be followed to arrange for the preordering.
1932 This is complicated by the fact that keys have two parts - a value
1933 and then a count. We first compare the values, and only if they
1934 are equal do we compare the counts.
1937 bool Key::operator<(Key key2) const
1939 LOGCALL(DB, bool, "Key::operator<", static_cast<const void*>(key2.p));
1940 int key1_len = length();
1941 int key2_len = key2.length();
1942 if (key1_len == key2_len) {
1943 // The keys are the same length, so we can compare the counts
1944 // in the same operation since they're stored as 2 byte
1945 // bigendian numbers.
1946 RETURN(memcmp(p + K1, key2.p + K1, key1_len + C2) < 0);
1949 int k_smaller = (key2_len < key1_len ? key2_len : key1_len);
1951 // Compare the common part of the keys
1952 int diff = memcmp(p + K1, key2.p + K1, k_smaller);
1953 if (diff != 0) RETURN(diff < 0);
1955 // We dealt with the "same length" case above so we never need to check
1956 // the count here.
1957 RETURN(key1_len < key2_len);
1960 bool Key::operator==(Key key2) const
1962 LOGCALL(DB, bool, "Key::operator==", static_cast<const void*>(key2.p));
1963 int key1_len = length();
1964 if (key1_len != key2.length()) RETURN(false);
1965 // The keys are the same length, so we can compare the counts
1966 // in the same operation since they're stored as 2 byte
1967 // bigendian numbers.
1968 RETURN(memcmp(p + K1, key2.p + K1, key1_len + C2) == 0);