[chert] Fix potential SEGV with corrupt value stats
[xapian.git] / xapian-core / backends / chert / chert_values.cc
blob5f0ea6433f0e94dc46c46d824f57a483bedb70d2
1 /** @file chert_values.cc
2 * @brief ChertValueManager class
3 */
4 /* Copyright (C) 2008,2009,2011,2012,2016 Olly Betts
5 * Copyright (C) 2008,2009 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <config.h>
24 #include "chert_values.h"
26 #include "chert_cursor.h"
27 #include "chert_postlist.h"
28 #include "chert_termlist.h"
29 #include "debuglog.h"
30 #include "backends/document.h"
31 #include "pack.h"
33 #include "xapian/error.h"
34 #include "xapian/valueiterator.h"
36 #include <algorithm>
37 #include "autoptr.h"
39 using namespace std;
41 // FIXME:
42 // * put the "used slots" entry in the same termlist tag as the terms?
43 // * multi-values?
44 // * values named instead of numbered?
46 /** Generate a key for the "used slots" data. */
47 inline string
48 make_slot_key(Xapian::docid did)
50 LOGCALL_STATIC(DB, string, "make_slot_key", did);
51 // Add an extra character so that it can't clash with a termlist entry key
52 // and will sort just after the corresponding termlist entry key.
53 // FIXME: should we store this in the *same entry* as the list of terms?
54 string key;
55 C_pack_uint_preserving_sort(key, did);
56 key += '\0';
57 RETURN(key);
60 /** Generate a key for a value statistics item. */
61 inline string
62 make_valuestats_key(Xapian::valueno slot)
64 LOGCALL_STATIC(DB, string, "make_valuestats_key", slot);
65 string key("\0\xd0", 2);
66 pack_uint_last(key, slot);
67 RETURN(key);
70 void
71 ValueChunkReader::assign(const char * p_, size_t len, Xapian::docid did_)
73 p = p_;
74 end = p_ + len;
75 did = did_;
76 if (!unpack_string(&p, end, value))
77 throw Xapian::DatabaseCorruptError("Failed to unpack first value");
80 void
81 ValueChunkReader::next()
83 if (p == end) {
84 p = NULL;
85 return;
88 Xapian::docid delta;
89 if (!unpack_uint(&p, end, &delta))
90 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
91 did += delta + 1;
92 if (!unpack_string(&p, end, value))
93 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
96 void
97 ValueChunkReader::skip_to(Xapian::docid target)
99 if (p == NULL || target <= did)
100 return;
102 size_t value_len;
103 while (p != end) {
104 // Get the next docid
105 Xapian::docid delta;
106 if (rare(!unpack_uint(&p, end, &delta)))
107 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
108 did += delta + 1;
110 // Get the length of the string
111 if (rare(!unpack_uint(&p, end, &value_len))) {
112 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value length");
115 // Check that it's not too long
116 if (rare(value_len > size_t(end - p))) {
117 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
120 // Assign the value and return only if we've reached the target
121 if (did >= target) {
122 value.assign(p, value_len);
123 p += value_len;
124 return;
126 p += value_len;
128 p = NULL;
131 void
132 ChertValueManager::add_value(Xapian::docid did, Xapian::valueno slot,
133 const string & val)
135 map<Xapian::valueno, map<Xapian::docid, string> >::iterator i;
136 i = changes.find(slot);
137 if (i == changes.end()) {
138 i = changes.insert(make_pair(slot, map<Xapian::docid, string>())).first;
140 i->second[did] = val;
143 void
144 ChertValueManager::remove_value(Xapian::docid did, Xapian::valueno slot)
146 map<Xapian::valueno, map<Xapian::docid, string> >::iterator i;
147 i = changes.find(slot);
148 if (i == changes.end()) {
149 i = changes.insert(make_pair(slot, map<Xapian::docid, string>())).first;
151 i->second[did] = string();
154 Xapian::docid
155 ChertValueManager::get_chunk_containing_did(Xapian::valueno slot,
156 Xapian::docid did,
157 string &chunk) const
159 LOGCALL(DB, Xapian::docid, "ChertValueManager::get_chunk_containing_did", slot | did | chunk);
160 if (!cursor.get())
161 cursor.reset(postlist_table->cursor_get());
162 if (!cursor.get()) RETURN(0);
164 bool exact = cursor->find_entry(make_valuechunk_key(slot, did));
165 if (!exact) {
166 // If we didn't find a chunk starting with docid did, then we need
167 // to check if the chunk contains did.
168 const char * p = cursor->current_key.data();
169 const char * end = p + cursor->current_key.size();
171 // Check that it is a value stream chunk.
172 if (end - p < 2 || *p++ != '\0' || *p++ != '\xd8') RETURN(0);
174 // Check that it's for the right value slot.
175 Xapian::valueno v;
176 if (!unpack_uint(&p, end, &v)) {
177 throw Xapian::DatabaseCorruptError("Bad value key");
179 if (v != slot) RETURN(0);
181 // And get the first docid for the chunk so we can return it.
182 if (!C_unpack_uint_preserving_sort(&p, end, &did) || p != end) {
183 throw Xapian::DatabaseCorruptError("Bad value key");
187 cursor->read_tag();
188 swap(chunk, cursor->current_tag);
190 RETURN(did);
193 static const size_t CHUNK_SIZE_THRESHOLD = 2000;
195 class ValueUpdater {
196 ChertPostListTable * table;
198 Xapian::valueno slot;
200 string ctag;
202 ValueChunkReader reader;
204 string tag;
206 Xapian::docid prev_did;
208 Xapian::docid first_did;
210 Xapian::docid new_first_did;
212 Xapian::docid last_allowed_did;
214 void append_to_stream(Xapian::docid did, const string & value) {
215 Assert(did);
216 if (tag.empty()) {
217 new_first_did = did;
218 } else {
219 AssertRel(did,>,prev_did);
220 pack_uint(tag, did - prev_did - 1);
222 prev_did = did;
223 pack_string(tag, value);
224 if (tag.size() >= CHUNK_SIZE_THRESHOLD) write_tag();
227 void write_tag() {
228 // If the first docid has changed, delete the old entry.
229 if (first_did && new_first_did != first_did) {
230 table->del(make_valuechunk_key(slot, first_did));
232 if (!tag.empty()) {
233 table->add(make_valuechunk_key(slot, new_first_did), tag);
235 first_did = 0;
236 tag.resize(0);
239 public:
240 ValueUpdater(ChertPostListTable * table_, Xapian::valueno slot_)
241 : table(table_), slot(slot_), first_did(0), last_allowed_did(0) { }
243 ~ValueUpdater() {
244 while (!reader.at_end()) {
245 // FIXME: use skip_to and some splicing magic instead?
246 append_to_stream(reader.get_docid(), reader.get_value());
247 reader.next();
249 write_tag();
252 void update(Xapian::docid did, const string & value) {
253 if (last_allowed_did && did > last_allowed_did) {
254 // The next change needs to go in a later existing chunk than the
255 // one we're currently updating, so we copy over the rest of the
256 // entries from the current chunk, write out the updated chunk and
257 // drop through to the case below will read in that later chunk.
258 // FIXME: use some string splicing magic instead of this loop.
259 while (!reader.at_end()) {
260 // last_allowed_did should be an upper bound for this chunk.
261 AssertRel(reader.get_docid(),<=,last_allowed_did);
262 append_to_stream(reader.get_docid(), reader.get_value());
263 reader.next();
265 write_tag();
266 last_allowed_did = 0;
268 if (last_allowed_did == 0) {
269 last_allowed_did = CHERT_MAX_DOCID;
270 Assert(tag.empty());
271 new_first_did = 0;
272 AutoPtr<ChertCursor> cursor(table->cursor_get());
273 if (cursor->find_entry(make_valuechunk_key(slot, did))) {
274 // We found an exact match, so the first docid is the one
275 // we looked for.
276 first_did = did;
277 } else {
278 Assert(!cursor->after_end());
279 // Otherwise we need to unpack it from the key we found.
280 // We may have found a non-value-chunk entry in which case
281 // docid_from_key() returns 0.
282 first_did = docid_from_key(slot, cursor->current_key);
285 // If there are no further chunks, then the last docid that can go
286 // in this chunk is the highest valid docid. If there are further
287 // chunks then it's one less than the first docid of the next
288 // chunk.
289 if (first_did) {
290 // We found a value chunk.
291 cursor->read_tag();
292 // FIXME:swap(cursor->current_tag, ctag);
293 ctag = cursor->current_tag;
294 reader.assign(ctag.data(), ctag.size(), first_did);
296 if (cursor->next()) {
297 const string & key = cursor->current_key;
298 Xapian::docid next_first_did = docid_from_key(slot, key);
299 if (next_first_did) last_allowed_did = next_first_did - 1;
300 Assert(last_allowed_did);
301 AssertRel(last_allowed_did,>=,first_did);
305 // Copy over entries until we get to the one we want to
306 // add/modify/delete.
307 // FIXME: use skip_to and some splicing magic instead?
308 while (!reader.at_end() && reader.get_docid() < did) {
309 append_to_stream(reader.get_docid(), reader.get_value());
310 reader.next();
312 if (!reader.at_end() && reader.get_docid() == did) reader.next();
313 if (!value.empty()) {
314 // Add/update entry for did.
315 append_to_stream(did, value);
320 void
321 ChertValueManager::merge_changes()
323 if (termlist_table->is_open()) {
324 map<Xapian::docid, string>::const_iterator i;
325 for (i = slots.begin(); i != slots.end(); ++i) {
326 const string & enc = i->second;
327 string key = make_slot_key(i->first);
328 if (!enc.empty()) {
329 termlist_table->add(key, i->second);
330 } else {
331 termlist_table->del(key);
334 slots.clear();
338 map<Xapian::valueno, map<Xapian::docid, string> >::const_iterator i;
339 for (i = changes.begin(); i != changes.end(); ++i) {
340 Xapian::valueno slot = i->first;
341 ValueUpdater updater(postlist_table, slot);
342 const map<Xapian::docid, string> & slot_changes = i->second;
343 map<Xapian::docid, string>::const_iterator j;
344 for (j = slot_changes.begin(); j != slot_changes.end(); ++j) {
345 updater.update(j->first, j->second);
348 changes.clear();
352 void
353 ChertValueManager::add_document(Xapian::docid did, const Xapian::Document &doc,
354 map<Xapian::valueno, ValueStats> & value_stats)
356 // FIXME: Use BitWriter and interpolative coding? Or is it not worthwhile
357 // for this?
358 string slots_used;
359 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
360 Xapian::ValueIterator it = doc.values_begin();
361 while (it != doc.values_end()) {
362 Xapian::valueno slot = it.get_valueno();
363 string value = *it;
365 // Update the statistics.
366 std::pair<map<Xapian::valueno, ValueStats>::iterator, bool> i;
367 i = value_stats.insert(make_pair(slot, ValueStats()));
368 ValueStats & stats = i.first->second;
369 if (i.second) {
370 // There were no statistics stored already, so read them.
371 get_value_stats(slot, stats);
374 // Now, modify the stored statistics.
375 if ((stats.freq)++ == 0) {
376 // If the value count was previously zero, set the upper and lower
377 // bounds to the newly added value.
378 stats.lower_bound = value;
379 stats.upper_bound = value;
380 } else {
381 // Otherwise, simply make sure they reflect the new value.
382 if (value < stats.lower_bound) {
383 stats.lower_bound = value;
384 } else if (value > stats.upper_bound) {
385 stats.upper_bound = value;
389 add_value(did, slot, value);
390 if (termlist_table->is_open()) {
391 pack_uint(slots_used, slot - prev_slot - 1);
392 prev_slot = slot;
394 ++it;
396 if (slots_used.empty() && slots.find(did) == slots.end()) {
397 // Adding a new document with no values which we didn't just remove.
398 } else {
399 swap(slots[did], slots_used);
403 void
404 ChertValueManager::delete_document(Xapian::docid did,
405 map<Xapian::valueno, ValueStats> & value_stats)
407 Assert(termlist_table->is_open());
408 map<Xapian::docid, string>::iterator it = slots.find(did);
409 string s;
410 if (it != slots.end()) {
411 swap(s, it->second);
412 } else {
413 // Get from table, making a swift exit if this document has no values.
414 if (!termlist_table->get_exact_entry(make_slot_key(did), s)) return;
415 slots.insert(make_pair(did, string()));
417 const char * p = s.data();
418 const char * end = p + s.size();
419 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
420 while (p != end) {
421 Xapian::valueno slot;
422 if (!unpack_uint(&p, end, &slot)) {
423 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
425 slot += prev_slot + 1;
426 prev_slot = slot;
428 std::pair<map<Xapian::valueno, ValueStats>::iterator, bool> i;
429 i = value_stats.insert(make_pair(slot, ValueStats()));
430 ValueStats & stats = i.first->second;
431 if (i.second) {
432 // There were no statistics stored already, so read them.
433 get_value_stats(slot, stats);
436 // Now, modify the stored statistics.
437 AssertRelParanoid(stats.freq, >, 0);
438 if (--(stats.freq) == 0) {
439 stats.lower_bound.resize(0);
440 stats.upper_bound.resize(0);
443 remove_value(did, slot);
447 void
448 ChertValueManager::replace_document(Xapian::docid did,
449 const Xapian::Document &doc,
450 map<Xapian::valueno, ValueStats> & value_stats)
452 // Load the values into the document from the database, if they haven't
453 // been already. (If we don't do this before deleting the old values,
454 // replacing a document with itself will lose the values.)
455 doc.internal->need_values();
456 delete_document(did, value_stats);
457 add_document(did, doc, value_stats);
460 string
461 ChertValueManager::get_value(Xapian::docid did, Xapian::valueno slot) const
463 map<Xapian::valueno, map<Xapian::docid, string> >::const_iterator i;
464 i = changes.find(slot);
465 if (i != changes.end()) {
466 map<Xapian::docid, string>::const_iterator j;
467 j = i->second.find(did);
468 if (j != i->second.end()) return j->second;
471 // Read it from the table.
472 string chunk;
473 Xapian::docid first_did;
474 first_did = get_chunk_containing_did(slot, did, chunk);
475 if (first_did == 0) return string();
477 ValueChunkReader reader(chunk.data(), chunk.size(), first_did);
478 reader.skip_to(did);
479 if (reader.at_end() || reader.get_docid() != did) return string();
480 return reader.get_value();
483 void
484 ChertValueManager::get_all_values(map<Xapian::valueno, string> & values,
485 Xapian::docid did) const
487 Assert(values.empty());
488 if (!termlist_table->is_open()) {
489 // Either the database has been closed, or else there's no termlist table.
490 // Check if the postlist table is open to determine which is the case.
491 if (!postlist_table->is_open())
492 ChertTable::throw_database_closed();
493 throw Xapian::FeatureUnavailableError("Database has no termlist");
495 map<Xapian::docid, string>::const_iterator i = slots.find(did);
496 string s;
497 if (i != slots.end()) {
498 s = i->second;
499 } else {
500 // Get from table.
501 if (!termlist_table->get_exact_entry(make_slot_key(did), s)) return;
503 const char * p = s.data();
504 const char * end = p + s.size();
505 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
506 while (p != end) {
507 Xapian::valueno slot;
508 if (!unpack_uint(&p, end, &slot)) {
509 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
511 slot += prev_slot + 1;
512 prev_slot = slot;
513 values.insert(make_pair(slot, get_value(did, slot)));
517 void
518 ChertValueManager::get_value_stats(Xapian::valueno slot) const
520 LOGCALL_VOID(DB, "ChertValueManager::get_value_stats", slot);
521 // Invalidate the cache first in case an exception is thrown.
522 mru_slot = Xapian::BAD_VALUENO;
523 get_value_stats(slot, mru_valstats);
524 mru_slot = slot;
527 void
528 ChertValueManager::get_value_stats(Xapian::valueno slot, ValueStats & stats) const
530 LOGCALL_VOID(DB, "ChertValueManager::get_value_stats", slot | Literal("[stats]"));
531 // Invalidate the cache first in case an exception is thrown.
532 mru_slot = Xapian::BAD_VALUENO;
534 string tag;
535 if (postlist_table->get_exact_entry(make_valuestats_key(slot), tag)) {
536 const char * pos = tag.data();
537 const char * end = pos + tag.size();
539 if (!unpack_uint(&pos, end, &(stats.freq))) {
540 if (pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
541 throw Xapian::RangeError("Frequency statistic in value table is too large");
543 if (!unpack_string(&pos, end, stats.lower_bound)) {
544 if (pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
545 throw Xapian::RangeError("Lower bound in value table is too large");
547 size_t len = end - pos;
548 if (len == 0) {
549 stats.upper_bound = stats.lower_bound;
550 } else {
551 stats.upper_bound.assign(pos, len);
553 } else {
554 stats.clear();
557 mru_slot = slot;
560 void
561 ChertValueManager::set_value_stats(map<Xapian::valueno, ValueStats> & value_stats)
563 LOGCALL_VOID(DB, "ChertValueManager::set_value_stats", value_stats);
564 map<Xapian::valueno, ValueStats>::const_iterator i;
565 for (i = value_stats.begin(); i != value_stats.end(); ++i) {
566 string key = make_valuestats_key(i->first);
567 const ValueStats & stats = i->second;
568 if (stats.freq != 0) {
569 string new_value;
570 pack_uint(new_value, stats.freq);
571 pack_string(new_value, stats.lower_bound);
572 // We don't store or count empty values, so neither of the bounds
573 // can be empty. So we can safely store an empty upper bound when
574 // the bounds are equal.
575 if (stats.lower_bound != stats.upper_bound)
576 new_value += stats.upper_bound;
577 postlist_table->add(key, new_value);
578 } else {
579 postlist_table->del(key);
582 value_stats.clear();
583 mru_slot = Xapian::BAD_VALUENO;