[honey] Use pack_uint() for postlist chunk data
[xapian.git] / xapian-core / backends / honey / honey_compact.cc
blobdb55de7448f498a6349b8c2acf2f9ef4f4f053ef
1 /** @file honey_compact.cc
2 * @brief Compact a honey database, or merge and compact several.
3 */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2018 Olly Betts
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation; either version 2 of the
9 * License, or (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
19 * USA
22 #include <config.h>
24 #include "xapian/compactor.h"
25 #include "xapian/constants.h"
26 #include "xapian/error.h"
27 #include "xapian/types.h"
29 #include <algorithm>
30 #include <iostream>
31 #include <memory>
32 #include <queue>
33 #include <type_traits>
35 #include <cstdio>
36 #include <sys/uio.h>
38 #include "safeerrno.h"
40 #include "backends/flint_lock.h"
41 #include "compression_stream.h"
42 #include "honey_cursor.h"
43 #include "honey_database.h"
44 #include "honey_defs.h"
45 #include "honey_postlist_encodings.h"
46 #include "honey_table.h"
47 #include "honey_version.h"
48 #include "filetests.h"
49 #include "internaltypes.h"
50 #include "pack.h"
51 #include "backends/valuestats.h"
52 #include "wordaccess.h"
54 #include "../byte_length_strings.h"
55 #include "../prefix_compressed_strings.h"
57 #ifndef DISABLE_GPL_LIBXAPIAN
58 # include "../glass/glass_database.h"
59 # include "../glass/glass_table.h"
60 #endif
62 using namespace std;
64 #ifndef DISABLE_GPL_LIBXAPIAN
65 [[noreturn]]
66 static void
67 throw_database_corrupt(const char* item, const char* pos)
69 string message;
70 if (pos != NULL) {
71 message = "Value overflow unpacking termlist: ";
72 } else {
73 message = "Out of data unpacking termlist: ";
75 message += item;
76 throw Xapian::DatabaseCorruptError(message);
79 namespace GlassCompact {
81 static inline bool
82 is_user_metadata_key(const string & key)
84 return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
87 static inline bool
88 is_valuestats_key(const string & key)
90 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
93 static inline bool
94 is_valuechunk_key(const string & key)
96 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
99 static inline bool
100 is_doclenchunk_key(const string & key)
102 return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
106 #endif
108 inline static bool
109 termlist_key_is_values_used(const string& key)
111 const char* p = key.data();
112 const char* e = p + key.size();
113 Xapian::docid did;
114 if (unpack_uint_preserving_sort(&p, e, &did)) {
115 if (p == e)
116 return false;
117 if (e - p == 1 && *p == '\0')
118 return true;
120 throw Xapian::DatabaseCorruptError("termlist key format");
123 // Put all the helpers in a namespace to avoid symbols colliding with those of
124 // the same name in other flint-derived backends.
125 namespace HoneyCompact {
127 enum {
128 KEY_USER_METADATA = 0x00,
129 KEY_VALUE_STATS = 0x01,
130 KEY_VALUE_CHUNK = 0xd8,
131 KEY_DOCLEN_CHUNK = 0xe0,
132 KEY_POSTING_CHUNK = 0xff
135 /// Return one of the KEY_* constants, or a different value for an invalid key.
136 static inline int
137 key_type(const string& key)
139 if (key[0] != '\0')
140 return KEY_POSTING_CHUNK;
142 if (key.size() <= 1)
143 return -1;
145 switch (static_cast<unsigned char>(key[1])) {
146 case 0x01: case 0x02: case 0x03: case 0x04:
147 return KEY_VALUE_STATS;
150 // If key[1] is \xff then this correctly returns KEY_POSTING_CHUNK.
151 return static_cast<unsigned char>(key[1]);
154 class BufferedFile {
155 int fd = -1;
156 bool read_only = true;
157 mutable size_t buf_end = 0;
158 mutable char buf[4096];
160 public:
161 BufferedFile() { }
163 ~BufferedFile() {
164 if (fd >= 0) close(fd);
167 bool open(const std::string& path, bool read_only_) {
168 if (fd >= 0) close(fd);
169 read_only = read_only_;
170 if (read_only) {
171 // FIXME: add new io_open_stream_rd() etc?
172 fd = io_open_block_rd(path);
173 } else {
174 fd = io_open_block_wr(path, true); // FIXME: Always create anew for now...
176 return fd >= 0;
179 off_t get_pos() const {
180 return read_only ?
181 lseek(fd, 0, SEEK_CUR) - buf_end :
182 lseek(fd, 0, SEEK_CUR) + buf_end;
185 bool empty() const {
186 if (buf_end) return false;
187 struct stat sbuf;
188 if (fd == -1 || fstat(fd, &sbuf) < 0) return true;
189 return (sbuf.st_size == 0);
192 void write(unsigned char ch) {
193 if (buf_end == sizeof(buf)) {
194 // writev()?
195 if (::write(fd, buf, buf_end)) {
196 // FIXME: retry short write
198 buf_end = 0;
200 buf[buf_end++] = ch;
203 void write(const char* p, size_t len) {
204 if (buf_end + len <= sizeof(buf)) {
205 memcpy(buf + buf_end, p, len);
206 buf_end += len;
207 return;
210 while (true) {
211 struct iovec iov[2];
212 iov[0].iov_base = buf;
213 iov[0].iov_len = buf_end;
214 iov[1].iov_base = const_cast<char*>(p);
215 iov[1].iov_len = len;
216 ssize_t n_ = writev(fd, iov, 2);
217 if (n_ < 0) abort();
218 size_t n = n_;
219 if (n == buf_end + len) {
220 // Wrote everything.
221 buf_end = 0;
222 return;
224 if (n >= buf_end) {
225 // Wrote all of buf.
226 n -= buf_end;
227 p += n;
228 len -= n;
229 if (::write(fd, p, len)) {
230 // FIXME: retry short write
232 buf_end = 0;
233 return;
235 buf_end -= n;
236 memmove(buf, buf + n, buf_end);
240 int read() const {
241 if (buf_end == 0) {
242 ssize_t r = ::read(fd, buf, sizeof(buf));
243 if (r == 0) return EOF;
244 // FIXME: retry on EINTR
245 if (r < 0) abort();
246 buf_end = r;
248 return static_cast<unsigned char>(buf[sizeof(buf) - buf_end--]);
251 bool read(char* p, size_t len) const {
252 if (len <= buf_end) {
253 memcpy(p, buf + sizeof(buf) - buf_end, len);
254 buf_end -= len;
255 return true;
257 memcpy(p, buf + sizeof(buf) - buf_end, buf_end);
258 p += buf_end;
259 len -= buf_end;
260 buf_end = 0;
261 // FIXME: refill buffer if len < sizeof(buf)
262 return ::read(fd, p, len) == ssize_t(len);
265 void flush() {
266 if (!read_only && buf_end) {
267 if (::write(fd, buf, buf_end)) {
268 // FIXME: retry short write
270 buf_end = 0;
274 void sync() {
275 io_sync(fd);
278 void rewind() {
279 read_only = true;
280 lseek(fd, 0, SEEK_SET);
281 buf_end = 0;
285 template<typename T> class PostlistCursor;
287 #ifndef DISABLE_GPL_LIBXAPIAN
288 // Convert glass to honey.
289 template<>
290 class PostlistCursor<const GlassTable&> : private GlassCursor {
291 Xapian::docid offset;
293 public:
294 string key, tag;
295 Xapian::docid firstdid;
296 Xapian::docid chunk_lastdid;
297 Xapian::termcount tf, cf;
298 Xapian::termcount first_wdf;
300 PostlistCursor(const GlassTable *in, Xapian::docid offset_)
301 : GlassCursor(in), offset(offset_), firstdid(0)
303 find_entry(string());
304 next();
307 bool next() {
308 if (!GlassCursor::next()) return false;
309 // We put all chunks into the non-initial chunk form here, then fix up
310 // the first chunk for each term in the merged database as we merge.
311 read_tag();
312 key = current_key;
313 tag = current_tag;
314 tf = cf = 0;
315 if (GlassCompact::is_user_metadata_key(key)) {
316 key[1] = KEY_USER_METADATA;
317 return true;
319 if (GlassCompact::is_valuestats_key(key)) {
320 // Adjust key.
321 const char * p = key.data();
322 const char * end = p + key.length();
323 p += 2;
324 Xapian::valueno slot;
325 if (!unpack_uint_last(&p, end, &slot))
326 throw Xapian::DatabaseCorruptError("bad value stats key");
327 key = pack_honey_valuestats_key(slot);
328 return true;
330 if (GlassCompact::is_valuechunk_key(key)) {
331 const char * p = key.data();
332 const char * end = p + key.length();
333 p += 2;
334 Xapian::valueno slot;
335 if (!unpack_uint(&p, end, &slot))
336 throw Xapian::DatabaseCorruptError("bad value key");
337 Xapian::docid did;
338 if (!unpack_uint_preserving_sort(&p, end, &did))
339 throw Xapian::DatabaseCorruptError("bad value key");
340 did += offset;
342 key.assign("\0\xd8", 2);
343 pack_uint(key, slot);
344 pack_uint_preserving_sort(key, did);
345 return true;
348 if (GlassCompact::is_doclenchunk_key(key)) {
349 const char * d = key.data();
350 const char * e = d + key.size();
351 d += 2;
353 if (d == e) {
354 // This is an initial chunk, so adjust tag header.
355 d = tag.data();
356 e = d + tag.size();
357 if (!unpack_uint(&d, e, &tf) ||
358 !unpack_uint(&d, e, &cf) ||
359 !unpack_uint(&d, e, &firstdid)) {
360 throw Xapian::DatabaseCorruptError("Bad postlist key");
362 ++firstdid;
363 tag.erase(0, d - tag.data());
364 } else {
365 // Not an initial chunk, so adjust key.
366 size_t tmp = d - key.data();
367 if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
368 throw Xapian::DatabaseCorruptError("Bad postlist key");
369 key.erase(tmp);
371 firstdid += offset;
373 d = tag.data();
374 e = d + tag.size();
376 // Convert doclen chunk to honey format.
377 string newtag;
379 // Skip the "last chunk" flag and increase_to_last.
380 if (d == e)
381 throw Xapian::DatabaseCorruptError("No last chunk flag in glass docdata chunk");
382 ++d;
383 Xapian::docid increase_to_last;
384 if (!unpack_uint(&d, e, &increase_to_last))
385 throw Xapian::DatabaseCorruptError("Decoding last docid delta in glass docdata chunk");
387 while (true) {
388 Xapian::termcount doclen;
389 if (!unpack_uint(&d, e, &doclen))
390 throw Xapian::DatabaseCorruptError("Decoding doclen in glass docdata chunk");
391 Assert(doclen != 0xffffffff);
392 unsigned char buf[4];
393 unaligned_write4(buf, doclen);
394 newtag.append(reinterpret_cast<char*>(buf), 4);
395 if (d == e)
396 break;
397 Xapian::docid gap_size;
398 if (!unpack_uint(&d, e, &gap_size))
399 throw Xapian::DatabaseCorruptError("Decoding docid gap_size in glass docdata chunk");
400 // FIXME: Split chunk if the gap_size is at all large.
401 newtag.append(4 * gap_size, '\xff');
404 Assert(!startswith(newtag, "\xff\xff\xff\xff"));
405 Assert(!endswith(newtag, "\xff\xff\xff\xff"));
407 swap(tag, newtag);
409 return true;
412 // Adjust key if this is *NOT* an initial chunk.
413 // key is: pack_string_preserving_sort(key, tname)
414 // plus optionally: pack_uint_preserving_sort(key, did)
415 const char * d = key.data();
416 const char * e = d + key.size();
417 string tname;
418 if (!unpack_string_preserving_sort(&d, e, tname))
419 throw Xapian::DatabaseCorruptError("Bad postlist key");
421 if (d == e) {
422 // This is an initial chunk for a term, so adjust tag header.
423 d = tag.data();
424 e = d + tag.size();
425 if (!unpack_uint(&d, e, &tf) ||
426 !unpack_uint(&d, e, &cf) ||
427 !unpack_uint(&d, e, &firstdid)) {
428 throw Xapian::DatabaseCorruptError("Bad postlist key");
430 ++firstdid;
431 tag.erase(0, d - tag.data());
432 } else {
433 // Not an initial chunk, so adjust key.
434 size_t tmp = d - key.data();
435 if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
436 throw Xapian::DatabaseCorruptError("Bad postlist key");
437 key.erase(tmp - 1);
439 firstdid += offset;
441 d = tag.data();
442 e = d + tag.size();
444 // Convert posting chunk to honey format, but without any header.
445 string newtag;
447 // Skip the "last chunk" flag; decode increase_to_last.
448 if (d == e)
449 throw Xapian::DatabaseCorruptError("No last chunk flag in glass posting chunk");
450 ++d;
451 Xapian::docid increase_to_last;
452 if (!unpack_uint(&d, e, &increase_to_last))
453 throw Xapian::DatabaseCorruptError("Decoding last docid delta in glass posting chunk");
454 chunk_lastdid = firstdid + increase_to_last;
455 if (!unpack_uint(&d, e, &first_wdf))
456 throw Xapian::DatabaseCorruptError("Decoding first wdf in glass posting chunk");
458 while (d != e) {
459 Xapian::docid delta;
460 if (!unpack_uint(&d, e, &delta))
461 throw Xapian::DatabaseCorruptError("Decoding docid delta in glass posting chunk");
462 pack_uint(newtag, delta);
463 Xapian::termcount wdf;
464 if (!unpack_uint(&d, e, &wdf))
465 throw Xapian::DatabaseCorruptError("Decoding wdf in glass posting chunk");
466 pack_uint(newtag, wdf);
469 swap(tag, newtag);
471 return true;
474 #endif
476 template<>
477 class PostlistCursor<const HoneyTable&> : private HoneyCursor {
478 Xapian::docid offset;
480 public:
481 string key, tag;
482 Xapian::docid firstdid;
483 Xapian::docid chunk_lastdid;
484 Xapian::termcount tf, cf;
485 Xapian::termcount first_wdf;
487 PostlistCursor(const HoneyTable *in, Xapian::docid offset_)
488 : HoneyCursor(in), offset(offset_), firstdid(0)
490 find_entry(string());
491 next();
494 bool next() {
495 if (!HoneyCursor::next()) return false;
496 // We put all chunks into the non-initial chunk form here, then fix up
497 // the first chunk for each term in the merged database as we merge.
498 read_tag();
499 key = current_key;
500 tag = current_tag;
501 tf = 0;
502 switch (key_type(key)) {
503 case KEY_USER_METADATA:
504 case KEY_VALUE_STATS:
505 return true;
506 case KEY_VALUE_CHUNK: {
507 const char * p = key.data();
508 const char * end = p + key.length();
509 p += 2;
510 Xapian::valueno slot;
511 if (!unpack_uint(&p, end, &slot))
512 throw Xapian::DatabaseCorruptError("bad value key");
513 Xapian::docid did;
514 if (!unpack_uint_preserving_sort(&p, end, &did))
515 throw Xapian::DatabaseCorruptError("bad value key");
516 did += offset;
518 key.assign("\0\xd8", 2);
519 pack_uint(key, slot);
520 pack_uint_preserving_sort(key, did);
521 return true;
523 case KEY_DOCLEN_CHUNK: {
524 const char * p = key.data();
525 const char * end = p + key.length();
526 p += 2;
527 Xapian::docid did = 1;
528 if (p != end &&
529 (!unpack_uint_preserving_sort(&p, end, &did) || p != end)) {
530 throw Xapian::DatabaseCorruptError("Bad doclen key");
532 did += offset;
534 key.erase(2);
535 if (did != 1) {
536 pack_uint_preserving_sort(key, did);
538 return true;
540 case KEY_POSTING_CHUNK:
541 break;
542 default:
543 throw Xapian::DatabaseCorruptError("Bad postlist table key type");
546 // Adjust key if this is *NOT* an initial chunk.
547 // key is: pack_string_preserving_sort(key, term)
548 // plus optionally: pack_uint_preserving_sort(key, did)
549 const char * d = key.data();
550 const char * e = d + key.size();
551 string term;
552 if (!unpack_string_preserving_sort(&d, e, term))
553 throw Xapian::DatabaseCorruptError("Bad postlist key");
555 if (d == e) {
556 // This is an initial chunk for a term, so remove tag header.
557 d = tag.data();
558 e = d + tag.size();
560 Xapian::docid lastdid;
561 if (!decode_initial_chunk_header(&d, e, tf, cf,
562 firstdid, lastdid, chunk_lastdid,
563 first_wdf)) {
564 throw Xapian::DatabaseCorruptError("Bad postlist initial chunk header");
566 // Ignore lastdid - we'll need to recalculate it (at least when
567 // merging, and for simplicity we always do).
568 (void)lastdid;
569 tag.erase(0, d - tag.data());
570 } else {
571 // Not an initial chunk, so adjust key and remove tag header.
572 size_t tmp = d - key.data();
573 if (!unpack_uint_preserving_sort(&d, e, &chunk_lastdid) ||
574 d != e) {
575 throw Xapian::DatabaseCorruptError("Bad postlist key");
577 // -1 to remove the terminating zero byte too.
578 key.erase(tmp - 1);
580 d = tag.data();
581 e = d + tag.size();
583 if (cf) {
584 if (!decode_delta_chunk_header(&d, e, chunk_lastdid, firstdid,
585 first_wdf)) {
586 throw Xapian::DatabaseCorruptError("Bad postlist delta chunk header");
588 tag.erase(0, d - tag.data());
589 } else {
590 if (!decode_delta_chunk_header_bool(&d, e, chunk_lastdid, firstdid)) {
591 throw Xapian::DatabaseCorruptError("Bad postlist delta chunk header");
593 first_wdf = 0;
594 // Splice in a 0 wdf value after each docid delta.
595 string newtag;
596 for (size_t i = d - tag.data(); i < tag.size(); i += 4) {
597 newtag.append(tag, i, 4);
598 newtag.append(4, '\0');
600 tag = std::move(newtag);
603 firstdid += offset;
604 chunk_lastdid += offset;
605 return true;
609 template<>
610 class PostlistCursor<HoneyTable&> : public PostlistCursor<const HoneyTable&> {
611 public:
612 PostlistCursor(HoneyTable *in, Xapian::docid offset_)
613 : PostlistCursor<const HoneyTable&>(in, offset_) {}
616 template<typename T>
617 class PostlistCursorGt {
618 public:
619 /** Return true if and only if a's key is strictly greater than b's key.
621 bool operator()(const T* a, const T* b) const {
622 if (a->key > b->key) return true;
623 if (a->key != b->key) return false;
624 return (a->firstdid > b->firstdid);
628 static string
629 encode_valuestats(Xapian::doccount freq,
630 const string & lbound, const string & ubound)
632 string value;
633 pack_uint(value, freq);
634 pack_string(value, lbound);
635 // We don't store or count empty values, so neither of the bounds
636 // can be empty. So we can safely store an empty upper bound when
637 // the bounds are equal.
638 if (lbound != ubound) value += ubound;
639 return value;
642 // U : vector<HoneyTable*>::const_iterator
643 template<typename T, typename U> void
644 merge_postlists(Xapian::Compactor * compactor,
645 T * out, vector<Xapian::docid>::const_iterator offset,
646 U b, U e)
648 typedef decltype(**b) table_type; // E.g. HoneyTable
649 typedef PostlistCursor<table_type> cursor_type;
650 typedef PostlistCursorGt<cursor_type> gt_type;
651 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
652 for ( ; b != e; ++b, ++offset) {
653 auto in = *b;
654 if (in->empty()) {
655 // Skip empty tables.
656 continue;
659 pq.push(new cursor_type(in, *offset));
662 string last_key;
664 // Merge user metadata.
665 vector<string> tags;
666 while (!pq.empty()) {
667 cursor_type * cur = pq.top();
668 const string& key = cur->key;
669 if (key_type(key) != KEY_USER_METADATA) break;
671 if (key != last_key) {
672 if (!tags.empty()) {
673 if (tags.size() > 1 && compactor) {
674 Assert(!last_key.empty());
675 // FIXME: It would be better to merge all duplicates
676 // for a key in one call, but currently we don't in
677 // multipass mode.
678 const string & resolved_tag =
679 compactor->resolve_duplicate_metadata(last_key,
680 tags.size(),
681 &tags[0]);
682 out->add(last_key, resolved_tag);
683 } else {
684 Assert(!last_key.empty());
685 out->add(last_key, tags[0]);
687 tags.resize(0);
689 last_key = key;
691 tags.push_back(cur->tag);
693 pq.pop();
694 if (cur->next()) {
695 pq.push(cur);
696 } else {
697 delete cur;
700 if (!tags.empty()) {
701 if (tags.size() > 1 && compactor) {
702 Assert(!last_key.empty());
703 const string & resolved_tag =
704 compactor->resolve_duplicate_metadata(last_key,
705 tags.size(),
706 &tags[0]);
707 out->add(last_key, resolved_tag);
708 } else {
709 Assert(!last_key.empty());
710 out->add(last_key, tags[0]);
716 // Merge valuestats.
717 Xapian::doccount freq = 0;
718 string lbound, ubound;
720 while (!pq.empty()) {
721 cursor_type * cur = pq.top();
722 const string& key = cur->key;
723 if (key_type(key) != KEY_VALUE_STATS) break;
724 if (key != last_key) {
725 // For the first valuestats key, last_key will be the previous
726 // key we wrote, which we don't want to overwrite. This is the
727 // only time that freq will be 0, so check that.
728 if (freq) {
729 out->add(last_key, encode_valuestats(freq, lbound, ubound));
730 freq = 0;
732 last_key = key;
735 const string & tag = cur->tag;
737 const char * pos = tag.data();
738 const char * end = pos + tag.size();
740 Xapian::doccount f;
741 string l, u;
742 if (!unpack_uint(&pos, end, &f)) {
743 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
744 throw Xapian::RangeError("Frequency statistic in value table is too large");
746 if (!unpack_string(&pos, end, l)) {
747 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
748 throw Xapian::RangeError("Lower bound in value table is too large");
750 size_t len = end - pos;
751 if (len == 0) {
752 u = l;
753 } else {
754 u.assign(pos, len);
756 if (freq == 0) {
757 freq = f;
758 lbound = l;
759 ubound = u;
760 } else {
761 freq += f;
762 if (l < lbound) lbound = l;
763 if (u > ubound) ubound = u;
766 pq.pop();
767 if (cur->next()) {
768 pq.push(cur);
769 } else {
770 delete cur;
774 if (freq) {
775 out->add(last_key, encode_valuestats(freq, lbound, ubound));
779 // Merge valuestream chunks.
780 while (!pq.empty()) {
781 cursor_type * cur = pq.top();
782 const string & key = cur->key;
783 if (key_type(key) != KEY_VALUE_CHUNK) break;
784 out->add(key, cur->tag);
785 pq.pop();
786 if (cur->next()) {
787 pq.push(cur);
788 } else {
789 delete cur;
793 // Merge doclen chunks.
794 while (!pq.empty()) {
795 cursor_type * cur = pq.top();
796 string & key = cur->key;
797 if (key_type(key) != KEY_DOCLEN_CHUNK) break;
798 if (cur->firstdid != 1)
799 pack_uint_preserving_sort(key, cur->firstdid);
800 out->add(key, cur->tag);
801 pq.pop();
802 if (cur->next()) {
803 pq.push(cur);
804 } else {
805 delete cur;
809 Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
811 struct HoneyPostListChunk {
812 Xapian::docid first, last;
813 Xapian::termcount first_wdf;
814 string data;
816 HoneyPostListChunk(Xapian::docid first_,
817 Xapian::docid last_,
818 Xapian::termcount first_wdf_,
819 string&& data_)
820 : first(first_),
821 last(last_),
822 first_wdf(first_wdf_),
823 data(data_) {}
825 vector<HoneyPostListChunk> tags;
826 while (true) {
827 cursor_type * cur = NULL;
828 if (!pq.empty()) {
829 cur = pq.top();
830 pq.pop();
832 Assert(cur == NULL || key_type(cur->key) == KEY_POSTING_CHUNK);
833 if (cur == NULL || cur->key != last_key) {
834 if (!tags.empty()) {
835 Xapian::termcount first_wdf = tags[0].first_wdf;
836 Xapian::docid chunk_lastdid = tags[0].last;
837 Xapian::docid last_did = tags.back().last;
839 string first_tag;
840 encode_initial_chunk_header(tf, cf, tags[0].first, last_did,
841 chunk_lastdid,
842 first_wdf, first_tag);
843 if (tf > 2) {
844 first_tag += tags[0].data;
846 out->add(last_key, first_tag);
848 // If tf == 2, the data could be split over two tags when
849 // merging, but we only output an initial tag in this case.
850 if (tf > 2 && tags.size() > 1) {
851 string term;
852 const char* p = last_key.data();
853 const char* end = p + last_key.size();
854 if (!unpack_string_preserving_sort(&p, end, term) ||
855 p != end) {
856 throw Xapian::DatabaseCorruptError("Bad postlist "
857 "chunk key");
860 auto i = tags.begin();
861 while (++i != tags.end()) {
862 last_did = i->last;
863 const string& key = pack_honey_postlist_key(term,
864 last_did);
865 string tag;
866 if (cf) {
867 encode_delta_chunk_header(i->first,
868 last_did,
869 i->first_wdf,
870 tag);
871 tag += i->data;
872 } else {
873 encode_delta_chunk_header_bool(i->first,
874 last_did,
875 tag);
876 const char* pos = i->data.data();
877 const char* pos_end = pos + i->data.size();
878 while (pos != pos_end) {
879 Xapian::docid delta;
880 if (!unpack_uint(&pos, pos_end, &delta))
881 throw Xapian::DatabaseCorruptError("Decoding docid delta");
882 pack_uint(tag, delta);
883 Xapian::termcount wdf;
884 if (!unpack_uint(&pos, pos_end, &wdf))
885 throw Xapian::DatabaseCorruptError("Decoding wdf");
886 // Only copy over the docid deltas, not the wdfs.
887 (void)wdf;
891 out->add(key, tag);
895 tags.clear();
896 if (cur == NULL) break;
897 tf = cf = 0;
898 last_key = cur->key;
900 if (cur->tf) {
901 tf += cur->tf;
902 cf += cur->cf;
904 tags.push_back(HoneyPostListChunk(cur->firstdid,
905 cur->chunk_lastdid,
906 cur->first_wdf,
907 std::move(cur->tag)));
908 if (cur->next()) {
909 pq.push(cur);
910 } else {
911 delete cur;
916 template<typename T> struct MergeCursor;
918 #ifndef DISABLE_GPL_LIBXAPIAN
919 template<>
920 struct MergeCursor<const GlassTable&> : public GlassCursor {
921 explicit MergeCursor(const GlassTable *in) : GlassCursor(in) {
922 find_entry(string());
923 next();
926 #endif
928 template<>
929 struct MergeCursor<const HoneyTable&> : public HoneyCursor {
930 explicit MergeCursor(const HoneyTable *in) : HoneyCursor(in) {
931 find_entry(string());
932 next();
936 template<>
937 struct MergeCursor<HoneyTable&> {
938 HoneyTable* table;
939 string current_key, current_tag;
940 bool current_compressed;
941 mutable CompressionStream comp_stream;
943 explicit MergeCursor(HoneyTable *in) : table(in), comp_stream(Z_DEFAULT_STRATEGY) {
944 next();
947 bool next() {
948 return table->read_item(current_key, current_tag, current_compressed);
951 bool read_tag(bool keep_compressed) {
952 if (!keep_compressed && current_compressed) {
953 // Need to decompress.
954 comp_stream.decompress_start();
955 string new_tag;
956 if (!comp_stream.decompress_chunk(current_tag.data(), current_tag.size(), new_tag)) {
957 // Decompression didn't complete.
958 abort();
960 swap(current_tag, new_tag);
961 current_compressed = false;
963 return current_compressed;
967 template<typename T>
968 struct CursorGt {
969 /// Return true if and only if a's key is strictly greater than b's key.
970 bool operator()(const T* a, const T* b) const {
971 if (b->after_end()) return false;
972 if (a->after_end()) return true;
973 return (a->current_key > b->current_key);
977 // U : vector<HoneyTable*>::const_iterator
978 template<typename T, typename U> void
979 merge_spellings(T* out, U b, U e)
981 typedef decltype(**b) table_type; // E.g. HoneyTable
982 typedef MergeCursor<table_type> cursor_type;
983 typedef CursorGt<cursor_type> gt_type;
984 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
985 for ( ; b != e; ++b) {
986 auto in = *b;
987 if (!in->empty()) {
988 pq.push(new cursor_type(in));
992 while (!pq.empty()) {
993 cursor_type * cur = pq.top();
994 pq.pop();
996 string key = cur->current_key;
997 if (pq.empty() || pq.top()->current_key > key) {
998 // No need to merge the tags, just copy the (possibly compressed)
999 // tag value.
1000 bool compressed = cur->read_tag(true);
1001 out->add(key, cur->current_tag, compressed);
1002 if (cur->next()) {
1003 pq.push(cur);
1004 } else {
1005 delete cur;
1007 continue;
1010 // Merge tag values with the same key:
1011 string tag;
1012 if (key[0] != 'W') {
1013 // We just want the union of words, so copy over the first instance
1014 // and skip any identical ones.
1015 priority_queue<PrefixCompressedStringItor *,
1016 vector<PrefixCompressedStringItor *>,
1017 PrefixCompressedStringItorGt> pqtag;
1018 // Stick all the cursor_type pointers in a vector because their
1019 // current_tag members must remain valid while we're merging their
1020 // tags, but we need to call next() on them all afterwards.
1021 vector<cursor_type *> vec;
1022 vec.reserve(pq.size());
1024 while (true) {
1025 cur->read_tag();
1026 pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
1027 vec.push_back(cur);
1028 if (pq.empty() || pq.top()->current_key != key) break;
1029 cur = pq.top();
1030 pq.pop();
1033 PrefixCompressedStringWriter wr(tag);
1034 string lastword;
1035 while (!pqtag.empty()) {
1036 PrefixCompressedStringItor * it = pqtag.top();
1037 pqtag.pop();
1038 string word = **it;
1039 if (word != lastword) {
1040 lastword = word;
1041 wr.append(lastword);
1043 ++*it;
1044 if (!it->at_end()) {
1045 pqtag.push(it);
1046 } else {
1047 delete it;
1051 for (auto i : vec) {
1052 cur = i;
1053 if (cur->next()) {
1054 pq.push(cur);
1055 } else {
1056 delete cur;
1059 } else {
1060 // We want to sum the frequencies from tags for the same key.
1061 Xapian::termcount tot_freq = 0;
1062 while (true) {
1063 cur->read_tag();
1064 Xapian::termcount freq;
1065 const char * p = cur->current_tag.data();
1066 const char * end = p + cur->current_tag.size();
1067 if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
1068 throw Xapian::DatabaseCorruptError("Bad spelling word freq");
1070 tot_freq += freq;
1071 if (cur->next()) {
1072 pq.push(cur);
1073 } else {
1074 delete cur;
1076 if (pq.empty() || pq.top()->current_key != key) break;
1077 cur = pq.top();
1078 pq.pop();
1080 tag.resize(0);
1081 pack_uint_last(tag, tot_freq);
1083 out->add(key, tag);
1087 // U : vector<HoneyTable*>::const_iterator
1088 template<typename T, typename U> void
1089 merge_synonyms(T* out, U b, U e)
1091 typedef decltype(**b) table_type; // E.g. HoneyTable
1092 typedef MergeCursor<table_type> cursor_type;
1093 typedef CursorGt<cursor_type> gt_type;
1094 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
1095 for ( ; b != e; ++b) {
1096 auto in = *b;
1097 if (!in->empty()) {
1098 pq.push(new cursor_type(in));
1102 while (!pq.empty()) {
1103 cursor_type * cur = pq.top();
1104 pq.pop();
1106 string key = cur->current_key;
1107 if (pq.empty() || pq.top()->current_key > key) {
1108 // No need to merge the tags, just copy the (possibly compressed)
1109 // tag value.
1110 bool compressed = cur->read_tag(true);
1111 out->add(key, cur->current_tag, compressed);
1112 if (cur->next()) {
1113 pq.push(cur);
1114 } else {
1115 delete cur;
1117 continue;
1120 // Merge tag values with the same key:
1121 string tag;
1123 // We just want the union of words, so copy over the first instance
1124 // and skip any identical ones.
1125 priority_queue<ByteLengthPrefixedStringItor *,
1126 vector<ByteLengthPrefixedStringItor *>,
1127 ByteLengthPrefixedStringItorGt> pqtag;
1128 vector<cursor_type *> vec;
1130 while (true) {
1131 cur->read_tag();
1132 pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
1133 vec.push_back(cur);
1134 if (pq.empty() || pq.top()->current_key != key) break;
1135 cur = pq.top();
1136 pq.pop();
1139 string lastword;
1140 while (!pqtag.empty()) {
1141 ByteLengthPrefixedStringItor * it = pqtag.top();
1142 pqtag.pop();
1143 if (**it != lastword) {
1144 lastword = **it;
1145 tag += byte(lastword.size() ^ MAGIC_XOR_VALUE);
1146 tag += lastword;
1148 ++*it;
1149 if (!it->at_end()) {
1150 pqtag.push(it);
1151 } else {
1152 delete it;
1156 for (auto i : vec) {
1157 cur = i;
1158 if (cur->next()) {
1159 pq.push(cur);
1160 } else {
1161 delete cur;
1165 out->add(key, tag);
1169 template<typename T, typename U> void
1170 multimerge_postlists(Xapian::Compactor * compactor,
1171 T* out, const char * tmpdir,
1172 const vector<U*>& in,
1173 vector<Xapian::docid> off)
1175 if (in.size() <= 3) {
1176 merge_postlists(compactor, out, off.begin(), in.begin(), in.end());
1177 return;
1179 unsigned int c = 0;
1180 vector<HoneyTable *> tmp;
1181 tmp.reserve(in.size() / 2);
1183 vector<Xapian::docid> newoff;
1184 newoff.resize(in.size() / 2);
1185 for (unsigned int i = 0, j; i < in.size(); i = j) {
1186 j = i + 2;
1187 if (j == in.size() - 1) ++j;
1189 string dest = tmpdir;
1190 char buf[64];
1191 sprintf(buf, "/tmp%u_%u.", c, i / 2);
1192 dest += buf;
1194 HoneyTable * tmptab = new HoneyTable("postlist", dest, false);
1196 // Use maximum blocksize for temporary tables. And don't compress
1197 // entries in temporary tables, even if the final table would do
1198 // so. Any already compressed entries will get copied in
1199 // compressed form. (FIXME: HoneyTable has no blocksize)
1200 Honey::RootInfo root_info;
1201 root_info.init(65536, 0);
1202 const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
1203 tmptab->create_and_open(flags, root_info);
1205 merge_postlists(compactor, tmptab, off.begin() + i,
1206 in.begin() + i, in.begin() + j);
1207 tmp.push_back(tmptab);
1208 tmptab->flush_db();
1209 tmptab->commit(1, &root_info);
1210 AssertRel(root_info.get_blocksize(),==,65536);
1212 swap(off, newoff);
1213 ++c;
1216 while (tmp.size() > 3) {
1217 vector<HoneyTable *> tmpout;
1218 tmpout.reserve(tmp.size() / 2);
1219 vector<Xapian::docid> newoff;
1220 newoff.resize(tmp.size() / 2);
1221 for (unsigned int i = 0, j; i < tmp.size(); i = j) {
1222 j = i + 2;
1223 if (j == tmp.size() - 1) ++j;
1225 string dest = tmpdir;
1226 char buf[64];
1227 sprintf(buf, "/tmp%u_%u.", c, i / 2);
1228 dest += buf;
1230 HoneyTable * tmptab = new HoneyTable("postlist", dest, false);
1232 // Use maximum blocksize for temporary tables. And don't compress
1233 // entries in temporary tables, even if the final table would do
1234 // so. Any already compressed entries will get copied in
1235 // compressed form. (FIXME: HoneyTable has no blocksize)
1236 Honey::RootInfo root_info;
1237 root_info.init(65536, 0);
1238 const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
1239 tmptab->create_and_open(flags, root_info);
1241 merge_postlists(compactor, tmptab, off.begin() + i,
1242 tmp.begin() + i, tmp.begin() + j);
1243 if (c > 0) {
1244 for (unsigned int k = i; k < j; ++k) {
1245 // FIXME: unlink(tmp[k]->get_path().c_str());
1246 delete tmp[k];
1247 tmp[k] = NULL;
1250 tmpout.push_back(tmptab);
1251 tmptab->flush_db();
1252 tmptab->commit(1, &root_info);
1253 AssertRel(root_info.get_blocksize(),==,65536);
1255 swap(tmp, tmpout);
1256 swap(off, newoff);
1257 ++c;
1259 merge_postlists(compactor, out, off.begin(), tmp.begin(), tmp.end());
1260 if (c > 0) {
1261 for (size_t k = 0; k < tmp.size(); ++k) {
1262 // FIXME: unlink(tmp[k]->get_path().c_str());
1263 delete tmp[k];
1264 tmp[k] = NULL;
1269 template<typename T> class PositionCursor;
1271 #ifndef DISABLE_GPL_LIBXAPIAN
1272 template<>
1273 class PositionCursor<const GlassTable&> : private GlassCursor {
1274 Xapian::docid offset;
1276 public:
1277 string key;
1278 Xapian::docid firstdid;
1280 PositionCursor(const GlassTable *in, Xapian::docid offset_)
1281 : GlassCursor(in), offset(offset_), firstdid(0) {
1282 find_entry(string());
1283 next();
1286 bool next() {
1287 if (!GlassCursor::next()) return false;
1288 read_tag();
1289 const char * d = current_key.data();
1290 const char * e = d + current_key.size();
1291 string term;
1292 Xapian::docid did;
1293 if (!unpack_string_preserving_sort(&d, e, term) ||
1294 !unpack_uint_preserving_sort(&d, e, &did) ||
1295 d != e) {
1296 throw Xapian::DatabaseCorruptError("Bad position key");
1299 key.resize(0);
1300 pack_string_preserving_sort(key, term);
1301 pack_uint_preserving_sort(key, did + offset);
1302 return true;
1305 const string & get_tag() const {
1306 return current_tag;
1309 #endif
1311 template<>
1312 class PositionCursor<const HoneyTable&> : private HoneyCursor {
1313 Xapian::docid offset;
1315 public:
1316 string key;
1317 Xapian::docid firstdid;
1319 PositionCursor(const HoneyTable *in, Xapian::docid offset_)
1320 : HoneyCursor(in), offset(offset_), firstdid(0) {
1321 find_entry(string());
1322 next();
1325 bool next() {
1326 if (!HoneyCursor::next()) return false;
1327 read_tag();
1328 const char * d = current_key.data();
1329 const char * e = d + current_key.size();
1330 string term;
1331 Xapian::docid did;
1332 if (!unpack_string_preserving_sort(&d, e, term) ||
1333 !unpack_uint_preserving_sort(&d, e, &did) ||
1334 d != e) {
1335 throw Xapian::DatabaseCorruptError("Bad position key");
1338 key.resize(0);
1339 pack_string_preserving_sort(key, term);
1340 pack_uint_preserving_sort(key, did + offset);
1341 return true;
1344 const string & get_tag() const {
1345 return current_tag;
1349 template<>
1350 class PositionCursor<HoneyTable&> {
1351 HoneyTable* table;
1352 Xapian::docid offset;
1354 public:
1355 string key;
1356 string current_key, current_tag;
1357 Xapian::docid firstdid;
1359 PositionCursor(HoneyTable *in, Xapian::docid offset_)
1360 : table(in), offset(offset_), firstdid(0) {
1361 next();
1364 bool next() {
1365 bool compressed;
1366 if (!table->read_item(current_key, current_tag, compressed))
1367 return false;
1368 if (compressed) abort();
1369 const char * d = current_key.data();
1370 const char * e = d + current_key.size();
1371 string term;
1372 Xapian::docid did;
1373 if (!unpack_string_preserving_sort(&d, e, term) ||
1374 !unpack_uint_preserving_sort(&d, e, &did) ||
1375 d != e) {
1376 throw Xapian::DatabaseCorruptError("Bad position key");
1379 key.resize(0);
1380 pack_string_preserving_sort(key, term);
1381 pack_uint_preserving_sort(key, did + offset);
1382 return true;
1385 const string & get_tag() const {
1386 return current_tag;
1390 template<typename T>
1391 class PositionCursorGt {
1392 public:
1393 /** Return true if and only if a's key is strictly greater than b's key.
1395 bool operator()(const T* a, const T* b) const {
1396 return a->key > b->key;
1400 template<typename T, typename U> void
1401 merge_positions(T* out, const vector<U*> & inputs,
1402 const vector<Xapian::docid> & offset)
1404 typedef decltype(*inputs[0]) table_type; // E.g. HoneyTable
1405 typedef PositionCursor<table_type> cursor_type;
1406 typedef PositionCursorGt<cursor_type> gt_type;
1407 priority_queue<cursor_type *, vector<cursor_type *>, gt_type> pq;
1408 for (size_t i = 0; i < inputs.size(); ++i) {
1409 auto in = inputs[i];
1410 if (in->empty()) {
1411 // Skip empty tables.
1412 continue;
1415 pq.push(new cursor_type(in, offset[i]));
1418 while (!pq.empty()) {
1419 cursor_type * cur = pq.top();
1420 pq.pop();
1421 out->add(cur->key, cur->get_tag());
1422 if (cur->next()) {
1423 pq.push(cur);
1424 } else {
1425 delete cur;
1430 template<typename T, typename U> void
1431 merge_docid_keyed(T *out, const vector<U*> & inputs,
1432 const vector<Xapian::docid> & offset,
1433 int = 0)
1435 for (size_t i = 0; i < inputs.size(); ++i) {
1436 Xapian::docid off = offset[i];
1438 auto in = inputs[i];
1439 if (in->empty()) continue;
1441 HoneyCursor cur(in);
1442 cur.find_entry(string());
1444 string key;
1445 while (cur.next()) {
1446 // Adjust the key if this isn't the first database.
1447 if (off) {
1448 Xapian::docid did;
1449 const char * d = cur.current_key.data();
1450 const char * e = d + cur.current_key.size();
1451 if (!unpack_uint_preserving_sort(&d, e, &did)) {
1452 string msg = "Bad key in ";
1453 msg += inputs[i]->get_path();
1454 throw Xapian::DatabaseCorruptError(msg);
1456 did += off;
1457 key.resize(0);
1458 pack_uint_preserving_sort(key, did);
1459 if (d != e) {
1460 // Copy over anything extra in the key (e.g. the zero byte
1461 // at the end of "used value slots" in the termlist table).
1462 key.append(d, e - d);
1464 } else {
1465 key = cur.current_key;
1467 bool compressed = cur.read_tag(true);
1468 out->add(key, cur.current_tag, compressed);
1473 #ifndef DISABLE_GPL_LIBXAPIAN
1474 template<typename T> void
1475 merge_docid_keyed(T *out, const vector<const GlassTable*> & inputs,
1476 const vector<Xapian::docid> & offset,
1477 int table_type = 0)
1479 for (size_t i = 0; i < inputs.size(); ++i) {
1480 Xapian::docid off = offset[i];
1482 auto in = inputs[i];
1483 if (in->empty()) continue;
1485 GlassCursor cur(in);
1486 cur.find_entry(string());
1488 string key;
1489 while (cur.next()) {
1490 next_without_next:
1491 // Adjust the key if this isn't the first database.
1492 if (off) {
1493 Xapian::docid did;
1494 const char * d = cur.current_key.data();
1495 const char * e = d + cur.current_key.size();
1496 if (!unpack_uint_preserving_sort(&d, e, &did)) {
1497 string msg = "Bad key in ";
1498 msg += inputs[i]->get_path();
1499 throw Xapian::DatabaseCorruptError(msg);
1501 did += off;
1502 key.resize(0);
1503 pack_uint_preserving_sort(key, did);
1504 if (d != e) {
1505 // Copy over anything extra in the key (e.g. the zero byte
1506 // at the end of "used value slots" in the termlist table).
1507 key.append(d, e - d);
1509 } else {
1510 key = cur.current_key;
1512 if (table_type == Honey::TERMLIST) {
1513 if (termlist_key_is_values_used(key)) {
1514 throw Xapian::DatabaseCorruptError("value slots used entry without a corresponding termlist entry");
1516 cur.read_tag();
1517 string tag = cur.current_tag;
1519 bool next_result = cur.next();
1520 bool next_already_done = true;
1521 string encoded_slots_used;
1522 if (next_result &&
1523 termlist_key_is_values_used(cur.current_key)) {
1524 next_already_done = false;
1525 cur.read_tag();
1526 const string& valtag = cur.current_tag;
1528 const char* p = valtag.data();
1529 const char* end = p + valtag.size();
1531 Xapian::VecCOW<Xapian::termpos> slots;
1533 Xapian::valueno first_slot;
1534 if (!unpack_uint(&p, end, &first_slot)) {
1535 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
1537 slots.push_back(first_slot);
1538 Xapian::valueno last_slot = first_slot;
1539 while (p != end) {
1540 Xapian::valueno slot;
1541 if (!unpack_uint(&p, end, &slot)) {
1542 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
1544 slot += last_slot + 1;
1545 last_slot = slot;
1547 slots.push_back(slot);
1550 string enc;
1551 pack_uint(enc, last_slot);
1552 if (slots.size() > 1) {
1553 BitWriter slots_used(enc);
1554 slots_used.encode(first_slot, last_slot);
1555 slots_used.encode(slots.size() - 2, last_slot - first_slot);
1556 slots_used.encode_interpolative(slots, 0, slots.size() - 1);
1557 encoded_slots_used = slots_used.freeze();
1558 } else {
1559 encoded_slots_used = std::move(enc);
1563 const char* pos = tag.data();
1564 const char* end = pos + tag.size();
1566 string newtag;
1567 pack_uint(newtag, encoded_slots_used.size());
1568 newtag += encoded_slots_used;
1570 if (pos != end) {
1571 Xapian::termcount doclen;
1572 if (!unpack_uint(&pos, end, &doclen)) {
1573 throw_database_corrupt("doclen", pos);
1575 Xapian::termcount termlist_size;
1576 if (!unpack_uint(&pos, end, &termlist_size)) {
1577 throw_database_corrupt("termlist length", pos);
1579 pack_uint(newtag, termlist_size - 1);
1580 pack_uint(newtag, doclen);
1582 string current_term;
1583 while (pos != end) {
1584 Xapian::termcount current_wdf = 0;
1586 if (!current_term.empty()) {
1587 size_t reuse = static_cast<unsigned char>(*pos++);
1588 newtag += char(reuse);
1590 if (reuse > current_term.size()) {
1591 current_wdf = reuse / (current_term.size() + 1);
1592 reuse = reuse % (current_term.size() + 1);
1594 current_term.resize(reuse);
1596 if (pos == end)
1597 throw_database_corrupt("term", NULL);
1600 size_t append = static_cast<unsigned char>(*pos++);
1601 if (size_t(end - pos) < append)
1602 throw_database_corrupt("term", NULL);
1604 current_term.append(pos, append);
1605 pos += append;
1607 if (current_wdf) {
1608 --current_wdf;
1609 } else {
1610 if (!unpack_uint(&pos, end, &current_wdf)) {
1611 throw_database_corrupt("wdf", pos);
1613 pack_uint(newtag, current_wdf);
1616 newtag += char(append);
1617 newtag.append(current_term.end() - append, current_term.end());
1620 if (!newtag.empty())
1621 out->add(key, newtag);
1622 if (!next_result) break;
1623 if (next_already_done) goto next_without_next;
1624 } else {
1625 bool compressed = cur.read_tag(true);
1626 out->add(key, cur.current_tag, compressed);
1631 #endif
1635 using namespace HoneyCompact;
1637 void
1638 HoneyDatabase::compact(Xapian::Compactor* compactor,
1639 const char* destdir,
1640 int fd,
1641 int source_backend,
1642 const vector<const Xapian::Database::Internal*>& sources,
1643 const vector<Xapian::docid>& offset,
1644 size_t block_size,
1645 Xapian::Compactor::compaction_level compaction,
1646 unsigned flags,
1647 Xapian::docid last_docid)
1649 struct table_list {
1650 // The "base name" of the table.
1651 char name[9];
1652 // The type.
1653 Honey::table_type type;
1654 // Create tables after position lazily.
1655 bool lazy;
1658 static const table_list tables[] = {
1659 // name type lazy
1660 { "postlist", Honey::POSTLIST, false },
1661 { "docdata", Honey::DOCDATA, true },
1662 { "termlist", Honey::TERMLIST, false },
1663 { "position", Honey::POSITION, true },
1664 { "spelling", Honey::SPELLING, true },
1665 { "synonym", Honey::SYNONYM, true }
1667 const table_list * tables_end = tables +
1668 (sizeof(tables) / sizeof(tables[0]));
1670 const int FLAGS = Xapian::DB_DANGEROUS;
1672 bool single_file = (flags & Xapian::DBCOMPACT_SINGLE_FILE);
1673 bool multipass = (flags & Xapian::DBCOMPACT_MULTIPASS);
1674 if (single_file) {
1675 // FIXME: Support this combination - we need to put temporary files
1676 // somewhere.
1677 multipass = false;
1680 if (single_file) {
1681 for (size_t i = 0; i != sources.size(); ++i) {
1682 bool has_uncommitted_changes;
1683 if (source_backend == Xapian::DB_BACKEND_GLASS) {
1684 auto db = static_cast<const GlassDatabase*>(sources[i]);
1685 has_uncommitted_changes = db->has_uncommitted_changes();
1686 } else {
1687 auto db = static_cast<const HoneyDatabase*>(sources[i]);
1688 has_uncommitted_changes = db->has_uncommitted_changes();
1690 if (has_uncommitted_changes) {
1691 const char * m =
1692 "Can't compact from a WritableDatabase with uncommitted "
1693 "changes - either call commit() first, or create a new "
1694 "Database object from the filename on disk";
1695 throw Xapian::InvalidOperationError(m);
1700 if (block_size < HONEY_MIN_BLOCKSIZE ||
1701 block_size > HONEY_MAX_BLOCKSIZE ||
1702 (block_size & (block_size - 1)) != 0) {
1703 block_size = HONEY_DEFAULT_BLOCKSIZE;
1706 FlintLock lock(destdir ? destdir : "");
1707 if (!single_file) {
1708 string explanation;
1709 FlintLock::reason why = lock.lock(true, false, explanation);
1710 if (why != FlintLock::SUCCESS) {
1711 lock.throw_databaselockerror(why, destdir, explanation);
1715 unique_ptr<HoneyVersion> version_file_out;
1716 if (single_file) {
1717 if (destdir) {
1718 fd = open(destdir, O_RDWR|O_CREAT|O_TRUNC|O_BINARY|O_CLOEXEC, 0666);
1719 if (fd < 0) {
1720 throw Xapian::DatabaseCreateError("open() failed", errno);
1723 version_file_out.reset(new HoneyVersion(fd));
1724 } else {
1725 fd = -1;
1726 version_file_out.reset(new HoneyVersion(destdir));
1729 version_file_out->create(block_size);
1730 for (size_t i = 0; i != sources.size(); ++i) {
1731 if (source_backend == Xapian::DB_BACKEND_GLASS) {
1732 #ifdef DISABLE_GPL_LIBXAPIAN
1733 Assert(false);
1734 #else
1735 auto db = static_cast<const GlassDatabase*>(sources[i]);
1736 version_file_out->merge_stats(db->version_file.get_doccount(),
1737 db->version_file.get_doclength_lower_bound(),
1738 db->version_file.get_doclength_upper_bound(),
1739 db->version_file.get_wdf_upper_bound(),
1740 db->version_file.get_total_doclen(),
1741 db->version_file.get_spelling_wordfreq_upper_bound());
1742 #endif
1743 } else {
1744 auto db = static_cast<const HoneyDatabase*>(sources[i]);
1745 version_file_out->merge_stats(db->version_file);
1749 string fl_serialised;
1750 #if 0
1751 if (single_file) {
1752 HoneyFreeList fl;
1753 fl.set_first_unused_block(1); // FIXME: Assumption?
1754 fl.pack(fl_serialised);
1756 #endif
1758 // Set to true if stat() failed (which can happen if the files are > 2GB
1759 // and off_t is 32 bit) or one of the totals overflowed.
1760 bool bad_totals = false;
1761 off_t in_total = 0, out_total = 0;
1763 // FIXME: sort out indentation.
1764 if (source_backend == Xapian::DB_BACKEND_GLASS) {
1765 #ifdef DISABLE_GPL_LIBXAPIAN
1766 throw Xapian::FeatureUnavailableError("Glass backend disabled");
1767 #else
1768 vector<HoneyTable *> tabs;
1769 tabs.reserve(tables_end - tables);
1770 off_t prev_size = block_size;
1771 for (const table_list * t = tables; t < tables_end; ++t) {
1772 // The postlist table requires an N-way merge, adjusting the
1773 // headers of various blocks. The spelling and synonym tables also
1774 // need special handling. The other tables have keys sorted in
1775 // docid order, so we can merge them by simply copying all the keys
1776 // from each source table in turn.
1777 if (compactor)
1778 compactor->set_status(t->name, string());
1780 string dest;
1781 if (!single_file) {
1782 dest = destdir;
1783 dest += '/';
1784 dest += t->name;
1785 dest += '.';
1788 bool output_will_exist = !t->lazy;
1790 // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
1791 // on certain systems).
1792 bool bad_stat = false;
1794 // We can't currently report input sizes if there's a single file DB
1795 // amongst the inputs.
1796 bool single_file_in = false;
1798 off_t in_size = 0;
1800 vector<const GlassTable*> inputs;
1801 inputs.reserve(sources.size());
1802 size_t inputs_present = 0;
1803 for (auto src : sources) {
1804 auto db = static_cast<const GlassDatabase*>(src);
1805 const GlassTable * table;
1806 switch (t->type) {
1807 case Honey::POSTLIST:
1808 table = &(db->postlist_table);
1809 break;
1810 case Honey::DOCDATA:
1811 table = &(db->docdata_table);
1812 break;
1813 case Honey::TERMLIST:
1814 table = &(db->termlist_table);
1815 break;
1816 case Honey::POSITION:
1817 table = &(db->position_table);
1818 break;
1819 case Honey::SPELLING:
1820 table = &(db->spelling_table);
1821 break;
1822 case Honey::SYNONYM:
1823 table = &(db->synonym_table);
1824 break;
1825 default:
1826 Assert(false);
1827 return;
1830 if (db->single_file()) {
1831 if (t->lazy && table->empty()) {
1832 // Essentially doesn't exist.
1833 } else {
1834 // FIXME: Find actual size somehow?
1835 // in_size += table->size() / 1024;
1836 single_file_in = true;
1837 bad_totals = true;
1838 output_will_exist = true;
1839 ++inputs_present;
1841 } else {
1842 off_t db_size = file_size(table->get_path());
1843 if (errno == 0) {
1844 // FIXME: check overflow and set bad_totals
1845 in_total += db_size;
1846 in_size += db_size / 1024;
1847 output_will_exist = true;
1848 ++inputs_present;
1849 } else if (errno != ENOENT) {
1850 // We get ENOENT for an optional table.
1851 bad_totals = bad_stat = true;
1852 output_will_exist = true;
1853 ++inputs_present;
1856 inputs.push_back(table);
1859 // If any inputs lack a termlist table, suppress it in the output.
1860 if (t->type == Honey::TERMLIST && inputs_present != sources.size()) {
1861 if (inputs_present != 0) {
1862 if (compactor) {
1863 string m = str(inputs_present);
1864 m += " of ";
1865 m += str(sources.size());
1866 m += " inputs present, so suppressing output";
1867 compactor->set_status(t->name, m);
1869 continue;
1871 output_will_exist = false;
1874 if (!output_will_exist) {
1875 if (compactor)
1876 compactor->set_status(t->name, "doesn't exist");
1877 continue;
1880 HoneyTable * out;
1881 off_t table_start_offset = -1;
1882 if (single_file) {
1883 if (t == tables) {
1884 // Start first table HONEY_VERSION_MAX_SIZE bytes in to allow
1885 // space for version file. It's tricky to exactly know the
1886 // size of the version file beforehand.
1887 table_start_offset = HONEY_VERSION_MAX_SIZE;
1888 if (lseek(fd, table_start_offset, SEEK_SET) < 0)
1889 throw Xapian::DatabaseError("lseek() failed", errno);
1890 } else {
1891 table_start_offset = lseek(fd, 0, SEEK_CUR);
1893 out = new HoneyTable(t->name, fd, version_file_out->get_offset(),
1894 false, false);
1895 } else {
1896 out = new HoneyTable(t->name, dest, false, t->lazy);
1898 tabs.push_back(out);
1899 Honey::RootInfo * root_info = version_file_out->root_to_set(t->type);
1900 if (single_file) {
1901 root_info->set_free_list(fl_serialised);
1902 root_info->set_offset(table_start_offset);
1903 out->open(FLAGS, version_file_out->get_root(t->type), version_file_out->get_revision());
1904 } else {
1905 out->create_and_open(FLAGS, *root_info);
1908 out->set_full_compaction(compaction != compactor->STANDARD);
1909 if (compaction == compactor->FULLER) out->set_max_item_size(1);
1911 switch (t->type) {
1912 case Honey::POSTLIST: {
1913 if (multipass && inputs.size() > 3) {
1914 multimerge_postlists(compactor, out, destdir,
1915 inputs, offset);
1916 } else {
1917 merge_postlists(compactor, out, offset.begin(),
1918 inputs.begin(), inputs.end());
1920 break;
1922 case Honey::SPELLING:
1923 merge_spellings(out, inputs.begin(), inputs.end());
1924 break;
1925 case Honey::SYNONYM:
1926 merge_synonyms(out, inputs.begin(), inputs.end());
1927 break;
1928 case Honey::POSITION:
1929 merge_positions(out, inputs, offset);
1930 break;
1931 default:
1932 // DocData, Termlist
1933 merge_docid_keyed(out, inputs, offset, t->type);
1934 break;
1937 // Commit as revision 1.
1938 out->flush_db();
1939 out->commit(1, root_info);
1940 out->sync();
1941 if (single_file) fl_serialised = root_info->get_free_list();
1943 off_t out_size = 0;
1944 if (!bad_stat && !single_file_in) {
1945 off_t db_size;
1946 if (single_file) {
1947 db_size = file_size(fd);
1948 } else {
1949 db_size = file_size(dest + HONEY_TABLE_EXTENSION);
1951 if (errno == 0) {
1952 if (single_file) {
1953 off_t old_prev_size = max(prev_size, off_t(block_size));
1954 prev_size = db_size;
1955 db_size -= old_prev_size;
1957 // FIXME: check overflow and set bad_totals
1958 out_total += db_size;
1959 out_size = db_size / 1024;
1960 } else if (errno != ENOENT) {
1961 bad_totals = bad_stat = true;
1964 if (bad_stat) {
1965 if (compactor)
1966 compactor->set_status(t->name, "Done (couldn't stat all the DB files)");
1967 } else if (single_file_in) {
1968 if (compactor)
1969 compactor->set_status(t->name, "Done (table sizes unknown for single file DB input)");
1970 } else {
1971 string status;
1972 if (out_size == in_size) {
1973 status = "Size unchanged (";
1974 } else {
1975 off_t delta;
1976 if (out_size < in_size) {
1977 delta = in_size - out_size;
1978 status = "Reduced by ";
1979 } else {
1980 delta = out_size - in_size;
1981 status = "INCREASED by ";
1983 if (in_size) {
1984 status += str(100 * delta / in_size);
1985 status += "% ";
1987 status += str(delta);
1988 status += "K (";
1989 status += str(in_size);
1990 status += "K -> ";
1992 status += str(out_size);
1993 status += "K)";
1994 if (compactor)
1995 compactor->set_status(t->name, status);
1999 // If compacting to a single file output and all the tables are empty, pad
2000 // the output so that it isn't mistaken for a stub database when we try to
2001 // open it. For this it needs to be a multiple of 2KB in size.
2002 if (single_file && prev_size < off_t(block_size)) {
2003 #ifdef HAVE_FTRUNCATE
2004 if (ftruncate(fd, block_size) < 0) {
2005 throw Xapian::DatabaseError("Failed to set size of output database", errno);
2007 #else
2008 const off_t off = block_size - 1;
2009 if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
2010 throw Xapian::DatabaseError("Failed to set size of output database", errno);
2012 #endif
2015 if (single_file) {
2016 if (lseek(fd, version_file_out->get_offset(), SEEK_SET) == -1) {
2017 throw Xapian::DatabaseError("lseek() failed", errno);
2020 version_file_out->set_last_docid(last_docid);
2021 string tmpfile = version_file_out->write(1, FLAGS);
2022 if (single_file) {
2023 off_t version_file_size = lseek(fd, 0, SEEK_CUR);
2024 if (version_file_size < 0) {
2025 throw Xapian::DatabaseError("lseek() failed", errno);
2027 if (version_file_size > HONEY_VERSION_MAX_SIZE) {
2028 throw Xapian::DatabaseError("Didn't allow enough space for version file data");
2031 for (unsigned j = 0; j != tabs.size(); ++j) {
2032 tabs[j]->sync();
2034 // Commit with revision 1.
2035 version_file_out->sync(tmpfile, 1, FLAGS);
2036 for (unsigned j = 0; j != tabs.size(); ++j) {
2037 delete tabs[j];
2039 #endif
2040 } else {
2041 vector<HoneyTable *> tabs;
2042 tabs.reserve(tables_end - tables);
2043 off_t prev_size = block_size;
2044 for (const table_list * t = tables; t < tables_end; ++t) {
2045 // The postlist table requires an N-way merge, adjusting the
2046 // headers of various blocks. The spelling and synonym tables also
2047 // need special handling. The other tables have keys sorted in
2048 // docid order, so we can merge them by simply copying all the keys
2049 // from each source table in turn.
2050 if (compactor)
2051 compactor->set_status(t->name, string());
2053 string dest;
2054 if (!single_file) {
2055 dest = destdir;
2056 dest += '/';
2057 dest += t->name;
2058 dest += '.';
2061 bool output_will_exist = !t->lazy;
2063 // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
2064 // on certain systems).
2065 bool bad_stat = false;
2067 // We can't currently report input sizes if there's a single file DB
2068 // amongst the inputs.
2069 bool single_file_in = false;
2071 off_t in_size = 0;
2073 vector<const HoneyTable*> inputs;
2074 inputs.reserve(sources.size());
2075 size_t inputs_present = 0;
2076 for (auto src : sources) {
2077 auto db = static_cast<const HoneyDatabase*>(src);
2078 const HoneyTable * table;
2079 switch (t->type) {
2080 case Honey::POSTLIST:
2081 table = &(db->postlist_table);
2082 break;
2083 case Honey::DOCDATA:
2084 table = &(db->docdata_table);
2085 break;
2086 case Honey::TERMLIST:
2087 table = &(db->termlist_table);
2088 break;
2089 case Honey::POSITION:
2090 table = &(db->position_table);
2091 break;
2092 case Honey::SPELLING:
2093 table = &(db->spelling_table);
2094 break;
2095 case Honey::SYNONYM:
2096 table = &(db->synonym_table);
2097 break;
2098 default:
2099 Assert(false);
2100 return;
2103 if (db->single_file()) {
2104 if (t->lazy && table->empty()) {
2105 // Essentially doesn't exist.
2106 } else {
2107 // FIXME: Find actual size somehow?
2108 // in_size += table->size() / 1024;
2109 single_file_in = true;
2110 bad_totals = true;
2111 output_will_exist = true;
2112 ++inputs_present;
2114 } else {
2115 off_t db_size = file_size(table->get_path());
2116 if (errno == 0) {
2117 // FIXME: check overflow and set bad_totals
2118 in_total += db_size;
2119 in_size += db_size / 1024;
2120 output_will_exist = true;
2121 ++inputs_present;
2122 } else if (errno != ENOENT) {
2123 // We get ENOENT for an optional table.
2124 bad_totals = bad_stat = true;
2125 output_will_exist = true;
2126 ++inputs_present;
2129 inputs.push_back(table);
2132 // If any inputs lack a termlist table, suppress it in the output.
2133 if (t->type == Honey::TERMLIST && inputs_present != sources.size()) {
2134 if (inputs_present != 0) {
2135 if (compactor) {
2136 string m = str(inputs_present);
2137 m += " of ";
2138 m += str(sources.size());
2139 m += " inputs present, so suppressing output";
2140 compactor->set_status(t->name, m);
2142 continue;
2144 output_will_exist = false;
2147 if (!output_will_exist) {
2148 if (compactor)
2149 compactor->set_status(t->name, "doesn't exist");
2150 continue;
2153 HoneyTable * out;
2154 off_t table_start_offset = -1;
2155 if (single_file) {
2156 if (t == tables) {
2157 // Start first table HONEY_VERSION_MAX_SIZE bytes in to allow
2158 // space for version file. It's tricky to exactly know the
2159 // size of the version file beforehand.
2160 table_start_offset = HONEY_VERSION_MAX_SIZE;
2161 if (lseek(fd, table_start_offset, SEEK_SET) < 0)
2162 throw Xapian::DatabaseError("lseek() failed", errno);
2163 } else {
2164 table_start_offset = lseek(fd, 0, SEEK_CUR);
2166 out = new HoneyTable(t->name, fd, version_file_out->get_offset(),
2167 false, false);
2168 } else {
2169 out = new HoneyTable(t->name, dest, false, t->lazy);
2171 tabs.push_back(out);
2172 Honey::RootInfo * root_info = version_file_out->root_to_set(t->type);
2173 if (single_file) {
2174 root_info->set_free_list(fl_serialised);
2175 root_info->set_offset(table_start_offset);
2176 out->open(FLAGS, version_file_out->get_root(t->type), version_file_out->get_revision());
2177 } else {
2178 out->create_and_open(FLAGS, *root_info);
2181 out->set_full_compaction(compaction != compactor->STANDARD);
2182 if (compaction == compactor->FULLER) out->set_max_item_size(1);
2184 switch (t->type) {
2185 case Honey::POSTLIST: {
2186 if (multipass && inputs.size() > 3) {
2187 multimerge_postlists(compactor, out, destdir,
2188 inputs, offset);
2189 } else {
2190 merge_postlists(compactor, out, offset.begin(),
2191 inputs.begin(), inputs.end());
2193 break;
2195 case Honey::SPELLING:
2196 merge_spellings(out, inputs.begin(), inputs.end());
2197 break;
2198 case Honey::SYNONYM:
2199 merge_synonyms(out, inputs.begin(), inputs.end());
2200 break;
2201 case Honey::POSITION:
2202 merge_positions(out, inputs, offset);
2203 break;
2204 default:
2205 // DocData, Termlist
2206 merge_docid_keyed(out, inputs, offset);
2207 break;
2210 // Commit as revision 1.
2211 out->flush_db();
2212 out->commit(1, root_info);
2213 out->sync();
2214 if (single_file) fl_serialised = root_info->get_free_list();
2216 off_t out_size = 0;
2217 if (!bad_stat && !single_file_in) {
2218 off_t db_size;
2219 if (single_file) {
2220 db_size = file_size(fd);
2221 } else {
2222 db_size = file_size(dest + HONEY_TABLE_EXTENSION);
2224 if (errno == 0) {
2225 if (single_file) {
2226 off_t old_prev_size = max(prev_size, off_t(block_size));
2227 prev_size = db_size;
2228 db_size -= old_prev_size;
2230 // FIXME: check overflow and set bad_totals
2231 out_total += db_size;
2232 out_size = db_size / 1024;
2233 } else if (errno != ENOENT) {
2234 bad_totals = bad_stat = true;
2237 if (bad_stat) {
2238 if (compactor)
2239 compactor->set_status(t->name, "Done (couldn't stat all the DB files)");
2240 } else if (single_file_in) {
2241 if (compactor)
2242 compactor->set_status(t->name, "Done (table sizes unknown for single file DB input)");
2243 } else {
2244 string status;
2245 if (out_size == in_size) {
2246 status = "Size unchanged (";
2247 } else {
2248 off_t delta;
2249 if (out_size < in_size) {
2250 delta = in_size - out_size;
2251 status = "Reduced by ";
2252 } else {
2253 delta = out_size - in_size;
2254 status = "INCREASED by ";
2256 if (in_size) {
2257 status += str(100 * delta / in_size);
2258 status += "% ";
2260 status += str(delta);
2261 status += "K (";
2262 status += str(in_size);
2263 status += "K -> ";
2265 status += str(out_size);
2266 status += "K)";
2267 if (compactor)
2268 compactor->set_status(t->name, status);
2272 // If compacting to a single file output and all the tables are empty, pad
2273 // the output so that it isn't mistaken for a stub database when we try to
2274 // open it. For this it needs to be a multiple of 2KB in size.
2275 if (single_file && prev_size < off_t(block_size)) {
2276 #ifdef HAVE_FTRUNCATE
2277 if (ftruncate(fd, block_size) < 0) {
2278 throw Xapian::DatabaseError("Failed to set size of output database", errno);
2280 #else
2281 const off_t off = block_size - 1;
2282 if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
2283 throw Xapian::DatabaseError("Failed to set size of output database", errno);
2285 #endif
2288 if (single_file) {
2289 if (lseek(fd, version_file_out->get_offset(), SEEK_SET) == -1) {
2290 throw Xapian::DatabaseError("lseek() failed", errno);
2293 version_file_out->set_last_docid(last_docid);
2294 string tmpfile = version_file_out->write(1, FLAGS);
2295 for (unsigned j = 0; j != tabs.size(); ++j) {
2296 tabs[j]->sync();
2298 // Commit with revision 1.
2299 version_file_out->sync(tmpfile, 1, FLAGS);
2300 for (unsigned j = 0; j != tabs.size(); ++j) {
2301 delete tabs[j];
2305 if (!single_file) lock.release();
2307 if (!bad_totals && compactor) {
2308 string status;
2309 in_total /= 1024;
2310 out_total /= 1024;
2311 if (out_total == in_total) {
2312 status = "Size unchanged (";
2313 } else {
2314 off_t delta;
2315 if (out_total < in_total) {
2316 delta = in_total - out_total;
2317 status = "Reduced by ";
2318 } else {
2319 delta = out_total - in_total;
2320 status = "INCREASED by ";
2322 if (in_total) {
2323 status += str(100 * delta / in_total);
2324 status += "% ";
2326 status += str(delta);
2327 status += "K (";
2328 status += str(in_total);
2329 status += "K -> ";
2331 status += str(out_total);
2332 status += "K)";
2333 compactor->set_status("Total", status);