Fix potential SEGV with corrupt value stats
[xapian.git] / xapian-core / backends / brass / brass_values.cc
blob7524555b356688ceff69cfffdec68040011cddb0
1 /** @file brass_values.cc
2 * @brief BrassValueManager class
3 */
4 /* Copyright (C) 2008,2009,2010,2012 Olly Betts
5 * Copyright (C) 2008,2009 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <config.h>
24 #include "brass_values.h"
26 #include "brass_cursor.h"
27 #include "brass_postlist.h"
28 #include "brass_termlist.h"
29 #include "debuglog.h"
30 #include "document.h"
31 #include "pack.h"
33 #include "xapian/error.h"
34 #include "xapian/valueiterator.h"
36 #include <algorithm>
37 #include "autoptr.h"
39 using namespace Brass;
40 using namespace std;
42 // FIXME:
43 // * put the "used slots" entry in the same termlist tag as the terms?
44 // * multi-values?
45 // * values named instead of numbered?
47 /** Generate a key for the "used slots" data. */
48 inline string
49 make_slot_key(Xapian::docid did)
51 LOGCALL_STATIC(DB, string, "make_slot_key", did);
52 // Add an extra character so that it can't clash with a termlist entry key
53 // and will sort just after the corresponding termlist entry key.
54 // FIXME: should we store this in the *same entry* as the list of terms?
55 string key;
56 pack_uint_preserving_sort(key, did);
57 key += '\0';
58 RETURN(key);
61 /** Generate a key for a value statistics item. */
62 inline string
63 make_valuestats_key(Xapian::valueno slot)
65 LOGCALL_STATIC(DB, string, "make_valuestats_key", slot);
66 string key("\0\xd0", 2);
67 pack_uint_last(key, slot);
68 RETURN(key);
71 void
72 ValueChunkReader::assign(const char * p_, size_t len, Xapian::docid did_)
74 p = p_;
75 end = p_ + len;
76 did = did_;
77 if (!unpack_string(&p, end, value))
78 throw Xapian::DatabaseCorruptError("Failed to unpack first value");
81 void
82 ValueChunkReader::next()
84 if (p == end) {
85 p = NULL;
86 return;
89 Xapian::docid delta;
90 if (!unpack_uint(&p, end, &delta))
91 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
92 did += delta + 1;
93 if (!unpack_string(&p, end, value))
94 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
97 void
98 ValueChunkReader::skip_to(Xapian::docid target)
100 if (p == NULL || target <= did)
101 return;
103 size_t value_len;
104 while (p != end) {
105 // Get the next docid
106 Xapian::docid delta;
107 if (rare(!unpack_uint(&p, end, &delta)))
108 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
109 did += delta + 1;
111 // Get the length of the string
112 if (rare(!unpack_uint(&p, end, &value_len))) {
113 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value length");
116 // Check that it's not too long
117 if (rare(value_len > size_t(end - p))) {
118 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
121 // Assign the value and return only if we've reached the target
122 if (did >= target) {
123 value.assign(p, value_len);
124 p += value_len;
125 return;
127 p += value_len;
129 p = NULL;
132 void
133 BrassValueManager::add_value(Xapian::docid did, Xapian::valueno slot,
134 const string & val)
136 map<Xapian::valueno, map<Xapian::docid, string> >::iterator i;
137 i = changes.find(slot);
138 if (i == changes.end()) {
139 i = changes.insert(make_pair(slot, map<Xapian::docid, string>())).first;
141 i->second[did] = val;
144 void
145 BrassValueManager::remove_value(Xapian::docid did, Xapian::valueno slot)
147 map<Xapian::valueno, map<Xapian::docid, string> >::iterator i;
148 i = changes.find(slot);
149 if (i == changes.end()) {
150 i = changes.insert(make_pair(slot, map<Xapian::docid, string>())).first;
152 i->second[did] = string();
155 Xapian::docid
156 BrassValueManager::get_chunk_containing_did(Xapian::valueno slot,
157 Xapian::docid did,
158 string &chunk) const
160 LOGCALL(DB, Xapian::docid, "BrassValueManager::get_chunk_containing_did", slot | did | chunk);
161 if (!cursor.get())
162 cursor.reset(postlist_table->cursor_get());
163 if (!cursor.get()) RETURN(0);
165 bool exact = cursor->find_entry(make_valuechunk_key(slot, did));
166 if (!exact) {
167 // If we didn't find a chunk starting with docid did, then we need
168 // to check if the chunk contains did.
169 const char * p = cursor->current_key.data();
170 const char * end = p + cursor->current_key.size();
172 // Check that it is a value stream chunk.
173 if (end - p < 2 || *p++ != '\0' || *p++ != '\xd8') RETURN(0);
175 // Check that it's for the right value slot.
176 Xapian::valueno v;
177 if (!unpack_uint(&p, end, &v)) {
178 throw Xapian::DatabaseCorruptError("Bad value key");
180 if (v != slot) RETURN(0);
182 // And get the first docid for the chunk so we can return it.
183 if (!unpack_uint_preserving_sort(&p, end, &did) || p != end) {
184 throw Xapian::DatabaseCorruptError("Bad value key");
188 cursor->read_tag();
189 swap(chunk, cursor->current_tag);
191 RETURN(did);
194 static const size_t CHUNK_SIZE_THRESHOLD = 2000;
196 static const Xapian::docid MAX_DOCID = static_cast<Xapian::docid>(-1);
198 namespace Brass {
200 class ValueUpdater {
201 BrassPostListTable * table;
203 Xapian::valueno slot;
205 string ctag;
207 ValueChunkReader reader;
209 string tag;
211 Xapian::docid prev_did;
213 Xapian::docid first_did;
215 Xapian::docid new_first_did;
217 Xapian::docid last_allowed_did;
219 void append_to_stream(Xapian::docid did, const string & value) {
220 Assert(did);
221 if (tag.empty()) {
222 new_first_did = did;
223 } else {
224 AssertRel(did,>,prev_did);
225 pack_uint(tag, did - prev_did - 1);
227 prev_did = did;
228 pack_string(tag, value);
229 if (tag.size() >= CHUNK_SIZE_THRESHOLD) write_tag();
232 void write_tag() {
233 // If the first docid has changed, delete the old entry.
234 if (first_did && new_first_did != first_did) {
235 table->del(make_valuechunk_key(slot, first_did));
237 if (!tag.empty()) {
238 table->add(make_valuechunk_key(slot, new_first_did), tag);
240 first_did = 0;
241 tag.resize(0);
244 public:
245 ValueUpdater(BrassPostListTable * table_, Xapian::valueno slot_)
246 : table(table_), slot(slot_), first_did(0), last_allowed_did(0) { }
248 ~ValueUpdater() {
249 while (!reader.at_end()) {
250 // FIXME: use skip_to and some splicing magic instead?
251 append_to_stream(reader.get_docid(), reader.get_value());
252 reader.next();
254 write_tag();
257 void update(Xapian::docid did, const string & value) {
258 if (last_allowed_did && did > last_allowed_did) {
259 // The next change needs to go in a later existing chunk than the
260 // one we're currently updating, so we copy over the rest of the
261 // entries from the current chunk, write out the updated chunk and
262 // drop through to the case below will read in that later chunk.
263 // FIXME: use some string splicing magic instead of this loop.
264 while (!reader.at_end()) {
265 // last_allowed_did should be an upper bound for this chunk.
266 AssertRel(reader.get_docid(),<=,last_allowed_did);
267 append_to_stream(reader.get_docid(), reader.get_value());
268 reader.next();
270 write_tag();
271 last_allowed_did = 0;
273 if (last_allowed_did == 0) {
274 last_allowed_did = MAX_DOCID;
275 Assert(tag.empty());
276 new_first_did = 0;
277 AutoPtr<BrassCursor> cursor(table->cursor_get());
278 if (cursor->find_entry(make_valuechunk_key(slot, did))) {
279 // We found an exact match, so the first docid is the one
280 // we looked for.
281 first_did = did;
282 } else {
283 Assert(!cursor->after_end());
284 // Otherwise we need to unpack it from the key we found.
285 // We may have found a non-value-chunk entry in which case
286 // docid_from_key() returns 0.
287 first_did = docid_from_key(slot, cursor->current_key);
290 // If there are no further chunks, then the last docid that can go
291 // in this chunk is the highest valid docid. If there are further
292 // chunks then it's one less than the first docid of the next
293 // chunk.
294 if (first_did) {
295 // We found a value chunk.
296 cursor->read_tag();
297 // FIXME:swap(cursor->current_tag, ctag);
298 ctag = cursor->current_tag;
299 reader.assign(ctag.data(), ctag.size(), first_did);
301 if (cursor->next()) {
302 const string & key = cursor->current_key;
303 Xapian::docid next_first_did = docid_from_key(slot, key);
304 if (next_first_did) last_allowed_did = next_first_did - 1;
305 Assert(last_allowed_did);
306 AssertRel(last_allowed_did,>=,first_did);
310 // Copy over entries until we get to the one we want to
311 // add/modify/delete.
312 // FIXME: use skip_to and some splicing magic instead?
313 while (!reader.at_end() && reader.get_docid() < did) {
314 append_to_stream(reader.get_docid(), reader.get_value());
315 reader.next();
317 if (!reader.at_end() && reader.get_docid() == did) reader.next();
318 if (!value.empty()) {
319 // Add/update entry for did.
320 append_to_stream(did, value);
327 void
328 BrassValueManager::merge_changes()
330 if (termlist_table->is_open()) {
331 map<Xapian::docid, string>::const_iterator i;
332 for (i = slots.begin(); i != slots.end(); ++i) {
333 const string & enc = i->second;
334 string key = make_slot_key(i->first);
335 if (!enc.empty()) {
336 termlist_table->add(key, i->second);
337 } else {
338 termlist_table->del(key);
341 slots.clear();
345 map<Xapian::valueno, map<Xapian::docid, string> >::const_iterator i;
346 for (i = changes.begin(); i != changes.end(); ++i) {
347 Xapian::valueno slot = i->first;
348 Brass::ValueUpdater updater(postlist_table, slot);
349 const map<Xapian::docid, string> & slot_changes = i->second;
350 map<Xapian::docid, string>::const_iterator j;
351 for (j = slot_changes.begin(); j != slot_changes.end(); ++j) {
352 updater.update(j->first, j->second);
355 changes.clear();
359 void
360 BrassValueManager::add_document(Xapian::docid did, const Xapian::Document &doc,
361 map<Xapian::valueno, ValueStats> & value_stats)
363 // FIXME: Use BitWriter and interpolative coding? Or is it not worthwhile
364 // for this?
365 string slots_used;
366 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
367 Xapian::ValueIterator it = doc.values_begin();
368 while (it != doc.values_end()) {
369 Xapian::valueno slot = it.get_valueno();
370 string value = *it;
372 // Update the statistics.
373 std::pair<map<Xapian::valueno, ValueStats>::iterator, bool> i;
374 i = value_stats.insert(make_pair(slot, ValueStats()));
375 ValueStats & stats = i.first->second;
376 if (i.second) {
377 // There were no statistics stored already, so read them.
378 get_value_stats(slot, stats);
381 // Now, modify the stored statistics.
382 if ((stats.freq)++ == 0) {
383 // If the value count was previously zero, set the upper and lower
384 // bounds to the newly added value.
385 stats.lower_bound = value;
386 stats.upper_bound = value;
387 } else {
388 // Otherwise, simply make sure they reflect the new value.
389 if (value < stats.lower_bound) {
390 stats.lower_bound = value;
391 } else if (value > stats.upper_bound) {
392 stats.upper_bound = value;
396 add_value(did, slot, value);
397 if (termlist_table->is_open()) {
398 pack_uint(slots_used, slot - prev_slot - 1);
399 prev_slot = slot;
401 ++it;
403 if (slots_used.empty() && slots.find(did) == slots.end()) {
404 // Adding a new document with no values which we didn't just remove.
405 } else {
406 swap(slots[did], slots_used);
410 void
411 BrassValueManager::delete_document(Xapian::docid did,
412 map<Xapian::valueno, ValueStats> & value_stats)
414 Assert(termlist_table->is_open());
415 map<Xapian::docid, string>::iterator it = slots.find(did);
416 string s;
417 if (it != slots.end()) {
418 swap(s, it->second);
419 } else {
420 // Get from table, making a swift exit if this document has no values.
421 if (!termlist_table->get_exact_entry(make_slot_key(did), s)) return;
422 slots.insert(make_pair(did, string()));
424 const char * p = s.data();
425 const char * end = p + s.size();
426 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
427 while (p != end) {
428 Xapian::valueno slot;
429 if (!unpack_uint(&p, end, &slot)) {
430 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
432 slot += prev_slot + 1;
433 prev_slot = slot;
435 std::pair<map<Xapian::valueno, ValueStats>::iterator, bool> i;
436 i = value_stats.insert(make_pair(slot, ValueStats()));
437 ValueStats & stats = i.first->second;
438 if (i.second) {
439 // There were no statistics stored already, so read them.
440 get_value_stats(slot, stats);
443 // Now, modify the stored statistics.
444 AssertRelParanoid(stats.freq, >, 0);
445 if (--(stats.freq) == 0) {
446 stats.lower_bound.resize(0);
447 stats.upper_bound.resize(0);
450 remove_value(did, slot);
454 void
455 BrassValueManager::replace_document(Xapian::docid did,
456 const Xapian::Document &doc,
457 map<Xapian::valueno, ValueStats> & value_stats)
459 // Load the values into the document from the database, if they haven't
460 // been already. (If we don't do this before deleting the old values,
461 // replacing a document with itself will lose the values.)
462 doc.internal->need_values();
463 delete_document(did, value_stats);
464 add_document(did, doc, value_stats);
467 string
468 BrassValueManager::get_value(Xapian::docid did, Xapian::valueno slot) const
470 map<Xapian::valueno, map<Xapian::docid, string> >::const_iterator i;
471 i = changes.find(slot);
472 if (i != changes.end()) {
473 map<Xapian::docid, string>::const_iterator j;
474 j = i->second.find(did);
475 if (j != i->second.end()) return j->second;
478 // Read it from the table.
479 string chunk;
480 Xapian::docid first_did;
481 first_did = get_chunk_containing_did(slot, did, chunk);
482 if (first_did == 0) return string();
484 ValueChunkReader reader(chunk.data(), chunk.size(), first_did);
485 reader.skip_to(did);
486 if (reader.at_end() || reader.get_docid() != did) return string();
487 return reader.get_value();
490 void
491 BrassValueManager::get_all_values(map<Xapian::valueno, string> & values,
492 Xapian::docid did) const
494 Assert(values.empty());
495 if (!termlist_table->is_open()) {
496 // Either the database has been closed, or else there's no termlist table.
497 // Check if the postlist table is open to determine which is the case.
498 if (!postlist_table->is_open())
499 BrassTable::throw_database_closed();
500 throw Xapian::FeatureUnavailableError("Database has no termlist");
502 map<Xapian::docid, string>::const_iterator i = slots.find(did);
503 string s;
504 if (i != slots.end()) {
505 s = i->second;
506 } else {
507 // Get from table.
508 if (!termlist_table->get_exact_entry(make_slot_key(did), s)) return;
510 const char * p = s.data();
511 const char * end = p + s.size();
512 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
513 while (p != end) {
514 Xapian::valueno slot;
515 if (!unpack_uint(&p, end, &slot)) {
516 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
518 slot += prev_slot + 1;
519 prev_slot = slot;
520 values.insert(make_pair(slot, get_value(did, slot)));
524 void
525 BrassValueManager::get_value_stats(Xapian::valueno slot) const
527 LOGCALL_VOID(DB, "BrassValueManager::get_value_stats", slot);
528 // Invalidate the cache first in case an exception is thrown.
529 mru_slot = Xapian::BAD_VALUENO;
530 get_value_stats(slot, mru_valstats);
531 mru_slot = slot;
534 void
535 BrassValueManager::get_value_stats(Xapian::valueno slot, ValueStats & stats) const
537 LOGCALL_VOID(DB, "BrassValueManager::get_value_stats", slot | Literal("[stats]"));
538 // Invalidate the cache first in case an exception is thrown.
539 mru_slot = Xapian::BAD_VALUENO;
541 string tag;
542 if (postlist_table->get_exact_entry(make_valuestats_key(slot), tag)) {
543 const char * pos = tag.data();
544 const char * end = pos + tag.size();
546 if (!unpack_uint(&pos, end, &(stats.freq))) {
547 if (pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
548 throw Xapian::RangeError("Frequency statistic in value table is too large");
550 if (!unpack_string(&pos, end, stats.lower_bound)) {
551 if (pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
552 throw Xapian::RangeError("Lower bound in value table is too large");
554 size_t len = end - pos;
555 if (len == 0) {
556 stats.upper_bound = stats.lower_bound;
557 } else {
558 stats.upper_bound.assign(pos, len);
560 } else {
561 stats.clear();
564 mru_slot = slot;
567 void
568 BrassValueManager::set_value_stats(map<Xapian::valueno, ValueStats> & value_stats)
570 LOGCALL_VOID(DB, "BrassValueManager::set_value_stats", value_stats);
571 map<Xapian::valueno, ValueStats>::const_iterator i;
572 for (i = value_stats.begin(); i != value_stats.end(); ++i) {
573 string key = make_valuestats_key(i->first);
574 const ValueStats & stats = i->second;
575 if (stats.freq != 0) {
576 string new_value;
577 pack_uint(new_value, stats.freq);
578 pack_string(new_value, stats.lower_bound);
579 // We don't store or count empty values, so neither of the bounds
580 // can be empty. So we can safely store an empty upper bound when
581 // the bounds are equal.
582 if (stats.lower_bound != stats.upper_bound)
583 new_value += stats.upper_bound;
584 postlist_table->add(key, new_value);
585 } else {
586 postlist_table->del(key);
589 value_stats.clear();
590 mru_slot = Xapian::BAD_VALUENO;