1 /** @file brass_values.cc
2 * @brief BrassValueManager class
4 /* Copyright (C) 2008,2009,2010,2012 Olly Betts
5 * Copyright (C) 2008,2009 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include "brass_values.h"
26 #include "brass_cursor.h"
27 #include "brass_postlist.h"
28 #include "brass_termlist.h"
33 #include "xapian/error.h"
34 #include "xapian/valueiterator.h"
39 using namespace Brass
;
43 // * put the "used slots" entry in the same termlist tag as the terms?
45 // * values named instead of numbered?
47 /** Generate a key for the "used slots" data. */
49 make_slot_key(Xapian::docid did
)
51 LOGCALL_STATIC(DB
, string
, "make_slot_key", did
);
52 // Add an extra character so that it can't clash with a termlist entry key
53 // and will sort just after the corresponding termlist entry key.
54 // FIXME: should we store this in the *same entry* as the list of terms?
56 pack_uint_preserving_sort(key
, did
);
61 /** Generate a key for a value statistics item. */
63 make_valuestats_key(Xapian::valueno slot
)
65 LOGCALL_STATIC(DB
, string
, "make_valuestats_key", slot
);
66 string
key("\0\xd0", 2);
67 pack_uint_last(key
, slot
);
72 ValueChunkReader::assign(const char * p_
, size_t len
, Xapian::docid did_
)
77 if (!unpack_string(&p
, end
, value
))
78 throw Xapian::DatabaseCorruptError("Failed to unpack first value");
82 ValueChunkReader::next()
90 if (!unpack_uint(&p
, end
, &delta
))
91 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
93 if (!unpack_string(&p
, end
, value
))
94 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
98 ValueChunkReader::skip_to(Xapian::docid target
)
100 if (p
== NULL
|| target
<= did
)
105 // Get the next docid
107 if (rare(!unpack_uint(&p
, end
, &delta
)))
108 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
111 // Get the length of the string
112 if (rare(!unpack_uint(&p
, end
, &value_len
))) {
113 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value length");
116 // Check that it's not too long
117 if (rare(value_len
> size_t(end
- p
))) {
118 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
121 // Assign the value and return only if we've reached the target
123 value
.assign(p
, value_len
);
133 BrassValueManager::add_value(Xapian::docid did
, Xapian::valueno slot
,
136 map
<Xapian::valueno
, map
<Xapian::docid
, string
> >::iterator i
;
137 i
= changes
.find(slot
);
138 if (i
== changes
.end()) {
139 i
= changes
.insert(make_pair(slot
, map
<Xapian::docid
, string
>())).first
;
141 i
->second
[did
] = val
;
145 BrassValueManager::remove_value(Xapian::docid did
, Xapian::valueno slot
)
147 map
<Xapian::valueno
, map
<Xapian::docid
, string
> >::iterator i
;
148 i
= changes
.find(slot
);
149 if (i
== changes
.end()) {
150 i
= changes
.insert(make_pair(slot
, map
<Xapian::docid
, string
>())).first
;
152 i
->second
[did
] = string();
156 BrassValueManager::get_chunk_containing_did(Xapian::valueno slot
,
160 LOGCALL(DB
, Xapian::docid
, "BrassValueManager::get_chunk_containing_did", slot
| did
| chunk
);
162 cursor
.reset(postlist_table
->cursor_get());
163 if (!cursor
.get()) RETURN(0);
165 bool exact
= cursor
->find_entry(make_valuechunk_key(slot
, did
));
167 // If we didn't find a chunk starting with docid did, then we need
168 // to check if the chunk contains did.
169 const char * p
= cursor
->current_key
.data();
170 const char * end
= p
+ cursor
->current_key
.size();
172 // Check that it is a value stream chunk.
173 if (end
- p
< 2 || *p
++ != '\0' || *p
++ != '\xd8') RETURN(0);
175 // Check that it's for the right value slot.
177 if (!unpack_uint(&p
, end
, &v
)) {
178 throw Xapian::DatabaseCorruptError("Bad value key");
180 if (v
!= slot
) RETURN(0);
182 // And get the first docid for the chunk so we can return it.
183 if (!unpack_uint_preserving_sort(&p
, end
, &did
) || p
!= end
) {
184 throw Xapian::DatabaseCorruptError("Bad value key");
189 swap(chunk
, cursor
->current_tag
);
194 static const size_t CHUNK_SIZE_THRESHOLD
= 2000;
196 static const Xapian::docid MAX_DOCID
= static_cast<Xapian::docid
>(-1);
201 BrassPostListTable
* table
;
203 Xapian::valueno slot
;
207 ValueChunkReader reader
;
211 Xapian::docid prev_did
;
213 Xapian::docid first_did
;
215 Xapian::docid new_first_did
;
217 Xapian::docid last_allowed_did
;
219 void append_to_stream(Xapian::docid did
, const string
& value
) {
224 AssertRel(did
,>,prev_did
);
225 pack_uint(tag
, did
- prev_did
- 1);
228 pack_string(tag
, value
);
229 if (tag
.size() >= CHUNK_SIZE_THRESHOLD
) write_tag();
233 // If the first docid has changed, delete the old entry.
234 if (first_did
&& new_first_did
!= first_did
) {
235 table
->del(make_valuechunk_key(slot
, first_did
));
238 table
->add(make_valuechunk_key(slot
, new_first_did
), tag
);
245 ValueUpdater(BrassPostListTable
* table_
, Xapian::valueno slot_
)
246 : table(table_
), slot(slot_
), first_did(0), last_allowed_did(0) { }
249 while (!reader
.at_end()) {
250 // FIXME: use skip_to and some splicing magic instead?
251 append_to_stream(reader
.get_docid(), reader
.get_value());
257 void update(Xapian::docid did
, const string
& value
) {
258 if (last_allowed_did
&& did
> last_allowed_did
) {
259 // The next change needs to go in a later existing chunk than the
260 // one we're currently updating, so we copy over the rest of the
261 // entries from the current chunk, write out the updated chunk and
262 // drop through to the case below will read in that later chunk.
263 // FIXME: use some string splicing magic instead of this loop.
264 while (!reader
.at_end()) {
265 // last_allowed_did should be an upper bound for this chunk.
266 AssertRel(reader
.get_docid(),<=,last_allowed_did
);
267 append_to_stream(reader
.get_docid(), reader
.get_value());
271 last_allowed_did
= 0;
273 if (last_allowed_did
== 0) {
274 last_allowed_did
= MAX_DOCID
;
277 AutoPtr
<BrassCursor
> cursor(table
->cursor_get());
278 if (cursor
->find_entry(make_valuechunk_key(slot
, did
))) {
279 // We found an exact match, so the first docid is the one
283 Assert(!cursor
->after_end());
284 // Otherwise we need to unpack it from the key we found.
285 // We may have found a non-value-chunk entry in which case
286 // docid_from_key() returns 0.
287 first_did
= docid_from_key(slot
, cursor
->current_key
);
290 // If there are no further chunks, then the last docid that can go
291 // in this chunk is the highest valid docid. If there are further
292 // chunks then it's one less than the first docid of the next
295 // We found a value chunk.
297 // FIXME:swap(cursor->current_tag, ctag);
298 ctag
= cursor
->current_tag
;
299 reader
.assign(ctag
.data(), ctag
.size(), first_did
);
301 if (cursor
->next()) {
302 const string
& key
= cursor
->current_key
;
303 Xapian::docid next_first_did
= docid_from_key(slot
, key
);
304 if (next_first_did
) last_allowed_did
= next_first_did
- 1;
305 Assert(last_allowed_did
);
306 AssertRel(last_allowed_did
,>=,first_did
);
310 // Copy over entries until we get to the one we want to
311 // add/modify/delete.
312 // FIXME: use skip_to and some splicing magic instead?
313 while (!reader
.at_end() && reader
.get_docid() < did
) {
314 append_to_stream(reader
.get_docid(), reader
.get_value());
317 if (!reader
.at_end() && reader
.get_docid() == did
) reader
.next();
318 if (!value
.empty()) {
319 // Add/update entry for did.
320 append_to_stream(did
, value
);
328 BrassValueManager::merge_changes()
330 if (termlist_table
->is_open()) {
331 map
<Xapian::docid
, string
>::const_iterator i
;
332 for (i
= slots
.begin(); i
!= slots
.end(); ++i
) {
333 const string
& enc
= i
->second
;
334 string key
= make_slot_key(i
->first
);
336 termlist_table
->add(key
, i
->second
);
338 termlist_table
->del(key
);
345 map
<Xapian::valueno
, map
<Xapian::docid
, string
> >::const_iterator i
;
346 for (i
= changes
.begin(); i
!= changes
.end(); ++i
) {
347 Xapian::valueno slot
= i
->first
;
348 Brass::ValueUpdater
updater(postlist_table
, slot
);
349 const map
<Xapian::docid
, string
> & slot_changes
= i
->second
;
350 map
<Xapian::docid
, string
>::const_iterator j
;
351 for (j
= slot_changes
.begin(); j
!= slot_changes
.end(); ++j
) {
352 updater
.update(j
->first
, j
->second
);
360 BrassValueManager::add_document(Xapian::docid did
, const Xapian::Document
&doc
,
361 map
<Xapian::valueno
, ValueStats
> & value_stats
)
363 // FIXME: Use BitWriter and interpolative coding? Or is it not worthwhile
366 Xapian::valueno prev_slot
= static_cast<Xapian::valueno
>(-1);
367 Xapian::ValueIterator it
= doc
.values_begin();
368 while (it
!= doc
.values_end()) {
369 Xapian::valueno slot
= it
.get_valueno();
372 // Update the statistics.
373 std::pair
<map
<Xapian::valueno
, ValueStats
>::iterator
, bool> i
;
374 i
= value_stats
.insert(make_pair(slot
, ValueStats()));
375 ValueStats
& stats
= i
.first
->second
;
377 // There were no statistics stored already, so read them.
378 get_value_stats(slot
, stats
);
381 // Now, modify the stored statistics.
382 if ((stats
.freq
)++ == 0) {
383 // If the value count was previously zero, set the upper and lower
384 // bounds to the newly added value.
385 stats
.lower_bound
= value
;
386 stats
.upper_bound
= value
;
388 // Otherwise, simply make sure they reflect the new value.
389 if (value
< stats
.lower_bound
) {
390 stats
.lower_bound
= value
;
391 } else if (value
> stats
.upper_bound
) {
392 stats
.upper_bound
= value
;
396 add_value(did
, slot
, value
);
397 if (termlist_table
->is_open()) {
398 pack_uint(slots_used
, slot
- prev_slot
- 1);
403 if (slots_used
.empty() && slots
.find(did
) == slots
.end()) {
404 // Adding a new document with no values which we didn't just remove.
406 swap(slots
[did
], slots_used
);
411 BrassValueManager::delete_document(Xapian::docid did
,
412 map
<Xapian::valueno
, ValueStats
> & value_stats
)
414 Assert(termlist_table
->is_open());
415 map
<Xapian::docid
, string
>::iterator it
= slots
.find(did
);
417 if (it
!= slots
.end()) {
420 // Get from table, making a swift exit if this document has no values.
421 if (!termlist_table
->get_exact_entry(make_slot_key(did
), s
)) return;
422 slots
.insert(make_pair(did
, string()));
424 const char * p
= s
.data();
425 const char * end
= p
+ s
.size();
426 Xapian::valueno prev_slot
= static_cast<Xapian::valueno
>(-1);
428 Xapian::valueno slot
;
429 if (!unpack_uint(&p
, end
, &slot
)) {
430 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
432 slot
+= prev_slot
+ 1;
435 std::pair
<map
<Xapian::valueno
, ValueStats
>::iterator
, bool> i
;
436 i
= value_stats
.insert(make_pair(slot
, ValueStats()));
437 ValueStats
& stats
= i
.first
->second
;
439 // There were no statistics stored already, so read them.
440 get_value_stats(slot
, stats
);
443 // Now, modify the stored statistics.
444 AssertRelParanoid(stats
.freq
, >, 0);
445 if (--(stats
.freq
) == 0) {
446 stats
.lower_bound
.resize(0);
447 stats
.upper_bound
.resize(0);
450 remove_value(did
, slot
);
455 BrassValueManager::replace_document(Xapian::docid did
,
456 const Xapian::Document
&doc
,
457 map
<Xapian::valueno
, ValueStats
> & value_stats
)
459 // Load the values into the document from the database, if they haven't
460 // been already. (If we don't do this before deleting the old values,
461 // replacing a document with itself will lose the values.)
462 doc
.internal
->need_values();
463 delete_document(did
, value_stats
);
464 add_document(did
, doc
, value_stats
);
468 BrassValueManager::get_value(Xapian::docid did
, Xapian::valueno slot
) const
470 map
<Xapian::valueno
, map
<Xapian::docid
, string
> >::const_iterator i
;
471 i
= changes
.find(slot
);
472 if (i
!= changes
.end()) {
473 map
<Xapian::docid
, string
>::const_iterator j
;
474 j
= i
->second
.find(did
);
475 if (j
!= i
->second
.end()) return j
->second
;
478 // Read it from the table.
480 Xapian::docid first_did
;
481 first_did
= get_chunk_containing_did(slot
, did
, chunk
);
482 if (first_did
== 0) return string();
484 ValueChunkReader
reader(chunk
.data(), chunk
.size(), first_did
);
486 if (reader
.at_end() || reader
.get_docid() != did
) return string();
487 return reader
.get_value();
491 BrassValueManager::get_all_values(map
<Xapian::valueno
, string
> & values
,
492 Xapian::docid did
) const
494 Assert(values
.empty());
495 if (!termlist_table
->is_open()) {
496 // Either the database has been closed, or else there's no termlist table.
497 // Check if the postlist table is open to determine which is the case.
498 if (!postlist_table
->is_open())
499 BrassTable::throw_database_closed();
500 throw Xapian::FeatureUnavailableError("Database has no termlist");
502 map
<Xapian::docid
, string
>::const_iterator i
= slots
.find(did
);
504 if (i
!= slots
.end()) {
508 if (!termlist_table
->get_exact_entry(make_slot_key(did
), s
)) return;
510 const char * p
= s
.data();
511 const char * end
= p
+ s
.size();
512 Xapian::valueno prev_slot
= static_cast<Xapian::valueno
>(-1);
514 Xapian::valueno slot
;
515 if (!unpack_uint(&p
, end
, &slot
)) {
516 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
518 slot
+= prev_slot
+ 1;
520 values
.insert(make_pair(slot
, get_value(did
, slot
)));
525 BrassValueManager::get_value_stats(Xapian::valueno slot
) const
527 LOGCALL_VOID(DB
, "BrassValueManager::get_value_stats", slot
);
528 // Invalidate the cache first in case an exception is thrown.
529 mru_slot
= Xapian::BAD_VALUENO
;
530 get_value_stats(slot
, mru_valstats
);
535 BrassValueManager::get_value_stats(Xapian::valueno slot
, ValueStats
& stats
) const
537 LOGCALL_VOID(DB
, "BrassValueManager::get_value_stats", slot
| Literal("[stats]"));
538 // Invalidate the cache first in case an exception is thrown.
539 mru_slot
= Xapian::BAD_VALUENO
;
542 if (postlist_table
->get_exact_entry(make_valuestats_key(slot
), tag
)) {
543 const char * pos
= tag
.data();
544 const char * end
= pos
+ tag
.size();
546 if (!unpack_uint(&pos
, end
, &(stats
.freq
))) {
547 if (pos
== 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
548 throw Xapian::RangeError("Frequency statistic in value table is too large");
550 if (!unpack_string(&pos
, end
, stats
.lower_bound
)) {
551 if (pos
== 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
552 throw Xapian::RangeError("Lower bound in value table is too large");
554 size_t len
= end
- pos
;
556 stats
.upper_bound
= stats
.lower_bound
;
558 stats
.upper_bound
.assign(pos
, len
);
568 BrassValueManager::set_value_stats(map
<Xapian::valueno
, ValueStats
> & value_stats
)
570 LOGCALL_VOID(DB
, "BrassValueManager::set_value_stats", value_stats
);
571 map
<Xapian::valueno
, ValueStats
>::const_iterator i
;
572 for (i
= value_stats
.begin(); i
!= value_stats
.end(); ++i
) {
573 string key
= make_valuestats_key(i
->first
);
574 const ValueStats
& stats
= i
->second
;
575 if (stats
.freq
!= 0) {
577 pack_uint(new_value
, stats
.freq
);
578 pack_string(new_value
, stats
.lower_bound
);
579 // We don't store or count empty values, so neither of the bounds
580 // can be empty. So we can safely store an empty upper bound when
581 // the bounds are equal.
582 if (stats
.lower_bound
!= stats
.upper_bound
)
583 new_value
+= stats
.upper_bound
;
584 postlist_table
->add(key
, new_value
);
586 postlist_table
->del(key
);
590 mru_slot
= Xapian::BAD_VALUENO
;