[honey] New format for more keys in postlist table
[xapian.git] / xapian-core / backends / honey / honey_values.cc
blobd6569715e67a7a35cd89191416fec6b0839f5e90
1 /** @file honey_values.cc
2 * @brief HoneyValueManager class
3 */
4 /* Copyright (C) 2008,2009,2010,2011,2012,2016,2017,2018 Olly Betts
5 * Copyright (C) 2008,2009 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <config.h>
24 #include "honey_values.h"
26 #include "honey_cursor.h"
27 #include "honey_postlist.h"
28 #include "honey_postlisttable.h"
29 #include "honey_termlist.h"
30 #include "honey_termlisttable.h"
32 #include "bitstream.h"
33 #include "debuglog.h"
34 #include "backends/documentinternal.h"
35 #include "pack.h"
37 #include "xapian/error.h"
38 #include "xapian/valueiterator.h"
40 #include <algorithm>
41 #include <memory>
43 using namespace Honey;
44 using namespace std;
46 // FIXME:
47 // * multi-values?
48 // * values named instead of numbered?
50 void
51 ValueChunkReader::assign(const char * p_, size_t len, Xapian::docid last_did)
53 p = p_;
54 end = p_ + len;
55 if (!unpack_uint(&p, end, &did))
56 throw Xapian::DatabaseCorruptError("Failed to unpack docid delta");
57 did = last_did - did;
58 if (!unpack_string(&p, end, value))
59 throw Xapian::DatabaseCorruptError("Failed to unpack first value");
62 void
63 ValueChunkReader::next()
65 if (p == end) {
66 p = NULL;
67 return;
70 Xapian::docid delta;
71 if (!unpack_uint(&p, end, &delta)) {
72 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value "
73 "docid");
75 did += delta + 1;
76 if (!unpack_string(&p, end, value))
77 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
80 void
81 ValueChunkReader::skip_to(Xapian::docid target)
83 if (p == NULL || target <= did)
84 return;
86 size_t value_len;
87 while (p != end) {
88 // Get the next docid
89 Xapian::docid delta;
90 if (rare(!unpack_uint(&p, end, &delta))) {
91 throw Xapian::DatabaseCorruptError("Failed to unpack streamed "
92 "value docid");
94 did += delta + 1;
96 // Get the length of the string
97 if (rare(!unpack_uint(&p, end, &value_len))) {
98 throw Xapian::DatabaseCorruptError("Failed to unpack streamed "
99 "value length");
102 // Check that it's not too long
103 if (rare(value_len > size_t(end - p))) {
104 throw Xapian::DatabaseCorruptError("Failed to unpack streamed "
105 "value");
108 // Assign the value and return only if we've reached the target
109 if (did >= target) {
110 value.assign(p, value_len);
111 p += value_len;
112 return;
114 p += value_len;
116 p = NULL;
119 void
120 HoneyValueManager::add_value(Xapian::docid did, Xapian::valueno slot,
121 const string & val)
123 map<Xapian::valueno, map<Xapian::docid, string> >::iterator i;
124 i = changes.find(slot);
125 if (i == changes.end()) {
126 i = changes.insert(make_pair(slot, map<Xapian::docid, string>())).first;
128 i->second[did] = val;
131 void
132 HoneyValueManager::remove_value(Xapian::docid did, Xapian::valueno slot)
134 map<Xapian::valueno, map<Xapian::docid, string> >::iterator i;
135 i = changes.find(slot);
136 if (i == changes.end()) {
137 i = changes.insert(make_pair(slot, map<Xapian::docid, string>())).first;
139 i->second[did] = string();
142 Xapian::docid
143 HoneyValueManager::get_chunk_containing_did(Xapian::valueno slot,
144 Xapian::docid did,
145 string &chunk) const
147 LOGCALL(DB, Xapian::docid, "HoneyValueManager::get_chunk_containing_did", slot | did | chunk);
148 if (!cursor.get())
149 cursor.reset(postlist_table.cursor_get());
150 if (!cursor.get()) RETURN(0);
152 bool exact = cursor->find_entry_ge(make_valuechunk_key(slot, did));
153 if (!exact) {
154 // The chunk doesn't end with docid did, so we need to get
155 // the last docid in the chunk from the key to return.
156 did = docid_from_key(slot, cursor->current_key);
159 cursor->read_tag();
160 chunk = cursor->current_tag;
161 // FIXME: fails, not sure why: swap(chunk, cursor->current_tag);
163 RETURN(did);
166 static const size_t CHUNK_SIZE_THRESHOLD = 2000;
168 namespace Honey {
170 class ValueUpdater {
171 HoneyPostListTable& table;
173 Xapian::valueno slot;
175 string ctag;
177 ValueChunkReader reader;
179 string tag;
181 Xapian::docid prev_did;
183 Xapian::docid last_did;
185 Xapian::docid new_last_did;
187 Xapian::docid last_allowed_did;
189 void append_to_stream(Xapian::docid did, const string & value) {
190 Assert(did);
191 if (!tag.empty()) {
192 AssertRel(did,>,prev_did);
193 pack_uint(tag, did - prev_did - 1);
195 prev_did = did;
196 new_last_did = did;
197 pack_string(tag, value);
198 if (tag.size() >= CHUNK_SIZE_THRESHOLD) write_tag();
201 void write_tag() {
202 // If the last docid has changed, delete the old entry.
203 if (last_did && new_last_did != last_did) {
204 table.del(make_valuechunk_key(slot, last_did));
206 if (!tag.empty()) {
207 table.add(make_valuechunk_key(slot, new_last_did), tag);
209 last_did = 0;
210 tag.resize(0);
213 public:
214 ValueUpdater(HoneyPostListTable& table_, Xapian::valueno slot_)
215 : table(table_), slot(slot_), last_did(0), last_allowed_did(0) { }
217 ~ValueUpdater() {
218 while (!reader.at_end()) {
219 // FIXME: use skip_to and some splicing magic instead?
220 append_to_stream(reader.get_docid(), reader.get_value());
221 reader.next();
223 write_tag();
226 void update(Xapian::docid did, const string & value) {
227 if (last_allowed_did && did > last_allowed_did) {
228 // The next change needs to go in a later existing chunk than the
229 // one we're currently updating, so we copy over the rest of the
230 // entries from the current chunk, write out the updated chunk and
231 // drop through to the case below will read in that later chunk.
232 // FIXME: use some string splicing magic instead of this loop.
233 while (!reader.at_end()) {
234 // last_allowed_did should be an upper bound for this chunk.
235 AssertRel(reader.get_docid(),<=,last_allowed_did);
236 append_to_stream(reader.get_docid(), reader.get_value());
237 reader.next();
239 write_tag();
240 last_allowed_did = 0;
242 if (last_allowed_did == 0) {
243 last_allowed_did = HONEY_MAX_DOCID;
244 Assert(tag.empty());
245 new_last_did = 0;
246 unique_ptr<HoneyCursor> cursor(table.cursor_get());
247 if (cursor->find_entry_ge(make_valuechunk_key(slot, did))) {
248 // We found an exact match, so the last docid is the one
249 // we looked for.
250 last_did = did;
251 } else {
252 Assert(!cursor->after_end());
253 // Otherwise we need to unpack it from the key we found.
254 // We may have found a non-value-chunk entry in which case
255 // docid_from_key() returns 0.
256 last_did = docid_from_key(slot, cursor->current_key);
259 // If there are no further chunks, then the last docid that can go
260 // in this chunk is the highest valid docid. If there are further
261 // chunks then it's one less than the first docid of the next
262 // chunk.
263 if (last_did) {
264 // We found a value chunk.
265 cursor->read_tag();
266 // FIXME:swap(cursor->current_tag, ctag);
267 ctag = cursor->current_tag;
268 reader.assign(ctag.data(), ctag.size(), last_did);
270 if (cursor->next()) {
271 const string & key = cursor->current_key;
272 Xapian::docid next_last_did = docid_from_key(slot, key);
273 if (next_last_did) {
274 cursor->read_tag();
275 Xapian::docid delta;
276 const char* p = cursor->current_tag.data();
277 const char* e = p + cursor->current_tag.size();
278 if (!unpack_uint(&p, e, &delta)) {
279 throw Xapian::DatabaseCorruptError("Failed to unpack "
280 "docid delta");
282 Xapian::docid next_first_did = next_last_did - delta;
283 last_allowed_did = next_first_did - 1;
285 Assert(last_allowed_did);
286 AssertRel(last_allowed_did,>=,last_did);
290 // Copy over entries until we get to the one we want to
291 // add/modify/delete.
292 // FIXME: use skip_to and some splicing magic instead?
293 while (!reader.at_end() && reader.get_docid() < did) {
294 append_to_stream(reader.get_docid(), reader.get_value());
295 reader.next();
297 if (!reader.at_end() && reader.get_docid() == did) reader.next();
298 if (!value.empty()) {
299 // Add/update entry for did.
300 append_to_stream(did, value);
307 void
308 HoneyValueManager::merge_changes()
310 for (auto&& i : changes) {
311 Xapian::valueno slot = i.first;
312 Honey::ValueUpdater updater(postlist_table, slot);
313 for (auto&& j : i.second) {
314 updater.update(j.first, j.second);
317 changes.clear();
320 string
321 HoneyValueManager::add_document(Xapian::docid did, const Xapian::Document &doc,
322 map<Xapian::valueno, ValueStats> &val_stats)
324 Xapian::ValueIterator it = doc.values_begin();
325 if (it == doc.values_end()) {
326 // No document values.
327 auto i = slots.find(did);
328 if (i != slots.end()) {
329 // Document's values already added or modified in this batch.
330 i->second = string();
332 return string();
335 Xapian::valueno count = doc.internal->values_count();
336 Xapian::VecCOW<Xapian::termpos> slotvec(count);
338 Xapian::valueno first_slot = it.get_valueno();
339 Xapian::valueno last_slot = first_slot;
340 while (it != doc.values_end()) {
341 Xapian::valueno slot = it.get_valueno();
342 slotvec.push_back(slot);
343 const string& value = *it;
345 // Update the statistics.
346 auto i = val_stats.insert(make_pair(slot, ValueStats()));
347 ValueStats& stats = i.first->second;
348 if (i.second) {
349 // There were no statistics stored already, so read them.
350 get_value_stats(slot, stats);
353 // Now, modify the stored statistics.
354 if ((stats.freq)++ == 0) {
355 // If the value count was previously zero, set the upper and lower
356 // bounds to the newly added value.
357 stats.lower_bound = value;
358 stats.upper_bound = value;
359 } else {
360 // Otherwise, simply make sure they reflect the new value.
362 // Check the upper bound first, as for some common uses of value
363 // slots (dates) the values will tend to get larger not smaller
364 // over time.
365 int cmp = value.compare(stats.upper_bound);
366 if (cmp >= 0) {
367 if (cmp > 0) stats.upper_bound = value;
368 } else if (value < stats.lower_bound) {
369 stats.lower_bound = value;
373 add_value(did, slot, value);
374 last_slot = slot;
375 ++it;
378 if (!termlist_table.is_open()) {
379 return string();
382 string enc;
383 pack_uint(enc, last_slot);
384 BitWriter slots_used(enc);
385 if (count > 1) {
386 slots_used.encode(first_slot, last_slot);
387 slots_used.encode(count - 2, last_slot - first_slot);
388 slots_used.encode_interpolative(slotvec, 0, slotvec.size() - 1);
391 return slots_used.freeze();
394 void
395 HoneyValueManager::delete_document(Xapian::docid did,
396 map<Xapian::valueno, ValueStats>& val_stats)
398 Assert(termlist_table.is_open());
399 map<Xapian::docid, string>::iterator it = slots.find(did);
400 string s;
401 if (it != slots.end()) {
402 swap(s, it->second);
403 } else {
404 // Get from table, making a swift exit if this document has no terms or
405 // values.
406 if (!termlist_table.get_exact_entry(termlist_table.make_key(did), s))
407 return;
408 slots.insert(make_pair(did, string()));
411 const char* p = s.data();
412 const char* end = p + s.size();
413 size_t slot_enc_size;
414 if (!unpack_uint(&p, end, &slot_enc_size)) {
415 throw Xapian::DatabaseCorruptError("Termlist encoding corrupt");
417 if (slot_enc_size == 0)
418 return;
420 end = p + slot_enc_size;
421 Xapian::valueno last_slot;
422 if (!unpack_uint(&p, end, &last_slot)) {
423 throw Xapian::DatabaseCorruptError("Slots used data corrupt");
426 if (p != end) {
427 BitReader rd(p, end);
428 Xapian::valueno first_slot = rd.decode(last_slot);
429 Xapian::valueno slot_count = rd.decode(last_slot - first_slot) + 2;
430 rd.decode_interpolative(0, slot_count - 1, first_slot, last_slot);
432 Xapian::valueno slot = first_slot;
433 while (slot != last_slot) {
434 auto i = val_stats.insert(make_pair(slot, ValueStats()));
435 ValueStats & stats = i.first->second;
436 if (i.second) {
437 // There were no statistics stored already, so read them.
438 get_value_stats(slot, stats);
441 // Now, modify the stored statistics.
442 AssertRelParanoid(stats.freq, >, 0);
443 if (--(stats.freq) == 0) {
444 stats.lower_bound.resize(0);
445 stats.upper_bound.resize(0);
448 remove_value(did, slot);
450 slot = rd.decode_interpolative_next();
455 Xapian::valueno slot = last_slot;
458 // FIXME: share code with above
459 auto i = val_stats.insert(make_pair(slot, ValueStats()));
460 ValueStats & stats = i.first->second;
461 if (i.second) {
462 // There were no statistics stored already, so read them.
463 get_value_stats(slot, stats);
466 // Now, modify the stored statistics.
467 AssertRelParanoid(stats.freq, >, 0);
468 if (--(stats.freq) == 0) {
469 stats.lower_bound.resize(0);
470 stats.upper_bound.resize(0);
473 remove_value(did, slot);
478 string
479 HoneyValueManager::replace_document(Xapian::docid did,
480 const Xapian::Document &doc,
481 map<Xapian::valueno, ValueStats>& val_stats)
483 if (doc.get_docid() == did) {
484 // If we're replacing a document with itself, but the optimisation for
485 // this higher up hasn't kicked in (e.g. because we've added/replaced
486 // a document since this one was read) and the values haven't changed,
487 // then the call to delete_document() below will remove the values
488 // before the subsequent add_document() can read them.
490 // The simplest way to handle this is to force the document to read its
491 // values, which we only need to do this is the docid matches. Note
492 // that this check can give false positives as we don't also check the
493 // database, so for example replacing document 4 in one database with
494 // document 4 from another will unnecessarily trigger this, but forcing
495 // the values to be read is fairly harmless, and this is unlikely to be
496 // a common case. (Currently we will end up fetching the values
497 // anyway, but there's scope to change that in the future).
498 doc.internal->ensure_values_fetched();
500 delete_document(did, val_stats);
501 return add_document(did, doc, val_stats);
504 string
505 HoneyValueManager::get_value(Xapian::docid did, Xapian::valueno slot) const
507 map<Xapian::valueno, map<Xapian::docid, string> >::const_iterator i;
508 i = changes.find(slot);
509 if (i != changes.end()) {
510 map<Xapian::docid, string>::const_iterator j;
511 j = i->second.find(did);
512 if (j != i->second.end()) return j->second;
515 // Read it from the table.
516 string chunk;
517 Xapian::docid last_did;
518 last_did = get_chunk_containing_did(slot, did, chunk);
519 if (last_did == 0) return string();
521 ValueChunkReader reader(chunk.data(), chunk.size(), last_did);
522 reader.skip_to(did);
523 if (reader.at_end() || reader.get_docid() != did) return string();
524 return reader.get_value();
527 void
528 HoneyValueManager::get_all_values(map<Xapian::valueno, string> & values,
529 Xapian::docid did) const
531 Assert(values.empty());
532 if (!termlist_table.is_open()) {
533 // Either the database has been closed, or else there's no termlist
534 // table. Check if the postlist table is open to determine which is
535 // the case.
536 if (!postlist_table.is_open())
537 HoneyTable::throw_database_closed();
538 throw Xapian::FeatureUnavailableError("Database has no termlist");
541 string s;
542 if (!termlist_table.get_exact_entry(termlist_table.make_key(did), s))
543 return;
545 const char* p = s.data();
546 const char* end = p + s.size();
547 size_t slot_enc_size = *p++;
549 if ((slot_enc_size & 0x80) == 0) {
550 // If the top bit is clear we have a 7-bit bitmap of slots used.
551 Xapian::valueno slot = 0;
552 while (slot_enc_size) {
553 if (slot_enc_size & 1) {
554 values.insert(make_pair(slot, get_value(did, slot)));
556 ++slot;
557 slot_enc_size >>= 1;
559 return;
562 slot_enc_size &= 0x7f;
563 if (slot_enc_size == 0) {
564 if (!unpack_uint(&p, end, &slot_enc_size)) {
565 throw Xapian::DatabaseCorruptError("Termlist encoding corrupt");
569 end = p + slot_enc_size;
570 Xapian::valueno last_slot;
571 if (!unpack_uint(&p, end, &last_slot)) {
572 throw Xapian::DatabaseCorruptError("Slots used data corrupt");
575 if (p != end) {
576 BitReader rd(p, end);
577 Xapian::valueno first_slot = rd.decode(last_slot);
578 Xapian::valueno slot_count = rd.decode(last_slot - first_slot) + 2;
579 rd.decode_interpolative(0, slot_count - 1, first_slot, last_slot);
581 Xapian::valueno slot = first_slot;
582 while (slot != last_slot) {
583 values.insert(make_pair(slot, get_value(did, slot)));
584 slot = rd.decode_interpolative_next();
587 values.insert(make_pair(last_slot, get_value(did, last_slot)));
590 void
591 HoneyValueManager::get_value_stats(Xapian::valueno slot) const
593 LOGCALL_VOID(DB, "HoneyValueManager::get_value_stats", slot);
594 // Invalidate the cache first in case an exception is thrown.
595 mru_slot = Xapian::BAD_VALUENO;
596 get_value_stats(slot, mru_valstats);
597 mru_slot = slot;
600 void
601 HoneyValueManager::get_value_stats(Xapian::valueno slot,
602 ValueStats& stats) const
604 LOGCALL_VOID(DB, "HoneyValueManager::get_value_stats", slot | Literal("[stats]"));
606 string tag;
607 if (postlist_table.get_exact_entry(Honey::make_valuestats_key(slot), tag)) {
608 const char * pos = tag.data();
609 const char * end = pos + tag.size();
611 if (!unpack_uint(&pos, end, &(stats.freq))) {
612 if (pos == 0) {
613 throw Xapian::DatabaseCorruptError("Incomplete stats item in "
614 "value table");
616 throw Xapian::RangeError("Frequency statistic in value table is "
617 "too large");
619 if (!unpack_string(&pos, end, stats.lower_bound)) {
620 if (pos == 0) {
621 throw Xapian::DatabaseCorruptError("Incomplete stats item in "
622 "value table");
624 throw Xapian::RangeError("Lower bound in value table is too "
625 "large");
627 size_t len = end - pos;
628 if (len == 0) {
629 stats.upper_bound = stats.lower_bound;
630 } else {
631 stats.upper_bound.assign(pos, len);
633 } else {
634 stats.clear();
638 void
639 HoneyValueManager::set_value_stats(map<Xapian::valueno, ValueStats>& val_stats)
641 LOGCALL_VOID(DB, "HoneyValueManager::set_value_stats", val_stats);
642 map<Xapian::valueno, ValueStats>::const_iterator i;
643 for (i = val_stats.begin(); i != val_stats.end(); ++i) {
644 string key = Honey::make_valuestats_key(i->first);
645 const ValueStats & stats = i->second;
646 if (stats.freq != 0) {
647 string new_value;
648 pack_uint(new_value, stats.freq);
649 pack_string(new_value, stats.lower_bound);
650 // We don't store or count empty values, so neither of the bounds
651 // can be empty. So we can safely store an empty upper bound when
652 // the bounds are equal.
653 if (stats.lower_bound != stats.upper_bound)
654 new_value += stats.upper_bound;
655 postlist_table.add(key, new_value);
656 } else {
657 postlist_table.del(key);
660 val_stats.clear();
661 mru_slot = Xapian::BAD_VALUENO;