1 /** @file chert_values.cc
2 * @brief ChertValueManager class
4 /* Copyright (C) 2008,2009,2011,2012,2016 Olly Betts
5 * Copyright (C) 2008,2009 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include "chert_values.h"
26 #include "chert_cursor.h"
27 #include "chert_postlist.h"
28 #include "chert_termlist.h"
30 #include "backends/document.h"
33 #include "xapian/error.h"
34 #include "xapian/valueiterator.h"
42 // * put the "used slots" entry in the same termlist tag as the terms?
44 // * values named instead of numbered?
46 /** Generate a key for the "used slots" data. */
48 make_slot_key(Xapian::docid did
)
50 LOGCALL_STATIC(DB
, string
, "make_slot_key", did
);
51 // Add an extra character so that it can't clash with a termlist entry key
52 // and will sort just after the corresponding termlist entry key.
53 // FIXME: should we store this in the *same entry* as the list of terms?
55 C_pack_uint_preserving_sort(key
, did
);
60 /** Generate a key for a value statistics item. */
62 make_valuestats_key(Xapian::valueno slot
)
64 LOGCALL_STATIC(DB
, string
, "make_valuestats_key", slot
);
65 string
key("\0\xd0", 2);
66 pack_uint_last(key
, slot
);
71 ValueChunkReader::assign(const char * p_
, size_t len
, Xapian::docid did_
)
76 if (!unpack_string(&p
, end
, value
))
77 throw Xapian::DatabaseCorruptError("Failed to unpack first value");
81 ValueChunkReader::next()
89 if (!unpack_uint(&p
, end
, &delta
))
90 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
92 if (!unpack_string(&p
, end
, value
))
93 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
97 ValueChunkReader::skip_to(Xapian::docid target
)
99 if (p
== NULL
|| target
<= did
)
104 // Get the next docid
106 if (rare(!unpack_uint(&p
, end
, &delta
)))
107 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
110 // Get the length of the string
111 if (rare(!unpack_uint(&p
, end
, &value_len
))) {
112 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value length");
115 // Check that it's not too long
116 if (rare(value_len
> size_t(end
- p
))) {
117 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
120 // Assign the value and return only if we've reached the target
122 value
.assign(p
, value_len
);
132 ChertValueManager::add_value(Xapian::docid did
, Xapian::valueno slot
,
135 map
<Xapian::valueno
, map
<Xapian::docid
, string
> >::iterator i
;
136 i
= changes
.find(slot
);
137 if (i
== changes
.end()) {
138 i
= changes
.insert(make_pair(slot
, map
<Xapian::docid
, string
>())).first
;
140 i
->second
[did
] = val
;
144 ChertValueManager::remove_value(Xapian::docid did
, Xapian::valueno slot
)
146 map
<Xapian::valueno
, map
<Xapian::docid
, string
> >::iterator i
;
147 i
= changes
.find(slot
);
148 if (i
== changes
.end()) {
149 i
= changes
.insert(make_pair(slot
, map
<Xapian::docid
, string
>())).first
;
151 i
->second
[did
] = string();
155 ChertValueManager::get_chunk_containing_did(Xapian::valueno slot
,
159 LOGCALL(DB
, Xapian::docid
, "ChertValueManager::get_chunk_containing_did", slot
| did
| chunk
);
161 cursor
.reset(postlist_table
->cursor_get());
162 if (!cursor
.get()) RETURN(0);
164 bool exact
= cursor
->find_entry(make_valuechunk_key(slot
, did
));
166 // If we didn't find a chunk starting with docid did, then we need
167 // to check if the chunk contains did.
168 const char * p
= cursor
->current_key
.data();
169 const char * end
= p
+ cursor
->current_key
.size();
171 // Check that it is a value stream chunk.
172 if (end
- p
< 2 || *p
++ != '\0' || *p
++ != '\xd8') RETURN(0);
174 // Check that it's for the right value slot.
176 if (!unpack_uint(&p
, end
, &v
)) {
177 throw Xapian::DatabaseCorruptError("Bad value key");
179 if (v
!= slot
) RETURN(0);
181 // And get the first docid for the chunk so we can return it.
182 if (!C_unpack_uint_preserving_sort(&p
, end
, &did
) || p
!= end
) {
183 throw Xapian::DatabaseCorruptError("Bad value key");
188 swap(chunk
, cursor
->current_tag
);
193 static const size_t CHUNK_SIZE_THRESHOLD
= 2000;
196 ChertPostListTable
* table
;
198 Xapian::valueno slot
;
202 ValueChunkReader reader
;
206 Xapian::docid prev_did
;
208 Xapian::docid first_did
;
210 Xapian::docid new_first_did
;
212 Xapian::docid last_allowed_did
;
214 void append_to_stream(Xapian::docid did
, const string
& value
) {
219 AssertRel(did
,>,prev_did
);
220 pack_uint(tag
, did
- prev_did
- 1);
223 pack_string(tag
, value
);
224 if (tag
.size() >= CHUNK_SIZE_THRESHOLD
) write_tag();
228 // If the first docid has changed, delete the old entry.
229 if (first_did
&& new_first_did
!= first_did
) {
230 table
->del(make_valuechunk_key(slot
, first_did
));
233 table
->add(make_valuechunk_key(slot
, new_first_did
), tag
);
240 ValueUpdater(ChertPostListTable
* table_
, Xapian::valueno slot_
)
241 : table(table_
), slot(slot_
), first_did(0), last_allowed_did(0) { }
244 while (!reader
.at_end()) {
245 // FIXME: use skip_to and some splicing magic instead?
246 append_to_stream(reader
.get_docid(), reader
.get_value());
252 void update(Xapian::docid did
, const string
& value
) {
253 if (last_allowed_did
&& did
> last_allowed_did
) {
254 // The next change needs to go in a later existing chunk than the
255 // one we're currently updating, so we copy over the rest of the
256 // entries from the current chunk, write out the updated chunk and
257 // drop through to the case below will read in that later chunk.
258 // FIXME: use some string splicing magic instead of this loop.
259 while (!reader
.at_end()) {
260 // last_allowed_did should be an upper bound for this chunk.
261 AssertRel(reader
.get_docid(),<=,last_allowed_did
);
262 append_to_stream(reader
.get_docid(), reader
.get_value());
266 last_allowed_did
= 0;
268 if (last_allowed_did
== 0) {
269 last_allowed_did
= CHERT_MAX_DOCID
;
272 AutoPtr
<ChertCursor
> cursor(table
->cursor_get());
273 if (cursor
->find_entry(make_valuechunk_key(slot
, did
))) {
274 // We found an exact match, so the first docid is the one
278 Assert(!cursor
->after_end());
279 // Otherwise we need to unpack it from the key we found.
280 // We may have found a non-value-chunk entry in which case
281 // docid_from_key() returns 0.
282 first_did
= docid_from_key(slot
, cursor
->current_key
);
285 // If there are no further chunks, then the last docid that can go
286 // in this chunk is the highest valid docid. If there are further
287 // chunks then it's one less than the first docid of the next
290 // We found a value chunk.
292 // FIXME:swap(cursor->current_tag, ctag);
293 ctag
= cursor
->current_tag
;
294 reader
.assign(ctag
.data(), ctag
.size(), first_did
);
296 if (cursor
->next()) {
297 const string
& key
= cursor
->current_key
;
298 Xapian::docid next_first_did
= docid_from_key(slot
, key
);
299 if (next_first_did
) last_allowed_did
= next_first_did
- 1;
300 Assert(last_allowed_did
);
301 AssertRel(last_allowed_did
,>=,first_did
);
305 // Copy over entries until we get to the one we want to
306 // add/modify/delete.
307 // FIXME: use skip_to and some splicing magic instead?
308 while (!reader
.at_end() && reader
.get_docid() < did
) {
309 append_to_stream(reader
.get_docid(), reader
.get_value());
312 if (!reader
.at_end() && reader
.get_docid() == did
) reader
.next();
313 if (!value
.empty()) {
314 // Add/update entry for did.
315 append_to_stream(did
, value
);
321 ChertValueManager::merge_changes()
323 if (termlist_table
->is_open()) {
324 map
<Xapian::docid
, string
>::const_iterator i
;
325 for (i
= slots
.begin(); i
!= slots
.end(); ++i
) {
326 const string
& enc
= i
->second
;
327 string key
= make_slot_key(i
->first
);
329 termlist_table
->add(key
, i
->second
);
331 termlist_table
->del(key
);
338 map
<Xapian::valueno
, map
<Xapian::docid
, string
> >::const_iterator i
;
339 for (i
= changes
.begin(); i
!= changes
.end(); ++i
) {
340 Xapian::valueno slot
= i
->first
;
341 ValueUpdater
updater(postlist_table
, slot
);
342 const map
<Xapian::docid
, string
> & slot_changes
= i
->second
;
343 map
<Xapian::docid
, string
>::const_iterator j
;
344 for (j
= slot_changes
.begin(); j
!= slot_changes
.end(); ++j
) {
345 updater
.update(j
->first
, j
->second
);
353 ChertValueManager::add_document(Xapian::docid did
, const Xapian::Document
&doc
,
354 map
<Xapian::valueno
, ValueStats
> & value_stats
)
356 // FIXME: Use BitWriter and interpolative coding? Or is it not worthwhile
359 Xapian::valueno prev_slot
= static_cast<Xapian::valueno
>(-1);
360 Xapian::ValueIterator it
= doc
.values_begin();
361 while (it
!= doc
.values_end()) {
362 Xapian::valueno slot
= it
.get_valueno();
365 // Update the statistics.
366 std::pair
<map
<Xapian::valueno
, ValueStats
>::iterator
, bool> i
;
367 i
= value_stats
.insert(make_pair(slot
, ValueStats()));
368 ValueStats
& stats
= i
.first
->second
;
370 // There were no statistics stored already, so read them.
371 get_value_stats(slot
, stats
);
374 // Now, modify the stored statistics.
375 if ((stats
.freq
)++ == 0) {
376 // If the value count was previously zero, set the upper and lower
377 // bounds to the newly added value.
378 stats
.lower_bound
= value
;
379 stats
.upper_bound
= value
;
381 // Otherwise, simply make sure they reflect the new value.
382 if (value
< stats
.lower_bound
) {
383 stats
.lower_bound
= value
;
384 } else if (value
> stats
.upper_bound
) {
385 stats
.upper_bound
= value
;
389 add_value(did
, slot
, value
);
390 if (termlist_table
->is_open()) {
391 pack_uint(slots_used
, slot
- prev_slot
- 1);
396 if (slots_used
.empty() && slots
.find(did
) == slots
.end()) {
397 // Adding a new document with no values which we didn't just remove.
399 swap(slots
[did
], slots_used
);
404 ChertValueManager::delete_document(Xapian::docid did
,
405 map
<Xapian::valueno
, ValueStats
> & value_stats
)
407 Assert(termlist_table
->is_open());
408 map
<Xapian::docid
, string
>::iterator it
= slots
.find(did
);
410 if (it
!= slots
.end()) {
413 // Get from table, making a swift exit if this document has no values.
414 if (!termlist_table
->get_exact_entry(make_slot_key(did
), s
)) return;
415 slots
.insert(make_pair(did
, string()));
417 const char * p
= s
.data();
418 const char * end
= p
+ s
.size();
419 Xapian::valueno prev_slot
= static_cast<Xapian::valueno
>(-1);
421 Xapian::valueno slot
;
422 if (!unpack_uint(&p
, end
, &slot
)) {
423 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
425 slot
+= prev_slot
+ 1;
428 std::pair
<map
<Xapian::valueno
, ValueStats
>::iterator
, bool> i
;
429 i
= value_stats
.insert(make_pair(slot
, ValueStats()));
430 ValueStats
& stats
= i
.first
->second
;
432 // There were no statistics stored already, so read them.
433 get_value_stats(slot
, stats
);
436 // Now, modify the stored statistics.
437 AssertRelParanoid(stats
.freq
, >, 0);
438 if (--(stats
.freq
) == 0) {
439 stats
.lower_bound
.resize(0);
440 stats
.upper_bound
.resize(0);
443 remove_value(did
, slot
);
448 ChertValueManager::replace_document(Xapian::docid did
,
449 const Xapian::Document
&doc
,
450 map
<Xapian::valueno
, ValueStats
> & value_stats
)
452 // Load the values into the document from the database, if they haven't
453 // been already. (If we don't do this before deleting the old values,
454 // replacing a document with itself will lose the values.)
455 doc
.internal
->need_values();
456 delete_document(did
, value_stats
);
457 add_document(did
, doc
, value_stats
);
461 ChertValueManager::get_value(Xapian::docid did
, Xapian::valueno slot
) const
463 map
<Xapian::valueno
, map
<Xapian::docid
, string
> >::const_iterator i
;
464 i
= changes
.find(slot
);
465 if (i
!= changes
.end()) {
466 map
<Xapian::docid
, string
>::const_iterator j
;
467 j
= i
->second
.find(did
);
468 if (j
!= i
->second
.end()) return j
->second
;
471 // Read it from the table.
473 Xapian::docid first_did
;
474 first_did
= get_chunk_containing_did(slot
, did
, chunk
);
475 if (first_did
== 0) return string();
477 ValueChunkReader
reader(chunk
.data(), chunk
.size(), first_did
);
479 if (reader
.at_end() || reader
.get_docid() != did
) return string();
480 return reader
.get_value();
484 ChertValueManager::get_all_values(map
<Xapian::valueno
, string
> & values
,
485 Xapian::docid did
) const
487 Assert(values
.empty());
488 if (!termlist_table
->is_open()) {
489 // Either the database has been closed, or else there's no termlist table.
490 // Check if the postlist table is open to determine which is the case.
491 if (!postlist_table
->is_open())
492 ChertTable::throw_database_closed();
493 throw Xapian::FeatureUnavailableError("Database has no termlist");
495 map
<Xapian::docid
, string
>::const_iterator i
= slots
.find(did
);
497 if (i
!= slots
.end()) {
501 if (!termlist_table
->get_exact_entry(make_slot_key(did
), s
)) return;
503 const char * p
= s
.data();
504 const char * end
= p
+ s
.size();
505 Xapian::valueno prev_slot
= static_cast<Xapian::valueno
>(-1);
507 Xapian::valueno slot
;
508 if (!unpack_uint(&p
, end
, &slot
)) {
509 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
511 slot
+= prev_slot
+ 1;
513 values
.insert(make_pair(slot
, get_value(did
, slot
)));
518 ChertValueManager::get_value_stats(Xapian::valueno slot
) const
520 LOGCALL_VOID(DB
, "ChertValueManager::get_value_stats", slot
);
521 // Invalidate the cache first in case an exception is thrown.
522 mru_slot
= Xapian::BAD_VALUENO
;
523 get_value_stats(slot
, mru_valstats
);
528 ChertValueManager::get_value_stats(Xapian::valueno slot
, ValueStats
& stats
) const
530 LOGCALL_VOID(DB
, "ChertValueManager::get_value_stats", slot
| Literal("[stats]"));
531 // Invalidate the cache first in case an exception is thrown.
532 mru_slot
= Xapian::BAD_VALUENO
;
535 if (postlist_table
->get_exact_entry(make_valuestats_key(slot
), tag
)) {
536 const char * pos
= tag
.data();
537 const char * end
= pos
+ tag
.size();
539 if (!unpack_uint(&pos
, end
, &(stats
.freq
))) {
540 if (pos
== 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
541 throw Xapian::RangeError("Frequency statistic in value table is too large");
543 if (!unpack_string(&pos
, end
, stats
.lower_bound
)) {
544 if (pos
== 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
545 throw Xapian::RangeError("Lower bound in value table is too large");
547 size_t len
= end
- pos
;
549 stats
.upper_bound
= stats
.lower_bound
;
551 stats
.upper_bound
.assign(pos
, len
);
561 ChertValueManager::set_value_stats(map
<Xapian::valueno
, ValueStats
> & value_stats
)
563 LOGCALL_VOID(DB
, "ChertValueManager::set_value_stats", value_stats
);
564 map
<Xapian::valueno
, ValueStats
>::const_iterator i
;
565 for (i
= value_stats
.begin(); i
!= value_stats
.end(); ++i
) {
566 string key
= make_valuestats_key(i
->first
);
567 const ValueStats
& stats
= i
->second
;
568 if (stats
.freq
!= 0) {
570 pack_uint(new_value
, stats
.freq
);
571 pack_string(new_value
, stats
.lower_bound
);
572 // We don't store or count empty values, so neither of the bounds
573 // can be empty. So we can safely store an empty upper bound when
574 // the bounds are equal.
575 if (stats
.lower_bound
!= stats
.upper_bound
)
576 new_value
+= stats
.upper_bound
;
577 postlist_table
->add(key
, new_value
);
579 postlist_table
->del(key
);
583 mru_slot
= Xapian::BAD_VALUENO
;