[honey] Improve spelling table key encoding
[xapian.git] / xapian-core / backends / honey / honey_compact.cc
blob5495262b1d33b97e9555dc4f4e9eecdd8f84b4fe
1 /** @file honey_compact.cc
2 * @brief Compact a honey database, or merge and compact several.
3 */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2018 Olly Betts
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation; either version 2 of the
9 * License, or (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
19 * USA
22 #include <config.h>
24 #include "xapian/compactor.h"
25 #include "xapian/constants.h"
26 #include "xapian/error.h"
27 #include "xapian/types.h"
29 #include <algorithm>
30 #include <iostream>
31 #include <memory>
32 #include <queue>
33 #include <type_traits>
35 #include <cstdio>
36 #ifdef HAVE_SYS_UIO_H
37 # include <sys/uio.h>
38 #endif
40 #include "safeerrno.h"
42 #include "backends/flint_lock.h"
43 #include "compression_stream.h"
44 #include "honey_cursor.h"
45 #include "honey_database.h"
46 #include "honey_defs.h"
47 #include "honey_postlist_encodings.h"
48 #include "honey_table.h"
49 #include "honey_version.h"
50 #include "filetests.h"
51 #include "internaltypes.h"
52 #include "pack.h"
53 #include "backends/valuestats.h"
54 #include "wordaccess.h"
56 #include "../byte_length_strings.h"
57 #include "../prefix_compressed_strings.h"
59 #ifndef DISABLE_GPL_LIBXAPIAN
60 # include "../glass/glass_database.h"
61 # include "../glass/glass_table.h"
62 # include "../glass/glass_values.h"
63 #endif
65 using namespace std;
67 #ifndef DISABLE_GPL_LIBXAPIAN
68 [[noreturn]]
69 static void
70 throw_database_corrupt(const char* item, const char* pos)
72 string message;
73 if (pos != NULL) {
74 message = "Value overflow unpacking termlist: ";
75 } else {
76 message = "Out of data unpacking termlist: ";
78 message += item;
79 throw Xapian::DatabaseCorruptError(message);
82 namespace GlassCompact {
84 static inline bool
85 is_user_metadata_key(const string & key)
87 return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
90 static inline bool
91 is_valuestats_key(const string & key)
93 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
96 static inline bool
97 is_valuechunk_key(const string & key)
99 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
102 static inline bool
103 is_doclenchunk_key(const string & key)
105 return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
110 inline static bool
111 termlist_key_is_values_used(const string& key)
113 const char* p = key.data();
114 const char* e = p + key.size();
115 Xapian::docid did;
116 if (unpack_uint_preserving_sort(&p, e, &did)) {
117 if (p == e)
118 return false;
119 if (e - p == 1 && *p == '\0')
120 return true;
122 throw Xapian::DatabaseCorruptError("termlist key format");
124 #endif
126 // Put all the helpers in a namespace to avoid symbols colliding with those of
127 // the same name in other flint-derived backends.
128 namespace HoneyCompact {
130 enum {
131 KEY_USER_METADATA = 0x00,
132 KEY_VALUE_STATS = 0x01,
133 KEY_VALUE_CHUNK = 0xd8,
134 KEY_DOCLEN_CHUNK = 0xe0,
135 KEY_POSTING_CHUNK = 0xff
138 /// Return one of the KEY_* constants, or a different value for an invalid key.
139 static inline int
140 key_type(const string& key)
142 if (key[0] != '\0')
143 return KEY_POSTING_CHUNK;
145 if (key.size() <= 1)
146 return -1;
148 switch (static_cast<unsigned char>(key[1])) {
149 case 0x01: case 0x02: case 0x03: case 0x04:
150 return KEY_VALUE_STATS;
153 // If key[1] is \xff then this correctly returns KEY_POSTING_CHUNK.
154 return static_cast<unsigned char>(key[1]);
157 class BufferedFile {
158 int fd = -1;
159 bool read_only = true;
160 mutable size_t buf_end = 0;
161 mutable char buf[4096];
163 public:
164 BufferedFile() { }
166 ~BufferedFile() {
167 if (fd >= 0) close(fd);
170 bool open(const std::string& path, bool read_only_) {
171 if (fd >= 0) close(fd);
172 read_only = read_only_;
173 if (read_only) {
174 // FIXME: add new io_open_stream_rd() etc?
175 fd = io_open_block_rd(path);
176 } else {
177 // FIXME: Always create anew for now...
178 fd = io_open_block_wr(path, true);
180 return fd >= 0;
183 off_t get_pos() const {
184 return read_only ?
185 lseek(fd, 0, SEEK_CUR) - buf_end :
186 lseek(fd, 0, SEEK_CUR) + buf_end;
189 bool empty() const {
190 if (buf_end) return false;
191 struct stat sbuf;
192 if (fd == -1 || fstat(fd, &sbuf) < 0) return true;
193 return (sbuf.st_size == 0);
196 void write(unsigned char ch) {
197 if (buf_end == sizeof(buf)) {
198 // writev()?
199 io_write(fd, buf, buf_end);
200 buf_end = 0;
202 buf[buf_end++] = ch;
205 void write(const char* p, size_t len) {
206 if (buf_end + len <= sizeof(buf)) {
207 memcpy(buf + buf_end, p, len);
208 buf_end += len;
209 return;
212 #ifdef HAVE_WRITEV
213 while (true) {
214 struct iovec iov[2];
215 iov[0].iov_base = buf;
216 iov[0].iov_len = buf_end;
217 iov[1].iov_base = const_cast<char*>(p);
218 iov[1].iov_len = len;
219 ssize_t n_ = writev(fd, iov, 2);
220 if (n_ < 0) abort();
221 size_t n = n_;
222 if (n == buf_end + len) {
223 // Wrote everything.
224 buf_end = 0;
225 return;
227 if (n >= buf_end) {
228 // Wrote all of buf.
229 n -= buf_end;
230 p += n;
231 len -= n;
232 io_write(fd, p, len);
233 buf_end = 0;
234 return;
236 buf_end -= n;
237 memmove(buf, buf + n, buf_end);
239 #else
240 io_write(fd, buf, buf_end);
241 if (len >= sizeof(buf)) {
242 // If it's bigger than our buffer, just write it directly.
243 io_write(fd, p, len);
244 buf_end = 0;
245 return;
247 memcpy(buf, p, len);
248 buf_end = len;
249 #endif
252 int read() const {
253 if (buf_end == 0) {
254 retry:
255 ssize_t r = ::read(fd, buf, sizeof(buf));
256 if (r == 0) return EOF;
257 if (r < 0) {
258 if (errno == EINTR) goto retry;
259 throw Xapian::DatabaseError("read failed", errno);
261 if (size_t(r) < sizeof(buf)) {
262 memmove(buf + sizeof(buf) - r, buf, r);
264 buf_end = r;
266 return static_cast<unsigned char>(buf[sizeof(buf) - buf_end--]);
269 bool read(char* p, size_t len) const {
270 if (len <= buf_end) {
271 memcpy(p, buf + sizeof(buf) - buf_end, len);
272 buf_end -= len;
273 return true;
275 memcpy(p, buf + sizeof(buf) - buf_end, buf_end);
276 p += buf_end;
277 len -= buf_end;
278 buf_end = 0;
279 // FIXME: refill buffer if len < sizeof(buf)
280 return ::read(fd, p, len) == ssize_t(len);
283 void flush() {
284 if (!read_only && buf_end) {
285 io_write(fd, buf, buf_end);
286 buf_end = 0;
290 void sync() {
291 io_sync(fd);
294 void rewind() {
295 read_only = true;
296 lseek(fd, 0, SEEK_SET);
297 buf_end = 0;
301 template<typename T> class PostlistCursor;
303 #ifndef DISABLE_GPL_LIBXAPIAN
304 // Convert glass to honey.
305 template<>
306 class PostlistCursor<const GlassTable&> : private GlassCursor {
307 Xapian::docid offset;
309 public:
310 string key, tag;
311 Xapian::docid firstdid;
312 Xapian::docid chunk_lastdid;
313 Xapian::termcount tf, cf;
314 Xapian::termcount first_wdf;
316 PostlistCursor(const GlassTable *in, Xapian::docid offset_)
317 : GlassCursor(in), offset(offset_), firstdid(0)
319 rewind();
320 next();
323 bool next() {
324 if (!GlassCursor::next()) return false;
325 // We put all chunks into the non-initial chunk form here, then fix up
326 // the first chunk for each term in the merged database as we merge.
327 read_tag();
328 key = current_key;
329 tag = current_tag;
330 tf = cf = 0;
331 if (GlassCompact::is_user_metadata_key(key)) {
332 key[1] = KEY_USER_METADATA;
333 return true;
335 if (GlassCompact::is_valuestats_key(key)) {
336 // Adjust key.
337 const char * p = key.data();
338 const char * end = p + key.length();
339 p += 2;
340 Xapian::valueno slot;
341 if (!unpack_uint_last(&p, end, &slot))
342 throw Xapian::DatabaseCorruptError("bad value stats key");
343 key = pack_honey_valuestats_key(slot);
344 return true;
346 if (GlassCompact::is_valuechunk_key(key)) {
347 const char * p = key.data();
348 const char * end = p + key.length();
349 p += 2;
350 Xapian::valueno slot;
351 if (!unpack_uint(&p, end, &slot))
352 throw Xapian::DatabaseCorruptError("bad value key");
353 Xapian::docid first_did;
354 if (!unpack_uint_preserving_sort(&p, end, &first_did))
355 throw Xapian::DatabaseCorruptError("bad value key");
356 first_did += offset;
358 Glass::ValueChunkReader reader(tag.data(), tag.size(), first_did);
359 Xapian::docid last_did = first_did;
360 while (reader.next(), !reader.at_end()) {
361 last_did = reader.get_docid();
364 key.assign("\0\xd8", 2);
365 pack_uint(key, slot);
366 pack_uint_preserving_sort(key, last_did);
368 // Add the docid delta across the chunk to the start of the tag.
369 string newtag;
370 pack_uint(newtag, last_did - first_did);
371 tag.insert(0, newtag);
373 return true;
376 if (GlassCompact::is_doclenchunk_key(key)) {
377 const char * d = key.data();
378 const char * e = d + key.size();
379 d += 2;
381 if (d == e) {
382 // This is an initial chunk, so adjust tag header.
383 d = tag.data();
384 e = d + tag.size();
385 if (!unpack_uint(&d, e, &tf) ||
386 !unpack_uint(&d, e, &cf) ||
387 !unpack_uint(&d, e, &firstdid)) {
388 throw Xapian::DatabaseCorruptError("Bad postlist key");
390 ++firstdid;
391 tag.erase(0, d - tag.data());
392 } else {
393 // Not an initial chunk, so adjust key.
394 size_t tmp = d - key.data();
395 if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
396 throw Xapian::DatabaseCorruptError("Bad postlist key");
397 key.erase(tmp);
399 firstdid += offset;
401 d = tag.data();
402 e = d + tag.size();
404 // Convert doclen chunk to honey format.
405 string newtag;
407 // Skip the "last chunk" flag and increase_to_last.
408 if (d == e)
409 throw Xapian::DatabaseCorruptError("No last chunk flag in "
410 "glass docdata chunk");
411 ++d;
412 Xapian::docid increase_to_last;
413 if (!unpack_uint(&d, e, &increase_to_last))
414 throw Xapian::DatabaseCorruptError("Decoding last docid delta "
415 "in glass docdata chunk");
417 Xapian::termcount doclen_max = 0;
418 while (true) {
419 Xapian::termcount doclen;
420 if (!unpack_uint(&d, e, &doclen))
421 throw Xapian::DatabaseCorruptError("Decoding doclen in "
422 "glass docdata chunk");
423 if (doclen > doclen_max)
424 doclen_max = doclen;
425 unsigned char buf[4];
426 unaligned_write4(buf, doclen);
427 newtag.append(reinterpret_cast<char*>(buf), 4);
428 if (d == e)
429 break;
430 Xapian::docid gap_size;
431 if (!unpack_uint(&d, e, &gap_size))
432 throw Xapian::DatabaseCorruptError("Decoding docid "
433 "gap_size in glass "
434 "docdata chunk");
435 // FIXME: Split chunk if the gap_size is at all large.
436 newtag.append(4 * gap_size, '\xff');
439 Assert(!startswith(newtag, "\xff\xff\xff\xff"));
440 Assert(!endswith(newtag, "\xff\xff\xff\xff"));
442 chunk_lastdid = firstdid - 1 + newtag.size() / 4;
444 // Only encode document lengths using a whole number of bytes for
445 // now. We could allow arbitrary bit widths, but it complicates
446 // encoding and decoding so we should consider if the fairly small
447 // additional saving is worth it.
448 if (doclen_max >= 0xffff) {
449 if (doclen_max >= 0xffffff) {
450 newtag.insert(0, 1, char(32));
451 swap(tag, newtag);
452 } else if (doclen_max >= 0xffffffff) {
453 // FIXME: Handle these.
454 const char* m = "Document length values >= 0xffffffff not "
455 "currently handled";
456 throw Xapian::FeatureUnavailableError(m);
457 } else {
458 tag.assign(1, char(24));
459 for (size_t i = 1; i < newtag.size(); i += 4)
460 tag.append(newtag, i, 3);
462 } else {
463 if (doclen_max >= 0xff) {
464 tag.assign(1, char(16));
465 for (size_t i = 2; i < newtag.size(); i += 4)
466 tag.append(newtag, i, 2);
467 } else {
468 tag.assign(1, char(8));
469 for (size_t i = 3; i < newtag.size(); i += 4)
470 tag.append(newtag, i, 1);
474 return true;
477 // Adjust key if this is *NOT* an initial chunk.
478 // key is: pack_string_preserving_sort(key, tname)
479 // plus optionally: pack_uint_preserving_sort(key, did)
480 const char * d = key.data();
481 const char * e = d + key.size();
482 string tname;
483 if (!unpack_string_preserving_sort(&d, e, tname))
484 throw Xapian::DatabaseCorruptError("Bad postlist key");
486 if (d == e) {
487 // This is an initial chunk for a term, so adjust tag header.
488 d = tag.data();
489 e = d + tag.size();
490 if (!unpack_uint(&d, e, &tf) ||
491 !unpack_uint(&d, e, &cf) ||
492 !unpack_uint(&d, e, &firstdid)) {
493 throw Xapian::DatabaseCorruptError("Bad postlist key");
495 ++firstdid;
496 tag.erase(0, d - tag.data());
497 } else {
498 // Not an initial chunk, so adjust key.
499 size_t tmp = d - key.data();
500 if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
501 throw Xapian::DatabaseCorruptError("Bad postlist key");
502 key.erase(tmp - 1);
504 firstdid += offset;
506 d = tag.data();
507 e = d + tag.size();
509 // Convert posting chunk to honey format, but without any header.
510 string newtag;
512 // Skip the "last chunk" flag; decode increase_to_last.
513 if (d == e)
514 throw Xapian::DatabaseCorruptError("No last chunk flag in glass "
515 "posting chunk");
516 ++d;
517 Xapian::docid increase_to_last;
518 if (!unpack_uint(&d, e, &increase_to_last))
519 throw Xapian::DatabaseCorruptError("Decoding last docid delta in "
520 "glass posting chunk");
521 chunk_lastdid = firstdid + increase_to_last;
522 if (!unpack_uint(&d, e, &first_wdf))
523 throw Xapian::DatabaseCorruptError("Decoding first wdf in glass "
524 "posting chunk");
526 while (d != e) {
527 Xapian::docid delta;
528 if (!unpack_uint(&d, e, &delta))
529 throw Xapian::DatabaseCorruptError("Decoding docid delta in "
530 "glass posting chunk");
531 pack_uint(newtag, delta);
532 Xapian::termcount wdf;
533 if (!unpack_uint(&d, e, &wdf))
534 throw Xapian::DatabaseCorruptError("Decoding wdf in glass "
535 "posting chunk");
536 pack_uint(newtag, wdf);
539 swap(tag, newtag);
541 return true;
544 #endif
546 template<>
547 class PostlistCursor<const HoneyTable&> : private HoneyCursor {
548 Xapian::docid offset;
550 public:
551 string key, tag;
552 Xapian::docid firstdid;
553 Xapian::docid chunk_lastdid;
554 Xapian::termcount tf, cf;
555 Xapian::termcount first_wdf;
557 PostlistCursor(const HoneyTable *in, Xapian::docid offset_)
558 : HoneyCursor(in), offset(offset_), firstdid(0)
560 rewind();
561 next();
564 bool next() {
565 if (!HoneyCursor::next()) return false;
566 // We put all chunks into the non-initial chunk form here, then fix up
567 // the first chunk for each term in the merged database as we merge.
568 read_tag();
569 key = current_key;
570 tag = current_tag;
571 tf = 0;
572 switch (key_type(key)) {
573 case KEY_USER_METADATA:
574 case KEY_VALUE_STATS:
575 return true;
576 case KEY_VALUE_CHUNK: {
577 const char * p = key.data();
578 const char * end = p + key.length();
579 p += 2;
580 Xapian::valueno slot;
581 if (!unpack_uint(&p, end, &slot))
582 throw Xapian::DatabaseCorruptError("bad value key");
583 Xapian::docid did;
584 if (!unpack_uint_preserving_sort(&p, end, &did))
585 throw Xapian::DatabaseCorruptError("bad value key");
586 did += offset;
588 key.assign("\0\xd8", 2);
589 pack_uint(key, slot);
590 pack_uint_preserving_sort(key, did);
591 return true;
593 case KEY_DOCLEN_CHUNK: {
594 const char * p = key.data();
595 const char * end = p + key.length();
596 p += 2;
597 Xapian::docid did = 1;
598 if (p != end &&
599 (!unpack_uint_preserving_sort(&p, end, &did) || p != end)) {
600 throw Xapian::DatabaseCorruptError("Bad doclen key");
602 did += offset;
604 key.erase(2);
605 if (did != 1) {
606 pack_uint_preserving_sort(key, did);
608 return true;
610 case KEY_POSTING_CHUNK:
611 break;
612 default:
613 throw Xapian::DatabaseCorruptError("Bad postlist table key "
614 "type");
617 // Adjust key if this is *NOT* an initial chunk.
618 // key is: pack_string_preserving_sort(key, term)
619 // plus optionally: pack_uint_preserving_sort(key, did)
620 const char * d = key.data();
621 const char * e = d + key.size();
622 string term;
623 if (!unpack_string_preserving_sort(&d, e, term))
624 throw Xapian::DatabaseCorruptError("Bad postlist key");
626 if (d == e) {
627 // This is an initial chunk for a term, so remove tag header.
628 d = tag.data();
629 e = d + tag.size();
631 Xapian::docid lastdid;
632 if (!decode_initial_chunk_header(&d, e, tf, cf,
633 firstdid, lastdid, chunk_lastdid,
634 first_wdf)) {
635 throw Xapian::DatabaseCorruptError("Bad postlist initial "
636 "chunk header");
638 // Ignore lastdid - we'll need to recalculate it (at least when
639 // merging, and for simplicity we always do).
640 (void)lastdid;
641 tag.erase(0, d - tag.data());
642 } else {
643 // Not an initial chunk, so adjust key and remove tag header.
644 size_t tmp = d - key.data();
645 if (!unpack_uint_preserving_sort(&d, e, &chunk_lastdid) ||
646 d != e) {
647 throw Xapian::DatabaseCorruptError("Bad postlist key");
649 // -1 to remove the terminating zero byte too.
650 key.erase(tmp - 1);
652 d = tag.data();
653 e = d + tag.size();
655 if (cf) {
656 if (!decode_delta_chunk_header(&d, e, chunk_lastdid, firstdid,
657 first_wdf)) {
658 throw Xapian::DatabaseCorruptError("Bad postlist delta "
659 "chunk header");
661 tag.erase(0, d - tag.data());
662 } else {
663 if (!decode_delta_chunk_header_bool(&d, e, chunk_lastdid,
664 firstdid)) {
665 throw Xapian::DatabaseCorruptError("Bad postlist delta "
666 "chunk header");
668 first_wdf = 0;
669 // Splice in a 0 wdf value after each docid delta.
670 string newtag;
671 for (size_t i = d - tag.data(); i < tag.size(); i += 4) {
672 newtag.append(tag, i, 4);
673 newtag.append(4, '\0');
675 tag = std::move(newtag);
678 firstdid += offset;
679 chunk_lastdid += offset;
680 return true;
684 template<>
685 class PostlistCursor<HoneyTable&> : public PostlistCursor<const HoneyTable&> {
686 public:
687 PostlistCursor(HoneyTable *in, Xapian::docid offset_)
688 : PostlistCursor<const HoneyTable&>(in, offset_) {}
691 template<typename T>
692 class PostlistCursorGt {
693 public:
694 /** Return true if and only if a's key is strictly greater than b's key.
696 bool operator()(const T* a, const T* b) const {
697 if (a->key > b->key) return true;
698 if (a->key != b->key) return false;
699 return (a->firstdid > b->firstdid);
703 static string
704 encode_valuestats(Xapian::doccount freq,
705 const string & lbound, const string & ubound)
707 string value;
708 pack_uint(value, freq);
709 pack_string(value, lbound);
710 // We don't store or count empty values, so neither of the bounds
711 // can be empty. So we can safely store an empty upper bound when
712 // the bounds are equal.
713 if (lbound != ubound) value += ubound;
714 return value;
717 // U : vector<HoneyTable*>::const_iterator
718 template<typename T, typename U> void
719 merge_postlists(Xapian::Compactor * compactor,
720 T * out, vector<Xapian::docid>::const_iterator offset,
721 U b, U e)
723 typedef decltype(**b) table_type; // E.g. HoneyTable
724 typedef PostlistCursor<table_type> cursor_type;
725 typedef PostlistCursorGt<cursor_type> gt_type;
726 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
727 for ( ; b != e; ++b, ++offset) {
728 auto in = *b;
729 if (in->empty()) {
730 // Skip empty tables.
731 continue;
734 pq.push(new cursor_type(in, *offset));
737 string last_key;
739 // Merge user metadata.
740 vector<string> tags;
741 while (!pq.empty()) {
742 cursor_type * cur = pq.top();
743 const string& key = cur->key;
744 if (key_type(key) != KEY_USER_METADATA) break;
746 if (key != last_key) {
747 if (!tags.empty()) {
748 if (tags.size() > 1 && compactor) {
749 Assert(!last_key.empty());
750 // FIXME: It would be better to merge all duplicates
751 // for a key in one call, but currently we don't in
752 // multipass mode.
753 const string & resolved_tag =
754 compactor->resolve_duplicate_metadata(last_key,
755 tags.size(),
756 &tags[0]);
757 out->add(last_key, resolved_tag);
758 } else {
759 Assert(!last_key.empty());
760 out->add(last_key, tags[0]);
762 tags.resize(0);
764 last_key = key;
766 tags.push_back(cur->tag);
768 pq.pop();
769 if (cur->next()) {
770 pq.push(cur);
771 } else {
772 delete cur;
775 if (!tags.empty()) {
776 if (tags.size() > 1 && compactor) {
777 Assert(!last_key.empty());
778 const string & resolved_tag =
779 compactor->resolve_duplicate_metadata(last_key,
780 tags.size(),
781 &tags[0]);
782 out->add(last_key, resolved_tag);
783 } else {
784 Assert(!last_key.empty());
785 out->add(last_key, tags[0]);
791 // Merge valuestats.
792 Xapian::doccount freq = 0;
793 string lbound, ubound;
795 while (!pq.empty()) {
796 cursor_type * cur = pq.top();
797 const string& key = cur->key;
798 if (key_type(key) != KEY_VALUE_STATS) break;
799 if (key != last_key) {
800 // For the first valuestats key, last_key will be the previous
801 // key we wrote, which we don't want to overwrite. This is the
802 // only time that freq will be 0, so check that.
803 if (freq) {
804 out->add(last_key, encode_valuestats(freq, lbound, ubound));
805 freq = 0;
807 last_key = key;
810 const string & tag = cur->tag;
812 const char * pos = tag.data();
813 const char * end = pos + tag.size();
815 Xapian::doccount f;
816 string l, u;
817 if (!unpack_uint(&pos, end, &f)) {
818 if (*pos == 0)
819 throw Xapian::DatabaseCorruptError("Incomplete stats item "
820 "in value table");
821 throw Xapian::RangeError("Frequency statistic in value table "
822 "is too large");
824 if (!unpack_string(&pos, end, l)) {
825 if (*pos == 0)
826 throw Xapian::DatabaseCorruptError("Incomplete stats item "
827 "in value table");
828 throw Xapian::RangeError("Lower bound in value table is too "
829 "large");
831 size_t len = end - pos;
832 if (len == 0) {
833 u = l;
834 } else {
835 u.assign(pos, len);
837 if (freq == 0) {
838 freq = f;
839 lbound = l;
840 ubound = u;
841 } else {
842 freq += f;
843 if (l < lbound) lbound = l;
844 if (u > ubound) ubound = u;
847 pq.pop();
848 if (cur->next()) {
849 pq.push(cur);
850 } else {
851 delete cur;
855 if (freq) {
856 out->add(last_key, encode_valuestats(freq, lbound, ubound));
860 // Merge valuestream chunks.
861 while (!pq.empty()) {
862 cursor_type * cur = pq.top();
863 const string & key = cur->key;
864 if (key_type(key) != KEY_VALUE_CHUNK) break;
865 out->add(key, cur->tag);
866 pq.pop();
867 if (cur->next()) {
868 pq.push(cur);
869 } else {
870 delete cur;
874 // Merge doclen chunks.
875 while (!pq.empty()) {
876 cursor_type * cur = pq.top();
877 string & key = cur->key;
878 if (key_type(key) != KEY_DOCLEN_CHUNK) break;
879 pack_uint_preserving_sort(key, cur->chunk_lastdid);
880 out->add(key, cur->tag);
881 pq.pop();
882 if (cur->next()) {
883 pq.push(cur);
884 } else {
885 delete cur;
889 Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
891 struct HoneyPostListChunk {
892 Xapian::docid first, last;
893 Xapian::termcount first_wdf;
894 string data;
896 HoneyPostListChunk(Xapian::docid first_,
897 Xapian::docid last_,
898 Xapian::termcount first_wdf_,
899 string&& data_)
900 : first(first_),
901 last(last_),
902 first_wdf(first_wdf_),
903 data(data_) {}
905 vector<HoneyPostListChunk> tags;
906 while (true) {
907 cursor_type * cur = NULL;
908 if (!pq.empty()) {
909 cur = pq.top();
910 pq.pop();
912 if (cur) {
913 AssertEq(key_type(cur->key), KEY_POSTING_CHUNK);
915 if (cur == NULL || cur->key != last_key) {
916 if (!tags.empty()) {
917 Xapian::termcount first_wdf = tags[0].first_wdf;
918 Xapian::docid chunk_lastdid = tags[0].last;
919 Xapian::docid last_did = tags.back().last;
921 string first_tag;
922 encode_initial_chunk_header(tf, cf, tags[0].first, last_did,
923 chunk_lastdid,
924 first_wdf, first_tag);
925 if (tf > 2) {
926 first_tag += tags[0].data;
928 out->add(last_key, first_tag);
930 // If tf == 2, the data could be split over two tags when
931 // merging, but we only output an initial tag in this case.
932 if (tf > 2 && tags.size() > 1) {
933 string term;
934 const char* p = last_key.data();
935 const char* end = p + last_key.size();
936 if (!unpack_string_preserving_sort(&p, end, term) ||
937 p != end) {
938 throw Xapian::DatabaseCorruptError("Bad postlist "
939 "chunk key");
942 auto i = tags.begin();
943 while (++i != tags.end()) {
944 last_did = i->last;
945 const string& key = pack_honey_postlist_key(term,
946 last_did);
947 string tag;
948 if (cf) {
949 encode_delta_chunk_header(i->first,
950 last_did,
951 i->first_wdf,
952 tag);
953 tag += i->data;
954 } else {
955 encode_delta_chunk_header_bool(i->first,
956 last_did,
957 tag);
958 const char* pos = i->data.data();
959 const char* pos_end = pos + i->data.size();
960 while (pos != pos_end) {
961 Xapian::docid delta;
962 if (!unpack_uint(&pos, pos_end, &delta))
963 throw_database_corrupt("Decoding docid "
964 "delta", pos);
965 pack_uint(tag, delta);
966 Xapian::termcount wdf;
967 if (!unpack_uint(&pos, pos_end, &wdf))
968 throw_database_corrupt("Decoding wdf",
969 pos);
970 // Only copy over the docid deltas, not the
971 // wdfs.
972 (void)wdf;
976 out->add(key, tag);
980 tags.clear();
981 if (cur == NULL) break;
982 tf = cf = 0;
983 last_key = cur->key;
985 if (cur->tf) {
986 tf += cur->tf;
987 cf += cur->cf;
989 tags.push_back(HoneyPostListChunk(cur->firstdid,
990 cur->chunk_lastdid,
991 cur->first_wdf,
992 std::move(cur->tag)));
993 if (cur->next()) {
994 pq.push(cur);
995 } else {
996 delete cur;
1001 template<typename T> struct MergeCursor;
1003 #ifndef DISABLE_GPL_LIBXAPIAN
1004 template<>
1005 struct MergeCursor<const GlassTable&> : public GlassCursor {
1006 explicit MergeCursor(const GlassTable *in) : GlassCursor(in) {
1007 rewind();
1008 next();
1011 #endif
1013 template<>
1014 struct MergeCursor<const HoneyTable&> : public HoneyCursor {
1015 explicit MergeCursor(const HoneyTable *in) : HoneyCursor(in) {
1016 rewind();
1017 next();
1021 template<typename T>
1022 struct CursorGt {
1023 /// Return true if and only if a's key is strictly greater than b's key.
1024 bool operator()(const T* a, const T* b) const {
1025 if (b->after_end()) return false;
1026 if (a->after_end()) return true;
1027 return (a->current_key > b->current_key);
1031 #ifndef DISABLE_GPL_LIBXAPIAN
1032 // Convert glass to honey.
1033 static void
1034 merge_spellings(HoneyTable* out,
1035 vector<const GlassTable*>::const_iterator b,
1036 vector<const GlassTable*>::const_iterator e)
1038 typedef MergeCursor<const GlassTable&> cursor_type;
1039 typedef CursorGt<cursor_type> gt_type;
1040 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
1041 for ( ; b != e; ++b) {
1042 auto in = *b;
1043 if (!in->empty()) {
1044 pq.push(new cursor_type(in));
1048 while (!pq.empty()) {
1049 cursor_type * cur = pq.top();
1050 pq.pop();
1052 // Glass vs honey spelling key prefix map:
1054 // Type Glass Honey
1056 // bookend Bbd \x00bd
1057 // head Hhe \x01he
1058 // middle Mddl \x02ddl
1059 // tail Til \x03il
1060 // word Wword word
1062 // In honey, if the word's first byte is <= '\x04', we add a prefix
1063 // of '\x04' (which is unlikely in real world use but ensures that
1064 // we can handle arbitrary strings).
1066 // The prefixes for honey are chosen such that we save a byte for the
1067 // key of all real-world words, and so that more first bytes are used
1068 // so that a top-level array index is more effective, especially for
1069 // random-access lookup of word entries (which we do to check the
1070 // frequency of potential spelling candidates).
1072 // The other prefixes are chosen such that we can do look up in key
1073 // order at search time, which is more efficient for a cursor which can
1074 // only efficiently move forwards.
1076 // Note that the key ordering is the same for glass and honey, which
1077 // makes translating during compaction simpler.
1078 string key = cur->current_key;
1079 switch (key[0]) {
1080 case 'B':
1081 key[0] = '\x00';
1082 break;
1083 case 'H':
1084 key[0] = '\x01';
1085 break;
1086 case 'M':
1087 key[0] = '\x02';
1088 break;
1089 case 'T':
1090 key[0] = '\x03';
1091 break;
1092 case 'W':
1093 if (static_cast<unsigned char>(key[1]) > 0x04)
1094 key.erase(0, 1);
1095 else
1096 key[0] = '\x04';
1097 break;
1098 default: {
1099 string m = "Bad spelling key prefix: ";
1100 m += static_cast<unsigned char>(key[0]);
1101 throw Xapian::DatabaseCorruptError(m);
1105 if (pq.empty() || pq.top()->current_key > key) {
1106 // No need to merge the tags, just copy the (possibly compressed)
1107 // tag value.
1108 bool compressed = cur->read_tag(true);
1109 out->add(key, cur->current_tag, compressed);
1110 if (cur->next()) {
1111 pq.push(cur);
1112 } else {
1113 delete cur;
1115 continue;
1118 // Merge tag values with the same key:
1119 string tag;
1120 if (static_cast<unsigned char>(key[0]) < 0x04) {
1121 // We just want the union of words, so copy over the first instance
1122 // and skip any identical ones.
1123 priority_queue<PrefixCompressedStringItor *,
1124 vector<PrefixCompressedStringItor *>,
1125 PrefixCompressedStringItorGt> pqtag;
1126 // Stick all the cursor_type pointers in a vector because their
1127 // current_tag members must remain valid while we're merging their
1128 // tags, but we need to call next() on them all afterwards.
1129 vector<cursor_type *> vec;
1130 vec.reserve(pq.size());
1132 while (true) {
1133 cur->read_tag();
1134 pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
1135 vec.push_back(cur);
1136 if (pq.empty() || pq.top()->current_key != key) break;
1137 cur = pq.top();
1138 pq.pop();
1141 PrefixCompressedStringWriter wr(tag);
1142 string lastword;
1143 while (!pqtag.empty()) {
1144 PrefixCompressedStringItor * it = pqtag.top();
1145 pqtag.pop();
1146 string word = **it;
1147 if (word != lastword) {
1148 lastword = word;
1149 wr.append(lastword);
1151 ++*it;
1152 if (!it->at_end()) {
1153 pqtag.push(it);
1154 } else {
1155 delete it;
1159 for (auto i : vec) {
1160 cur = i;
1161 if (cur->next()) {
1162 pq.push(cur);
1163 } else {
1164 delete cur;
1167 } else {
1168 // We want to sum the frequencies from tags for the same key.
1169 Xapian::termcount tot_freq = 0;
1170 while (true) {
1171 cur->read_tag();
1172 Xapian::termcount freq;
1173 const char * p = cur->current_tag.data();
1174 const char * end = p + cur->current_tag.size();
1175 if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
1176 throw_database_corrupt("Bad spelling word freq", p);
1178 tot_freq += freq;
1179 if (cur->next()) {
1180 pq.push(cur);
1181 } else {
1182 delete cur;
1184 if (pq.empty() || pq.top()->current_key != key) break;
1185 cur = pq.top();
1186 pq.pop();
1188 tag.resize(0);
1189 pack_uint_last(tag, tot_freq);
1191 out->add(key, tag);
1194 #endif
1196 static void
1197 merge_spellings(HoneyTable* out,
1198 vector<const HoneyTable*>::const_iterator b,
1199 vector<const HoneyTable*>::const_iterator e)
1201 typedef MergeCursor<const HoneyTable&> cursor_type;
1202 typedef CursorGt<cursor_type> gt_type;
1203 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
1204 for ( ; b != e; ++b) {
1205 auto in = *b;
1206 if (!in->empty()) {
1207 pq.push(new cursor_type(in));
1211 while (!pq.empty()) {
1212 cursor_type * cur = pq.top();
1213 pq.pop();
1215 string key = cur->current_key;
1216 if (pq.empty() || pq.top()->current_key > key) {
1217 // No need to merge the tags, just copy the (possibly compressed)
1218 // tag value.
1219 bool compressed = cur->read_tag(true);
1220 out->add(key, cur->current_tag, compressed);
1221 if (cur->next()) {
1222 pq.push(cur);
1223 } else {
1224 delete cur;
1226 continue;
1229 // Merge tag values with the same key:
1230 string tag;
1231 if (static_cast<unsigned char>(key[0]) < 0x04) {
1232 // We just want the union of words, so copy over the first instance
1233 // and skip any identical ones.
1234 priority_queue<PrefixCompressedStringItor *,
1235 vector<PrefixCompressedStringItor *>,
1236 PrefixCompressedStringItorGt> pqtag;
1237 // Stick all the cursor_type pointers in a vector because their
1238 // current_tag members must remain valid while we're merging their
1239 // tags, but we need to call next() on them all afterwards.
1240 vector<cursor_type *> vec;
1241 vec.reserve(pq.size());
1243 while (true) {
1244 cur->read_tag();
1245 pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
1246 vec.push_back(cur);
1247 if (pq.empty() || pq.top()->current_key != key) break;
1248 cur = pq.top();
1249 pq.pop();
1252 PrefixCompressedStringWriter wr(tag);
1253 string lastword;
1254 while (!pqtag.empty()) {
1255 PrefixCompressedStringItor * it = pqtag.top();
1256 pqtag.pop();
1257 string word = **it;
1258 if (word != lastword) {
1259 lastword = word;
1260 wr.append(lastword);
1262 ++*it;
1263 if (!it->at_end()) {
1264 pqtag.push(it);
1265 } else {
1266 delete it;
1270 for (auto i : vec) {
1271 cur = i;
1272 if (cur->next()) {
1273 pq.push(cur);
1274 } else {
1275 delete cur;
1278 } else {
1279 // We want to sum the frequencies from tags for the same key.
1280 Xapian::termcount tot_freq = 0;
1281 while (true) {
1282 cur->read_tag();
1283 Xapian::termcount freq;
1284 const char * p = cur->current_tag.data();
1285 const char * end = p + cur->current_tag.size();
1286 if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
1287 throw_database_corrupt("Bad spelling word freq", p);
1289 tot_freq += freq;
1290 if (cur->next()) {
1291 pq.push(cur);
1292 } else {
1293 delete cur;
1295 if (pq.empty() || pq.top()->current_key != key) break;
1296 cur = pq.top();
1297 pq.pop();
1299 tag.resize(0);
1300 pack_uint_last(tag, tot_freq);
1302 out->add(key, tag);
1306 // U : vector<HoneyTable*>::const_iterator
1307 template<typename T, typename U> void
1308 merge_synonyms(T* out, U b, U e)
1310 typedef decltype(**b) table_type; // E.g. HoneyTable
1311 typedef MergeCursor<table_type> cursor_type;
1312 typedef CursorGt<cursor_type> gt_type;
1313 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
1314 for ( ; b != e; ++b) {
1315 auto in = *b;
1316 if (!in->empty()) {
1317 pq.push(new cursor_type(in));
1321 while (!pq.empty()) {
1322 cursor_type * cur = pq.top();
1323 pq.pop();
1325 string key = cur->current_key;
1326 if (pq.empty() || pq.top()->current_key > key) {
1327 // No need to merge the tags, just copy the (possibly compressed)
1328 // tag value.
1329 bool compressed = cur->read_tag(true);
1330 out->add(key, cur->current_tag, compressed);
1331 if (cur->next()) {
1332 pq.push(cur);
1333 } else {
1334 delete cur;
1336 continue;
1339 // Merge tag values with the same key:
1340 string tag;
1342 // We just want the union of words, so copy over the first instance
1343 // and skip any identical ones.
1344 priority_queue<ByteLengthPrefixedStringItor *,
1345 vector<ByteLengthPrefixedStringItor *>,
1346 ByteLengthPrefixedStringItorGt> pqtag;
1347 vector<cursor_type *> vec;
1349 while (true) {
1350 cur->read_tag();
1351 pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
1352 vec.push_back(cur);
1353 if (pq.empty() || pq.top()->current_key != key) break;
1354 cur = pq.top();
1355 pq.pop();
1358 string lastword;
1359 while (!pqtag.empty()) {
1360 ByteLengthPrefixedStringItor * it = pqtag.top();
1361 pqtag.pop();
1362 if (**it != lastword) {
1363 lastword = **it;
1364 tag += byte(lastword.size() ^ MAGIC_XOR_VALUE);
1365 tag += lastword;
1367 ++*it;
1368 if (!it->at_end()) {
1369 pqtag.push(it);
1370 } else {
1371 delete it;
1375 for (auto i : vec) {
1376 cur = i;
1377 if (cur->next()) {
1378 pq.push(cur);
1379 } else {
1380 delete cur;
1384 out->add(key, tag);
1388 template<typename T, typename U> void
1389 multimerge_postlists(Xapian::Compactor * compactor,
1390 T* out, const char * tmpdir,
1391 const vector<U*>& in,
1392 vector<Xapian::docid> off)
1394 if (in.size() <= 3) {
1395 merge_postlists(compactor, out, off.begin(), in.begin(), in.end());
1396 return;
1398 unsigned int c = 0;
1399 vector<HoneyTable *> tmp;
1400 tmp.reserve(in.size() / 2);
1402 vector<Xapian::docid> newoff;
1403 newoff.resize(in.size() / 2);
1404 for (unsigned int i = 0, j; i < in.size(); i = j) {
1405 j = i + 2;
1406 if (j == in.size() - 1) ++j;
1408 string dest = tmpdir;
1409 char buf[64];
1410 sprintf(buf, "/tmp%u_%u.", c, i / 2);
1411 dest += buf;
1413 HoneyTable * tmptab = new HoneyTable("postlist", dest, false);
1415 // Use maximum blocksize for temporary tables. And don't compress
1416 // entries in temporary tables, even if the final table would do
1417 // so. Any already compressed entries will get copied in
1418 // compressed form. (FIXME: HoneyTable has no blocksize)
1419 Honey::RootInfo root_info;
1420 root_info.init(65536, 0);
1421 const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
1422 tmptab->create_and_open(flags, root_info);
1424 merge_postlists(compactor, tmptab, off.begin() + i,
1425 in.begin() + i, in.begin() + j);
1426 tmp.push_back(tmptab);
1427 tmptab->flush_db();
1428 tmptab->commit(1, &root_info);
1429 AssertRel(root_info.get_blocksize(),==,65536);
1431 swap(off, newoff);
1432 ++c;
1435 while (tmp.size() > 3) {
1436 vector<HoneyTable *> tmpout;
1437 tmpout.reserve(tmp.size() / 2);
1438 vector<Xapian::docid> newoff;
1439 newoff.resize(tmp.size() / 2);
1440 for (unsigned int i = 0, j; i < tmp.size(); i = j) {
1441 j = i + 2;
1442 if (j == tmp.size() - 1) ++j;
1444 string dest = tmpdir;
1445 char buf[64];
1446 sprintf(buf, "/tmp%u_%u.", c, i / 2);
1447 dest += buf;
1449 HoneyTable * tmptab = new HoneyTable("postlist", dest, false);
1451 // Use maximum blocksize for temporary tables. And don't compress
1452 // entries in temporary tables, even if the final table would do
1453 // so. Any already compressed entries will get copied in
1454 // compressed form. (FIXME: HoneyTable has no blocksize)
1455 Honey::RootInfo root_info;
1456 root_info.init(65536, 0);
1457 const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
1458 tmptab->create_and_open(flags, root_info);
1460 merge_postlists(compactor, tmptab, off.begin() + i,
1461 tmp.begin() + i, tmp.begin() + j);
1462 if (c > 0) {
1463 for (unsigned int k = i; k < j; ++k) {
1464 // FIXME: unlink(tmp[k]->get_path().c_str());
1465 delete tmp[k];
1466 tmp[k] = NULL;
1469 tmpout.push_back(tmptab);
1470 tmptab->flush_db();
1471 tmptab->commit(1, &root_info);
1472 AssertRel(root_info.get_blocksize(),==,65536);
1474 swap(tmp, tmpout);
1475 swap(off, newoff);
1476 ++c;
1478 merge_postlists(compactor, out, off.begin(), tmp.begin(), tmp.end());
1479 if (c > 0) {
1480 for (size_t k = 0; k < tmp.size(); ++k) {
1481 // FIXME: unlink(tmp[k]->get_path().c_str());
1482 delete tmp[k];
1483 tmp[k] = NULL;
1488 template<typename T> class PositionCursor;
1490 #ifndef DISABLE_GPL_LIBXAPIAN
1491 template<>
1492 class PositionCursor<const GlassTable&> : private GlassCursor {
1493 Xapian::docid offset;
1495 public:
1496 string key;
1497 Xapian::docid firstdid;
1499 PositionCursor(const GlassTable *in, Xapian::docid offset_)
1500 : GlassCursor(in), offset(offset_), firstdid(0) {
1501 rewind();
1502 next();
1505 bool next() {
1506 if (!GlassCursor::next()) return false;
1507 read_tag();
1508 const char * d = current_key.data();
1509 const char * e = d + current_key.size();
1510 string term;
1511 Xapian::docid did;
1512 if (!unpack_string_preserving_sort(&d, e, term) ||
1513 !unpack_uint_preserving_sort(&d, e, &did) ||
1514 d != e) {
1515 throw Xapian::DatabaseCorruptError("Bad position key");
1518 key.resize(0);
1519 pack_string_preserving_sort(key, term);
1520 pack_uint_preserving_sort(key, did + offset);
1521 return true;
1524 const string & get_tag() const {
1525 return current_tag;
1528 #endif
1530 template<>
1531 class PositionCursor<const HoneyTable&> : private HoneyCursor {
1532 Xapian::docid offset;
1534 public:
1535 string key;
1536 Xapian::docid firstdid;
1538 PositionCursor(const HoneyTable *in, Xapian::docid offset_)
1539 : HoneyCursor(in), offset(offset_), firstdid(0) {
1540 rewind();
1541 next();
1544 bool next() {
1545 if (!HoneyCursor::next()) return false;
1546 read_tag();
1547 const char * d = current_key.data();
1548 const char * e = d + current_key.size();
1549 string term;
1550 Xapian::docid did;
1551 if (!unpack_string_preserving_sort(&d, e, term) ||
1552 !unpack_uint_preserving_sort(&d, e, &did) ||
1553 d != e) {
1554 throw Xapian::DatabaseCorruptError("Bad position key");
1557 key.resize(0);
1558 pack_string_preserving_sort(key, term);
1559 pack_uint_preserving_sort(key, did + offset);
1560 return true;
1563 const string & get_tag() const {
1564 return current_tag;
1568 template<typename T>
1569 class PositionCursorGt {
1570 public:
1571 /** Return true if and only if a's key is strictly greater than b's key.
1573 bool operator()(const T* a, const T* b) const {
1574 return a->key > b->key;
1578 template<typename T, typename U> void
1579 merge_positions(T* out, const vector<U*> & inputs,
1580 const vector<Xapian::docid> & offset)
1582 typedef decltype(*inputs[0]) table_type; // E.g. HoneyTable
1583 typedef PositionCursor<table_type> cursor_type;
1584 typedef PositionCursorGt<cursor_type> gt_type;
1585 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
1586 for (size_t i = 0; i < inputs.size(); ++i) {
1587 auto in = inputs[i];
1588 if (in->empty()) {
1589 // Skip empty tables.
1590 continue;
1593 pq.push(new cursor_type(in, offset[i]));
1596 while (!pq.empty()) {
1597 cursor_type * cur = pq.top();
1598 pq.pop();
1599 out->add(cur->key, cur->get_tag());
1600 if (cur->next()) {
1601 pq.push(cur);
1602 } else {
1603 delete cur;
1608 template<typename T, typename U> void
1609 merge_docid_keyed(T *out, const vector<U*> & inputs,
1610 const vector<Xapian::docid> & offset,
1611 int = 0)
1613 for (size_t i = 0; i < inputs.size(); ++i) {
1614 Xapian::docid off = offset[i];
1616 auto in = inputs[i];
1617 if (in->empty()) continue;
1619 HoneyCursor cur(in);
1620 cur.rewind();
1622 string key;
1623 while (cur.next()) {
1624 // Adjust the key if this isn't the first database.
1625 if (off) {
1626 Xapian::docid did;
1627 const char * d = cur.current_key.data();
1628 const char * e = d + cur.current_key.size();
1629 if (!unpack_uint_preserving_sort(&d, e, &did)) {
1630 string msg = "Bad key in ";
1631 msg += inputs[i]->get_path();
1632 throw Xapian::DatabaseCorruptError(msg);
1634 did += off;
1635 key.resize(0);
1636 pack_uint_preserving_sort(key, did);
1637 if (d != e) {
1638 // Copy over anything extra in the key (e.g. the zero byte
1639 // at the end of "used value slots" in the termlist table).
1640 key.append(d, e - d);
1642 } else {
1643 key = cur.current_key;
1645 bool compressed = cur.read_tag(true);
1646 out->add(key, cur.current_tag, compressed);
1651 #ifndef DISABLE_GPL_LIBXAPIAN
1652 template<typename T> void
1653 merge_docid_keyed(T *out, const vector<const GlassTable*> & inputs,
1654 const vector<Xapian::docid> & offset,
1655 int table_type = 0)
1657 for (size_t i = 0; i < inputs.size(); ++i) {
1658 Xapian::docid off = offset[i];
1660 auto in = inputs[i];
1661 if (in->empty()) continue;
1663 GlassCursor cur(in);
1664 cur.rewind();
1666 string key;
1667 while (cur.next()) {
1668 next_without_next:
1669 // Adjust the key if this isn't the first database.
1670 if (off) {
1671 Xapian::docid did;
1672 const char * d = cur.current_key.data();
1673 const char * e = d + cur.current_key.size();
1674 if (!unpack_uint_preserving_sort(&d, e, &did)) {
1675 string msg = "Bad key in ";
1676 msg += inputs[i]->get_path();
1677 throw Xapian::DatabaseCorruptError(msg);
1679 did += off;
1680 key.resize(0);
1681 pack_uint_preserving_sort(key, did);
1682 if (d != e) {
1683 // Copy over anything extra in the key (e.g. the zero byte
1684 // at the end of "used value slots" in the termlist table).
1685 key.append(d, e - d);
1687 } else {
1688 key = cur.current_key;
1690 if (table_type == Honey::TERMLIST) {
1691 if (termlist_key_is_values_used(key)) {
1692 throw Xapian::DatabaseCorruptError("value slots used "
1693 "entry without a "
1694 "corresponding "
1695 "termlist entry");
1697 cur.read_tag();
1698 string tag = cur.current_tag;
1700 bool next_result = cur.next();
1701 bool next_already_done = true;
1702 unsigned bitmap_slots_used = 0;
1703 string encoded_slots_used;
1704 if (next_result &&
1705 termlist_key_is_values_used(cur.current_key)) {
1706 next_already_done = false;
1707 cur.read_tag();
1708 const string& valtag = cur.current_tag;
1710 const char* p = valtag.data();
1711 const char* end = p + valtag.size();
1713 Xapian::VecCOW<Xapian::termpos> slots;
1715 Xapian::valueno first_slot;
1716 if (!unpack_uint(&p, end, &first_slot)) {
1717 throw_database_corrupt("Value slot encoding corrupt",
1720 slots.push_back(first_slot);
1721 Xapian::valueno last_slot = first_slot;
1722 while (p != end) {
1723 Xapian::valueno slot;
1724 if (!unpack_uint(&p, end, &slot)) {
1725 throw_database_corrupt("Value slot encoding "
1726 "corrupt", p);
1728 slot += last_slot + 1;
1729 last_slot = slot;
1731 slots.push_back(slot);
1734 if (slots.back() <= 6) {
1735 // Encode as a bitmap if only slots in the range 0-6
1736 // are used.
1737 for (auto slot : slots) {
1738 bitmap_slots_used |= 1 << slot;
1740 } else {
1741 string enc;
1742 pack_uint(enc, last_slot);
1743 if (slots.size() > 1) {
1744 BitWriter slots_used(enc);
1745 slots_used.encode(first_slot, last_slot);
1746 slots_used.encode(slots.size() - 2,
1747 last_slot - first_slot);
1748 slots_used.encode_interpolative(slots, 0,
1749 slots.size() - 1);
1750 encoded_slots_used = slots_used.freeze();
1751 } else {
1752 encoded_slots_used = std::move(enc);
1757 const char* pos = tag.data();
1758 const char* end = pos + tag.size();
1760 string newtag;
1761 if (encoded_slots_used.empty()) {
1762 newtag += char(bitmap_slots_used);
1763 } else {
1764 auto size = encoded_slots_used.size();
1765 if (size < 0x80) {
1766 newtag += char(0x80 | size);
1767 } else {
1768 newtag += '\x80';
1769 pack_uint(newtag, size);
1771 newtag += encoded_slots_used;
1774 if (pos != end) {
1775 Xapian::termcount doclen;
1776 if (!unpack_uint(&pos, end, &doclen)) {
1777 throw_database_corrupt("doclen", pos);
1779 Xapian::termcount termlist_size;
1780 if (!unpack_uint(&pos, end, &termlist_size)) {
1781 throw_database_corrupt("termlist length", pos);
1783 pack_uint(newtag, termlist_size - 1);
1784 pack_uint(newtag, doclen);
1786 string current_term;
1787 while (pos != end) {
1788 Xapian::termcount current_wdf = 0;
1790 if (!current_term.empty()) {
1791 size_t reuse = static_cast<unsigned char>(*pos++);
1792 newtag += char(reuse);
1794 if (reuse > current_term.size()) {
1795 current_wdf = reuse / (current_term.size() + 1);
1796 reuse = reuse % (current_term.size() + 1);
1798 current_term.resize(reuse);
1800 if (pos == end)
1801 throw_database_corrupt("term", NULL);
1804 size_t append = static_cast<unsigned char>(*pos++);
1805 if (size_t(end - pos) < append)
1806 throw_database_corrupt("term", NULL);
1808 current_term.append(pos, append);
1809 pos += append;
1811 if (current_wdf) {
1812 --current_wdf;
1813 } else {
1814 if (!unpack_uint(&pos, end, &current_wdf)) {
1815 throw_database_corrupt("wdf", pos);
1817 pack_uint(newtag, current_wdf);
1820 newtag += char(append);
1821 newtag.append(current_term.end() - append,
1822 current_term.end());
1825 if (!newtag.empty())
1826 out->add(key, newtag);
1827 if (!next_result) break;
1828 if (next_already_done) goto next_without_next;
1829 } else {
1830 bool compressed = cur.read_tag(true);
1831 out->add(key, cur.current_tag, compressed);
1836 #endif
1840 using namespace HoneyCompact;
1842 void
1843 HoneyDatabase::compact(Xapian::Compactor* compactor,
1844 const char* destdir,
1845 int fd,
1846 int source_backend,
1847 const vector<const Xapian::Database::Internal*>& sources,
1848 const vector<Xapian::docid>& offset,
1849 size_t block_size,
1850 Xapian::Compactor::compaction_level compaction,
1851 unsigned flags,
1852 Xapian::docid last_docid)
1854 struct table_list {
1855 // The "base name" of the table.
1856 char name[9];
1857 // The type.
1858 Honey::table_type type;
1859 // Create tables after position lazily.
1860 bool lazy;
1863 static const table_list tables[] = {
1864 // name type lazy
1865 { "postlist", Honey::POSTLIST, false },
1866 { "docdata", Honey::DOCDATA, true },
1867 { "termlist", Honey::TERMLIST, false },
1868 { "position", Honey::POSITION, true },
1869 { "spelling", Honey::SPELLING, true },
1870 { "synonym", Honey::SYNONYM, true }
1872 const table_list * tables_end = tables +
1873 (sizeof(tables) / sizeof(tables[0]));
1875 const int FLAGS = Xapian::DB_DANGEROUS;
1877 bool single_file = (flags & Xapian::DBCOMPACT_SINGLE_FILE);
1878 bool multipass = (flags & Xapian::DBCOMPACT_MULTIPASS);
1879 if (single_file) {
1880 // FIXME: Support this combination - we need to put temporary files
1881 // somewhere.
1882 multipass = false;
1885 if (single_file) {
1886 for (size_t i = 0; i != sources.size(); ++i) {
1887 bool has_uncommitted_changes;
1888 if (source_backend == Xapian::DB_BACKEND_GLASS) {
1889 auto db = static_cast<const GlassDatabase*>(sources[i]);
1890 has_uncommitted_changes = db->has_uncommitted_changes();
1891 } else {
1892 auto db = static_cast<const HoneyDatabase*>(sources[i]);
1893 has_uncommitted_changes = db->has_uncommitted_changes();
1895 if (has_uncommitted_changes) {
1896 const char * m =
1897 "Can't compact from a WritableDatabase with uncommitted "
1898 "changes - either call commit() first, or create a new "
1899 "Database object from the filename on disk";
1900 throw Xapian::InvalidOperationError(m);
1905 if (block_size < HONEY_MIN_BLOCKSIZE ||
1906 block_size > HONEY_MAX_BLOCKSIZE ||
1907 (block_size & (block_size - 1)) != 0) {
1908 block_size = HONEY_DEFAULT_BLOCKSIZE;
1911 FlintLock lock(destdir ? destdir : "");
1912 if (!single_file) {
1913 string explanation;
1914 FlintLock::reason why = lock.lock(true, false, explanation);
1915 if (why != FlintLock::SUCCESS) {
1916 lock.throw_databaselockerror(why, destdir, explanation);
1920 unique_ptr<HoneyVersion> version_file_out;
1921 if (single_file) {
1922 if (destdir) {
1923 fd = open(destdir, O_RDWR|O_CREAT|O_TRUNC|O_BINARY|O_CLOEXEC, 0666);
1924 if (fd < 0) {
1925 throw Xapian::DatabaseCreateError("open() failed", errno);
1928 version_file_out.reset(new HoneyVersion(fd));
1929 } else {
1930 fd = -1;
1931 version_file_out.reset(new HoneyVersion(destdir));
1934 version_file_out->create(block_size);
1935 for (size_t i = 0; i != sources.size(); ++i) {
1936 if (source_backend == Xapian::DB_BACKEND_GLASS) {
1937 #ifdef DISABLE_GPL_LIBXAPIAN
1938 Assert(false);
1939 #else
1940 auto db = static_cast<const GlassDatabase*>(sources[i]);
1941 auto& v_in = db->version_file;
1942 auto& v_out = version_file_out;
1943 v_out->merge_stats(v_in.get_doccount(),
1944 v_in.get_doclength_lower_bound(),
1945 v_in.get_doclength_upper_bound(),
1946 v_in.get_wdf_upper_bound(),
1947 v_in.get_total_doclen(),
1948 v_in.get_spelling_wordfreq_upper_bound());
1949 #endif
1950 } else {
1951 auto db = static_cast<const HoneyDatabase*>(sources[i]);
1952 version_file_out->merge_stats(db->version_file);
1956 string fl_serialised;
1957 #if 0
1958 if (single_file) {
1959 HoneyFreeList fl;
1960 fl.set_first_unused_block(1); // FIXME: Assumption?
1961 fl.pack(fl_serialised);
1963 #endif
1965 // Set to true if stat() failed (which can happen if the files are > 2GB
1966 // and off_t is 32 bit) or one of the totals overflowed.
1967 bool bad_totals = false;
1968 off_t in_total = 0, out_total = 0;
1970 // FIXME: sort out indentation.
1971 if (source_backend == Xapian::DB_BACKEND_GLASS) {
1972 #ifdef DISABLE_GPL_LIBXAPIAN
1973 throw Xapian::FeatureUnavailableError("Glass backend disabled");
1974 #else
1975 vector<HoneyTable *> tabs;
1976 tabs.reserve(tables_end - tables);
1977 off_t prev_size = block_size;
1978 for (const table_list * t = tables; t < tables_end; ++t) {
1979 // The postlist table requires an N-way merge, adjusting the
1980 // headers of various blocks. The spelling and synonym tables also
1981 // need special handling. The other tables have keys sorted in
1982 // docid order, so we can merge them by simply copying all the keys
1983 // from each source table in turn.
1984 if (compactor)
1985 compactor->set_status(t->name, string());
1987 string dest;
1988 if (!single_file) {
1989 dest = destdir;
1990 dest += '/';
1991 dest += t->name;
1992 dest += '.';
1995 bool output_will_exist = !t->lazy;
1997 // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
1998 // on certain systems).
1999 bool bad_stat = false;
2001 // We can't currently report input sizes if there's a single file DB
2002 // amongst the inputs.
2003 bool single_file_in = false;
2005 off_t in_size = 0;
2007 vector<const GlassTable*> inputs;
2008 inputs.reserve(sources.size());
2009 size_t inputs_present = 0;
2010 for (auto src : sources) {
2011 auto db = static_cast<const GlassDatabase*>(src);
2012 const GlassTable * table;
2013 switch (t->type) {
2014 case Honey::POSTLIST:
2015 table = &(db->postlist_table);
2016 break;
2017 case Honey::DOCDATA:
2018 table = &(db->docdata_table);
2019 break;
2020 case Honey::TERMLIST:
2021 table = &(db->termlist_table);
2022 break;
2023 case Honey::POSITION:
2024 table = &(db->position_table);
2025 break;
2026 case Honey::SPELLING:
2027 table = &(db->spelling_table);
2028 break;
2029 case Honey::SYNONYM:
2030 table = &(db->synonym_table);
2031 break;
2032 default:
2033 Assert(false);
2034 return;
2037 if (db->single_file()) {
2038 if (t->lazy && table->empty()) {
2039 // Essentially doesn't exist.
2040 } else {
2041 // FIXME: Find actual size somehow?
2042 // in_size += table->size() / 1024;
2043 single_file_in = true;
2044 bad_totals = true;
2045 output_will_exist = true;
2046 ++inputs_present;
2048 } else {
2049 off_t db_size = file_size(table->get_path());
2050 if (errno == 0) {
2051 // FIXME: check overflow and set bad_totals
2052 in_total += db_size;
2053 in_size += db_size / 1024;
2054 output_will_exist = true;
2055 ++inputs_present;
2056 } else if (errno != ENOENT) {
2057 // We get ENOENT for an optional table.
2058 bad_totals = bad_stat = true;
2059 output_will_exist = true;
2060 ++inputs_present;
2063 inputs.push_back(table);
2066 // If any inputs lack a termlist table, suppress it in the output.
2067 if (t->type == Honey::TERMLIST && inputs_present != sources.size()) {
2068 if (inputs_present != 0) {
2069 if (compactor) {
2070 string m = str(inputs_present);
2071 m += " of ";
2072 m += str(sources.size());
2073 m += " inputs present, so suppressing output";
2074 compactor->set_status(t->name, m);
2076 continue;
2078 output_will_exist = false;
2081 if (!output_will_exist) {
2082 if (compactor)
2083 compactor->set_status(t->name, "doesn't exist");
2084 continue;
2087 HoneyTable * out;
2088 off_t table_start_offset = -1;
2089 if (single_file) {
2090 if (t == tables) {
2091 // Start first table HONEY_VERSION_MAX_SIZE bytes in to allow
2092 // space for version file. It's tricky to exactly know the
2093 // size of the version file beforehand.
2094 table_start_offset = HONEY_VERSION_MAX_SIZE;
2095 if (lseek(fd, table_start_offset, SEEK_SET) < 0)
2096 throw Xapian::DatabaseError("lseek() failed", errno);
2097 } else {
2098 table_start_offset = lseek(fd, 0, SEEK_CUR);
2100 out = new HoneyTable(t->name, fd, version_file_out->get_offset(),
2101 false, false);
2102 } else {
2103 out = new HoneyTable(t->name, dest, false, t->lazy);
2105 tabs.push_back(out);
2106 Honey::RootInfo * root_info = version_file_out->root_to_set(t->type);
2107 if (single_file) {
2108 root_info->set_free_list(fl_serialised);
2109 root_info->set_offset(table_start_offset);
2110 out->open(FLAGS,
2111 version_file_out->get_root(t->type),
2112 version_file_out->get_revision());
2113 } else {
2114 out->create_and_open(FLAGS, *root_info);
2117 out->set_full_compaction(compaction != compactor->STANDARD);
2118 if (compaction == compactor->FULLER) out->set_max_item_size(1);
2120 switch (t->type) {
2121 case Honey::POSTLIST: {
2122 if (multipass && inputs.size() > 3) {
2123 multimerge_postlists(compactor, out, destdir,
2124 inputs, offset);
2125 } else {
2126 merge_postlists(compactor, out, offset.begin(),
2127 inputs.begin(), inputs.end());
2129 break;
2131 case Honey::SPELLING:
2132 merge_spellings(out, inputs.cbegin(), inputs.cend());
2133 break;
2134 case Honey::SYNONYM:
2135 merge_synonyms(out, inputs.begin(), inputs.end());
2136 break;
2137 case Honey::POSITION:
2138 merge_positions(out, inputs, offset);
2139 break;
2140 default:
2141 // DocData, Termlist
2142 merge_docid_keyed(out, inputs, offset, t->type);
2143 break;
2146 // Commit as revision 1.
2147 out->flush_db();
2148 out->commit(1, root_info);
2149 out->sync();
2150 if (single_file) fl_serialised = root_info->get_free_list();
2152 off_t out_size = 0;
2153 if (!bad_stat && !single_file_in) {
2154 off_t db_size;
2155 if (single_file) {
2156 db_size = file_size(fd);
2157 } else {
2158 db_size = file_size(dest + HONEY_TABLE_EXTENSION);
2160 if (errno == 0) {
2161 if (single_file) {
2162 off_t old_prev_size = max(prev_size, off_t(block_size));
2163 prev_size = db_size;
2164 db_size -= old_prev_size;
2166 // FIXME: check overflow and set bad_totals
2167 out_total += db_size;
2168 out_size = db_size / 1024;
2169 } else if (errno != ENOENT) {
2170 bad_totals = bad_stat = true;
2173 if (bad_stat) {
2174 if (compactor)
2175 compactor->set_status(t->name,
2176 "Done (couldn't stat all the DB files)");
2177 } else if (single_file_in) {
2178 if (compactor)
2179 compactor->set_status(t->name,
2180 "Done (table sizes unknown for single "
2181 "file DB input)");
2182 } else {
2183 string status;
2184 if (out_size == in_size) {
2185 status = "Size unchanged (";
2186 } else {
2187 off_t delta;
2188 if (out_size < in_size) {
2189 delta = in_size - out_size;
2190 status = "Reduced by ";
2191 } else {
2192 delta = out_size - in_size;
2193 status = "INCREASED by ";
2195 if (in_size) {
2196 status += str(100 * delta / in_size);
2197 status += "% ";
2199 status += str(delta);
2200 status += "K (";
2201 status += str(in_size);
2202 status += "K -> ";
2204 status += str(out_size);
2205 status += "K)";
2206 if (compactor)
2207 compactor->set_status(t->name, status);
2211 // If compacting to a single file output and all the tables are empty, pad
2212 // the output so that it isn't mistaken for a stub database when we try to
2213 // open it. For this it needs to be a multiple of 2KB in size.
2214 if (single_file && prev_size < off_t(block_size)) {
2215 #ifdef HAVE_FTRUNCATE
2216 if (ftruncate(fd, block_size) < 0) {
2217 throw Xapian::DatabaseError("Failed to set size of output "
2218 "database", errno);
2220 #else
2221 const off_t off = block_size - 1;
2222 if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
2223 throw Xapian::DatabaseError("Failed to set size of output "
2224 "database", errno);
2226 #endif
2229 if (single_file) {
2230 if (lseek(fd, version_file_out->get_offset(), SEEK_SET) == -1) {
2231 throw Xapian::DatabaseError("lseek() failed", errno);
2234 version_file_out->set_last_docid(last_docid);
2235 string tmpfile = version_file_out->write(1, FLAGS);
2236 if (single_file) {
2237 off_t version_file_size = lseek(fd, 0, SEEK_CUR);
2238 if (version_file_size < 0) {
2239 throw Xapian::DatabaseError("lseek() failed", errno);
2241 if (version_file_size > HONEY_VERSION_MAX_SIZE) {
2242 throw Xapian::DatabaseError("Didn't allow enough space for "
2243 "version file data");
2246 for (unsigned j = 0; j != tabs.size(); ++j) {
2247 tabs[j]->sync();
2249 // Commit with revision 1.
2250 version_file_out->sync(tmpfile, 1, FLAGS);
2251 for (unsigned j = 0; j != tabs.size(); ++j) {
2252 delete tabs[j];
2254 #endif
2255 } else {
2256 vector<HoneyTable *> tabs;
2257 tabs.reserve(tables_end - tables);
2258 off_t prev_size = block_size;
2259 for (const table_list * t = tables; t < tables_end; ++t) {
2260 // The postlist table requires an N-way merge, adjusting the
2261 // headers of various blocks. The spelling and synonym tables also
2262 // need special handling. The other tables have keys sorted in
2263 // docid order, so we can merge them by simply copying all the keys
2264 // from each source table in turn.
2265 if (compactor)
2266 compactor->set_status(t->name, string());
2268 string dest;
2269 if (!single_file) {
2270 dest = destdir;
2271 dest += '/';
2272 dest += t->name;
2273 dest += '.';
2276 bool output_will_exist = !t->lazy;
2278 // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
2279 // on certain systems).
2280 bool bad_stat = false;
2282 // We can't currently report input sizes if there's a single file DB
2283 // amongst the inputs.
2284 bool single_file_in = false;
2286 off_t in_size = 0;
2288 vector<const HoneyTable*> inputs;
2289 inputs.reserve(sources.size());
2290 size_t inputs_present = 0;
2291 for (auto src : sources) {
2292 auto db = static_cast<const HoneyDatabase*>(src);
2293 const HoneyTable * table;
2294 switch (t->type) {
2295 case Honey::POSTLIST:
2296 table = &(db->postlist_table);
2297 break;
2298 case Honey::DOCDATA:
2299 table = &(db->docdata_table);
2300 break;
2301 case Honey::TERMLIST:
2302 table = &(db->termlist_table);
2303 break;
2304 case Honey::POSITION:
2305 table = &(db->position_table);
2306 break;
2307 case Honey::SPELLING:
2308 table = &(db->spelling_table);
2309 break;
2310 case Honey::SYNONYM:
2311 table = &(db->synonym_table);
2312 break;
2313 default:
2314 Assert(false);
2315 return;
2318 if (db->single_file()) {
2319 if (t->lazy && table->empty()) {
2320 // Essentially doesn't exist.
2321 } else {
2322 // FIXME: Find actual size somehow?
2323 // in_size += table->size() / 1024;
2324 single_file_in = true;
2325 bad_totals = true;
2326 output_will_exist = true;
2327 ++inputs_present;
2329 } else {
2330 off_t db_size = file_size(table->get_path());
2331 if (errno == 0) {
2332 // FIXME: check overflow and set bad_totals
2333 in_total += db_size;
2334 in_size += db_size / 1024;
2335 output_will_exist = true;
2336 ++inputs_present;
2337 } else if (errno != ENOENT) {
2338 // We get ENOENT for an optional table.
2339 bad_totals = bad_stat = true;
2340 output_will_exist = true;
2341 ++inputs_present;
2344 inputs.push_back(table);
2347 // If any inputs lack a termlist table, suppress it in the output.
2348 if (t->type == Honey::TERMLIST && inputs_present != sources.size()) {
2349 if (inputs_present != 0) {
2350 if (compactor) {
2351 string m = str(inputs_present);
2352 m += " of ";
2353 m += str(sources.size());
2354 m += " inputs present, so suppressing output";
2355 compactor->set_status(t->name, m);
2357 continue;
2359 output_will_exist = false;
2362 if (!output_will_exist) {
2363 if (compactor)
2364 compactor->set_status(t->name, "doesn't exist");
2365 continue;
2368 HoneyTable * out;
2369 off_t table_start_offset = -1;
2370 if (single_file) {
2371 if (t == tables) {
2372 // Start first table HONEY_VERSION_MAX_SIZE bytes in to allow
2373 // space for version file. It's tricky to exactly know the
2374 // size of the version file beforehand.
2375 table_start_offset = HONEY_VERSION_MAX_SIZE;
2376 if (lseek(fd, table_start_offset, SEEK_SET) < 0)
2377 throw Xapian::DatabaseError("lseek() failed", errno);
2378 } else {
2379 table_start_offset = lseek(fd, 0, SEEK_CUR);
2381 out = new HoneyTable(t->name, fd, version_file_out->get_offset(),
2382 false, false);
2383 } else {
2384 out = new HoneyTable(t->name, dest, false, t->lazy);
2386 tabs.push_back(out);
2387 Honey::RootInfo * root_info = version_file_out->root_to_set(t->type);
2388 if (single_file) {
2389 root_info->set_free_list(fl_serialised);
2390 root_info->set_offset(table_start_offset);
2391 out->open(FLAGS,
2392 version_file_out->get_root(t->type),
2393 version_file_out->get_revision());
2394 } else {
2395 out->create_and_open(FLAGS, *root_info);
2398 out->set_full_compaction(compaction != compactor->STANDARD);
2399 if (compaction == compactor->FULLER) out->set_max_item_size(1);
2401 switch (t->type) {
2402 case Honey::POSTLIST: {
2403 if (multipass && inputs.size() > 3) {
2404 multimerge_postlists(compactor, out, destdir,
2405 inputs, offset);
2406 } else {
2407 merge_postlists(compactor, out, offset.begin(),
2408 inputs.begin(), inputs.end());
2410 break;
2412 case Honey::SPELLING:
2413 merge_spellings(out, inputs.begin(), inputs.end());
2414 break;
2415 case Honey::SYNONYM:
2416 merge_synonyms(out, inputs.begin(), inputs.end());
2417 break;
2418 case Honey::POSITION:
2419 merge_positions(out, inputs, offset);
2420 break;
2421 default:
2422 // DocData, Termlist
2423 merge_docid_keyed(out, inputs, offset);
2424 break;
2427 // Commit as revision 1.
2428 out->flush_db();
2429 out->commit(1, root_info);
2430 out->sync();
2431 if (single_file) fl_serialised = root_info->get_free_list();
2433 off_t out_size = 0;
2434 if (!bad_stat && !single_file_in) {
2435 off_t db_size;
2436 if (single_file) {
2437 db_size = file_size(fd);
2438 } else {
2439 db_size = file_size(dest + HONEY_TABLE_EXTENSION);
2441 if (errno == 0) {
2442 if (single_file) {
2443 off_t old_prev_size = max(prev_size, off_t(block_size));
2444 prev_size = db_size;
2445 db_size -= old_prev_size;
2447 // FIXME: check overflow and set bad_totals
2448 out_total += db_size;
2449 out_size = db_size / 1024;
2450 } else if (errno != ENOENT) {
2451 bad_totals = bad_stat = true;
2454 if (bad_stat) {
2455 if (compactor)
2456 compactor->set_status(t->name,
2457 "Done (couldn't stat all the DB files)");
2458 } else if (single_file_in) {
2459 if (compactor)
2460 compactor->set_status(t->name,
2461 "Done (table sizes unknown for single "
2462 "file DB input)");
2463 } else {
2464 string status;
2465 if (out_size == in_size) {
2466 status = "Size unchanged (";
2467 } else {
2468 off_t delta;
2469 if (out_size < in_size) {
2470 delta = in_size - out_size;
2471 status = "Reduced by ";
2472 } else {
2473 delta = out_size - in_size;
2474 status = "INCREASED by ";
2476 if (in_size) {
2477 status += str(100 * delta / in_size);
2478 status += "% ";
2480 status += str(delta);
2481 status += "K (";
2482 status += str(in_size);
2483 status += "K -> ";
2485 status += str(out_size);
2486 status += "K)";
2487 if (compactor)
2488 compactor->set_status(t->name, status);
2492 // If compacting to a single file output and all the tables are empty, pad
2493 // the output so that it isn't mistaken for a stub database when we try to
2494 // open it. For this it needs to be a multiple of 2KB in size.
2495 if (single_file && prev_size < off_t(block_size)) {
2496 #ifdef HAVE_FTRUNCATE
2497 if (ftruncate(fd, block_size) < 0) {
2498 throw Xapian::DatabaseError("Failed to set size of output "
2499 "database", errno);
2501 #else
2502 const off_t off = block_size - 1;
2503 if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
2504 throw Xapian::DatabaseError("Failed to set size of output "
2505 "database", errno);
2507 #endif
2510 if (single_file) {
2511 if (lseek(fd, version_file_out->get_offset(), SEEK_SET) == -1) {
2512 throw Xapian::DatabaseError("lseek() failed", errno);
2515 version_file_out->set_last_docid(last_docid);
2516 string tmpfile = version_file_out->write(1, FLAGS);
2517 for (unsigned j = 0; j != tabs.size(); ++j) {
2518 tabs[j]->sync();
2520 // Commit with revision 1.
2521 version_file_out->sync(tmpfile, 1, FLAGS);
2522 for (unsigned j = 0; j != tabs.size(); ++j) {
2523 delete tabs[j];
2527 if (!single_file) lock.release();
2529 if (!bad_totals && compactor) {
2530 string status;
2531 in_total /= 1024;
2532 out_total /= 1024;
2533 if (out_total == in_total) {
2534 status = "Size unchanged (";
2535 } else {
2536 off_t delta;
2537 if (out_total < in_total) {
2538 delta = in_total - out_total;
2539 status = "Reduced by ";
2540 } else {
2541 delta = out_total - in_total;
2542 status = "INCREASED by ";
2544 if (in_total) {
2545 status += str(100 * delta / in_total);
2546 status += "% ";
2548 status += str(delta);
2549 status += "K (";
2550 status += str(in_total);
2551 status += "K -> ";
2553 status += str(out_total);
2554 status += "K)";
2555 compactor->set_status("Total", status);