1 /* glass_table.cc: Btree implementation
3 * Copyright 1999,2000,2001 BrightStation PLC
4 * Copyright 2002 Ananova Ltd
5 * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015 Olly Betts
6 * Copyright 2008 Lemur Consulting Ltd
8 * This program is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU General Public License as
10 * published by the Free Software Foundation; either version 2 of the
11 * License, or (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
26 #include "glass_table.h"
28 #include <xapian/error.h>
30 #include "safeerrno.h"
33 #include "posixy_wrapper.h"
35 #include "stringutils.h" // For STRINGIZE().
37 #include <sys/types.h>
39 #include <cstring> /* for memmove */
40 #include <climits> /* for CHAR_BIT */
42 #include "glass_freelist.h"
43 #include "glass_changes.h"
44 #include "glass_cursor.h"
45 #include "glass_defs.h"
46 #include "glass_version.h"
49 #include "errno_to_string.h"
50 #include "filetests.h"
53 #include "unaligned.h"
55 #include <algorithm> // for std::min()
58 #include "xapian/constants.h"
60 using namespace Glass
;
63 // Only try to compress tags longer than this many bytes.
64 const size_t COMPRESS_MIN
= 4;
66 //#define BTREE_DEBUG_FULL 1
67 #undef BTREE_DEBUG_FULL
69 #ifdef BTREE_DEBUG_FULL
70 /*------debugging aids from here--------*/
72 static void print_key(const byte
* p
, int c
, int j
);
73 static void print_tag(const byte
* p
, int c
, int j
);
76 static void report_cursor(int N, Btree * B, Glass::Cursor * C)
80 for (i = 0; i <= B->level; i++)
81 printf("p=%d, c=%d, n=[%d], rewrite=%d\n",
82 C[i].p, C[i].c, C[i].n, C[i].rewrite);
86 /*------to here--------*/
87 #endif /* BTREE_DEBUG_FULL */
89 static inline byte
*zeroed_new(size_t size
)
91 byte
*temp
= new byte
[size
];
92 memset(temp
, 0, size
);
96 /* A B-tree consists of a file with extension GLASS_TABLE_EXTENSION divided
97 into a sequence of equal sized blocks, numbered 0, 1, 2 ... some of which
98 are free, some in use. Those in use are arranged in a tree. Lists of
99 currently unused blocks are stored in a freelist which is itself stored in
102 Some "root info" is needed to locate a particular revision of the B-tree
103 - this is stored in the "version file" (we store this data for all tables
104 at a particular revision together).
106 Each block, b, has a structure like this:
108 R L M T D o1 o2 o3 ... oN <gap> [item] .. [item] .. [item] ...
109 <---------- D ----------> <-M->
113 R = REVISION(b) is the revision number the B-tree had when the block was
114 written into the DB file.
115 L = GET_LEVEL(b) is the level of the block, which is the number of levels
116 towards the root of the B-tree structure. So leaf blocks
117 have level 0 and the one root block has the highest level
118 equal to the number of levels in the B-tree. For blocks
119 storing the freelist, the level is set to 254.
120 M = MAX_FREE(b) is the size of the gap between the end of the directory and
121 the first item of data. (It is not necessarily the maximum
122 size among the bits of space that are free, but I can't
123 think of a better name.)
124 T = TOTAL_FREE(b)is the total amount of free space left in b.
125 D = DIR_END(b) gives the offset to the end of the directory.
127 o1, o2 ... oN are a directory of offsets to the N items held in the block.
128 The items are key-tag pairs, and as they occur in the directory are ordered
131 An item has this form:
137 A long tag presented through the API is split up into C tags small enough to
138 be accommodated in the blocks of the B-tree. The key is extended to include
139 a counter, x, which runs from 1 to C. The key is preceded by a length, K,
140 and the whole item with a length, I, as depicted above.
142 Here are the corresponding definitions:
146 /** Flip to sequential addition block-splitting after this number of observed
147 * sequential additions (in negated form). */
148 #define SEQ_START_POINT (-10)
150 /* Note use of the limits.h values:
151 UCHAR_MAX = 255, an unsigned with all bits set, and
152 CHAR_BIT = 8, the number of bits per byte
154 BYTE_PAIR_RANGE below is the smallest +ve number that can't be held in two
155 bytes -- 64K effectively.
158 #define BYTE_PAIR_RANGE (1 << 2 * CHAR_BIT)
160 /// read_block(n, p) reads block n of the DB file to address p.
162 GlassTable::read_block(uint4 n
, byte
* p
) const
164 // Log the value of p, not the contents of the block it points to...
165 LOGCALL_VOID(DB
, "GlassTable::read_block", n
| (void*)p
);
166 if (rare(handle
== -2))
167 GlassTable::throw_database_closed();
168 AssertRel(n
,<,free_list
.get_first_unused_block());
170 io_read_block(handle
, reinterpret_cast<char *>(p
), block_size
, n
);
172 if (GET_LEVEL(p
) != LEVEL_FREELIST
) {
173 int dir_end
= DIR_END(p
);
174 if (rare(dir_end
< DIR_START
|| unsigned(dir_end
) > block_size
)) {
175 string
msg("dir_end invalid in block ");
177 throw Xapian::DatabaseCorruptError(msg
);
182 /** write_block(n, p, appending) writes block n in the DB file from address p.
184 * If appending is true (not specified it defaults to false), then this
185 * indicates that we've added data to a block in space which was previously
186 * unused, and are writing the block back in place - we use this to add
187 * free list entries (the information about where the freelist data for a
188 * revision begins and ends is stored in the "iamglass" file). We don't
189 * currently use this flag for much, but it signifies that we don't risk
190 * invalidating any existing revisions, which may be useful information.
193 GlassTable::write_block(uint4 n
, const byte
* p
, bool appending
) const
195 LOGCALL_VOID(DB
, "GlassTable::write_block", n
| p
| appending
);
197 /* Check that n is in range. */
198 AssertRel(n
,<,free_list
.get_first_unused_block());
200 /* don't write to non-free */
201 // FIXME: We can no longer check this easily.
202 // AssertParanoid(free_list.block_free_at_start(block_size, n));
204 /* write revision is okay */
205 AssertEqParanoid(REVISION(p
), revision_number
+ 1);
208 // In the case of the freelist, new entries can safely be written into
209 // the space at the end of the latest freelist block, as that's unused
210 // by previous revisions. It is also safe to write into a block which
211 // wasn't used in the previous revision.
213 // It's only when we start a new freelist block that we need to worry
214 // about invalidating old revisions.
215 } else if (flags
& Xapian::DB_DANGEROUS
) {
216 // FIXME: We should somehow flag this to prevent readers opening the
217 // database. Removing "iamglass" or setting a flag in it doesn't deal
218 // with any existing readers though. Perhaps we want to have readers
219 // read lock and try to take an exclusive lock here?
222 io_write_block(handle
, p
, block_size
, n
);
224 if (!changes_obj
) return;
227 // FIXME: track table_type in this class?
228 if (strcmp(tablename
, "position") == 0) {
229 v
= int(Glass::POSITION
);
230 } else if (strcmp(tablename
, "postlist") == 0) {
231 v
= int(Glass::POSTLIST
);
232 } else if (strcmp(tablename
, "docdata") == 0) {
233 v
= int(Glass::DOCDATA
);
234 } else if (strcmp(tablename
, "spelling") == 0) {
235 v
= int(Glass::SPELLING
);
236 } else if (strcmp(tablename
, "synonym") == 0) {
237 v
= int(Glass::SYNONYM
);
238 } else if (strcmp(tablename
, "termlist") == 0) {
239 v
= int(Glass::TERMLIST
);
244 if (block_size
== 2048) {
246 } else if (block_size
== 4096) {
248 } else if (block_size
== 8192) {
250 } else if (block_size
== 16384) {
252 } else if (block_size
== 32768) {
254 } else if (block_size
== 65536) {
262 // Write the block number to the file
265 changes_obj
->write_block(buf
);
266 changes_obj
->write_block(reinterpret_cast<const char *>(p
), block_size
);
269 /* A note on cursors:
271 Each B-tree level has a corresponding array element C[j] in a
272 cursor, C. C[0] is the leaf (or data) level, and C[B->level] is the
273 root block level. Within a level j,
275 C[j].p addresses the block
276 C[j].c is the offset into the directory entry in the block
277 C[j].n is the number of the block at C[j].p
279 A look up in the B-tree causes navigation of the blocks starting
280 from the root. In each block, p, we find an offset, c, to an item
281 which gives the number, n, of the block for the next level. This
282 leads to an array of values p,c,n which are held inside the cursor.
284 Any Btree object B has a built-in cursor, at B->C. But other cursors may
285 be created. If BC is a created cursor, BC->C is the cursor in the
286 sense given above, and BC->B is the handle for the B-tree again.
291 GlassTable::set_overwritten() const
293 LOGCALL_VOID(DB
, "GlassTable::set_overwritten", NO_ARGS
);
294 // If we're writable, there shouldn't be another writer who could cause
295 // overwritten to be flagged, so that's a DatabaseCorruptError.
297 throw Xapian::DatabaseCorruptError("Db block overwritten - are there multiple writers?");
298 throw Xapian::DatabaseModifiedError("The revision being read has been discarded - you should call Xapian::Database::reopen() and retry the operation");
301 /* block_to_cursor(C, j, n) puts block n into position C[j] of cursor
302 C, writing the block currently at C[j] back to disk if necessary.
307 is true iff C[j].n is different from block n in file DB. If it is
308 false no rewriting is necessary.
312 GlassTable::block_to_cursor(Glass::Cursor
* C_
, int j
, uint4 n
) const
314 LOGCALL_VOID(DB
, "GlassTable::block_to_cursor", (void*)C_
| j
| n
);
315 if (n
== C_
[j
].get_n()) return;
317 if (writable
&& C_
[j
].rewrite
) {
319 write_block(C_
[j
].get_n(), C_
[j
].get_p());
320 C_
[j
].rewrite
= false;
323 // Check if the block is in the built-in cursor (potentially in
326 if (n
== C
[j
].get_n()) {
327 p
= C_
[j
].clone(C
[j
]);
329 byte
* q
= C_
[j
].init(block_size
);
336 /* unsigned comparison */
337 if (rare(REVISION(p
) > REVISION(C_
[j
+ 1].get_p()))) {
343 if (rare(j
!= GET_LEVEL(p
))) {
344 string msg
= "Expected block ";
346 msg
+= " to be level ";
349 msg
+= str(GET_LEVEL(p
));
350 throw Xapian::DatabaseCorruptError(msg
);
354 /** Btree::alter(); is called when the B-tree is to be altered.
356 It causes new blocks to be forced for the current set of blocks in
359 The point is that if a block at level 0 is to be altered it may get
360 a new number. Then the pointer to this block from level 1 will need
361 changing. So the block at level 1 needs altering and may get a new
362 block number. Then the pointer to this block from level 2 will need
363 changing ... and so on back to the root.
365 The clever bit here is spotting the cases when we can make an early
366 exit from this process. If C[j].rewrite is true, C[j+k].rewrite
367 will be true for k = 1,2 ... We have been through all this before,
368 and there is no need to do it again. If C[j].n was free at the
369 start of the transaction, we can copy it back to the same place
370 without violating the integrity of the B-tree. We don't then need a
371 new n and can return. The corresponding C[j].rewrite may be true or
378 LOGCALL_VOID(DB
, "GlassTable::alter", NO_ARGS
);
380 if (flags
& Xapian::DB_DANGEROUS
) {
386 if (C
[j
].rewrite
) return; /* all new, so return */
389 glass_revision_number_t rev
= REVISION(C
[j
].get_p());
390 if (rev
== revision_number
+ 1) {
393 Assert(rev
< revision_number
+ 1);
394 uint4 n
= C
[j
].get_n();
395 free_list
.mark_block_unused(this, block_size
, n
);
396 SET_REVISION(C
[j
].get_modifiable_p(block_size
), revision_number
+ 1);
397 n
= free_list
.get_block(this, block_size
);
400 if (j
== level
) return;
402 Item_wr(C
[j
].get_modifiable_p(block_size
), C
[j
].c
).set_block_given_by(n
);
406 /** find_in_block(p, key, leaf, c) searches for the key in the block at p.
408 leaf is true for a data block, and false for an index block (when the
409 first key is dummy and never needs to be tested). What we get is the
410 directory entry to the last key <= the key being searched for.
412 The lookup is by binary chop, with i and j set to the left and
413 right ends of the search area. In sequential addition, c will often
414 be the answer, so we test the keys round c and move i and j towards
419 GlassTable::find_in_block(const byte
* p
, Key key
, bool leaf
, int c
)
421 LOGCALL_STATIC(DB
, int, "GlassTable::find_in_block", (const void*)p
| (const void *)key
.get_address() | leaf
| c
);
422 // c should be odd (either -1, or an even offset from DIR_START).
423 Assert((c
& 1) == 1);
432 if (c
< j
&& i
< c
&& Item(p
, c
).key() <= key
)
435 if (c
< j
&& i
< c
&& key
< Item(p
, c
).key())
440 int k
= i
+ ((j
- i
)/(D2
* 2))*D2
; /* mid way */
441 if (key
< Item(p
, k
).key()) j
= k
; else i
= k
;
444 AssertRel(DIR_START
- D2
,<=,i
);
446 AssertRel(DIR_START
,<=,i
);
448 AssertRel(i
,<,DIR_END(p
));
452 /** find(C_) searches for the key of B->kt in the B-tree.
454 Result is true if found, false otherwise. When false, the B_tree
455 cursor is positioned at the last key in the B-tree <= the search
456 key. Goes to first (null) item in B-tree when key length == 0.
460 GlassTable::find(Glass::Cursor
* C_
) const
462 LOGCALL(DB
, bool, "GlassTable::find", (void*)C_
);
463 // Note: the parameter is needed when we're called by GlassCursor
467 for (int j
= level
; j
> 0; --j
) {
469 c
= find_in_block(p
, key
, false, C_
[j
].c
);
470 #ifdef BTREE_DEBUG_FULL
471 printf("Block in GlassTable:find - code position 1");
472 report_block_full(j
, C_
[j
].get_n(), p
);
473 #endif /* BTREE_DEBUG_FULL */
475 block_to_cursor(C_
, j
- 1, Item(p
, c
).block_given_by());
478 c
= find_in_block(p
, key
, true, C_
[0].c
);
479 #ifdef BTREE_DEBUG_FULL
480 printf("Block in GlassTable:find - code position 2");
481 report_block_full(0, C_
[0].get_n(), p
);
482 #endif /* BTREE_DEBUG_FULL */
484 if (c
< DIR_START
) RETURN(false);
485 RETURN(Item(p
, c
).key() == key
);
488 /** compact(p) compact the block at p by shuffling all the items up to the end.
490 MAX_FREE(p) is then maximized, and is equal to TOTAL_FREE(p).
494 GlassTable::compact(byte
* p
)
496 LOGCALL_VOID(DB
, "GlassTable::compact", (void*)p
);
500 int dir_end
= DIR_END(p
);
501 for (int c
= DIR_START
; c
< dir_end
; c
+= D2
) {
505 memmove(b
+ e
, item
.get_address(), l
);
506 setD(p
, c
, e
); /* reform in b */
508 memmove(p
+ e
, b
+ e
, block_size
- e
); /* copy back */
510 SET_TOTAL_FREE(p
, e
);
514 /** Btree needs to gain a new level to insert more items: so split root block
515 * and construct a new one.
518 GlassTable::split_root(uint4 split_n
)
520 LOGCALL_VOID(DB
, "GlassTable::split_root", split_n
);
524 /* check level overflow - this isn't something that should ever happen
525 * but deserves more than an Assert()... */
526 if (level
== BTREE_CURSOR_LEVELS
) {
527 throw Xapian::DatabaseCorruptError("Btree has grown impossibly large (" STRINGIZE(BTREE_CURSOR_LEVELS
) " levels)");
530 byte
* q
= C
[level
].init(block_size
);
531 memset(q
, 0, block_size
);
532 C
[level
].c
= DIR_START
;
533 C
[level
].set_n(free_list
.get_block(this, block_size
));
534 C
[level
].rewrite
= true;
535 SET_REVISION(q
, revision_number
+ 1);
537 SET_DIR_END(q
, DIR_START
);
538 compact(q
); /* to reset TOTAL_FREE, MAX_FREE */
540 /* form a null key in b with a pointer to the old root */
541 byte b
[10]; /* 7 is exact */
543 item
.form_null_key(split_n
);
544 add_item(item
, level
);
547 /** enter_key(j, prevkey, newkey) is called after a block split.
549 It enters in the block at level C[j] a separating key for the block
550 at level C[j - 1]. The key itself is newkey. prevkey is the
551 preceding key, and at level 1 newkey can be trimmed down to the
552 first point of difference to prevkey for entry in C[j].
554 This code looks longer than it really is. If j exceeds the number
555 of B-tree levels the root block has split and we have to construct
556 a new one, but this is a rare event.
558 The key is constructed in b, with block number C[j - 1].n as tag,
559 and this is added in with add_item. add_item may itself cause a
560 block split, with a further call to enter_key. Hence the recursion.
563 GlassTable::enter_key(int j
, Key prevkey
, Key newkey
)
565 LOGCALL_VOID(DB
, "GlassTable::enter_key", j
| Literal("prevkey") | Literal("newkey"));
567 Assert(prevkey
< newkey
);
570 uint4 blocknumber
= C
[j
- 1].get_n();
572 // FIXME update to use Key
573 // Keys are truncated here: but don't truncate the count at the end away.
574 const int newkey_len
= newkey
.length();
575 AssertRel(newkey_len
,>,0);
579 // Truncate the key to the minimal key which differs from prevkey,
580 // the preceding key in the block.
582 const int min_len
= min(newkey_len
, prevkey
.length());
583 while (i
< min_len
&& prevkey
[i
] == newkey
[i
]) {
587 // Want one byte of difference.
588 if (i
< newkey_len
) i
++;
590 /* Can't truncate between branch levels, since the separated keys
591 * are in at the leaf level, and truncating again will change the
597 byte b
[UCHAR_MAX
+ 6];
599 Assert(i
<= 256 - I2
- C2
);
600 Assert(i
<= (int)sizeof(b
) - I2
- C2
- 4);
601 item
.set_key_and_block(newkey
, i
, blocknumber
);
603 // When j > 1 we can make the first key of block p null. This is probably
604 // worthwhile as it trades a small amount of CPU and RAM use for a small
605 // saving in disk use. Other redundant keys will still creep in though.
607 byte
* p
= C
[j
- 1].get_modifiable_p(block_size
);
608 uint4 n
= getint4(newkey
.get_address(), newkey_len
+ K1
+ C2
);
609 int new_total_free
= TOTAL_FREE(p
) + newkey_len
+ C2
;
610 // FIXME: incredibly icky going from key to item like this...
611 Item_wr(const_cast<byte
*>(newkey
.get_address()) - I2
).form_null_key(n
);
612 SET_TOTAL_FREE(p
, new_total_free
);
615 // The split block gets inserted into the parent after the pointer to the
617 AssertEq(C
[j
].c
, find_in_block(C
[j
].get_p(), item
.key(), false, C
[j
].c
));
619 C
[j
].rewrite
= true; /* a subtle point: this *is* required. */
623 /** mid_point(p) finds the directory entry in c that determines the
624 approximate mid point of the data in the block at p.
628 GlassTable::mid_point(byte
* p
)
630 LOGCALL(DB
, int, "GlassTable::mid_point", (void*)p
);
632 int dir_end
= DIR_END(p
);
633 int size
= block_size
- TOTAL_FREE(p
) - dir_end
;
634 for (int c
= DIR_START
; c
< dir_end
; c
+= D2
) {
635 int l
= Item(p
, c
).size();
638 if (l
< n
- size
) RETURN(c
);
643 /* This shouldn't happen, as the sum of the item sizes should be the same
644 * as the value calculated in size, so assert but return a sane value just
650 /** add_item_to_block(p, kt_, c) adds item kt_ to the block at p.
652 c is the offset in the directory that needs to be expanded to accommodate
653 the new entry for the item. We know before this is called that there is
654 enough contiguous room for the item in the block, so it's just a matter of
655 shuffling up any directory entries after where we're inserting and copying
660 GlassTable::add_item_to_block(byte
* p
, Item_wr kt_
, int c
)
662 LOGCALL_VOID(DB
, "GlassTable::add_item_to_block", (void*)p
| Literal("kt_") | c
);
664 int dir_end
= DIR_END(p
);
665 int kt_len
= kt_
.size();
666 int needed
= kt_len
+ D2
;
667 int new_total
= TOTAL_FREE(p
) - needed
;
668 int new_max
= MAX_FREE(p
) - needed
;
670 Assert(new_total
>= 0);
672 AssertRel(MAX_FREE(p
),>=,needed
);
674 AssertRel(DIR_START
,<=,c
);
675 AssertRel(c
,<=,dir_end
);
677 memmove(p
+ c
+ D2
, p
+ c
, dir_end
- c
);
679 SET_DIR_END(p
, dir_end
);
681 int o
= dir_end
+ new_max
;
683 memmove(p
+ o
, kt_
.get_address(), kt_len
);
685 SET_MAX_FREE(p
, new_max
);
687 SET_TOTAL_FREE(p
, new_total
);
690 /** GlassTable::add_item(kt_, j) adds item kt_ to the block at cursor level C[j].
692 * If there is not enough room the block splits and the item is then
693 * added to the appropriate half.
696 GlassTable::add_item(Item_wr kt_
, int j
)
698 LOGCALL_VOID(DB
, "GlassTable::add_item", Literal("kt_") | j
);
700 byte
* p
= C
[j
].get_modifiable_p(block_size
);
704 int needed
= kt_
.size() + D2
;
705 if (TOTAL_FREE(p
) < needed
) {
707 // Prepare to split p. After splitting, the block is in two halves, the
708 // lower half is split_p, the upper half p again. add_to_upper_half
709 // becomes true when the item gets added to p, false when it gets added
713 // If we're not in sequential mode, we split at the mid point
717 // During sequential addition, split at the insert point
718 AssertRel(c
,>=,DIR_START
);
722 uint4 split_n
= C
[j
].get_n();
723 C
[j
].set_n(free_list
.get_block(this, block_size
));
725 memcpy(split_p
, p
, block_size
); // replicate the whole block in split_p
726 SET_DIR_END(split_p
, m
);
727 compact(split_p
); /* to reset TOTAL_FREE, MAX_FREE */
730 int residue
= DIR_END(p
) - m
;
731 int new_dir_end
= DIR_START
+ residue
;
732 memmove(p
+ DIR_START
, p
+ m
, residue
);
733 SET_DIR_END(p
, new_dir_end
);
736 compact(p
); /* to reset TOTAL_FREE, MAX_FREE */
738 bool add_to_upper_half
;
740 add_to_upper_half
= (c
>= m
);
742 // And add item to lower half if split_p has room, otherwise upper
744 add_to_upper_half
= (TOTAL_FREE(split_p
) < needed
);
747 if (add_to_upper_half
) {
748 c
-= (m
- DIR_START
);
749 Assert(seq_count
< 0 || c
<= DIR_START
+ D2
);
750 Assert(c
>= DIR_START
);
751 Assert(c
<= DIR_END(p
));
752 add_item_to_block(p
, kt_
, c
);
755 Assert(c
>= DIR_START
);
756 Assert(c
<= DIR_END(split_p
));
757 add_item_to_block(split_p
, kt_
, c
);
760 write_block(split_n
, split_p
);
762 // Check if we're splitting the root block.
763 if (j
== level
) split_root(split_n
);
765 /* Enter a separating key at level j + 1 between */
766 /* the last key of block split_p, and the first key of block p */
768 Item(split_p
, DIR_END(split_p
) - D2
).key(),
769 Item(p
, DIR_START
).key());
771 AssertRel(TOTAL_FREE(p
),>=,needed
);
773 if (MAX_FREE(p
) < needed
) {
775 AssertRel(MAX_FREE(p
),>=,needed
);
778 add_item_to_block(p
, kt_
, c
);
787 /** GlassTable::delete_item(j, repeatedly) is (almost) the converse of add_item.
789 * If repeatedly is true, the process repeats at the next level when a
790 * block has been completely emptied, freeing the block and taking out
791 * the pointer to it. Emptied root blocks are also removed, which
792 * reduces the number of levels in the B-tree.
795 GlassTable::delete_item(int j
, bool repeatedly
)
797 LOGCALL_VOID(DB
, "GlassTable::delete_item", j
| repeatedly
);
799 byte
* p
= C
[j
].get_modifiable_p(block_size
);
801 AssertRel(DIR_START
,<=,c
);
802 AssertRel(c
,<,DIR_END(p
));
803 int kt_len
= Item(p
, c
).size(); /* size of the item to be deleted */
804 int dir_end
= DIR_END(p
) - D2
; /* directory length will go down by 2 bytes */
806 memmove(p
+ c
, p
+ c
+ D2
, dir_end
- c
);
807 SET_DIR_END(p
, dir_end
);
808 SET_MAX_FREE(p
, MAX_FREE(p
) + D2
);
809 SET_TOTAL_FREE(p
, TOTAL_FREE(p
) + kt_len
+ D2
);
811 if (!repeatedly
) return;
813 if (dir_end
== DIR_START
) {
814 free_list
.mark_block_unused(this, block_size
, C
[j
].get_n());
815 C
[j
].rewrite
= false;
816 C
[j
].set_n(BLK_UNUSED
);
817 C
[j
+ 1].rewrite
= true; /* *is* necessary */
818 delete_item(j
+ 1, true);
822 while (dir_end
== DIR_START
+ D2
&& level
> 0) {
823 /* single item in the root block, so lose a level */
824 uint4 new_root
= Item(C
[level
].get_p(), DIR_START
).block_given_by();
825 free_list
.mark_block_unused(this, block_size
, C
[level
].get_n());
829 block_to_cursor(C
, level
, new_root
);
831 dir_end
= DIR_END(C
[level
].get_p()); /* prepare for the loop */
840 /** add_kt(found) adds the item (key-tag pair) at B->kt into the
841 B-tree, using cursor C.
843 found == find() is handed over as a parameter from Btree::add.
844 Btree::alter() prepares for the alteration to the B-tree. Then
845 there are a number of cases to consider:
847 If an item with the same key is in the B-tree (found is true),
848 the new kt replaces it.
850 If then kt is smaller, or the same size as, the item it replaces,
851 kt is put in the same place as the item it replaces, and the
852 TOTAL_FREE measure is reduced.
854 If kt is larger than the item it replaces it is put in the
855 MAX_FREE space if there is room, and the directory entry and
856 space counts are adjusted accordingly.
858 - But if there is not room we do it the long way: the old item is
859 deleted with delete_item and kt is added in with add_item.
861 If the key of kt is not in the B-tree (found is false), the new
862 kt is added in with add_item.
866 GlassTable::add_kt(bool found
)
868 LOGCALL(DB
, int, "GlassTable::add_kt", found
);
874 printf("%d) %s ", addcount++, (found ? "replacing" : "adding"));
875 print_bytes(kt[I2] - K1 - C2, kt + I2 + K1); putchar('\n');
880 if (found
) { /* replacement */
881 seq_count
= SEQ_START_POINT
;
884 byte
* p
= C
[0].get_modifiable_p(block_size
);
886 AssertRel(DIR_START
,<=,c
);
887 AssertRel(c
,<,DIR_END(p
));
889 int kt_size
= kt
.size();
890 int needed
= kt_size
- item
.size();
892 components
= item
.components_of();
895 /* simple replacement */
896 memmove(const_cast<byte
*>(item
.get_address()),
897 kt
.get_address(), kt_size
);
898 SET_TOTAL_FREE(p
, TOTAL_FREE(p
) - needed
);
900 /* new item into the block's freespace */
901 int new_max
= MAX_FREE(p
) - kt_size
;
903 int o
= DIR_END(p
) + new_max
;
904 memmove(p
+ o
, kt
.get_address(), kt_size
);
906 SET_MAX_FREE(p
, new_max
);
907 SET_TOTAL_FREE(p
, TOTAL_FREE(p
) - needed
);
909 /* do it the long way */
910 delete_item(0, false);
916 if (changed_n
== C
[0].get_n() && changed_c
== C
[0].c
) {
917 if (seq_count
< 0) seq_count
++;
919 seq_count
= SEQ_START_POINT
;
928 /* delete_kt() corresponds to add_kt(found), but there are only
929 two cases: if the key is not found nothing is done, and if it is
930 found the corresponding item is deleted with delete_item.
934 GlassTable::delete_kt()
936 LOGCALL(DB
, int, "GlassTable::delete_kt", NO_ARGS
);
939 bool found
= find(C
);
942 seq_count
= SEQ_START_POINT
;
947 printf("%d) %s ", addcount++, (found ? "deleting " : "ignoring "));
948 print_bytes(B->kt[I2] - K1 - C2, B->kt + I2 + K1); putchar('\n');
952 components
= Item(C
[0].get_p(), C
[0].c
).components_of();
954 delete_item(0, true);
959 /* GlassTable::form_key(key) treats address kt as an item holder and fills in
964 The bracketed parts are left blank. The key is filled in with key_len bytes and
965 K set accordingly. c is set to 1.
968 void GlassTable::form_key(const string
& key
) const
970 LOGCALL_VOID(DB
, "GlassTable::form_key", key
);
974 /* GlassTable::add(key, tag) adds the key/tag item to the
975 B-tree, replacing any existing item with the same key.
977 For a long tag, we end up having to add m components, of the form
984 and tag1+tag2+...+tagm are equal to tag. These in their turn may be replacing
985 n components of the form
992 and n may be greater than, equal to, or less than m. These cases are dealt
993 with in the code below. If m < n for example, we end up with a series of
998 GlassTable::add(const string
&key
, string tag
, bool already_compressed
)
1000 LOGCALL_VOID(DB
, "GlassTable::add", key
| tag
| already_compressed
);
1005 GlassTable::throw_database_closed();
1012 bool compressed
= false;
1013 if (already_compressed
) {
1015 } else if (compress_strategy
!= DONT_COMPRESS
&& tag
.size() > COMPRESS_MIN
) {
1016 static_assert(DONT_COMPRESS
!= Z_DEFAULT_STRATEGY
,
1017 "DONT_COMPRESS clashes with zlib constant");
1018 static_assert(DONT_COMPRESS
!= Z_FILTERED
,
1019 "DONT_COMPRESS clashes with zlib constant");
1020 static_assert(DONT_COMPRESS
!= Z_HUFFMAN_ONLY
,
1021 "DONT_COMPRESS clashes with zlib constant");
1023 static_assert(DONT_COMPRESS
!= Z_RLE
,
1024 "DONT_COMPRESS clashes with zlib constant");
1027 comp_stream
.lazy_alloc_deflate_zstream();
1029 comp_stream
.deflate_zstream
->next_in
= (Bytef
*)const_cast<char *>(tag
.data());
1030 comp_stream
.deflate_zstream
->avail_in
= (uInt
)tag
.size();
1032 // If compressed size is >= tag.size(), we don't want to compress.
1033 unsigned long blk_len
= tag
.size() - 1;
1034 unsigned char * blk
= new unsigned char[blk_len
];
1035 comp_stream
.deflate_zstream
->next_out
= blk
;
1036 comp_stream
.deflate_zstream
->avail_out
= (uInt
)blk_len
;
1038 int err
= deflate(comp_stream
.deflate_zstream
, Z_FINISH
);
1039 if (err
== Z_STREAM_END
) {
1040 // If deflate succeeded, then the output was at least one byte
1041 // smaller than the input.
1042 tag
.assign(reinterpret_cast<const char *>(blk
), comp_stream
.deflate_zstream
->total_out
);
1045 // Deflate failed - presumably the data wasn't compressible.
1051 // sort of matching kt.append_chunk(), but setting the chunk
1052 const size_t cd
= kt
.key().length() + K1
+ I2
+ C2
+ C2
; // offset to the tag data
1053 const size_t L
= max_item_size
- cd
; // largest amount of tag data for any chunk
1054 size_t first_L
= L
; // - amount for tag1
1055 bool found
= find(C
);
1057 const byte
* p
= C
[0].get_p();
1058 size_t n
= TOTAL_FREE(p
) % (max_item_size
+ D2
);
1061 // if n >= last then fully filling this block won't produce
1062 // an extra item, so we might as well do this even if
1063 // full_compaction isn't active.
1065 // In the full_compaction case, it turns out we shouldn't always
1066 // try to fill every last byte. Doing so can actually increase the
1067 // total space required (I believe this effect is due to longer
1068 // dividing keys being required in the index blocks). Empirically,
1069 // n >= key.size() + K appears a good criterion for K ~= 34. This
1070 // seems to save about 0.2% in total database size over always
1071 // splitting the tag. It'll also give be slightly faster retrieval
1072 // as we can avoid reading an extra block occasionally.
1073 size_t last
= tag
.length() % L
;
1074 if (n
>= last
|| (full_compaction
&& n
>= key
.size() + 34))
1079 // a null tag must be added in of course
1080 int m
= tag
.empty() ? 1 : (tag
.length() - first_L
+ L
- 1) / L
+ 1;
1081 // there are m items to add
1082 /* FIXME: sort out this error higher up and turn this into
1085 if (m
>= BYTE_PAIR_RANGE
)
1086 throw Xapian::UnimplementedError("Can't handle insanely large tags");
1088 int n
= 0; // initialise to shut off warning
1089 // - and there will be n to delete
1090 int o
= 0; // Offset into the tag
1091 size_t residue
= tag
.length(); // Bytes of the tag remaining to add in
1092 int replacement
= false; // Has there been a replacement ?
1094 kt
.set_components_of(m
);
1095 for (i
= 1; i
<= m
; i
++) {
1096 size_t l
= (i
== m
? residue
: (i
== 1 ? first_L
: L
));
1097 Assert(cd
+ l
<= block_size
);
1098 Assert(string::size_type(o
+ l
) <= tag
.length());
1099 kt
.set_tag(cd
, tag
.data() + o
, l
, compressed
);
1100 kt
.set_component_of(i
);
1105 if (i
> 1) found
= find(C
);
1107 if (n
> 0) replacement
= true;
1109 /* o == tag.length() here, and n may be zero */
1110 for (i
= m
+ 1; i
<= n
; i
++) {
1111 kt
.set_component_of(i
);
1114 if (!replacement
) ++item_count
;
1115 Btree_modified
= true;
1116 if (cursor_created_since_last_modification
) {
1117 cursor_created_since_last_modification
= false;
1122 /* GlassTable::del(key) returns false if the key is not in the B-tree,
1123 otherwise deletes it and returns true.
1125 Again, this is parallel to GlassTable::add, but simpler in form.
1129 GlassTable::del(const string
&key
)
1131 LOGCALL(DB
, bool, "GlassTable::del", key
);
1136 GlassTable::throw_database_closed();
1141 // We can't delete a key which we is too long for us to store.
1142 if (key
.size() > GLASS_BTREE_MAX_KEY_LEN
) RETURN(false);
1144 if (key
.empty()) RETURN(false);
1147 int n
= delete_kt(); /* there are n items to delete */
1148 if (n
<= 0) RETURN(false);
1150 for (int i
= 2; i
<= n
; i
++) {
1151 kt
.set_component_of(i
);
1156 Btree_modified
= true;
1157 if (cursor_created_since_last_modification
) {
1158 cursor_created_since_last_modification
= false;
1165 GlassTable::readahead_key(const string
&key
) const
1167 LOGCALL(DB
, bool, "GlassTable::readahead_key", key
);
1168 Assert(!key
.empty());
1172 // handle = -1: Lazy table which isn't yet open
1174 // handle = -2: Table has been closed. Since the readahead is just a
1175 // hint, we can safely ignore it for a closed table.
1179 // If the table only has one level, there are no branch blocks to preread.
1184 Key ktkey
= kt
.key();
1186 // We'll only readahead the first level, since descending the B-tree would
1187 // require actual reads that would likely hurt performance more than help.
1188 const byte
* p
= C
[level
].get_p();
1189 int c
= find_in_block(p
, ktkey
, false, C
[level
].c
);
1190 uint4 n
= Item(p
, c
).block_given_by();
1191 // Don't preread if it's the block we last preread or already in the
1193 if (n
!= last_readahead
&& n
!= C
[level
- 1].get_n()) {
1195 if (!io_readahead_block(handle
, block_size
, n
))
1202 GlassTable::get_exact_entry(const string
&key
, string
& tag
) const
1204 LOGCALL(DB
, bool, "GlassTable::get_exact_entry", key
| tag
);
1205 Assert(!key
.empty());
1209 GlassTable::throw_database_closed();
1214 // An oversized key can't exist, so attempting to search for it should fail.
1215 if (key
.size() > GLASS_BTREE_MAX_KEY_LEN
) RETURN(false);
1218 if (!find(C
)) RETURN(false);
1220 (void)read_tag(C
, &tag
, false);
1225 GlassTable::key_exists(const string
&key
) const
1227 LOGCALL(DB
, bool, "GlassTable::key_exists", key
);
1228 Assert(!key
.empty());
1230 // An oversized key can't exist, so attempting to search for it should fail.
1231 if (key
.size() > GLASS_BTREE_MAX_KEY_LEN
) RETURN(false);
1238 GlassTable::read_tag(Glass::Cursor
* C_
, string
*tag
, bool keep_compressed
) const
1240 LOGCALL(DB
, bool, "GlassTable::read_tag", Literal("C_") | tag
| keep_compressed
);
1241 Item
item(C_
[0].get_p(), C_
[0].c
);
1243 /* n components to join */
1244 int n
= item
.components_of();
1247 // max_item_size also includes K1 + I2 + C2 + C2 bytes overhead and the key
1248 // (which is at least 1 byte long).
1249 if (n
> 1) tag
->reserve((max_item_size
- (1 + K1
+ I2
+ C2
+ C2
)) * n
);
1251 item
.append_chunk(tag
);
1252 bool compressed
= item
.get_compressed();
1254 for (int i
= 2; i
<= n
; i
++) {
1256 throw Xapian::DatabaseCorruptError("Unexpected end of table when reading continuation of tag");
1258 (void)Item(C_
[0].get_p(), C_
[0].c
).append_chunk(tag
);
1260 // At this point the cursor is on the last item - calling next will move
1261 // it to the next key (GlassCursor::get_tag() relies on this).
1262 if (!compressed
|| keep_compressed
) RETURN(compressed
);
1264 // FIXME: Perhaps we should we decompress each chunk as we read it so we
1265 // don't need both the full compressed and uncompressed tags in memory
1269 // May not be enough for a compressed tag, but it's a reasonable guess.
1270 utag
.reserve(tag
->size() + tag
->size() / 2);
1274 comp_stream
.lazy_alloc_inflate_zstream();
1276 comp_stream
.inflate_zstream
->next_in
= (Bytef
*)const_cast<char *>(tag
->data());
1277 comp_stream
.inflate_zstream
->avail_in
= (uInt
)tag
->size();
1280 while (err
!= Z_STREAM_END
) {
1281 comp_stream
.inflate_zstream
->next_out
= buf
;
1282 comp_stream
.inflate_zstream
->avail_out
= (uInt
)sizeof(buf
);
1283 err
= inflate(comp_stream
.inflate_zstream
, Z_SYNC_FLUSH
);
1284 if (err
== Z_BUF_ERROR
&& comp_stream
.inflate_zstream
->avail_in
== 0) {
1285 LOGLINE(DB
, "Z_BUF_ERROR - faking checksum of " << comp_stream
.inflate_zstream
->adler
);
1287 setint4(header2
, 0, comp_stream
.inflate_zstream
->adler
);
1288 comp_stream
.inflate_zstream
->next_in
= header2
;
1289 comp_stream
.inflate_zstream
->avail_in
= 4;
1290 err
= inflate(comp_stream
.inflate_zstream
, Z_SYNC_FLUSH
);
1291 if (err
== Z_STREAM_END
) break;
1294 if (err
!= Z_OK
&& err
!= Z_STREAM_END
) {
1295 if (err
== Z_MEM_ERROR
) throw std::bad_alloc();
1296 string msg
= "inflate failed";
1297 if (comp_stream
.inflate_zstream
->msg
) {
1299 msg
+= comp_stream
.inflate_zstream
->msg
;
1302 throw Xapian::DatabaseError(msg
);
1305 utag
.append(reinterpret_cast<const char *>(buf
),
1306 comp_stream
.inflate_zstream
->next_out
- buf
);
1308 if (utag
.size() != comp_stream
.inflate_zstream
->total_out
) {
1309 string msg
= "compressed tag didn't expand to the expected size: ";
1310 msg
+= str(utag
.size());
1312 // OpenBSD's zlib.h uses off_t instead of uLong for total_out.
1313 msg
+= str((size_t)comp_stream
.inflate_zstream
->total_out
);
1314 throw Xapian::DatabaseCorruptError(msg
);
1323 GlassTable::set_full_compaction(bool parity
)
1325 LOGCALL_VOID(DB
, "GlassTable::set_full_compaction", parity
);
1328 if (parity
) seq_count
= 0;
1329 full_compaction
= parity
;
1332 GlassCursor
* GlassTable::cursor_get() const {
1333 LOGCALL(DB
, GlassCursor
*, "GlassTable::cursor_get", NO_ARGS
);
1336 GlassTable::throw_database_closed();
1340 // FIXME Ick - casting away const is nasty
1341 RETURN(new GlassCursor(const_cast<GlassTable
*>(this)));
1344 /************ B-tree opening and closing ************/
1347 GlassTable::basic_open(const RootInfo
* root_info
, glass_revision_number_t rev
)
1349 LOGCALL_VOID(DB
, "GlassTable::basic_open", root_info
|rev
);
1350 revision_number
= rev
;
1352 root
= root_info
->get_root();
1353 level
= root_info
->get_level();
1354 item_count
= root_info
->get_num_entries();
1355 faked_root_block
= root_info
->get_root_is_fake();
1356 sequential
= root_info
->get_sequential_mode();
1357 const string
& fl_serialised
= root_info
->get_free_list();
1358 if (!fl_serialised
.empty()) {
1359 if (!free_list
.unpack(fl_serialised
))
1360 throw Xapian::DatabaseCorruptError("Bad freelist metadata");
1368 faked_root_block
= true;
1373 /* kt holds constructed items as well as keys */
1374 kt
= Item_wr(zeroed_new(block_size
));
1376 set_max_item_size(BLOCK_CAPACITY
);
1378 for (int j
= 0; j
<= level
; j
++) {
1379 C
[j
].init(block_size
);
1384 if (cursor_created_since_last_modification
) {
1385 cursor_created_since_last_modification
= false;
1391 GlassTable::read_root()
1393 LOGCALL_VOID(DB
, "GlassTable::read_root", NO_ARGS
);
1394 if (faked_root_block
) {
1395 /* root block for an unmodified database. */
1396 byte
* p
= C
[0].init(block_size
);
1399 /* clear block - shouldn't be necessary, but is a bit nicer,
1400 * and means that the same operations should always produce
1401 * the same database. */
1402 memset(p
, 0, block_size
);
1404 int o
= block_size
- I2
- K1
- C2
- C2
;
1405 Item_wr(p
+ o
).fake_root_item();
1407 setD(p
, DIR_START
, o
); // its directory entry
1408 SET_DIR_END(p
, DIR_START
+ D2
);// the directory size
1410 o
-= (DIR_START
+ D2
);
1412 SET_TOTAL_FREE(p
, o
);
1416 /* reading - revision number doesn't matter as long as
1417 * it's not greater than the current one. */
1422 SET_REVISION(p
, revision_number
+ 1);
1423 C
[0].set_n(free_list
.get_block(this, block_size
));
1426 /* using a root block stored on disk */
1427 block_to_cursor(C
, level
, root
);
1429 if (REVISION(C
[level
].get_p()) > revision_number
) set_overwritten();
1430 /* although this is unlikely */
1435 GlassTable::do_open_to_write(const RootInfo
* root_info
,
1436 glass_revision_number_t rev
)
1438 LOGCALL_VOID(DB
, "GlassTable::do_open_to_write", root_info
|rev
);
1440 GlassTable::throw_database_closed();
1442 handle
= io_open_block_wr(name
+ GLASS_TABLE_EXTENSION
,
1445 // lazy doesn't make a lot of sense when we're creating a DB (which
1446 // is the case when root_info==NULL), but ENOENT with O_CREAT means a
1447 // parent directory doesn't exist.
1448 if (lazy
&& root_info
&& errno
== ENOENT
) {
1449 revision_number
= rev
;
1452 string
message(!root_info
? "Couldn't create " : "Couldn't open ");
1454 message
+= "DB read/write: ";
1455 errno_to_string(errno
, message
);
1456 throw Xapian::DatabaseOpeningError(message
);
1460 basic_open(root_info
, rev
);
1462 split_p
= new byte
[block_size
];
1464 buffer
= zeroed_new(block_size
);
1467 changed_c
= DIR_START
;
1468 seq_count
= SEQ_START_POINT
;
1471 GlassTable::GlassTable(const char * tablename_
, const string
& path_
,
1472 bool readonly_
, int compress_strategy_
, bool lazy_
)
1473 : tablename(tablename_
),
1477 faked_root_block(true),
1490 Btree_modified(false),
1491 full_compaction(false),
1492 writable(!readonly_
),
1493 cursor_created_since_last_modification(false),
1497 compress_strategy(compress_strategy_
),
1498 comp_stream(compress_strategy_
),
1500 last_readahead(BLK_UNUSED
)
1502 LOGCALL_CTOR(DB
, "GlassTable", tablename_
| path_
| readonly_
| compress_strategy_
| lazy_
);
1506 GlassTable::exists() const {
1507 LOGCALL(DB
, bool, "GlassTable::exists", NO_ARGS
);
1508 return file_exists(name
+ GLASS_TABLE_EXTENSION
);
1512 GlassTable::create_and_open(int flags_
, unsigned int block_size_
)
1514 LOGCALL_VOID(DB
, "GlassTable::create_and_open", flags_
|block_size_
);
1516 GlassTable::throw_database_closed();
1521 Assert(block_size_
>= 2048);
1522 Assert(block_size_
<= BYTE_PAIR_RANGE
);
1523 // Must be a power of two.
1524 Assert((block_size_
& (block_size_
- 1)) == 0);
1527 block_size
= block_size_
;
1531 (void)io_unlink(name
+ GLASS_TABLE_EXTENSION
);
1533 // FIXME: it would be good to arrange that this works such that there's
1534 // always a valid table in place if you run create_and_open() on an
1541 GlassTable::~GlassTable() {
1542 LOGCALL_DTOR(DB
, "GlassTable");
1543 GlassTable::close();
1546 void GlassTable::close(bool permanent
) {
1547 LOGCALL_VOID(DB
, "GlassTable::close", permanent
);
1550 // If an error occurs here, we just ignore it, since we're just
1551 // trying to free everything.
1552 (void)::close(handle
);
1558 // Don't delete the resources in the table, since they may
1559 // still be used to look up cached content.
1562 for (int j
= level
; j
>= 0; j
--) {
1568 delete [] kt
.get_address();
1575 GlassTable::flush_db()
1577 LOGCALL_VOID(DB
, "GlassTable::flush_db", NO_ARGS
);
1581 GlassTable::throw_database_closed();
1586 for (int j
= level
; j
>= 0; j
--) {
1588 write_block(C
[j
].get_n(), C
[j
].get_p());
1592 if (Btree_modified
) {
1593 faked_root_block
= false;
1598 GlassTable::commit(glass_revision_number_t revision
, RootInfo
* root_info
)
1600 LOGCALL_VOID(DB
, "GlassTable::commit", revision
|root_info
);
1603 if (revision
<= revision_number
) {
1604 throw Xapian::DatabaseError("New revision too low");
1609 GlassTable::throw_database_closed();
1611 revision_number
= revision
;
1612 root_info
->set_blocksize(block_size
);
1613 root_info
->set_level(0);
1614 root_info
->set_num_entries(0);
1615 root_info
->set_root_is_fake(true);
1616 root_info
->set_sequential_mode(true);
1617 root_info
->set_root(0);
1622 root
= C
[level
].get_n();
1624 root_info
->set_blocksize(block_size
);
1625 root_info
->set_level(level
);
1626 root_info
->set_num_entries(item_count
);
1627 root_info
->set_root_is_fake(faked_root_block
);
1628 root_info
->set_sequential_mode(sequential
);
1629 root_info
->set_root(root
);
1631 Btree_modified
= false;
1633 for (int i
= 0; i
< BTREE_CURSOR_LEVELS
; ++i
) {
1634 C
[i
].init(block_size
);
1637 free_list
.set_revision(revision
);
1638 free_list
.commit(this, block_size
);
1640 // Save the freelist details into the root_info.
1642 free_list
.pack(serialised
);
1643 root_info
->set_free_list(serialised
);
1645 revision_number
= revision
;
1650 changed_c
= DIR_START
;
1651 seq_count
= SEQ_START_POINT
;
1653 GlassTable::close();
1659 GlassTable::cancel(const RootInfo
& root_info
, glass_revision_number_t rev
)
1661 LOGCALL_VOID(DB
, "GlassTable::cancel", root_info
|rev
);
1666 GlassTable::throw_database_closed();
1671 // This causes problems: if (!Btree_modified) return;
1673 if (flags
& Xapian::DB_DANGEROUS
)
1674 throw Xapian::InvalidOperationError("cancel() not supported under Xapian::DB_DANGEROUS");
1676 revision_number
= rev
;
1677 block_size
= root_info
.get_blocksize();
1678 root
= root_info
.get_root();
1679 level
= root_info
.get_level();
1680 item_count
= root_info
.get_num_entries();
1681 faked_root_block
= root_info
.get_root_is_fake();
1682 sequential
= root_info
.get_sequential_mode();
1684 Btree_modified
= false;
1686 for (int j
= 0; j
<= level
; j
++) {
1687 C
[j
].init(block_size
);
1688 C
[j
].rewrite
= false;
1693 changed_c
= DIR_START
;
1694 seq_count
= SEQ_START_POINT
;
1696 if (cursor_created_since_last_modification
) {
1697 cursor_created_since_last_modification
= false;
1702 /************ B-tree reading ************/
1705 GlassTable::do_open_to_read(const RootInfo
* root_info
,
1706 glass_revision_number_t rev
)
1708 LOGCALL(DB
, bool, "GlassTable::do_open_to_read", root_info
|rev
);
1710 GlassTable::throw_database_closed();
1712 handle
= io_open_block_rd(name
+ GLASS_TABLE_EXTENSION
);
1715 // This table is optional when reading!
1716 revision_number
= rev
;
1719 string
message("Couldn't open ");
1721 message
+= "DB to read: ";
1722 errno_to_string(errno
, message
);
1723 throw Xapian::DatabaseOpeningError(message
);
1726 basic_open(root_info
, rev
);
1732 GlassTable::open(int flags_
, const RootInfo
& root_info
,
1733 glass_revision_number_t rev
)
1735 LOGCALL_VOID(DB
, "GlassTable::open", flags_
|root_info
|rev
);
1739 block_size
= root_info
.get_blocksize();
1740 root
= root_info
.get_root();
1743 do_open_to_read(&root_info
, rev
);
1747 do_open_to_write(&root_info
, rev
);
1751 GlassTable::prev_for_sequential(Glass::Cursor
* C_
, int /*dummy*/) const
1753 LOGCALL(DB
, bool, "GlassTable::prev_for_sequential", Literal("C_") | Literal("/*dummy*/"));
1755 AssertRel(DIR_START
,<=,c
);
1756 AssertRel(c
,<,DIR_END(C_
[0].get_p()));
1757 if (c
== DIR_START
) {
1758 uint4 n
= C_
[0].get_n();
1761 if (n
== 0) RETURN(false);
1763 if (n
== C
[0].get_n()) {
1764 // Block is a leaf block in the built-in cursor (potentially in
1765 // modified form if the table is writable).
1766 p
= C_
[0].clone(C
[0]);
1769 // Blocks in the built-in cursor may not have been written
1770 // to disk yet, so we have to check that the block number
1771 // isn't in the built-in cursor or we'll read an
1772 // uninitialised block (for which GET_LEVEL(p) will
1773 // probably return 0).
1775 for (j
= 1; j
<= level
; ++j
) {
1776 if (n
== C
[j
].get_n()) break;
1778 if (j
<= level
) continue;
1781 // Block isn't in the built-in cursor, so the form on disk
1782 // is valid, so read it to check if it's the next level 0
1784 byte
* q
= C_
[0].init(block_size
);
1789 if (REVISION(p
) > revision_number
+ writable
) {
1793 if (GET_LEVEL(p
) == 0) break;
1796 AssertRel(DIR_START
,<,c
);
1804 GlassTable::next_for_sequential(Glass::Cursor
* C_
, int /*dummy*/) const
1806 LOGCALL(DB
, bool, "GlassTable::next_for_sequential", Literal("C_") | Literal("/*dummy*/"));
1807 const byte
* p
= C_
[0].get_p();
1810 AssertRel(c
,<,DIR_END(p
));
1812 Assert((unsigned)c
< block_size
);
1813 if (c
== DIR_END(p
)) {
1814 uint4 n
= C_
[0].get_n();
1817 if (n
>= free_list
.get_first_unused_block()) RETURN(false);
1819 if (n
== C
[0].get_n()) {
1820 // Block is a leaf block in the built-in cursor
1821 // (potentially in modified form).
1822 p
= C_
[0].clone(C
[0]);
1824 // Blocks in the built-in cursor may not have been written
1825 // to disk yet, so we have to check that the block number
1826 // isn't in the built-in cursor or we'll read an
1827 // uninitialised block (for which GET_LEVEL(p) will
1828 // probably return 0).
1830 for (j
= 1; j
<= level
; ++j
) {
1831 if (n
== C
[j
].get_n()) break;
1833 if (j
<= level
) continue;
1835 // Block isn't in the built-in cursor, so the form on disk
1836 // is valid, so read it to check if it's the next level 0
1838 byte
* q
= C_
[0].init(block_size
);
1843 byte
* q
= C_
[0].init(block_size
);
1847 if (REVISION(p
) > revision_number
+ writable
) {
1851 if (GET_LEVEL(p
) == 0) break;
1861 GlassTable::prev_default(Glass::Cursor
* C_
, int j
) const
1863 LOGCALL(DB
, bool, "GlassTable::prev_default", Literal("C_") | j
);
1864 const byte
* p
= C_
[j
].get_p();
1866 AssertRel(DIR_START
,<=,c
);
1867 AssertRel(c
,<,DIR_END(p
));
1868 AssertRel((unsigned)DIR_END(p
),<=,block_size
);
1869 if (c
== DIR_START
) {
1870 if (j
== level
) RETURN(false);
1871 if (!prev_default(C_
, j
+ 1)) RETURN(false);
1874 AssertRel(DIR_START
,<,c
);
1879 block_to_cursor(C_
, j
- 1, Item(p
, c
).block_given_by());
1885 GlassTable::next_default(Glass::Cursor
* C_
, int j
) const
1887 LOGCALL(DB
, bool, "GlassTable::next_default", Literal("C_") | j
);
1888 const byte
* p
= C_
[j
].get_p();
1890 AssertRel(c
,<,DIR_END(p
));
1891 AssertRel((unsigned)DIR_END(p
),<=,block_size
);
1894 AssertRel(DIR_START
,<,c
);
1896 AssertRel(DIR_START
,<=,c
);
1898 // Sometimes c can be DIR_END(p) + 2 here it appears...
1899 if (c
>= DIR_END(p
)) {
1900 if (j
== level
) RETURN(false);
1901 if (!next_default(C_
, j
+ 1)) RETURN(false);
1907 block_to_cursor(C_
, j
- 1, Item(p
, c
).block_given_by());
1908 #ifdef BTREE_DEBUG_FULL
1909 printf("Block in GlassTable:next_default");
1910 report_block_full(j
- 1, C_
[j
- 1].get_n(), C_
[j
- 1].get_p());
1911 #endif /* BTREE_DEBUG_FULL */
1917 GlassTable::throw_database_closed()
1919 throw Xapian::DatabaseError("Database has been closed");
1922 /** Compares this key with key2.
1924 The result is true if this key precedes key2. The comparison is for byte
1925 sequence collating order, taking lengths into account. So if the keys are
1926 made up of lower case ASCII letters we get alphabetical ordering.
1928 Now remember that items are added into the B-tree in fastest time
1929 when they are preordered by their keys. This is therefore the piece
1930 of code that needs to be followed to arrange for the preordering.
1932 This is complicated by the fact that keys have two parts - a value
1933 and then a count. We first compare the values, and only if they
1934 are equal do we compare the counts.
1937 bool Key::operator<(Key key2
) const
1939 LOGCALL(DB
, bool, "Key::operator<", static_cast<const void*>(key2
.p
));
1940 int key1_len
= length();
1941 int key2_len
= key2
.length();
1942 if (key1_len
== key2_len
) {
1943 // The keys are the same length, so we can compare the counts
1944 // in the same operation since they're stored as 2 byte
1945 // bigendian numbers.
1946 RETURN(memcmp(p
+ K1
, key2
.p
+ K1
, key1_len
+ C2
) < 0);
1949 int k_smaller
= (key2_len
< key1_len
? key2_len
: key1_len
);
1951 // Compare the common part of the keys
1952 int diff
= memcmp(p
+ K1
, key2
.p
+ K1
, k_smaller
);
1953 if (diff
!= 0) RETURN(diff
< 0);
1955 // We dealt with the "same length" case above so we never need to check
1957 RETURN(key1_len
< key2_len
);
1960 bool Key::operator==(Key key2
) const
1962 LOGCALL(DB
, bool, "Key::operator==", static_cast<const void*>(key2
.p
));
1963 int key1_len
= length();
1964 if (key1_len
!= key2
.length()) RETURN(false);
1965 // The keys are the same length, so we can compare the counts
1966 // in the same operation since they're stored as 2 byte
1967 // bigendian numbers.
1968 RETURN(memcmp(p
+ K1
, key2
.p
+ K1
, key1_len
+ C2
) == 0);