[honey] Don't include <iostream> by default
[xapian.git] / xapian-core / backends / honey / honey_compact.cc
blob8f975c6a6338349ad35d1af3e41982fbddbf7b6f
1 /** @file honey_compact.cc
2 * @brief Compact a honey database, or merge and compact several.
3 */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2018 Olly Betts
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation; either version 2 of the
9 * License, or (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
19 * USA
22 #include <config.h>
24 #include "xapian/compactor.h"
25 #include "xapian/constants.h"
26 #include "xapian/error.h"
27 #include "xapian/types.h"
29 #include <algorithm>
30 #include <memory>
31 #include <queue>
32 #include <type_traits>
34 #include <cstdio>
35 #ifdef HAVE_SYS_UIO_H
36 # include <sys/uio.h>
37 #endif
39 #include "safeerrno.h"
41 #include "backends/flint_lock.h"
42 #include "compression_stream.h"
43 #include "honey_cursor.h"
44 #include "honey_database.h"
45 #include "honey_defs.h"
46 #include "honey_postlist_encodings.h"
47 #include "honey_table.h"
48 #include "honey_version.h"
49 #include "filetests.h"
50 #include "internaltypes.h"
51 #include "pack.h"
52 #include "backends/valuestats.h"
53 #include "wordaccess.h"
55 #include "../byte_length_strings.h"
56 #include "../prefix_compressed_strings.h"
58 #ifndef DISABLE_GPL_LIBXAPIAN
59 # include "../glass/glass_database.h"
60 # include "../glass/glass_table.h"
61 # include "../glass/glass_values.h"
62 #endif
64 using namespace std;
66 #ifndef DISABLE_GPL_LIBXAPIAN
67 [[noreturn]]
68 static void
69 throw_database_corrupt(const char* item, const char* pos)
71 string message;
72 if (pos != NULL) {
73 message = "Value overflow unpacking termlist: ";
74 } else {
75 message = "Out of data unpacking termlist: ";
77 message += item;
78 throw Xapian::DatabaseCorruptError(message);
81 namespace GlassCompact {
83 static inline bool
84 is_user_metadata_key(const string & key)
86 return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
89 static inline bool
90 is_valuestats_key(const string & key)
92 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
95 static inline bool
96 is_valuechunk_key(const string & key)
98 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
101 static inline bool
102 is_doclenchunk_key(const string & key)
104 return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
109 inline static bool
110 termlist_key_is_values_used(const string& key)
112 const char* p = key.data();
113 const char* e = p + key.size();
114 Xapian::docid did;
115 if (unpack_uint_preserving_sort(&p, e, &did)) {
116 if (p == e)
117 return false;
118 if (e - p == 1 && *p == '\0')
119 return true;
121 throw Xapian::DatabaseCorruptError("termlist key format");
123 #endif
125 // Put all the helpers in a namespace to avoid symbols colliding with those of
126 // the same name in other flint-derived backends.
127 namespace HoneyCompact {
129 enum {
130 KEY_USER_METADATA = 0x00,
131 KEY_VALUE_STATS = 0x01,
132 KEY_VALUE_CHUNK = 0xd8,
133 KEY_DOCLEN_CHUNK = 0xe0,
134 KEY_POSTING_CHUNK = 0xff
137 /// Return one of the KEY_* constants, or a different value for an invalid key.
138 static inline int
139 key_type(const string& key)
141 if (key[0] != '\0')
142 return KEY_POSTING_CHUNK;
144 if (key.size() <= 1)
145 return -1;
147 switch (static_cast<unsigned char>(key[1])) {
148 case 0x01: case 0x02: case 0x03: case 0x04:
149 return KEY_VALUE_STATS;
152 // If key[1] is \xff then this correctly returns KEY_POSTING_CHUNK.
153 return static_cast<unsigned char>(key[1]);
156 class BufferedFile {
157 int fd = -1;
158 bool read_only = true;
159 mutable size_t buf_end = 0;
160 mutable char buf[4096];
162 public:
163 BufferedFile() { }
165 ~BufferedFile() {
166 if (fd >= 0) close(fd);
169 bool open(const std::string& path, bool read_only_) {
170 if (fd >= 0) close(fd);
171 read_only = read_only_;
172 if (read_only) {
173 // FIXME: add new io_open_stream_rd() etc?
174 fd = io_open_block_rd(path);
175 } else {
176 // FIXME: Always create anew for now...
177 fd = io_open_block_wr(path, true);
179 return fd >= 0;
182 off_t get_pos() const {
183 return read_only ?
184 lseek(fd, 0, SEEK_CUR) - buf_end :
185 lseek(fd, 0, SEEK_CUR) + buf_end;
188 bool empty() const {
189 if (buf_end) return false;
190 struct stat sbuf;
191 if (fd == -1 || fstat(fd, &sbuf) < 0) return true;
192 return (sbuf.st_size == 0);
195 void write(unsigned char ch) {
196 if (buf_end == sizeof(buf)) {
197 // writev()?
198 io_write(fd, buf, buf_end);
199 buf_end = 0;
201 buf[buf_end++] = ch;
204 void write(const char* p, size_t len) {
205 if (buf_end + len <= sizeof(buf)) {
206 memcpy(buf + buf_end, p, len);
207 buf_end += len;
208 return;
211 #ifdef HAVE_WRITEV
212 while (true) {
213 struct iovec iov[2];
214 iov[0].iov_base = buf;
215 iov[0].iov_len = buf_end;
216 iov[1].iov_base = const_cast<char*>(p);
217 iov[1].iov_len = len;
218 ssize_t n_ = writev(fd, iov, 2);
219 if (n_ < 0) abort();
220 size_t n = n_;
221 if (n == buf_end + len) {
222 // Wrote everything.
223 buf_end = 0;
224 return;
226 if (n >= buf_end) {
227 // Wrote all of buf.
228 n -= buf_end;
229 p += n;
230 len -= n;
231 io_write(fd, p, len);
232 buf_end = 0;
233 return;
235 buf_end -= n;
236 memmove(buf, buf + n, buf_end);
238 #else
239 io_write(fd, buf, buf_end);
240 if (len >= sizeof(buf)) {
241 // If it's bigger than our buffer, just write it directly.
242 io_write(fd, p, len);
243 buf_end = 0;
244 return;
246 memcpy(buf, p, len);
247 buf_end = len;
248 #endif
251 int read() const {
252 if (buf_end == 0) {
253 retry:
254 ssize_t r = ::read(fd, buf, sizeof(buf));
255 if (r == 0) return EOF;
256 if (r < 0) {
257 if (errno == EINTR) goto retry;
258 throw Xapian::DatabaseError("read failed", errno);
260 if (size_t(r) < sizeof(buf)) {
261 memmove(buf + sizeof(buf) - r, buf, r);
263 buf_end = r;
265 return static_cast<unsigned char>(buf[sizeof(buf) - buf_end--]);
268 bool read(char* p, size_t len) const {
269 if (len <= buf_end) {
270 memcpy(p, buf + sizeof(buf) - buf_end, len);
271 buf_end -= len;
272 return true;
274 memcpy(p, buf + sizeof(buf) - buf_end, buf_end);
275 p += buf_end;
276 len -= buf_end;
277 buf_end = 0;
278 // FIXME: refill buffer if len < sizeof(buf)
279 return ::read(fd, p, len) == ssize_t(len);
282 void flush() {
283 if (!read_only && buf_end) {
284 io_write(fd, buf, buf_end);
285 buf_end = 0;
289 void sync() {
290 io_sync(fd);
293 void rewind() {
294 read_only = true;
295 lseek(fd, 0, SEEK_SET);
296 buf_end = 0;
300 template<typename T> class PostlistCursor;
302 #ifndef DISABLE_GPL_LIBXAPIAN
303 // Convert glass to honey.
304 template<>
305 class PostlistCursor<const GlassTable&> : private GlassCursor {
306 Xapian::docid offset;
308 public:
309 string key, tag;
310 Xapian::docid firstdid;
311 Xapian::docid chunk_lastdid;
312 Xapian::termcount tf, cf;
313 Xapian::termcount first_wdf;
314 Xapian::termcount wdf_max;
316 PostlistCursor(const GlassTable *in, Xapian::docid offset_)
317 : GlassCursor(in), offset(offset_), firstdid(0)
319 rewind();
320 next();
323 bool next() {
324 if (!GlassCursor::next()) return false;
325 // We put all chunks into the non-initial chunk form here, then fix up
326 // the first chunk for each term in the merged database as we merge.
327 read_tag();
328 key = current_key;
329 tag = current_tag;
330 tf = cf = 0;
331 if (GlassCompact::is_user_metadata_key(key)) {
332 key[1] = KEY_USER_METADATA;
333 return true;
335 if (GlassCompact::is_valuestats_key(key)) {
336 // Adjust key.
337 const char * p = key.data();
338 const char * end = p + key.length();
339 p += 2;
340 Xapian::valueno slot;
341 if (!unpack_uint_last(&p, end, &slot))
342 throw Xapian::DatabaseCorruptError("bad value stats key");
343 key = pack_honey_valuestats_key(slot);
344 return true;
346 if (GlassCompact::is_valuechunk_key(key)) {
347 const char * p = key.data();
348 const char * end = p + key.length();
349 p += 2;
350 Xapian::valueno slot;
351 if (!unpack_uint(&p, end, &slot))
352 throw Xapian::DatabaseCorruptError("bad value key");
353 Xapian::docid first_did;
354 if (!unpack_uint_preserving_sort(&p, end, &first_did))
355 throw Xapian::DatabaseCorruptError("bad value key");
356 first_did += offset;
358 Glass::ValueChunkReader reader(tag.data(), tag.size(), first_did);
359 Xapian::docid last_did = first_did;
360 while (reader.next(), !reader.at_end()) {
361 last_did = reader.get_docid();
364 key.assign("\0\xd8", 2);
365 pack_uint(key, slot);
366 pack_uint_preserving_sort(key, last_did);
368 // Add the docid delta across the chunk to the start of the tag.
369 string newtag;
370 pack_uint(newtag, last_did - first_did);
371 tag.insert(0, newtag);
373 return true;
376 if (GlassCompact::is_doclenchunk_key(key)) {
377 const char * d = key.data();
378 const char * e = d + key.size();
379 d += 2;
381 if (d == e) {
382 // This is an initial chunk, so adjust tag header.
383 d = tag.data();
384 e = d + tag.size();
385 if (!unpack_uint(&d, e, &tf) ||
386 !unpack_uint(&d, e, &cf) ||
387 !unpack_uint(&d, e, &firstdid)) {
388 throw Xapian::DatabaseCorruptError("Bad postlist key");
390 ++firstdid;
391 tag.erase(0, d - tag.data());
392 } else {
393 // Not an initial chunk, so adjust key.
394 size_t tmp = d - key.data();
395 if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
396 throw Xapian::DatabaseCorruptError("Bad postlist key");
397 key.erase(tmp);
399 firstdid += offset;
401 d = tag.data();
402 e = d + tag.size();
404 // Convert doclen chunk to honey format.
405 string newtag;
407 // Skip the "last chunk" flag and increase_to_last.
408 if (d == e)
409 throw Xapian::DatabaseCorruptError("No last chunk flag in "
410 "glass docdata chunk");
411 ++d;
412 Xapian::docid increase_to_last;
413 if (!unpack_uint(&d, e, &increase_to_last))
414 throw Xapian::DatabaseCorruptError("Decoding last docid delta "
415 "in glass docdata chunk");
417 Xapian::termcount doclen_max = 0;
418 while (true) {
419 Xapian::termcount doclen;
420 if (!unpack_uint(&d, e, &doclen))
421 throw Xapian::DatabaseCorruptError("Decoding doclen in "
422 "glass docdata chunk");
423 if (doclen > doclen_max)
424 doclen_max = doclen;
425 unsigned char buf[4];
426 unaligned_write4(buf, doclen);
427 newtag.append(reinterpret_cast<char*>(buf), 4);
428 if (d == e)
429 break;
430 Xapian::docid gap_size;
431 if (!unpack_uint(&d, e, &gap_size))
432 throw Xapian::DatabaseCorruptError("Decoding docid "
433 "gap_size in glass "
434 "docdata chunk");
435 // FIXME: Split chunk if the gap_size is at all large.
436 newtag.append(4 * gap_size, '\xff');
439 Assert(!startswith(newtag, "\xff\xff\xff\xff"));
440 Assert(!endswith(newtag, "\xff\xff\xff\xff"));
442 chunk_lastdid = firstdid - 1 + newtag.size() / 4;
444 // Only encode document lengths using a whole number of bytes for
445 // now. We could allow arbitrary bit widths, but it complicates
446 // encoding and decoding so we should consider if the fairly small
447 // additional saving is worth it.
448 if (doclen_max >= 0xffff) {
449 if (doclen_max >= 0xffffff) {
450 newtag.insert(0, 1, char(32));
451 swap(tag, newtag);
452 } else if (doclen_max >= 0xffffffff) {
453 // FIXME: Handle these.
454 const char* m = "Document length values >= 0xffffffff not "
455 "currently handled";
456 throw Xapian::FeatureUnavailableError(m);
457 } else {
458 tag.assign(1, char(24));
459 for (size_t i = 1; i < newtag.size(); i += 4)
460 tag.append(newtag, i, 3);
462 } else {
463 if (doclen_max >= 0xff) {
464 tag.assign(1, char(16));
465 for (size_t i = 2; i < newtag.size(); i += 4)
466 tag.append(newtag, i, 2);
467 } else {
468 tag.assign(1, char(8));
469 for (size_t i = 3; i < newtag.size(); i += 4)
470 tag.append(newtag, i, 1);
474 return true;
477 // Adjust key if this is *NOT* an initial chunk.
478 // key is: pack_string_preserving_sort(key, tname)
479 // plus optionally: pack_uint_preserving_sort(key, did)
480 const char * d = key.data();
481 const char * e = d + key.size();
482 string tname;
483 if (!unpack_string_preserving_sort(&d, e, tname))
484 throw Xapian::DatabaseCorruptError("Bad postlist key");
486 if (d == e) {
487 // This is an initial chunk for a term, so adjust tag header.
488 d = tag.data();
489 e = d + tag.size();
490 if (!unpack_uint(&d, e, &tf) ||
491 !unpack_uint(&d, e, &cf) ||
492 !unpack_uint(&d, e, &firstdid)) {
493 throw Xapian::DatabaseCorruptError("Bad postlist key");
495 ++firstdid;
496 tag.erase(0, d - tag.data());
497 wdf_max = 0;
498 } else {
499 // Not an initial chunk, so adjust key.
500 size_t tmp = d - key.data();
501 if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
502 throw Xapian::DatabaseCorruptError("Bad postlist key");
503 key.erase(tmp - 1);
505 firstdid += offset;
507 d = tag.data();
508 e = d + tag.size();
510 // Convert posting chunk to honey format, but without any header.
511 string newtag;
513 // Skip the "last chunk" flag; decode increase_to_last.
514 if (d == e)
515 throw Xapian::DatabaseCorruptError("No last chunk flag in glass "
516 "posting chunk");
517 ++d;
518 Xapian::docid increase_to_last;
519 if (!unpack_uint(&d, e, &increase_to_last))
520 throw Xapian::DatabaseCorruptError("Decoding last docid delta in "
521 "glass posting chunk");
522 chunk_lastdid = firstdid + increase_to_last;
523 if (!unpack_uint(&d, e, &first_wdf))
524 throw Xapian::DatabaseCorruptError("Decoding first wdf in glass "
525 "posting chunk");
526 wdf_max = max(wdf_max, first_wdf);
528 while (d != e) {
529 Xapian::docid delta;
530 if (!unpack_uint(&d, e, &delta))
531 throw Xapian::DatabaseCorruptError("Decoding docid delta in "
532 "glass posting chunk");
533 pack_uint(newtag, delta);
534 Xapian::termcount wdf;
535 if (!unpack_uint(&d, e, &wdf))
536 throw Xapian::DatabaseCorruptError("Decoding wdf in glass "
537 "posting chunk");
538 pack_uint(newtag, wdf);
539 wdf_max = max(wdf_max, wdf);
542 swap(tag, newtag);
544 return true;
547 #endif
549 template<>
550 class PostlistCursor<const HoneyTable&> : private HoneyCursor {
551 Xapian::docid offset;
553 public:
554 string key, tag;
555 Xapian::docid firstdid;
556 Xapian::docid chunk_lastdid;
557 Xapian::termcount tf, cf;
558 Xapian::termcount first_wdf;
559 Xapian::termcount wdf_max;
561 PostlistCursor(const HoneyTable *in, Xapian::docid offset_)
562 : HoneyCursor(in), offset(offset_), firstdid(0)
564 rewind();
565 next();
568 bool next() {
569 if (!HoneyCursor::next()) return false;
570 // We put all chunks into the non-initial chunk form here, then fix up
571 // the first chunk for each term in the merged database as we merge.
572 read_tag();
573 key = current_key;
574 tag = current_tag;
575 tf = 0;
576 switch (key_type(key)) {
577 case KEY_USER_METADATA:
578 case KEY_VALUE_STATS:
579 return true;
580 case KEY_VALUE_CHUNK: {
581 const char * p = key.data();
582 const char * end = p + key.length();
583 p += 2;
584 Xapian::valueno slot;
585 if (!unpack_uint(&p, end, &slot))
586 throw Xapian::DatabaseCorruptError("bad value key");
587 Xapian::docid did;
588 if (!unpack_uint_preserving_sort(&p, end, &did))
589 throw Xapian::DatabaseCorruptError("bad value key");
590 did += offset;
592 key.assign("\0\xd8", 2);
593 pack_uint(key, slot);
594 pack_uint_preserving_sort(key, did);
595 return true;
597 case KEY_DOCLEN_CHUNK: {
598 const char * p = key.data();
599 const char * end = p + key.length();
600 p += 2;
601 Xapian::docid did = 1;
602 if (p != end &&
603 (!unpack_uint_preserving_sort(&p, end, &did) || p != end)) {
604 throw Xapian::DatabaseCorruptError("Bad doclen key");
606 did += offset;
608 key.erase(2);
609 if (did != 1) {
610 pack_uint_preserving_sort(key, did);
612 return true;
614 case KEY_POSTING_CHUNK:
615 break;
616 default:
617 throw Xapian::DatabaseCorruptError("Bad postlist table key "
618 "type");
621 // Adjust key if this is *NOT* an initial chunk.
622 // key is: pack_string_preserving_sort(key, term)
623 // plus optionally: pack_uint_preserving_sort(key, did)
624 const char * d = key.data();
625 const char * e = d + key.size();
626 string term;
627 if (!unpack_string_preserving_sort(&d, e, term))
628 throw Xapian::DatabaseCorruptError("Bad postlist key");
630 if (d == e) {
631 // This is an initial chunk for a term, so remove tag header.
632 d = tag.data();
633 e = d + tag.size();
635 Xapian::docid lastdid;
636 if (!decode_initial_chunk_header(&d, e, tf, cf,
637 firstdid, lastdid, chunk_lastdid,
638 first_wdf, wdf_max)) {
639 throw Xapian::DatabaseCorruptError("Bad postlist initial "
640 "chunk header");
642 // Ignore lastdid - we'll need to recalculate it (at least when
643 // merging, and for simplicity we always do).
644 (void)lastdid;
645 tag.erase(0, d - tag.data());
646 } else {
647 // Not an initial chunk, so adjust key and remove tag header.
648 size_t tmp = d - key.data();
649 if (!unpack_uint_preserving_sort(&d, e, &chunk_lastdid) ||
650 d != e) {
651 throw Xapian::DatabaseCorruptError("Bad postlist key");
653 // -1 to remove the terminating zero byte too.
654 key.erase(tmp - 1);
656 d = tag.data();
657 e = d + tag.size();
660 bool have_wdfs = (cf != 0);
661 if (have_wdfs && tf > 2) {
662 Xapian::termcount remaining_cf_for_flat_wdf =
663 (tf - 1) * wdf_max;
664 // Check this matches and that it isn't a false match due
665 // to overflow of the multiplication above.
666 if (cf - first_wdf == remaining_cf_for_flat_wdf &&
667 usual(remaining_cf_for_flat_wdf / wdf_max == tf - 1)) {
668 // The wdf is flat so we don't need to store it.
669 have_wdfs = false;
673 if (have_wdfs) {
674 if (!decode_delta_chunk_header(&d, e, chunk_lastdid, firstdid,
675 first_wdf)) {
676 throw Xapian::DatabaseCorruptError("Bad postlist delta "
677 "chunk header");
679 tag.erase(0, d - tag.data());
680 } else {
681 if (!decode_delta_chunk_header_no_wdf(&d, e, chunk_lastdid,
682 firstdid)) {
683 throw Xapian::DatabaseCorruptError("Bad postlist delta "
684 "chunk header");
686 first_wdf = 0;
687 // Splice in a 0 wdf value after each docid delta.
688 string newtag;
689 for (size_t i = d - tag.data(); i < tag.size(); i += 4) {
690 newtag.append(tag, i, 4);
691 newtag.append(4, '\0');
693 tag = std::move(newtag);
696 firstdid += offset;
697 chunk_lastdid += offset;
698 return true;
702 template<>
703 class PostlistCursor<HoneyTable&> : public PostlistCursor<const HoneyTable&> {
704 public:
705 PostlistCursor(HoneyTable *in, Xapian::docid offset_)
706 : PostlistCursor<const HoneyTable&>(in, offset_) {}
709 template<typename T>
710 class PostlistCursorGt {
711 public:
712 /** Return true if and only if a's key is strictly greater than b's key.
714 bool operator()(const T* a, const T* b) const {
715 if (a->key > b->key) return true;
716 if (a->key != b->key) return false;
717 return (a->firstdid > b->firstdid);
721 static string
722 encode_valuestats(Xapian::doccount freq,
723 const string & lbound, const string & ubound)
725 string value;
726 pack_uint(value, freq);
727 pack_string(value, lbound);
728 // We don't store or count empty values, so neither of the bounds
729 // can be empty. So we can safely store an empty upper bound when
730 // the bounds are equal.
731 if (lbound != ubound) value += ubound;
732 return value;
735 // U : vector<HoneyTable*>::const_iterator
736 template<typename T, typename U> void
737 merge_postlists(Xapian::Compactor * compactor,
738 T * out, vector<Xapian::docid>::const_iterator offset,
739 U b, U e)
741 typedef decltype(**b) table_type; // E.g. HoneyTable
742 typedef PostlistCursor<table_type> cursor_type;
743 typedef PostlistCursorGt<cursor_type> gt_type;
744 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
745 for ( ; b != e; ++b, ++offset) {
746 auto in = *b;
747 if (in->empty()) {
748 // Skip empty tables.
749 continue;
752 pq.push(new cursor_type(in, *offset));
755 string last_key;
757 // Merge user metadata.
758 vector<string> tags;
759 while (!pq.empty()) {
760 cursor_type * cur = pq.top();
761 const string& key = cur->key;
762 if (key_type(key) != KEY_USER_METADATA) break;
764 if (key != last_key) {
765 if (!tags.empty()) {
766 if (tags.size() > 1 && compactor) {
767 Assert(!last_key.empty());
768 // FIXME: It would be better to merge all duplicates
769 // for a key in one call, but currently we don't in
770 // multipass mode.
771 const string & resolved_tag =
772 compactor->resolve_duplicate_metadata(last_key,
773 tags.size(),
774 &tags[0]);
775 out->add(last_key, resolved_tag);
776 } else {
777 Assert(!last_key.empty());
778 out->add(last_key, tags[0]);
780 tags.resize(0);
782 last_key = key;
784 tags.push_back(cur->tag);
786 pq.pop();
787 if (cur->next()) {
788 pq.push(cur);
789 } else {
790 delete cur;
793 if (!tags.empty()) {
794 if (tags.size() > 1 && compactor) {
795 Assert(!last_key.empty());
796 const string & resolved_tag =
797 compactor->resolve_duplicate_metadata(last_key,
798 tags.size(),
799 &tags[0]);
800 out->add(last_key, resolved_tag);
801 } else {
802 Assert(!last_key.empty());
803 out->add(last_key, tags[0]);
809 // Merge valuestats.
810 Xapian::doccount freq = 0;
811 string lbound, ubound;
813 while (!pq.empty()) {
814 cursor_type * cur = pq.top();
815 const string& key = cur->key;
816 if (key_type(key) != KEY_VALUE_STATS) break;
817 if (key != last_key) {
818 // For the first valuestats key, last_key will be the previous
819 // key we wrote, which we don't want to overwrite. This is the
820 // only time that freq will be 0, so check that.
821 if (freq) {
822 out->add(last_key, encode_valuestats(freq, lbound, ubound));
823 freq = 0;
825 last_key = key;
828 const string & tag = cur->tag;
830 const char * pos = tag.data();
831 const char * end = pos + tag.size();
833 Xapian::doccount f;
834 string l, u;
835 if (!unpack_uint(&pos, end, &f)) {
836 if (*pos == 0)
837 throw Xapian::DatabaseCorruptError("Incomplete stats item "
838 "in value table");
839 throw Xapian::RangeError("Frequency statistic in value table "
840 "is too large");
842 if (!unpack_string(&pos, end, l)) {
843 if (*pos == 0)
844 throw Xapian::DatabaseCorruptError("Incomplete stats item "
845 "in value table");
846 throw Xapian::RangeError("Lower bound in value table is too "
847 "large");
849 size_t len = end - pos;
850 if (len == 0) {
851 u = l;
852 } else {
853 u.assign(pos, len);
855 if (freq == 0) {
856 freq = f;
857 lbound = l;
858 ubound = u;
859 } else {
860 freq += f;
861 if (l < lbound) lbound = l;
862 if (u > ubound) ubound = u;
865 pq.pop();
866 if (cur->next()) {
867 pq.push(cur);
868 } else {
869 delete cur;
873 if (freq) {
874 out->add(last_key, encode_valuestats(freq, lbound, ubound));
878 // Merge valuestream chunks.
879 while (!pq.empty()) {
880 cursor_type * cur = pq.top();
881 const string & key = cur->key;
882 if (key_type(key) != KEY_VALUE_CHUNK) break;
883 out->add(key, cur->tag);
884 pq.pop();
885 if (cur->next()) {
886 pq.push(cur);
887 } else {
888 delete cur;
892 // Merge doclen chunks.
893 while (!pq.empty()) {
894 cursor_type * cur = pq.top();
895 string & key = cur->key;
896 if (key_type(key) != KEY_DOCLEN_CHUNK) break;
897 pack_uint_preserving_sort(key, cur->chunk_lastdid);
898 out->add(key, cur->tag);
899 pq.pop();
900 if (cur->next()) {
901 pq.push(cur);
902 } else {
903 delete cur;
907 Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
909 struct HoneyPostListChunk {
910 Xapian::docid first, last;
911 Xapian::termcount first_wdf;
912 Xapian::termcount wdf_max;
913 string data;
915 HoneyPostListChunk(Xapian::docid first_,
916 Xapian::docid last_,
917 Xapian::termcount first_wdf_,
918 Xapian::termcount wdf_max_,
919 string&& data_)
920 : first(first_),
921 last(last_),
922 first_wdf(first_wdf_),
923 wdf_max(wdf_max_),
924 data(data_) {}
926 vector<HoneyPostListChunk> tags;
927 while (true) {
928 cursor_type * cur = NULL;
929 if (!pq.empty()) {
930 cur = pq.top();
931 pq.pop();
933 if (cur) {
934 AssertEq(key_type(cur->key), KEY_POSTING_CHUNK);
936 if (cur == NULL || cur->key != last_key) {
937 if (!tags.empty()) {
938 Xapian::termcount first_wdf = tags[0].first_wdf;
939 Xapian::docid chunk_lastdid = tags[0].last;
940 Xapian::docid last_did = tags.back().last;
941 Xapian::termcount wdf_max = tags.back().wdf_max;
943 string first_tag;
944 encode_initial_chunk_header(tf, cf, tags[0].first, last_did,
945 chunk_lastdid,
946 first_wdf, wdf_max, first_tag);
947 bool have_wdfs = (cf != 0);
948 if (have_wdfs && tf > 2) {
949 Xapian::termcount remaining_cf_for_flat_wdf =
950 (tf - 1) * wdf_max;
951 // Check this matches and that it isn't a false match due
952 // to overflow of the multiplication above.
953 if (cf - first_wdf == remaining_cf_for_flat_wdf &&
954 usual(remaining_cf_for_flat_wdf / wdf_max == tf - 1)) {
955 // The wdf is flat so we don't need to store it.
956 have_wdfs = false;
960 if (tf > 2) {
961 if (have_wdfs) {
962 first_tag += tags[0].data;
963 } else {
964 const char* pos = tags[0].data.data();
965 const char* pos_end = pos + tags[0].data.size();
966 while (pos != pos_end) {
967 Xapian::docid delta;
968 if (!unpack_uint(&pos, pos_end, &delta))
969 throw_database_corrupt("Decoding docid "
970 "delta", pos);
971 pack_uint(first_tag, delta);
972 Xapian::termcount wdf;
973 if (!unpack_uint(&pos, pos_end, &wdf))
974 throw_database_corrupt("Decoding wdf",
975 pos);
976 // Only copy over the docid deltas, not the
977 // wdfs.
978 (void)wdf;
982 out->add(last_key, first_tag);
984 // If tf == 2, the data could be split over two tags when
985 // merging, but we only output an initial tag in this case.
986 if (tf > 2 && tags.size() > 1) {
987 string term;
988 const char* p = last_key.data();
989 const char* end = p + last_key.size();
990 if (!unpack_string_preserving_sort(&p, end, term) ||
991 p != end) {
992 throw Xapian::DatabaseCorruptError("Bad postlist "
993 "chunk key");
996 auto i = tags.begin();
997 while (++i != tags.end()) {
998 last_did = i->last;
999 const string& key = pack_honey_postlist_key(term,
1000 last_did);
1001 string tag;
1002 if (have_wdfs) {
1003 encode_delta_chunk_header(i->first,
1004 last_did,
1005 i->first_wdf,
1006 tag);
1007 tag += i->data;
1008 } else {
1009 encode_delta_chunk_header_no_wdf(i->first,
1010 last_did,
1011 tag);
1012 const char* pos = i->data.data();
1013 const char* pos_end = pos + i->data.size();
1014 while (pos != pos_end) {
1015 Xapian::docid delta;
1016 if (!unpack_uint(&pos, pos_end, &delta))
1017 throw_database_corrupt("Decoding docid "
1018 "delta", pos);
1019 pack_uint(tag, delta);
1020 Xapian::termcount wdf;
1021 if (!unpack_uint(&pos, pos_end, &wdf))
1022 throw_database_corrupt("Decoding wdf",
1023 pos);
1024 // Only copy over the docid deltas, not the
1025 // wdfs.
1026 (void)wdf;
1030 out->add(key, tag);
1034 tags.clear();
1035 if (cur == NULL) break;
1036 tf = cf = 0;
1037 last_key = cur->key;
1039 if (cur->tf) {
1040 tf += cur->tf;
1041 cf += cur->cf;
1043 tags.push_back(HoneyPostListChunk(cur->firstdid,
1044 cur->chunk_lastdid,
1045 cur->first_wdf,
1046 cur->wdf_max,
1047 std::move(cur->tag)));
1048 if (cur->next()) {
1049 pq.push(cur);
1050 } else {
1051 delete cur;
1056 template<typename T> struct MergeCursor;
1058 #ifndef DISABLE_GPL_LIBXAPIAN
1059 template<>
1060 struct MergeCursor<const GlassTable&> : public GlassCursor {
1061 explicit MergeCursor(const GlassTable *in) : GlassCursor(in) {
1062 rewind();
1063 next();
1066 #endif
1068 template<>
1069 struct MergeCursor<const HoneyTable&> : public HoneyCursor {
1070 explicit MergeCursor(const HoneyTable *in) : HoneyCursor(in) {
1071 rewind();
1072 next();
1076 template<typename T>
1077 struct CursorGt {
1078 /// Return true if and only if a's key is strictly greater than b's key.
1079 bool operator()(const T* a, const T* b) const {
1080 if (b->after_end()) return false;
1081 if (a->after_end()) return true;
1082 return (a->current_key > b->current_key);
1086 #ifndef DISABLE_GPL_LIBXAPIAN
1087 // Convert glass to honey.
1088 static void
1089 merge_spellings(HoneyTable* out,
1090 vector<const GlassTable*>::const_iterator b,
1091 vector<const GlassTable*>::const_iterator e)
1093 typedef MergeCursor<const GlassTable&> cursor_type;
1094 typedef CursorGt<cursor_type> gt_type;
1095 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
1096 for ( ; b != e; ++b) {
1097 auto in = *b;
1098 if (!in->empty()) {
1099 pq.push(new cursor_type(in));
1103 while (!pq.empty()) {
1104 cursor_type * cur = pq.top();
1105 pq.pop();
1107 // Glass vs honey spelling key prefix map:
1109 // Type Glass Honey
1111 // bookend Bbd \x00bd
1112 // head Hhe \x01he
1113 // middle Mddl \x02ddl
1114 // tail Til \x03il
1115 // word Wword word
1117 // In honey, if the word's first byte is <= '\x04', we add a prefix
1118 // of '\x04' (which is unlikely in real world use but ensures that
1119 // we can handle arbitrary strings).
1121 // The prefixes for honey are chosen such that we save a byte for the
1122 // key of all real-world words, and so that more first bytes are used
1123 // so that a top-level array index is more effective, especially for
1124 // random-access lookup of word entries (which we do to check the
1125 // frequency of potential spelling candidates).
1127 // The other prefixes are chosen such that we can do look up in key
1128 // order at search time, which is more efficient for a cursor which can
1129 // only efficiently move forwards.
1131 // Note that the key ordering is the same for glass and honey, which
1132 // makes translating during compaction simpler.
1133 string key = cur->current_key;
1134 switch (key[0]) {
1135 case 'B':
1136 key[0] = Honey::KEY_PREFIX_BOOKEND;
1137 break;
1138 case 'H':
1139 key[0] = Honey::KEY_PREFIX_HEAD;
1140 break;
1141 case 'M':
1142 key[0] = Honey::KEY_PREFIX_MIDDLE;
1143 break;
1144 case 'T':
1145 key[0] = Honey::KEY_PREFIX_TAIL;
1146 break;
1147 case 'W':
1148 if (static_cast<unsigned char>(key[1]) > Honey::KEY_PREFIX_WORD)
1149 key.erase(0, 1);
1150 else
1151 key[0] = Honey::KEY_PREFIX_WORD;
1152 break;
1153 default: {
1154 string m = "Bad spelling key prefix: ";
1155 m += static_cast<unsigned char>(key[0]);
1156 throw Xapian::DatabaseCorruptError(m);
1160 if (pq.empty() || pq.top()->current_key > key) {
1161 // No need to merge the tags, just copy the (possibly compressed)
1162 // tag value.
1163 bool compressed = cur->read_tag(true);
1164 out->add(key, cur->current_tag, compressed);
1165 if (cur->next()) {
1166 pq.push(cur);
1167 } else {
1168 delete cur;
1170 continue;
1173 // Merge tag values with the same key:
1174 string tag;
1175 if (static_cast<unsigned char>(key[0]) < Honey::KEY_PREFIX_WORD) {
1176 // We just want the union of words, so copy over the first instance
1177 // and skip any identical ones.
1178 priority_queue<PrefixCompressedStringItor *,
1179 vector<PrefixCompressedStringItor *>,
1180 PrefixCompressedStringItorGt> pqtag;
1181 // Stick all the cursor_type pointers in a vector because their
1182 // current_tag members must remain valid while we're merging their
1183 // tags, but we need to call next() on them all afterwards.
1184 vector<cursor_type *> vec;
1185 vec.reserve(pq.size());
1187 while (true) {
1188 cur->read_tag();
1189 pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
1190 vec.push_back(cur);
1191 if (pq.empty() || pq.top()->current_key != key) break;
1192 cur = pq.top();
1193 pq.pop();
1196 PrefixCompressedStringWriter wr(tag);
1197 string lastword;
1198 while (!pqtag.empty()) {
1199 PrefixCompressedStringItor * it = pqtag.top();
1200 pqtag.pop();
1201 string word = **it;
1202 if (word != lastword) {
1203 lastword = word;
1204 wr.append(lastword);
1206 ++*it;
1207 if (!it->at_end()) {
1208 pqtag.push(it);
1209 } else {
1210 delete it;
1214 for (auto i : vec) {
1215 cur = i;
1216 if (cur->next()) {
1217 pq.push(cur);
1218 } else {
1219 delete cur;
1222 } else {
1223 // We want to sum the frequencies from tags for the same key.
1224 Xapian::termcount tot_freq = 0;
1225 while (true) {
1226 cur->read_tag();
1227 Xapian::termcount freq;
1228 const char * p = cur->current_tag.data();
1229 const char * end = p + cur->current_tag.size();
1230 if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
1231 throw_database_corrupt("Bad spelling word freq", p);
1233 tot_freq += freq;
1234 if (cur->next()) {
1235 pq.push(cur);
1236 } else {
1237 delete cur;
1239 if (pq.empty() || pq.top()->current_key != key) break;
1240 cur = pq.top();
1241 pq.pop();
1243 tag.resize(0);
1244 pack_uint_last(tag, tot_freq);
1246 out->add(key, tag);
1249 #endif
1251 static void
1252 merge_spellings(HoneyTable* out,
1253 vector<const HoneyTable*>::const_iterator b,
1254 vector<const HoneyTable*>::const_iterator e)
1256 typedef MergeCursor<const HoneyTable&> cursor_type;
1257 typedef CursorGt<cursor_type> gt_type;
1258 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
1259 for ( ; b != e; ++b) {
1260 auto in = *b;
1261 if (!in->empty()) {
1262 pq.push(new cursor_type(in));
1266 while (!pq.empty()) {
1267 cursor_type * cur = pq.top();
1268 pq.pop();
1270 string key = cur->current_key;
1271 if (pq.empty() || pq.top()->current_key > key) {
1272 // No need to merge the tags, just copy the (possibly compressed)
1273 // tag value.
1274 bool compressed = cur->read_tag(true);
1275 out->add(key, cur->current_tag, compressed);
1276 if (cur->next()) {
1277 pq.push(cur);
1278 } else {
1279 delete cur;
1281 continue;
1284 // Merge tag values with the same key:
1285 string tag;
1286 if (static_cast<unsigned char>(key[0]) < Honey::KEY_PREFIX_WORD) {
1287 // We just want the union of words, so copy over the first instance
1288 // and skip any identical ones.
1289 priority_queue<PrefixCompressedStringItor *,
1290 vector<PrefixCompressedStringItor *>,
1291 PrefixCompressedStringItorGt> pqtag;
1292 // Stick all the cursor_type pointers in a vector because their
1293 // current_tag members must remain valid while we're merging their
1294 // tags, but we need to call next() on them all afterwards.
1295 vector<cursor_type *> vec;
1296 vec.reserve(pq.size());
1298 while (true) {
1299 cur->read_tag();
1300 pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
1301 vec.push_back(cur);
1302 if (pq.empty() || pq.top()->current_key != key) break;
1303 cur = pq.top();
1304 pq.pop();
1307 PrefixCompressedStringWriter wr(tag);
1308 string lastword;
1309 while (!pqtag.empty()) {
1310 PrefixCompressedStringItor * it = pqtag.top();
1311 pqtag.pop();
1312 string word = **it;
1313 if (word != lastword) {
1314 lastword = word;
1315 wr.append(lastword);
1317 ++*it;
1318 if (!it->at_end()) {
1319 pqtag.push(it);
1320 } else {
1321 delete it;
1325 for (auto i : vec) {
1326 cur = i;
1327 if (cur->next()) {
1328 pq.push(cur);
1329 } else {
1330 delete cur;
1333 } else {
1334 // We want to sum the frequencies from tags for the same key.
1335 Xapian::termcount tot_freq = 0;
1336 while (true) {
1337 cur->read_tag();
1338 Xapian::termcount freq;
1339 const char * p = cur->current_tag.data();
1340 const char * end = p + cur->current_tag.size();
1341 if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
1342 throw_database_corrupt("Bad spelling word freq", p);
1344 tot_freq += freq;
1345 if (cur->next()) {
1346 pq.push(cur);
1347 } else {
1348 delete cur;
1350 if (pq.empty() || pq.top()->current_key != key) break;
1351 cur = pq.top();
1352 pq.pop();
1354 tag.resize(0);
1355 pack_uint_last(tag, tot_freq);
1357 out->add(key, tag);
1361 // U : vector<HoneyTable*>::const_iterator
1362 template<typename T, typename U> void
1363 merge_synonyms(T* out, U b, U e)
1365 typedef decltype(**b) table_type; // E.g. HoneyTable
1366 typedef MergeCursor<table_type> cursor_type;
1367 typedef CursorGt<cursor_type> gt_type;
1368 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
1369 for ( ; b != e; ++b) {
1370 auto in = *b;
1371 if (!in->empty()) {
1372 pq.push(new cursor_type(in));
1376 while (!pq.empty()) {
1377 cursor_type * cur = pq.top();
1378 pq.pop();
1380 string key = cur->current_key;
1381 if (pq.empty() || pq.top()->current_key > key) {
1382 // No need to merge the tags, just copy the (possibly compressed)
1383 // tag value.
1384 bool compressed = cur->read_tag(true);
1385 out->add(key, cur->current_tag, compressed);
1386 if (cur->next()) {
1387 pq.push(cur);
1388 } else {
1389 delete cur;
1391 continue;
1394 // Merge tag values with the same key:
1395 string tag;
1397 // We just want the union of words, so copy over the first instance
1398 // and skip any identical ones.
1399 priority_queue<ByteLengthPrefixedStringItor *,
1400 vector<ByteLengthPrefixedStringItor *>,
1401 ByteLengthPrefixedStringItorGt> pqtag;
1402 vector<cursor_type *> vec;
1404 while (true) {
1405 cur->read_tag();
1406 pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
1407 vec.push_back(cur);
1408 if (pq.empty() || pq.top()->current_key != key) break;
1409 cur = pq.top();
1410 pq.pop();
1413 string lastword;
1414 while (!pqtag.empty()) {
1415 ByteLengthPrefixedStringItor * it = pqtag.top();
1416 pqtag.pop();
1417 if (**it != lastword) {
1418 lastword = **it;
1419 tag += byte(lastword.size() ^ MAGIC_XOR_VALUE);
1420 tag += lastword;
1422 ++*it;
1423 if (!it->at_end()) {
1424 pqtag.push(it);
1425 } else {
1426 delete it;
1430 for (auto i : vec) {
1431 cur = i;
1432 if (cur->next()) {
1433 pq.push(cur);
1434 } else {
1435 delete cur;
1439 out->add(key, tag);
1443 template<typename T, typename U> void
1444 multimerge_postlists(Xapian::Compactor * compactor,
1445 T* out, const char * tmpdir,
1446 const vector<U*>& in,
1447 vector<Xapian::docid> off)
1449 if (in.size() <= 3) {
1450 merge_postlists(compactor, out, off.begin(), in.begin(), in.end());
1451 return;
1453 unsigned int c = 0;
1454 vector<HoneyTable *> tmp;
1455 tmp.reserve(in.size() / 2);
1457 vector<Xapian::docid> newoff;
1458 newoff.resize(in.size() / 2);
1459 for (unsigned int i = 0, j; i < in.size(); i = j) {
1460 j = i + 2;
1461 if (j == in.size() - 1) ++j;
1463 string dest = tmpdir;
1464 char buf[64];
1465 sprintf(buf, "/tmp%u_%u.", c, i / 2);
1466 dest += buf;
1468 HoneyTable * tmptab = new HoneyTable("postlist", dest, false);
1470 // Use maximum blocksize for temporary tables. And don't compress
1471 // entries in temporary tables, even if the final table would do
1472 // so. Any already compressed entries will get copied in
1473 // compressed form. (FIXME: HoneyTable has no blocksize)
1474 Honey::RootInfo root_info;
1475 root_info.init(65536, 0);
1476 const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
1477 tmptab->create_and_open(flags, root_info);
1479 merge_postlists(compactor, tmptab, off.begin() + i,
1480 in.begin() + i, in.begin() + j);
1481 tmp.push_back(tmptab);
1482 tmptab->flush_db();
1483 tmptab->commit(1, &root_info);
1484 AssertRel(root_info.get_blocksize(),==,65536);
1486 swap(off, newoff);
1487 ++c;
1490 while (tmp.size() > 3) {
1491 vector<HoneyTable *> tmpout;
1492 tmpout.reserve(tmp.size() / 2);
1493 vector<Xapian::docid> newoff;
1494 newoff.resize(tmp.size() / 2);
1495 for (unsigned int i = 0, j; i < tmp.size(); i = j) {
1496 j = i + 2;
1497 if (j == tmp.size() - 1) ++j;
1499 string dest = tmpdir;
1500 char buf[64];
1501 sprintf(buf, "/tmp%u_%u.", c, i / 2);
1502 dest += buf;
1504 HoneyTable * tmptab = new HoneyTable("postlist", dest, false);
1506 // Use maximum blocksize for temporary tables. And don't compress
1507 // entries in temporary tables, even if the final table would do
1508 // so. Any already compressed entries will get copied in
1509 // compressed form. (FIXME: HoneyTable has no blocksize)
1510 Honey::RootInfo root_info;
1511 root_info.init(65536, 0);
1512 const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
1513 tmptab->create_and_open(flags, root_info);
1515 merge_postlists(compactor, tmptab, off.begin() + i,
1516 tmp.begin() + i, tmp.begin() + j);
1517 if (c > 0) {
1518 for (unsigned int k = i; k < j; ++k) {
1519 // FIXME: unlink(tmp[k]->get_path().c_str());
1520 delete tmp[k];
1521 tmp[k] = NULL;
1524 tmpout.push_back(tmptab);
1525 tmptab->flush_db();
1526 tmptab->commit(1, &root_info);
1527 AssertRel(root_info.get_blocksize(),==,65536);
1529 swap(tmp, tmpout);
1530 swap(off, newoff);
1531 ++c;
1533 merge_postlists(compactor, out, off.begin(), tmp.begin(), tmp.end());
1534 if (c > 0) {
1535 for (size_t k = 0; k < tmp.size(); ++k) {
1536 // FIXME: unlink(tmp[k]->get_path().c_str());
1537 delete tmp[k];
1538 tmp[k] = NULL;
1543 template<typename T> class PositionCursor;
1545 #ifndef DISABLE_GPL_LIBXAPIAN
1546 template<>
1547 class PositionCursor<const GlassTable&> : private GlassCursor {
1548 Xapian::docid offset;
1550 public:
1551 string key;
1552 Xapian::docid firstdid;
1554 PositionCursor(const GlassTable *in, Xapian::docid offset_)
1555 : GlassCursor(in), offset(offset_), firstdid(0) {
1556 rewind();
1557 next();
1560 bool next() {
1561 if (!GlassCursor::next()) return false;
1562 read_tag();
1563 const char * d = current_key.data();
1564 const char * e = d + current_key.size();
1565 string term;
1566 Xapian::docid did;
1567 if (!unpack_string_preserving_sort(&d, e, term) ||
1568 !unpack_uint_preserving_sort(&d, e, &did) ||
1569 d != e) {
1570 throw Xapian::DatabaseCorruptError("Bad position key");
1573 key.resize(0);
1574 pack_string_preserving_sort(key, term);
1575 pack_uint_preserving_sort(key, did + offset);
1576 return true;
1579 const string & get_tag() const {
1580 return current_tag;
1583 #endif
1585 template<>
1586 class PositionCursor<const HoneyTable&> : private HoneyCursor {
1587 Xapian::docid offset;
1589 public:
1590 string key;
1591 Xapian::docid firstdid;
1593 PositionCursor(const HoneyTable *in, Xapian::docid offset_)
1594 : HoneyCursor(in), offset(offset_), firstdid(0) {
1595 rewind();
1596 next();
1599 bool next() {
1600 if (!HoneyCursor::next()) return false;
1601 read_tag();
1602 const char * d = current_key.data();
1603 const char * e = d + current_key.size();
1604 string term;
1605 Xapian::docid did;
1606 if (!unpack_string_preserving_sort(&d, e, term) ||
1607 !unpack_uint_preserving_sort(&d, e, &did) ||
1608 d != e) {
1609 throw Xapian::DatabaseCorruptError("Bad position key");
1612 key.resize(0);
1613 pack_string_preserving_sort(key, term);
1614 pack_uint_preserving_sort(key, did + offset);
1615 return true;
1618 const string & get_tag() const {
1619 return current_tag;
1623 template<typename T>
1624 class PositionCursorGt {
1625 public:
1626 /** Return true if and only if a's key is strictly greater than b's key.
1628 bool operator()(const T* a, const T* b) const {
1629 return a->key > b->key;
1633 template<typename T, typename U> void
1634 merge_positions(T* out, const vector<U*> & inputs,
1635 const vector<Xapian::docid> & offset)
1637 typedef decltype(*inputs[0]) table_type; // E.g. HoneyTable
1638 typedef PositionCursor<table_type> cursor_type;
1639 typedef PositionCursorGt<cursor_type> gt_type;
1640 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
1641 for (size_t i = 0; i < inputs.size(); ++i) {
1642 auto in = inputs[i];
1643 if (in->empty()) {
1644 // Skip empty tables.
1645 continue;
1648 pq.push(new cursor_type(in, offset[i]));
1651 while (!pq.empty()) {
1652 cursor_type * cur = pq.top();
1653 pq.pop();
1654 out->add(cur->key, cur->get_tag());
1655 if (cur->next()) {
1656 pq.push(cur);
1657 } else {
1658 delete cur;
1663 template<typename T, typename U> void
1664 merge_docid_keyed(T *out, const vector<U*> & inputs,
1665 const vector<Xapian::docid> & offset,
1666 int = 0)
1668 for (size_t i = 0; i < inputs.size(); ++i) {
1669 Xapian::docid off = offset[i];
1671 auto in = inputs[i];
1672 if (in->empty()) continue;
1674 HoneyCursor cur(in);
1675 cur.rewind();
1677 string key;
1678 while (cur.next()) {
1679 // Adjust the key if this isn't the first database.
1680 if (off) {
1681 Xapian::docid did;
1682 const char * d = cur.current_key.data();
1683 const char * e = d + cur.current_key.size();
1684 if (!unpack_uint_preserving_sort(&d, e, &did)) {
1685 string msg = "Bad key in ";
1686 msg += inputs[i]->get_path();
1687 throw Xapian::DatabaseCorruptError(msg);
1689 did += off;
1690 key.resize(0);
1691 pack_uint_preserving_sort(key, did);
1692 if (d != e) {
1693 // Copy over anything extra in the key (e.g. the zero byte
1694 // at the end of "used value slots" in the termlist table).
1695 key.append(d, e - d);
1697 } else {
1698 key = cur.current_key;
1700 bool compressed = cur.read_tag(true);
1701 out->add(key, cur.current_tag, compressed);
1706 #ifndef DISABLE_GPL_LIBXAPIAN
1707 template<typename T> void
1708 merge_docid_keyed(T *out, const vector<const GlassTable*> & inputs,
1709 const vector<Xapian::docid> & offset,
1710 int table_type = 0)
1712 for (size_t i = 0; i < inputs.size(); ++i) {
1713 Xapian::docid off = offset[i];
1715 auto in = inputs[i];
1716 if (in->empty()) continue;
1718 GlassCursor cur(in);
1719 cur.rewind();
1721 string key;
1722 while (cur.next()) {
1723 next_without_next:
1724 // Adjust the key if this isn't the first database.
1725 if (off) {
1726 Xapian::docid did;
1727 const char * d = cur.current_key.data();
1728 const char * e = d + cur.current_key.size();
1729 if (!unpack_uint_preserving_sort(&d, e, &did)) {
1730 string msg = "Bad key in ";
1731 msg += inputs[i]->get_path();
1732 throw Xapian::DatabaseCorruptError(msg);
1734 did += off;
1735 key.resize(0);
1736 pack_uint_preserving_sort(key, did);
1737 if (d != e) {
1738 // Copy over anything extra in the key (e.g. the zero byte
1739 // at the end of "used value slots" in the termlist table).
1740 key.append(d, e - d);
1742 } else {
1743 key = cur.current_key;
1745 if (table_type == Honey::TERMLIST) {
1746 if (termlist_key_is_values_used(key)) {
1747 throw Xapian::DatabaseCorruptError("value slots used "
1748 "entry without a "
1749 "corresponding "
1750 "termlist entry");
1752 cur.read_tag();
1753 string tag = cur.current_tag;
1755 bool next_result = cur.next();
1756 bool next_already_done = true;
1757 unsigned bitmap_slots_used = 0;
1758 string encoded_slots_used;
1759 if (next_result &&
1760 termlist_key_is_values_used(cur.current_key)) {
1761 next_already_done = false;
1762 cur.read_tag();
1763 const string& valtag = cur.current_tag;
1765 const char* p = valtag.data();
1766 const char* end = p + valtag.size();
1768 Xapian::VecCOW<Xapian::termpos> slots;
1770 Xapian::valueno first_slot;
1771 if (!unpack_uint(&p, end, &first_slot)) {
1772 throw_database_corrupt("Value slot encoding corrupt",
1775 slots.push_back(first_slot);
1776 Xapian::valueno last_slot = first_slot;
1777 while (p != end) {
1778 Xapian::valueno slot;
1779 if (!unpack_uint(&p, end, &slot)) {
1780 throw_database_corrupt("Value slot encoding "
1781 "corrupt", p);
1783 slot += last_slot + 1;
1784 last_slot = slot;
1786 slots.push_back(slot);
1789 if (slots.back() <= 6) {
1790 // Encode as a bitmap if only slots in the range 0-6
1791 // are used.
1792 for (auto slot : slots) {
1793 bitmap_slots_used |= 1 << slot;
1795 } else {
1796 string enc;
1797 pack_uint(enc, last_slot);
1798 if (slots.size() > 1) {
1799 BitWriter slots_used(enc);
1800 slots_used.encode(first_slot, last_slot);
1801 slots_used.encode(slots.size() - 2,
1802 last_slot - first_slot);
1803 slots_used.encode_interpolative(slots, 0,
1804 slots.size() - 1);
1805 encoded_slots_used = slots_used.freeze();
1806 } else {
1807 encoded_slots_used = std::move(enc);
1812 const char* pos = tag.data();
1813 const char* end = pos + tag.size();
1815 string newtag;
1816 if (encoded_slots_used.empty()) {
1817 newtag += char(bitmap_slots_used);
1818 } else {
1819 auto size = encoded_slots_used.size();
1820 if (size < 0x80) {
1821 newtag += char(0x80 | size);
1822 } else {
1823 newtag += '\x80';
1824 pack_uint(newtag, size);
1826 newtag += encoded_slots_used;
1829 if (pos != end) {
1830 Xapian::termcount doclen;
1831 if (!unpack_uint(&pos, end, &doclen)) {
1832 throw_database_corrupt("doclen", pos);
1834 Xapian::termcount termlist_size;
1835 if (!unpack_uint(&pos, end, &termlist_size)) {
1836 throw_database_corrupt("termlist length", pos);
1838 pack_uint(newtag, termlist_size - 1);
1839 pack_uint(newtag, doclen);
1841 string current_term;
1842 while (pos != end) {
1843 Xapian::termcount current_wdf = 0;
1845 if (!current_term.empty()) {
1846 size_t reuse = static_cast<unsigned char>(*pos++);
1847 newtag += char(reuse);
1849 if (reuse > current_term.size()) {
1850 current_wdf = reuse / (current_term.size() + 1);
1851 reuse = reuse % (current_term.size() + 1);
1853 current_term.resize(reuse);
1855 if (pos == end)
1856 throw_database_corrupt("term", NULL);
1859 size_t append = static_cast<unsigned char>(*pos++);
1860 if (size_t(end - pos) < append)
1861 throw_database_corrupt("term", NULL);
1863 current_term.append(pos, append);
1864 pos += append;
1866 if (current_wdf) {
1867 --current_wdf;
1868 } else {
1869 if (!unpack_uint(&pos, end, &current_wdf)) {
1870 throw_database_corrupt("wdf", pos);
1872 pack_uint(newtag, current_wdf);
1875 newtag += char(append);
1876 newtag.append(current_term.end() - append,
1877 current_term.end());
1880 if (!newtag.empty())
1881 out->add(key, newtag);
1882 if (!next_result) break;
1883 if (next_already_done) goto next_without_next;
1884 } else {
1885 bool compressed = cur.read_tag(true);
1886 out->add(key, cur.current_tag, compressed);
1891 #endif
1895 using namespace HoneyCompact;
1897 void
1898 HoneyDatabase::compact(Xapian::Compactor* compactor,
1899 const char* destdir,
1900 int fd,
1901 int source_backend,
1902 const vector<const Xapian::Database::Internal*>& sources,
1903 const vector<Xapian::docid>& offset,
1904 size_t block_size,
1905 Xapian::Compactor::compaction_level compaction,
1906 unsigned flags,
1907 Xapian::docid last_docid)
1909 struct table_list {
1910 // The "base name" of the table.
1911 char name[9];
1912 // The type.
1913 Honey::table_type type;
1914 // Create tables after position lazily.
1915 bool lazy;
1918 static const table_list tables[] = {
1919 // name type lazy
1920 { "postlist", Honey::POSTLIST, false },
1921 { "docdata", Honey::DOCDATA, true },
1922 { "termlist", Honey::TERMLIST, false },
1923 { "position", Honey::POSITION, true },
1924 { "spelling", Honey::SPELLING, true },
1925 { "synonym", Honey::SYNONYM, true }
1927 const table_list * tables_end = tables +
1928 (sizeof(tables) / sizeof(tables[0]));
1930 const int FLAGS = Xapian::DB_DANGEROUS;
1932 bool single_file = (flags & Xapian::DBCOMPACT_SINGLE_FILE);
1933 bool multipass = (flags & Xapian::DBCOMPACT_MULTIPASS);
1934 if (single_file) {
1935 // FIXME: Support this combination - we need to put temporary files
1936 // somewhere.
1937 multipass = false;
1940 if (single_file) {
1941 for (size_t i = 0; i != sources.size(); ++i) {
1942 bool has_uncommitted_changes;
1943 if (source_backend == Xapian::DB_BACKEND_GLASS) {
1944 auto db = static_cast<const GlassDatabase*>(sources[i]);
1945 has_uncommitted_changes = db->has_uncommitted_changes();
1946 } else {
1947 auto db = static_cast<const HoneyDatabase*>(sources[i]);
1948 has_uncommitted_changes = db->has_uncommitted_changes();
1950 if (has_uncommitted_changes) {
1951 const char * m =
1952 "Can't compact from a WritableDatabase with uncommitted "
1953 "changes - either call commit() first, or create a new "
1954 "Database object from the filename on disk";
1955 throw Xapian::InvalidOperationError(m);
1960 if (block_size < HONEY_MIN_BLOCKSIZE ||
1961 block_size > HONEY_MAX_BLOCKSIZE ||
1962 (block_size & (block_size - 1)) != 0) {
1963 block_size = HONEY_DEFAULT_BLOCKSIZE;
1966 FlintLock lock(destdir ? destdir : "");
1967 if (!single_file) {
1968 string explanation;
1969 FlintLock::reason why = lock.lock(true, false, explanation);
1970 if (why != FlintLock::SUCCESS) {
1971 lock.throw_databaselockerror(why, destdir, explanation);
1975 unique_ptr<HoneyVersion> version_file_out;
1976 if (single_file) {
1977 if (destdir) {
1978 fd = open(destdir, O_RDWR|O_CREAT|O_TRUNC|O_BINARY|O_CLOEXEC, 0666);
1979 if (fd < 0) {
1980 throw Xapian::DatabaseCreateError("open() failed", errno);
1983 version_file_out.reset(new HoneyVersion(fd));
1984 } else {
1985 fd = -1;
1986 version_file_out.reset(new HoneyVersion(destdir));
1989 version_file_out->create(block_size);
1990 for (size_t i = 0; i != sources.size(); ++i) {
1991 if (source_backend == Xapian::DB_BACKEND_GLASS) {
1992 #ifdef DISABLE_GPL_LIBXAPIAN
1993 Assert(false);
1994 #else
1995 auto db = static_cast<const GlassDatabase*>(sources[i]);
1996 auto& v_in = db->version_file;
1997 auto& v_out = version_file_out;
1998 v_out->merge_stats(v_in.get_doccount(),
1999 v_in.get_doclength_lower_bound(),
2000 v_in.get_doclength_upper_bound(),
2001 v_in.get_wdf_upper_bound(),
2002 v_in.get_total_doclen(),
2003 v_in.get_spelling_wordfreq_upper_bound());
2004 #endif
2005 } else {
2006 auto db = static_cast<const HoneyDatabase*>(sources[i]);
2007 version_file_out->merge_stats(db->version_file);
2011 string fl_serialised;
2012 #if 0
2013 if (single_file) {
2014 HoneyFreeList fl;
2015 fl.set_first_unused_block(1); // FIXME: Assumption?
2016 fl.pack(fl_serialised);
2018 #endif
2020 // Set to true if stat() failed (which can happen if the files are > 2GB
2021 // and off_t is 32 bit) or one of the totals overflowed.
2022 bool bad_totals = false;
2023 off_t in_total = 0, out_total = 0;
2025 // FIXME: sort out indentation.
2026 if (source_backend == Xapian::DB_BACKEND_GLASS) {
2027 #ifdef DISABLE_GPL_LIBXAPIAN
2028 throw Xapian::FeatureUnavailableError("Glass backend disabled");
2029 #else
2030 vector<HoneyTable *> tabs;
2031 tabs.reserve(tables_end - tables);
2032 off_t prev_size = block_size;
2033 for (const table_list * t = tables; t < tables_end; ++t) {
2034 // The postlist table requires an N-way merge, adjusting the
2035 // headers of various blocks. The spelling and synonym tables also
2036 // need special handling. The other tables have keys sorted in
2037 // docid order, so we can merge them by simply copying all the keys
2038 // from each source table in turn.
2039 if (compactor)
2040 compactor->set_status(t->name, string());
2042 string dest;
2043 if (!single_file) {
2044 dest = destdir;
2045 dest += '/';
2046 dest += t->name;
2047 dest += '.';
2050 bool output_will_exist = !t->lazy;
2052 // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
2053 // on certain systems).
2054 bool bad_stat = false;
2056 // We can't currently report input sizes if there's a single file DB
2057 // amongst the inputs.
2058 bool single_file_in = false;
2060 off_t in_size = 0;
2062 vector<const GlassTable*> inputs;
2063 inputs.reserve(sources.size());
2064 size_t inputs_present = 0;
2065 for (auto src : sources) {
2066 auto db = static_cast<const GlassDatabase*>(src);
2067 const GlassTable * table;
2068 switch (t->type) {
2069 case Honey::POSTLIST:
2070 table = &(db->postlist_table);
2071 break;
2072 case Honey::DOCDATA:
2073 table = &(db->docdata_table);
2074 break;
2075 case Honey::TERMLIST:
2076 table = &(db->termlist_table);
2077 break;
2078 case Honey::POSITION:
2079 table = &(db->position_table);
2080 break;
2081 case Honey::SPELLING:
2082 table = &(db->spelling_table);
2083 break;
2084 case Honey::SYNONYM:
2085 table = &(db->synonym_table);
2086 break;
2087 default:
2088 Assert(false);
2089 return;
2092 if (db->single_file()) {
2093 if (t->lazy && table->empty()) {
2094 // Essentially doesn't exist.
2095 } else {
2096 // FIXME: Find actual size somehow?
2097 // in_size += table->size() / 1024;
2098 single_file_in = true;
2099 bad_totals = true;
2100 output_will_exist = true;
2101 ++inputs_present;
2103 } else {
2104 off_t db_size = file_size(table->get_path());
2105 if (errno == 0) {
2106 // FIXME: check overflow and set bad_totals
2107 in_total += db_size;
2108 in_size += db_size / 1024;
2109 output_will_exist = true;
2110 ++inputs_present;
2111 } else if (errno != ENOENT) {
2112 // We get ENOENT for an optional table.
2113 bad_totals = bad_stat = true;
2114 output_will_exist = true;
2115 ++inputs_present;
2118 inputs.push_back(table);
2121 // If any inputs lack a termlist table, suppress it in the output.
2122 if (t->type == Honey::TERMLIST && inputs_present != sources.size()) {
2123 if (inputs_present != 0) {
2124 if (compactor) {
2125 string m = str(inputs_present);
2126 m += " of ";
2127 m += str(sources.size());
2128 m += " inputs present, so suppressing output";
2129 compactor->set_status(t->name, m);
2131 continue;
2133 output_will_exist = false;
2136 if (!output_will_exist) {
2137 if (compactor)
2138 compactor->set_status(t->name, "doesn't exist");
2139 continue;
2142 HoneyTable * out;
2143 off_t table_start_offset = -1;
2144 if (single_file) {
2145 if (t == tables) {
2146 // Start first table HONEY_VERSION_MAX_SIZE bytes in to allow
2147 // space for version file. It's tricky to exactly know the
2148 // size of the version file beforehand.
2149 table_start_offset = HONEY_VERSION_MAX_SIZE;
2150 if (lseek(fd, table_start_offset, SEEK_SET) < 0)
2151 throw Xapian::DatabaseError("lseek() failed", errno);
2152 } else {
2153 table_start_offset = lseek(fd, 0, SEEK_CUR);
2155 out = new HoneyTable(t->name, fd, version_file_out->get_offset(),
2156 false, false);
2157 } else {
2158 out = new HoneyTable(t->name, dest, false, t->lazy);
2160 tabs.push_back(out);
2161 Honey::RootInfo * root_info = version_file_out->root_to_set(t->type);
2162 if (single_file) {
2163 root_info->set_free_list(fl_serialised);
2164 root_info->set_offset(table_start_offset);
2165 out->open(FLAGS,
2166 version_file_out->get_root(t->type),
2167 version_file_out->get_revision());
2168 } else {
2169 out->create_and_open(FLAGS, *root_info);
2172 out->set_full_compaction(compaction != compactor->STANDARD);
2173 if (compaction == compactor->FULLER) out->set_max_item_size(1);
2175 switch (t->type) {
2176 case Honey::POSTLIST: {
2177 if (multipass && inputs.size() > 3) {
2178 multimerge_postlists(compactor, out, destdir,
2179 inputs, offset);
2180 } else {
2181 merge_postlists(compactor, out, offset.begin(),
2182 inputs.begin(), inputs.end());
2184 break;
2186 case Honey::SPELLING:
2187 merge_spellings(out, inputs.cbegin(), inputs.cend());
2188 break;
2189 case Honey::SYNONYM:
2190 merge_synonyms(out, inputs.begin(), inputs.end());
2191 break;
2192 case Honey::POSITION:
2193 merge_positions(out, inputs, offset);
2194 break;
2195 default:
2196 // DocData, Termlist
2197 merge_docid_keyed(out, inputs, offset, t->type);
2198 break;
2201 // Commit as revision 1.
2202 out->flush_db();
2203 out->commit(1, root_info);
2204 out->sync();
2205 if (single_file) fl_serialised = root_info->get_free_list();
2207 off_t out_size = 0;
2208 if (!bad_stat && !single_file_in) {
2209 off_t db_size;
2210 if (single_file) {
2211 db_size = file_size(fd);
2212 } else {
2213 db_size = file_size(dest + HONEY_TABLE_EXTENSION);
2215 if (errno == 0) {
2216 if (single_file) {
2217 off_t old_prev_size = max(prev_size, off_t(block_size));
2218 prev_size = db_size;
2219 db_size -= old_prev_size;
2221 // FIXME: check overflow and set bad_totals
2222 out_total += db_size;
2223 out_size = db_size / 1024;
2224 } else if (errno != ENOENT) {
2225 bad_totals = bad_stat = true;
2228 if (bad_stat) {
2229 if (compactor)
2230 compactor->set_status(t->name,
2231 "Done (couldn't stat all the DB files)");
2232 } else if (single_file_in) {
2233 if (compactor)
2234 compactor->set_status(t->name,
2235 "Done (table sizes unknown for single "
2236 "file DB input)");
2237 } else {
2238 string status;
2239 if (out_size == in_size) {
2240 status = "Size unchanged (";
2241 } else {
2242 off_t delta;
2243 if (out_size < in_size) {
2244 delta = in_size - out_size;
2245 status = "Reduced by ";
2246 } else {
2247 delta = out_size - in_size;
2248 status = "INCREASED by ";
2250 if (in_size) {
2251 status += str(100 * delta / in_size);
2252 status += "% ";
2254 status += str(delta);
2255 status += "K (";
2256 status += str(in_size);
2257 status += "K -> ";
2259 status += str(out_size);
2260 status += "K)";
2261 if (compactor)
2262 compactor->set_status(t->name, status);
2266 // If compacting to a single file output and all the tables are empty, pad
2267 // the output so that it isn't mistaken for a stub database when we try to
2268 // open it. For this it needs to be a multiple of 2KB in size.
2269 if (single_file && prev_size < off_t(block_size)) {
2270 #ifdef HAVE_FTRUNCATE
2271 if (ftruncate(fd, block_size) < 0) {
2272 throw Xapian::DatabaseError("Failed to set size of output "
2273 "database", errno);
2275 #else
2276 const off_t off = block_size - 1;
2277 if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
2278 throw Xapian::DatabaseError("Failed to set size of output "
2279 "database", errno);
2281 #endif
2284 if (single_file) {
2285 if (lseek(fd, version_file_out->get_offset(), SEEK_SET) == -1) {
2286 throw Xapian::DatabaseError("lseek() failed", errno);
2289 version_file_out->set_last_docid(last_docid);
2290 string tmpfile = version_file_out->write(1, FLAGS);
2291 if (single_file) {
2292 off_t version_file_size = lseek(fd, 0, SEEK_CUR);
2293 if (version_file_size < 0) {
2294 throw Xapian::DatabaseError("lseek() failed", errno);
2296 if (version_file_size > HONEY_VERSION_MAX_SIZE) {
2297 throw Xapian::DatabaseError("Didn't allow enough space for "
2298 "version file data");
2301 for (unsigned j = 0; j != tabs.size(); ++j) {
2302 tabs[j]->sync();
2304 // Commit with revision 1.
2305 version_file_out->sync(tmpfile, 1, FLAGS);
2306 for (unsigned j = 0; j != tabs.size(); ++j) {
2307 delete tabs[j];
2309 #endif
2310 } else {
2311 vector<HoneyTable *> tabs;
2312 tabs.reserve(tables_end - tables);
2313 off_t prev_size = block_size;
2314 for (const table_list * t = tables; t < tables_end; ++t) {
2315 // The postlist table requires an N-way merge, adjusting the
2316 // headers of various blocks. The spelling and synonym tables also
2317 // need special handling. The other tables have keys sorted in
2318 // docid order, so we can merge them by simply copying all the keys
2319 // from each source table in turn.
2320 if (compactor)
2321 compactor->set_status(t->name, string());
2323 string dest;
2324 if (!single_file) {
2325 dest = destdir;
2326 dest += '/';
2327 dest += t->name;
2328 dest += '.';
2331 bool output_will_exist = !t->lazy;
2333 // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
2334 // on certain systems).
2335 bool bad_stat = false;
2337 // We can't currently report input sizes if there's a single file DB
2338 // amongst the inputs.
2339 bool single_file_in = false;
2341 off_t in_size = 0;
2343 vector<const HoneyTable*> inputs;
2344 inputs.reserve(sources.size());
2345 size_t inputs_present = 0;
2346 for (auto src : sources) {
2347 auto db = static_cast<const HoneyDatabase*>(src);
2348 const HoneyTable * table;
2349 switch (t->type) {
2350 case Honey::POSTLIST:
2351 table = &(db->postlist_table);
2352 break;
2353 case Honey::DOCDATA:
2354 table = &(db->docdata_table);
2355 break;
2356 case Honey::TERMLIST:
2357 table = &(db->termlist_table);
2358 break;
2359 case Honey::POSITION:
2360 table = &(db->position_table);
2361 break;
2362 case Honey::SPELLING:
2363 table = &(db->spelling_table);
2364 break;
2365 case Honey::SYNONYM:
2366 table = &(db->synonym_table);
2367 break;
2368 default:
2369 Assert(false);
2370 return;
2373 if (db->single_file()) {
2374 if (t->lazy && table->empty()) {
2375 // Essentially doesn't exist.
2376 } else {
2377 // FIXME: Find actual size somehow?
2378 // in_size += table->size() / 1024;
2379 single_file_in = true;
2380 bad_totals = true;
2381 output_will_exist = true;
2382 ++inputs_present;
2384 } else {
2385 off_t db_size = file_size(table->get_path());
2386 if (errno == 0) {
2387 // FIXME: check overflow and set bad_totals
2388 in_total += db_size;
2389 in_size += db_size / 1024;
2390 output_will_exist = true;
2391 ++inputs_present;
2392 } else if (errno != ENOENT) {
2393 // We get ENOENT for an optional table.
2394 bad_totals = bad_stat = true;
2395 output_will_exist = true;
2396 ++inputs_present;
2399 inputs.push_back(table);
2402 // If any inputs lack a termlist table, suppress it in the output.
2403 if (t->type == Honey::TERMLIST && inputs_present != sources.size()) {
2404 if (inputs_present != 0) {
2405 if (compactor) {
2406 string m = str(inputs_present);
2407 m += " of ";
2408 m += str(sources.size());
2409 m += " inputs present, so suppressing output";
2410 compactor->set_status(t->name, m);
2412 continue;
2414 output_will_exist = false;
2417 if (!output_will_exist) {
2418 if (compactor)
2419 compactor->set_status(t->name, "doesn't exist");
2420 continue;
2423 HoneyTable * out;
2424 off_t table_start_offset = -1;
2425 if (single_file) {
2426 if (t == tables) {
2427 // Start first table HONEY_VERSION_MAX_SIZE bytes in to allow
2428 // space for version file. It's tricky to exactly know the
2429 // size of the version file beforehand.
2430 table_start_offset = HONEY_VERSION_MAX_SIZE;
2431 if (lseek(fd, table_start_offset, SEEK_SET) < 0)
2432 throw Xapian::DatabaseError("lseek() failed", errno);
2433 } else {
2434 table_start_offset = lseek(fd, 0, SEEK_CUR);
2436 out = new HoneyTable(t->name, fd, version_file_out->get_offset(),
2437 false, false);
2438 } else {
2439 out = new HoneyTable(t->name, dest, false, t->lazy);
2441 tabs.push_back(out);
2442 Honey::RootInfo * root_info = version_file_out->root_to_set(t->type);
2443 if (single_file) {
2444 root_info->set_free_list(fl_serialised);
2445 root_info->set_offset(table_start_offset);
2446 out->open(FLAGS,
2447 version_file_out->get_root(t->type),
2448 version_file_out->get_revision());
2449 } else {
2450 out->create_and_open(FLAGS, *root_info);
2453 out->set_full_compaction(compaction != compactor->STANDARD);
2454 if (compaction == compactor->FULLER) out->set_max_item_size(1);
2456 switch (t->type) {
2457 case Honey::POSTLIST: {
2458 if (multipass && inputs.size() > 3) {
2459 multimerge_postlists(compactor, out, destdir,
2460 inputs, offset);
2461 } else {
2462 merge_postlists(compactor, out, offset.begin(),
2463 inputs.begin(), inputs.end());
2465 break;
2467 case Honey::SPELLING:
2468 merge_spellings(out, inputs.begin(), inputs.end());
2469 break;
2470 case Honey::SYNONYM:
2471 merge_synonyms(out, inputs.begin(), inputs.end());
2472 break;
2473 case Honey::POSITION:
2474 merge_positions(out, inputs, offset);
2475 break;
2476 default:
2477 // DocData, Termlist
2478 merge_docid_keyed(out, inputs, offset);
2479 break;
2482 // Commit as revision 1.
2483 out->flush_db();
2484 out->commit(1, root_info);
2485 out->sync();
2486 if (single_file) fl_serialised = root_info->get_free_list();
2488 off_t out_size = 0;
2489 if (!bad_stat && !single_file_in) {
2490 off_t db_size;
2491 if (single_file) {
2492 db_size = file_size(fd);
2493 } else {
2494 db_size = file_size(dest + HONEY_TABLE_EXTENSION);
2496 if (errno == 0) {
2497 if (single_file) {
2498 off_t old_prev_size = max(prev_size, off_t(block_size));
2499 prev_size = db_size;
2500 db_size -= old_prev_size;
2502 // FIXME: check overflow and set bad_totals
2503 out_total += db_size;
2504 out_size = db_size / 1024;
2505 } else if (errno != ENOENT) {
2506 bad_totals = bad_stat = true;
2509 if (bad_stat) {
2510 if (compactor)
2511 compactor->set_status(t->name,
2512 "Done (couldn't stat all the DB files)");
2513 } else if (single_file_in) {
2514 if (compactor)
2515 compactor->set_status(t->name,
2516 "Done (table sizes unknown for single "
2517 "file DB input)");
2518 } else {
2519 string status;
2520 if (out_size == in_size) {
2521 status = "Size unchanged (";
2522 } else {
2523 off_t delta;
2524 if (out_size < in_size) {
2525 delta = in_size - out_size;
2526 status = "Reduced by ";
2527 } else {
2528 delta = out_size - in_size;
2529 status = "INCREASED by ";
2531 if (in_size) {
2532 status += str(100 * delta / in_size);
2533 status += "% ";
2535 status += str(delta);
2536 status += "K (";
2537 status += str(in_size);
2538 status += "K -> ";
2540 status += str(out_size);
2541 status += "K)";
2542 if (compactor)
2543 compactor->set_status(t->name, status);
2547 // If compacting to a single file output and all the tables are empty, pad
2548 // the output so that it isn't mistaken for a stub database when we try to
2549 // open it. For this it needs to be a multiple of 2KB in size.
2550 if (single_file && prev_size < off_t(block_size)) {
2551 #ifdef HAVE_FTRUNCATE
2552 if (ftruncate(fd, block_size) < 0) {
2553 throw Xapian::DatabaseError("Failed to set size of output "
2554 "database", errno);
2556 #else
2557 const off_t off = block_size - 1;
2558 if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
2559 throw Xapian::DatabaseError("Failed to set size of output "
2560 "database", errno);
2562 #endif
2565 if (single_file) {
2566 if (lseek(fd, version_file_out->get_offset(), SEEK_SET) == -1) {
2567 throw Xapian::DatabaseError("lseek() failed", errno);
2570 version_file_out->set_last_docid(last_docid);
2571 string tmpfile = version_file_out->write(1, FLAGS);
2572 for (unsigned j = 0; j != tabs.size(); ++j) {
2573 tabs[j]->sync();
2575 // Commit with revision 1.
2576 version_file_out->sync(tmpfile, 1, FLAGS);
2577 for (unsigned j = 0; j != tabs.size(); ++j) {
2578 delete tabs[j];
2582 if (!single_file) lock.release();
2584 if (!bad_totals && compactor) {
2585 string status;
2586 in_total /= 1024;
2587 out_total /= 1024;
2588 if (out_total == in_total) {
2589 status = "Size unchanged (";
2590 } else {
2591 off_t delta;
2592 if (out_total < in_total) {
2593 delta = in_total - out_total;
2594 status = "Reduced by ";
2595 } else {
2596 delta = out_total - in_total;
2597 status = "INCREASED by ";
2599 if (in_total) {
2600 status += str(100 * delta / in_total);
2601 status += "% ";
2603 status += str(delta);
2604 status += "K (";
2605 status += str(in_total);
2606 status += "K -> ";
2608 status += str(out_total);
2609 status += "K)";
2610 compactor->set_status("Total", status);