Fix --disable-remote-backend to disable replication
[xapian.git] / xapian-core / backends / chert / chert_database.cc
blobbc2b2dca96cf710b4535bb305da7de9ae2385d8f
1 /* chert_database.cc: chert database
3 * Copyright 1999,2000,2001 BrightStation PLC
4 * Copyright 2001 Hein Ragas
5 * Copyright 2002 Ananova Ltd
6 * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
7 * Copyright 2006,2008 Lemur Consulting Ltd
8 * Copyright 2009,2010 Richard Boulton
9 * Copyright 2009 Kan-Ru Chen
10 * Copyright 2011 Dan Colish
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License as
14 * published by the Free Software Foundation; either version 2 of the
15 * License, or (at your option) any later version.
17 * This program is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
22 * You should have received a copy of the GNU General Public License
23 * along with this program; if not, write to the Free Software
24 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
25 * USA
28 #include <config.h>
30 #include "chert_database.h"
32 #include "xapian/constants.h"
33 #include "xapian/error.h"
34 #include "xapian/valueiterator.h"
36 #include "backends/contiguousalldocspostlist.h"
37 #include "chert_alldocsmodifiedpostlist.h"
38 #include "chert_alldocspostlist.h"
39 #include "chert_alltermslist.h"
40 #include "chert_replicate_internal.h"
41 #include "chert_document.h"
42 #include "../flint_lock.h"
43 #include "chert_metadata.h"
44 #include "chert_modifiedpostlist.h"
45 #include "chert_positionlist.h"
46 #include "chert_postlist.h"
47 #include "chert_record.h"
48 #include "chert_spellingwordslist.h"
49 #include "chert_termlist.h"
50 #include "chert_valuelist.h"
51 #include "chert_values.h"
52 #include "debuglog.h"
53 #include "fd.h"
54 #include "io_utils.h"
55 #include "pack.h"
56 #include "posixy_wrapper.h"
57 #include "net/remoteconnection.h"
58 #include "replicate_utils.h"
59 #include "api/replication.h"
60 #include "replicationprotocol.h"
61 #include "net/length.h"
62 #include "str.h"
63 #include "stringutils.h"
64 #include "backends/valuestats.h"
66 #include "safeerrno.h"
67 #include "safesysstat.h"
68 #include <sys/types.h>
70 #include <algorithm>
71 #include "autoptr.h"
72 #include <cstdlib>
73 #include <string>
75 using namespace std;
76 using namespace Xapian;
77 using Xapian::Internal::intrusive_ptr;
79 // The maximum safe term length is determined by the postlist. There we
80 // store the term using pack_string_preserving_sort() which takes the
81 // length of the string plus an extra byte (assuming the string doesn't
82 // contain any zero bytes), followed by the docid with encoded with
83 // C_pack_uint_preserving_sort() which takes up to 5 bytes.
85 // The Btree manager's key length limit is 252 bytes so the maximum safe term
86 // length is 252 - 1 - 5 = 246 bytes. We use 245 rather than 246 for
87 // consistency with flint.
89 // If the term contains zero bytes, the limit is lower (by one for each zero
90 // byte in the term).
91 #define MAX_SAFE_TERM_LENGTH 245
93 /** Maximum number of times to try opening the tables to get them at a
94 * consistent revision.
96 * This is mostly just to avoid any chance of an infinite loop - normally
97 * we'll either get then on the first or second try.
99 const int MAX_OPEN_RETRIES = 100;
101 /* This finds the tables, opens them at consistent revisions, manages
102 * determining the current and next revision numbers, and stores handles
103 * to the tables.
105 ChertDatabase::ChertDatabase(const string &chert_dir, int flags,
106 unsigned int block_size)
107 : db_dir(chert_dir),
108 readonly(flags == Xapian::DB_READONLY_),
109 version_file(db_dir),
110 postlist_table(db_dir, readonly),
111 position_table(db_dir, readonly),
112 termlist_table(db_dir, readonly),
113 value_manager(&postlist_table, &termlist_table),
114 synonym_table(db_dir, readonly),
115 spelling_table(db_dir, readonly),
116 record_table(db_dir, readonly),
117 lock(db_dir),
118 max_changesets(0)
120 LOGCALL_CTOR(DB, "ChertDatabase", chert_dir | flags | block_size);
122 if (readonly) {
123 open_tables_consistent();
124 return;
127 int action = flags & Xapian::DB_ACTION_MASK_;
128 if (action != Xapian::DB_OPEN && !database_exists()) {
130 // Create the directory for the database, if it doesn't exist
131 // already.
132 bool fail = false;
133 struct stat statbuf;
134 if (stat(db_dir.c_str(), &statbuf) == 0) {
135 if (!S_ISDIR(statbuf.st_mode)) fail = true;
136 } else if (errno != ENOENT || mkdir(db_dir.c_str(), 0755) == -1) {
137 fail = true;
139 if (fail) {
140 throw Xapian::DatabaseCreateError("Cannot create directory '" +
141 db_dir + "'", errno);
143 get_database_write_lock(flags, true);
145 create_and_open_tables(block_size);
146 return;
149 if (action == Xapian::DB_CREATE) {
150 throw Xapian::DatabaseCreateError("Can't create new database at '" +
151 db_dir + "': a database already exists and I was told "
152 "not to overwrite it");
155 get_database_write_lock(flags, false);
156 // if we're overwriting, pretend the db doesn't exist
157 if (action == Xapian::DB_CREATE_OR_OVERWRITE) {
158 create_and_open_tables(block_size);
159 return;
162 // Get latest consistent version
163 open_tables_consistent();
165 // Check that there are no more recent versions of tables. If there
166 // are, perform recovery by writing a new revision number to all
167 // tables.
168 if (record_table.get_open_revision_number() !=
169 postlist_table.get_latest_revision_number()) {
170 chert_revision_number_t new_revision = get_next_revision_number();
172 set_revision_number(new_revision);
176 ChertDatabase::~ChertDatabase()
178 LOGCALL_DTOR(DB, "ChertDatabase");
181 bool
182 ChertDatabase::database_exists() {
183 LOGCALL(DB, bool, "ChertDatabase::database_exists", NO_ARGS);
184 RETURN(record_table.exists() && postlist_table.exists());
187 void
188 ChertDatabase::create_and_open_tables(unsigned int block_size)
190 LOGCALL_VOID(DB, "ChertDatabase::create_and_open_tables", NO_ARGS);
191 // The caller is expected to create the database directory if it doesn't
192 // already exist.
194 // Create postlist_table first, and record_table last. Existence of
195 // record_table is considered to imply existence of the database.
196 version_file.create();
197 postlist_table.create_and_open(block_size);
198 position_table.create_and_open(block_size);
199 termlist_table.create_and_open(block_size);
200 synonym_table.create_and_open(block_size);
201 spelling_table.create_and_open(block_size);
202 record_table.create_and_open(block_size);
204 Assert(database_exists());
206 // Check consistency
207 chert_revision_number_t revision = record_table.get_open_revision_number();
208 if (revision != postlist_table.get_open_revision_number()) {
209 throw Xapian::DatabaseCreateError("Newly created tables are not in consistent state");
212 stats.zero();
215 bool
216 ChertDatabase::open_tables_consistent()
218 LOGCALL(DB, bool, "ChertDatabase::open_tables_consistent", NO_ARGS);
219 // Open record_table first, since it's the last to be written to,
220 // and hence if a revision is available in it, it should be available
221 // in all the other tables (unless they've moved on already).
223 // If we find that a table can't open the desired revision, we
224 // go back and open record_table again, until record_table has
225 // the same revision as the last time we opened it.
227 chert_revision_number_t cur_rev = record_table.get_open_revision_number();
229 // Check the version file unless we're reopening.
230 if (cur_rev == 0) version_file.read_and_check();
232 record_table.open();
233 chert_revision_number_t revision = record_table.get_open_revision_number();
235 if (cur_rev && cur_rev == revision) {
236 // We're reopening a database and the revision hasn't changed so we
237 // don't need to do anything.
238 RETURN(false);
241 // Set the block_size for optional tables as they may not currently exist.
242 unsigned int block_size = record_table.get_block_size();
243 position_table.set_block_size(block_size);
244 termlist_table.set_block_size(block_size);
245 synonym_table.set_block_size(block_size);
246 spelling_table.set_block_size(block_size);
248 value_manager.reset();
250 bool fully_opened = false;
251 int tries_left = MAX_OPEN_RETRIES;
252 while (!fully_opened && (tries_left--) > 0) {
253 if (spelling_table.open(revision) &&
254 synonym_table.open(revision) &&
255 termlist_table.open(revision) &&
256 position_table.open(revision) &&
257 postlist_table.open(revision)) {
258 // Everything now open at the same revision.
259 fully_opened = true;
260 } else {
261 // Couldn't open consistent revision: two cases possible:
262 // i) An update has completed and a second one has begun since
263 // record was opened. This leaves a consistent revision
264 // available, but not the one we were trying to open.
265 // ii) Tables have become corrupt / have no consistent revision
266 // available. In this case, updates must have ceased.
268 // So, we reopen the record table, and check its revision number,
269 // if it's changed we try the opening again, otherwise we give up.
271 record_table.open();
272 chert_revision_number_t newrevision =
273 record_table.get_open_revision_number();
274 if (revision == newrevision) {
275 // Revision number hasn't changed - therefore a second index
276 // sweep hasn't begun and the system must have failed. Database
277 // is inconsistent.
278 throw Xapian::DatabaseCorruptError("Cannot open tables at consistent revisions");
280 revision = newrevision;
284 if (!fully_opened) {
285 throw Xapian::DatabaseModifiedError("Cannot open tables at stable revision - changing too fast");
288 stats.read(postlist_table);
289 return true;
292 void
293 ChertDatabase::open_tables(chert_revision_number_t revision)
295 LOGCALL_VOID(DB, "ChertDatabase::open_tables", revision);
296 version_file.read_and_check();
297 record_table.open(revision);
299 // Set the block_size for optional tables as they may not currently exist.
300 unsigned int block_size = record_table.get_block_size();
301 position_table.set_block_size(block_size);
302 termlist_table.set_block_size(block_size);
303 synonym_table.set_block_size(block_size);
304 spelling_table.set_block_size(block_size);
306 value_manager.reset();
308 spelling_table.open(revision);
309 synonym_table.open(revision);
310 termlist_table.open(revision);
311 position_table.open(revision);
312 postlist_table.open(revision);
315 chert_revision_number_t
316 ChertDatabase::get_revision_number() const
318 LOGCALL(DB, chert_revision_number_t, "ChertDatabase::get_revision_number", NO_ARGS);
319 // We could use any table here, theoretically.
320 RETURN(postlist_table.get_open_revision_number());
323 chert_revision_number_t
324 ChertDatabase::get_next_revision_number() const
326 LOGCALL(DB, chert_revision_number_t, "ChertDatabase::get_next_revision_number", NO_ARGS);
327 /* We _must_ use postlist_table here, since it is always the first
328 * to be written, and hence will have the greatest available revision
329 * number.
331 chert_revision_number_t new_revision =
332 postlist_table.get_latest_revision_number();
333 ++new_revision;
334 RETURN(new_revision);
337 void
338 ChertDatabase::get_changeset_revisions(const string & path,
339 chert_revision_number_t * startrev,
340 chert_revision_number_t * endrev) const
342 FD changes_fd(posixy_open(path.c_str(), O_RDONLY | O_CLOEXEC));
343 if (changes_fd < 0) {
344 string message = string("Couldn't open changeset ")
345 + path + " to read";
346 throw Xapian::DatabaseError(message, errno);
349 char buf[REASONABLE_CHANGESET_SIZE];
350 const char *start = buf;
351 const char *end = buf + io_read(changes_fd, buf, REASONABLE_CHANGESET_SIZE);
352 if (size_t(end - start) < CONST_STRLEN(CHANGES_MAGIC_STRING))
353 throw Xapian::DatabaseError("Changeset too short at " + path);
354 if (memcmp(start, CHANGES_MAGIC_STRING,
355 CONST_STRLEN(CHANGES_MAGIC_STRING)) != 0) {
356 string message = string("Changeset at ")
357 + path + " does not contain valid magic string";
358 throw Xapian::DatabaseError(message);
360 start += CONST_STRLEN(CHANGES_MAGIC_STRING);
362 unsigned int changes_version;
363 if (!unpack_uint(&start, end, &changes_version))
364 throw Xapian::DatabaseError("Couldn't read a valid version number for "
365 "changeset at " + path);
366 if (changes_version != CHANGES_VERSION)
367 throw Xapian::DatabaseError("Don't support version of changeset at "
368 + path);
370 if (!unpack_uint(&start, end, startrev))
371 throw Xapian::DatabaseError("Couldn't read a valid start revision from "
372 "changeset at " + path);
374 if (!unpack_uint(&start, end, endrev))
375 throw Xapian::DatabaseError("Couldn't read a valid end revision for "
376 "changeset at " + path);
379 void
380 ChertDatabase::set_revision_number(chert_revision_number_t new_revision)
382 LOGCALL_VOID(DB, "ChertDatabase::set_revision_number", new_revision);
384 value_manager.merge_changes();
386 postlist_table.flush_db();
387 position_table.flush_db();
388 termlist_table.flush_db();
389 synonym_table.flush_db();
390 spelling_table.flush_db();
391 record_table.flush_db();
393 int changes_fd = -1;
394 string changes_name;
396 const char *p = getenv("XAPIAN_MAX_CHANGESETS");
397 if (p) {
398 max_changesets = atoi(p);
399 } else {
400 max_changesets = 0;
403 if (max_changesets > 0) {
404 chert_revision_number_t old_revision = get_revision_number();
405 if (old_revision) {
406 // Don't generate a changeset for the first revision.
407 changes_fd = create_changeset_file(db_dir,
408 "/changes" + str(old_revision),
409 changes_name);
413 try {
414 FD closefd(changes_fd);
415 if (changes_fd >= 0) {
416 string buf;
417 chert_revision_number_t old_revision = get_revision_number();
418 buf += CHANGES_MAGIC_STRING;
419 pack_uint(buf, CHANGES_VERSION);
420 pack_uint(buf, old_revision);
421 pack_uint(buf, new_revision);
423 #ifndef DANGEROUS
424 buf += '\x00'; // Changes can be applied to a live database.
425 #else
426 buf += '\x01';
427 #endif
429 io_write(changes_fd, buf.data(), buf.size());
431 // Write the changes to the blocks in the tables. Do the postlist
432 // table last, so that ends up cached the most, if the cache
433 // available is limited. Do the position table just before that
434 // as having that cached will also improve search performance.
435 termlist_table.write_changed_blocks(changes_fd);
436 synonym_table.write_changed_blocks(changes_fd);
437 spelling_table.write_changed_blocks(changes_fd);
438 record_table.write_changed_blocks(changes_fd);
439 position_table.write_changed_blocks(changes_fd);
440 postlist_table.write_changed_blocks(changes_fd);
443 postlist_table.commit(new_revision, changes_fd);
444 position_table.commit(new_revision, changes_fd);
445 termlist_table.commit(new_revision, changes_fd);
446 synonym_table.commit(new_revision, changes_fd);
447 spelling_table.commit(new_revision, changes_fd);
449 string changes_tail; // Data to be appended to the changes file
450 if (changes_fd >= 0) {
451 changes_tail += '\0';
452 pack_uint(changes_tail, new_revision);
454 record_table.commit(new_revision, changes_fd, &changes_tail);
455 } catch (...) {
456 // Remove the changeset, if there was one.
457 if (changes_fd >= 0) {
458 (void)io_unlink(changes_name);
461 throw;
464 if (changes_fd >= 0 && max_changesets < new_revision) {
465 // While change sets less than N - max_changesets exist, delete them
466 // 1 must be subtracted so we don't delete the changeset we just wrote
467 // when max_changesets = 1
468 unsigned rev = new_revision - max_changesets - 1;
469 while (io_unlink(db_dir + "/changes" + str(rev--))) { }
473 void
474 ChertDatabase::request_document(Xapian::docid did) const
476 record_table.readahead_for_record(did);
479 void
480 ChertDatabase::readahead_for_query(const Xapian::Query &query)
482 Xapian::TermIterator t;
483 for (t = query.get_unique_terms_begin(); t != Xapian::TermIterator(); ++t) {
484 const string & term = *t;
485 if (!postlist_table.readahead_key(ChertPostListTable::make_key(term)))
486 break;
490 bool
491 ChertDatabase::reopen()
493 LOGCALL(DB, bool, "ChertDatabase::reopen", NO_ARGS);
494 if (!readonly) RETURN(false);
495 RETURN(open_tables_consistent());
498 void
499 ChertDatabase::close()
501 LOGCALL_VOID(DB, "ChertDatabase::close", NO_ARGS);
502 postlist_table.close(true);
503 position_table.close(true);
504 termlist_table.close(true);
505 synonym_table.close(true);
506 spelling_table.close(true);
507 record_table.close(true);
508 lock.release();
511 void
512 ChertDatabase::get_database_write_lock(int flags, bool creating)
514 LOGCALL_VOID(DB, "ChertDatabase::get_database_write_lock", flags|creating);
515 string explanation;
516 bool retry = flags & Xapian::DB_RETRY_LOCK;
517 FlintLock::reason why = lock.lock(true, retry, explanation);
518 if (why != FlintLock::SUCCESS) {
519 if (why == FlintLock::UNKNOWN && !creating && !database_exists()) {
520 string msg("No chert database found at path '");
521 msg += db_dir;
522 msg += '\'';
523 throw Xapian::DatabaseOpeningError(msg);
525 lock.throw_databaselockerror(why, db_dir, explanation);
529 void
530 ChertDatabase::send_whole_database(RemoteConnection & conn, double end_time)
532 LOGCALL_VOID(DB, "ChertDatabase::send_whole_database", conn | end_time);
533 #ifdef XAPIAN_HAS_REMOTE_BACKEND
534 // Send the current revision number in the header.
535 string buf;
536 string uuid = get_uuid();
537 buf += encode_length(uuid.size());
538 buf += uuid;
539 pack_uint(buf, get_revision_number());
540 conn.send_message(REPL_REPLY_DB_HEADER, buf, end_time);
542 // Send all the tables. The tables which we want to be cached best after
543 // the copy finished are sent last.
544 static const char filenames[] =
545 "\x0b""termlist.DB""\x0e""termlist.baseA\x0e""termlist.baseB"
546 "\x0a""synonym.DB""\x0d""synonym.baseA\x0d""synonym.baseB"
547 "\x0b""spelling.DB""\x0e""spelling.baseA\x0e""spelling.baseB"
548 "\x09""record.DB""\x0c""record.baseA\x0c""record.baseB"
549 "\x0b""position.DB""\x0e""position.baseA\x0e""position.baseB"
550 "\x0b""postlist.DB""\x0e""postlist.baseA\x0e""postlist.baseB"
551 "\x08""iamchert";
552 string filepath = db_dir;
553 filepath += '/';
554 for (const char * p = filenames; *p; p += *p + 1) {
555 string leaf(p + 1, size_t(static_cast<unsigned char>(*p)));
556 filepath.replace(db_dir.size() + 1, string::npos, leaf);
557 FD fd(posixy_open(filepath.c_str(), O_RDONLY | O_CLOEXEC));
558 if (fd >= 0) {
559 conn.send_message(REPL_REPLY_DB_FILENAME, leaf, end_time);
560 conn.send_file(REPL_REPLY_DB_FILEDATA, fd, end_time);
563 #else
564 (void)conn;
565 (void)end_time;
566 #endif
569 void
570 ChertDatabase::write_changesets_to_fd(int fd,
571 const string & revision,
572 bool need_whole_db,
573 ReplicationInfo * info)
575 LOGCALL_VOID(DB, "ChertDatabase::write_changesets_to_fd", fd | revision | need_whole_db | info);
576 #ifdef XAPIAN_HAS_REMOTE_BACKEND
577 int whole_db_copies_left = MAX_DB_COPIES_PER_CONVERSATION;
578 chert_revision_number_t start_rev_num = 0;
579 string start_uuid = get_uuid();
581 chert_revision_number_t needed_rev_num = 0;
583 const char * rev_ptr = revision.data();
584 const char * rev_end = rev_ptr + revision.size();
585 if (!unpack_uint(&rev_ptr, rev_end, &start_rev_num)) {
586 need_whole_db = true;
589 RemoteConnection conn(-1, fd, string());
591 // While the starting revision number is less than the latest revision
592 // number, look for a changeset, and write it.
594 // FIXME - perhaps we should make hardlinks for all the changesets we're
595 // likely to need, first, and then start sending them, so that there's no
596 // risk of them disappearing while we're sending earlier ones.
597 while (true) {
598 if (need_whole_db) {
599 // Decrease the counter of copies left to be sent, and fail
600 // if we've already copied the database enough. This ensures that
601 // synchronisation attempts always terminate eventually.
602 if (whole_db_copies_left == 0) {
603 conn.send_message(REPL_REPLY_FAIL,
604 "Database changing too fast",
605 0.0);
606 return;
608 whole_db_copies_left--;
610 // Send the whole database across.
611 start_rev_num = get_revision_number();
612 start_uuid = get_uuid();
614 send_whole_database(conn, 0.0);
615 if (info != NULL)
616 ++(info->fullcopy_count);
618 need_whole_db = false;
620 reopen();
621 if (start_uuid == get_uuid()) {
622 // Send the latest revision number after sending the tables.
623 // The update must proceed to that revision number before the
624 // copy is safe to make live.
626 string buf;
627 needed_rev_num = get_revision_number();
628 pack_uint(buf, needed_rev_num);
629 conn.send_message(REPL_REPLY_DB_FOOTER, buf, 0.0);
630 if (info != NULL && start_rev_num == needed_rev_num)
631 info->changed = true;
632 } else {
633 // Database has been replaced since we did the copy. Send a
634 // higher revision number than the revision we've just copied,
635 // so that the client doesn't make the copy we've just done
636 // live, and then mark that we need to do a copy again.
637 // The client will never actually get the required revision,
638 // because the next message is going to be the start of a new
639 // database transfer.
641 string buf;
642 pack_uint(buf, start_rev_num + 1);
643 conn.send_message(REPL_REPLY_DB_FOOTER, buf, 0.0);
644 need_whole_db = true;
646 } else {
647 // Check if we've sent all the updates.
648 if (start_rev_num >= get_revision_number()) {
649 reopen();
650 if (start_uuid != get_uuid()) {
651 need_whole_db = true;
652 continue;
654 if (start_rev_num >= get_revision_number()) {
655 break;
659 // Look for the changeset for revision start_rev_num.
660 string changes_name = db_dir + "/changes" + str(start_rev_num);
661 FD fd_changes(posixy_open(changes_name.c_str(), O_RDONLY | O_CLOEXEC));
662 if (fd_changes >= 0) {
663 // Send it, and also update start_rev_num to the new value
664 // specified in the changeset.
665 chert_revision_number_t changeset_start_rev_num;
666 chert_revision_number_t changeset_end_rev_num;
667 get_changeset_revisions(changes_name,
668 &changeset_start_rev_num,
669 &changeset_end_rev_num);
670 if (changeset_start_rev_num != start_rev_num) {
671 throw Xapian::DatabaseError("Changeset start revision does not match changeset filename");
673 if (changeset_start_rev_num >= changeset_end_rev_num) {
674 throw Xapian::DatabaseError("Changeset start revision is not less than end revision");
677 conn.send_file(REPL_REPLY_CHANGESET, fd_changes, 0.0);
678 start_rev_num = changeset_end_rev_num;
679 if (info != NULL) {
680 ++(info->changeset_count);
681 if (start_rev_num >= needed_rev_num)
682 info->changed = true;
684 } else {
685 // The changeset doesn't exist: leave the revision number as it
686 // is, and mark for doing a full database copy.
687 need_whole_db = true;
691 conn.send_message(REPL_REPLY_END_OF_CHANGES, string(), 0.0);
692 #else
693 (void)fd;
694 (void)revision;
695 (void)need_whole_db;
696 (void)info;
697 #endif
700 void
701 ChertDatabase::modifications_failed(chert_revision_number_t old_revision,
702 chert_revision_number_t new_revision,
703 const std::string & msg)
705 // Modifications failed. Wipe all the modifications from memory.
706 try {
707 // Discard any buffered changes and reinitialised cached values
708 // from the table.
709 cancel();
711 // Reopen tables with old revision number.
712 open_tables(old_revision);
714 // Increase revision numbers to new revision number plus one,
715 // writing increased numbers to all tables.
716 ++new_revision;
717 set_revision_number(new_revision);
718 } catch (const Xapian::Error &e) {
719 // We can't get the database into a consistent state, so close
720 // it to avoid the risk of database corruption.
721 ChertDatabase::close();
722 throw Xapian::DatabaseError("Modifications failed (" + msg +
723 "), and cannot set consistent table "
724 "revision numbers: " + e.get_msg());
728 void
729 ChertDatabase::apply()
731 LOGCALL_VOID(DB, "ChertDatabase::apply", NO_ARGS);
732 if (!postlist_table.is_modified() &&
733 !position_table.is_modified() &&
734 !termlist_table.is_modified() &&
735 !value_manager.is_modified() &&
736 !synonym_table.is_modified() &&
737 !spelling_table.is_modified() &&
738 !record_table.is_modified()) {
739 return;
742 chert_revision_number_t old_revision = get_revision_number();
743 chert_revision_number_t new_revision = get_next_revision_number();
745 try {
746 set_revision_number(new_revision);
747 } catch (const Xapian::Error &e) {
748 modifications_failed(old_revision, new_revision, e.get_description());
749 throw;
750 } catch (...) {
751 modifications_failed(old_revision, new_revision, "Unknown error");
752 throw;
756 void
757 ChertDatabase::cancel()
759 LOGCALL_VOID(DB, "ChertDatabase::cancel", NO_ARGS);
760 postlist_table.cancel();
761 position_table.cancel();
762 termlist_table.cancel();
763 value_manager.cancel();
764 synonym_table.cancel();
765 spelling_table.cancel();
766 record_table.cancel();
769 Xapian::doccount
770 ChertDatabase::get_doccount() const
772 LOGCALL(DB, Xapian::doccount, "ChertDatabase::get_doccount", NO_ARGS);
773 RETURN(record_table.get_doccount());
776 Xapian::docid
777 ChertDatabase::get_lastdocid() const
779 LOGCALL(DB, Xapian::docid, "ChertDatabase::get_lastdocid", NO_ARGS);
780 RETURN(stats.get_last_docid());
783 Xapian::totallength
784 ChertDatabase::get_total_length() const
786 LOGCALL(DB, Xapian::totallength, "ChertDatabase::get_total_length", NO_ARGS);
787 RETURN(stats.get_total_doclen());
790 Xapian::termcount
791 ChertDatabase::get_doclength(Xapian::docid did) const
793 LOGCALL(DB, Xapian::termcount, "ChertDatabase::get_doclength", did);
794 Assert(did != 0);
795 intrusive_ptr<const ChertDatabase> ptrtothis(this);
796 RETURN(postlist_table.get_doclength(did, ptrtothis));
799 Xapian::termcount
800 ChertDatabase::get_unique_terms(Xapian::docid did) const
802 LOGCALL(DB, Xapian::termcount, "ChertDatabase::get_unique_terms", did);
803 Assert(did != 0);
804 intrusive_ptr<const ChertDatabase> ptrtothis(this);
805 ChertTermList termlist(ptrtothis, did);
806 // Note that the "approximate" size should be exact in this case.
808 // get_unique_terms() really ought to only count terms with wdf > 0, but
809 // that's expensive to calculate on demand, so for now let's just ensure
810 // unique_terms <= doclen.
811 RETURN(min(termlist.get_approx_size(),
812 postlist_table.get_doclength(did, ptrtothis)));
815 void
816 ChertDatabase::get_freqs(const string & term,
817 Xapian::doccount * termfreq_ptr,
818 Xapian::termcount * collfreq_ptr) const
820 LOGCALL_VOID(DB, "ChertDatabase::get_freqs", term | termfreq_ptr | collfreq_ptr);
821 Assert(!term.empty());
822 postlist_table.get_freqs(term, termfreq_ptr, collfreq_ptr);
825 Xapian::doccount
826 ChertDatabase::get_value_freq(Xapian::valueno slot) const
828 LOGCALL(DB, Xapian::doccount, "ChertDatabase::get_value_freq", slot);
829 RETURN(value_manager.get_value_freq(slot));
832 std::string
833 ChertDatabase::get_value_lower_bound(Xapian::valueno slot) const
835 LOGCALL(DB, std::string, "ChertDatabase::get_value_lower_bound", slot);
836 RETURN(value_manager.get_value_lower_bound(slot));
839 std::string
840 ChertDatabase::get_value_upper_bound(Xapian::valueno slot) const
842 LOGCALL(DB, std::string, "ChertDatabase::get_value_upper_bound", slot);
843 RETURN(value_manager.get_value_upper_bound(slot));
846 Xapian::termcount
847 ChertDatabase::get_doclength_lower_bound() const
849 return stats.get_doclength_lower_bound();
852 Xapian::termcount
853 ChertDatabase::get_doclength_upper_bound() const
855 return stats.get_doclength_upper_bound();
858 Xapian::termcount
859 ChertDatabase::get_wdf_upper_bound(const string & term) const
861 Xapian::termcount cf;
862 get_freqs(term, NULL, &cf);
863 return min(cf, stats.get_wdf_upper_bound());
866 bool
867 ChertDatabase::term_exists(const string & term) const
869 LOGCALL(DB, bool, "ChertDatabase::term_exists", term);
870 Assert(!term.empty());
871 RETURN(postlist_table.term_exists(term));
874 bool
875 ChertDatabase::has_positions() const
877 return !position_table.empty();
880 LeafPostList *
881 ChertDatabase::open_post_list(const string& term) const
883 LOGCALL(DB, LeafPostList *, "ChertDatabase::open_post_list", term);
884 intrusive_ptr<const ChertDatabase> ptrtothis(this);
886 if (term.empty()) {
887 Xapian::doccount doccount = get_doccount();
888 if (stats.get_last_docid() == doccount) {
889 RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
891 RETURN(new ChertAllDocsPostList(ptrtothis, doccount));
894 RETURN(new ChertPostList(ptrtothis, term, true));
897 ValueList *
898 ChertDatabase::open_value_list(Xapian::valueno slot) const
900 LOGCALL(DB, ValueList *, "ChertDatabase::open_value_list", slot);
901 intrusive_ptr<const ChertDatabase> ptrtothis(this);
902 RETURN(new ChertValueList(slot, ptrtothis));
905 TermList *
906 ChertDatabase::open_term_list(Xapian::docid did) const
908 LOGCALL(DB, TermList *, "ChertDatabase::open_term_list", did);
909 Assert(did != 0);
910 if (!termlist_table.is_open())
911 throw_termlist_table_close_exception();
912 intrusive_ptr<const ChertDatabase> ptrtothis(this);
913 RETURN(new ChertTermList(ptrtothis, did));
916 Xapian::Document::Internal *
917 ChertDatabase::open_document(Xapian::docid did, bool lazy) const
919 LOGCALL(DB, Xapian::Document::Internal *, "ChertDatabase::open_document", did | lazy);
920 Assert(did != 0);
921 if (!lazy) {
922 // This will throw DocNotFoundError if the document doesn't exist.
923 (void)get_doclength(did);
926 intrusive_ptr<const Database::Internal> ptrtothis(this);
927 RETURN(new ChertDocument(ptrtothis, did, &value_manager, &record_table));
930 PositionList *
931 ChertDatabase::open_position_list(Xapian::docid did, const string & term) const
933 Assert(did != 0);
935 AutoPtr<ChertPositionList> poslist(new ChertPositionList);
936 if (!poslist->read_data(&position_table, did, term)) {
937 // As of 1.1.0, we don't check if the did and term exist - we just
938 // return an empty positionlist. If the user really needs to know,
939 // they can check for themselves.
942 return poslist.release();
945 TermList *
946 ChertDatabase::open_allterms(const string & prefix) const
948 LOGCALL(DB, TermList *, "ChertDatabase::open_allterms", NO_ARGS);
949 RETURN(new ChertAllTermsList(intrusive_ptr<const ChertDatabase>(this),
950 prefix));
953 TermList *
954 ChertDatabase::open_spelling_termlist(const string & word) const
956 return spelling_table.open_termlist(word);
959 TermList *
960 ChertDatabase::open_spelling_wordlist() const
962 ChertCursor * cursor = spelling_table.cursor_get();
963 if (!cursor) return NULL;
964 return new ChertSpellingWordsList(intrusive_ptr<const ChertDatabase>(this),
965 cursor);
968 Xapian::doccount
969 ChertDatabase::get_spelling_frequency(const string & word) const
971 return spelling_table.get_word_frequency(word);
974 TermList *
975 ChertDatabase::open_synonym_termlist(const string & term) const
977 return synonym_table.open_termlist(term);
980 TermList *
981 ChertDatabase::open_synonym_keylist(const string & prefix) const
983 ChertCursor * cursor = synonym_table.cursor_get();
984 if (!cursor) return NULL;
985 return new ChertSynonymTermList(intrusive_ptr<const ChertDatabase>(this),
986 cursor, prefix);
989 string
990 ChertDatabase::get_metadata(const string & key) const
992 LOGCALL(DB, string, "ChertDatabase::get_metadata", key);
993 string btree_key("\x00\xc0", 2);
994 btree_key += key;
995 string tag;
996 (void)postlist_table.get_exact_entry(btree_key, tag);
997 RETURN(tag);
1000 TermList *
1001 ChertDatabase::open_metadata_keylist(const std::string &prefix) const
1003 LOGCALL(DB, TermList *, "ChertDatabase::open_metadata_keylist", NO_ARGS);
1004 ChertCursor * cursor = postlist_table.cursor_get();
1005 if (!cursor) RETURN(NULL);
1006 RETURN(new ChertMetadataTermList(intrusive_ptr<const ChertDatabase>(this),
1007 cursor, prefix));
1010 string
1011 ChertDatabase::get_revision_info() const
1013 LOGCALL(DB, string, "ChertDatabase::get_revision_info", NO_ARGS);
1014 string buf;
1015 pack_uint(buf, get_revision_number());
1016 RETURN(buf);
1019 string
1020 ChertDatabase::get_uuid() const
1022 LOGCALL(DB, string, "ChertDatabase::get_uuid", NO_ARGS);
1023 RETURN(version_file.get_uuid_string());
1026 void
1027 ChertDatabase::throw_termlist_table_close_exception() const
1029 // Either the database has been closed, or else there's no termlist table.
1030 // Check if the postlist table is open to determine which is the case.
1031 if (!postlist_table.is_open())
1032 ChertTable::throw_database_closed();
1033 throw Xapian::FeatureUnavailableError("Database has no termlist");
1036 void
1037 ChertDatabase::get_used_docid_range(Xapian::docid & first,
1038 Xapian::docid & last) const
1040 last = stats.get_last_docid();
1041 if (last == record_table.get_doccount()) {
1042 // Contiguous range starting at 1.
1043 first = 1;
1044 return;
1046 postlist_table.get_used_docid_range(first, last);
1049 bool
1050 ChertDatabase::locked() const
1052 return lock.test();
1055 bool
1056 ChertDatabase::has_uncommitted_changes() const
1058 return false;
1061 ///////////////////////////////////////////////////////////////////////////
1063 ChertWritableDatabase::ChertWritableDatabase(const string &dir, int action,
1064 int block_size)
1065 : ChertDatabase(dir, action, block_size),
1066 freq_deltas(),
1067 doclens(),
1068 mod_plists(),
1069 change_count(0),
1070 flush_threshold(0),
1071 modify_shortcut_document(NULL),
1072 modify_shortcut_docid(0)
1074 LOGCALL_CTOR(DB, "ChertWritableDatabase", dir | action | block_size);
1076 const char *p = getenv("XAPIAN_FLUSH_THRESHOLD");
1077 if (p)
1078 flush_threshold = atoi(p);
1079 if (flush_threshold == 0)
1080 flush_threshold = 10000;
1083 ChertWritableDatabase::~ChertWritableDatabase()
1085 LOGCALL_DTOR(DB, "ChertWritableDatabase");
1086 dtor_called();
1089 void
1090 ChertWritableDatabase::commit()
1092 if (transaction_active())
1093 throw Xapian::InvalidOperationError("Can't commit during a transaction");
1094 if (change_count) flush_postlist_changes();
1095 apply();
1098 void
1099 ChertWritableDatabase::check_flush_threshold()
1101 // FIXME: this should be done by checking memory usage, not the number of
1102 // changes.
1103 // We could also look at:
1104 // * mod_plists.size()
1105 // * doclens.size()
1106 // * freq_deltas.size()
1108 // cout << "+++ mod_plists.size() " << mod_plists.size() <<
1109 // ", doclens.size() " << doclens.size() <<
1110 // ", freq_deltas.size() " << freq_deltas.size() << endl;
1111 if (++change_count >= flush_threshold) {
1112 flush_postlist_changes();
1113 if (!transaction_active()) apply();
1117 void
1118 ChertWritableDatabase::flush_postlist_changes() const
1120 postlist_table.merge_changes(mod_plists, doclens, freq_deltas);
1121 stats.write(postlist_table);
1123 freq_deltas.clear();
1124 doclens.clear();
1125 mod_plists.clear();
1126 change_count = 0;
1129 void
1130 ChertWritableDatabase::close()
1132 LOGCALL_VOID(DB, "ChertWritableDatabase::close", NO_ARGS);
1133 if (!transaction_active()) {
1134 commit();
1135 // FIXME: if commit() throws, should we still close?
1137 ChertDatabase::close();
1140 void
1141 ChertWritableDatabase::apply()
1143 value_manager.set_value_stats(value_stats);
1144 ChertDatabase::apply();
1147 void
1148 ChertWritableDatabase::add_freq_delta(const string & tname,
1149 Xapian::termcount_diff tf_delta,
1150 Xapian::termcount_diff cf_delta)
1152 map<string, pair<termcount_diff, termcount_diff> >::iterator i;
1153 i = freq_deltas.find(tname);
1154 if (i == freq_deltas.end()) {
1155 freq_deltas.insert(make_pair(tname, make_pair(tf_delta, cf_delta)));
1156 } else {
1157 i->second.first += tf_delta;
1158 i->second.second += cf_delta;
1162 void
1163 ChertWritableDatabase::insert_mod_plist(Xapian::docid did,
1164 const string & tname,
1165 Xapian::termcount wdf)
1167 // Find or make the appropriate entry in mod_plists.
1168 map<string, map<docid, pair<char, termcount> > >::iterator j;
1169 j = mod_plists.find(tname);
1170 if (j == mod_plists.end()) {
1171 map<docid, pair<char, termcount> > m;
1172 j = mod_plists.insert(make_pair(tname, m)).first;
1174 j->second[did] = make_pair('A', wdf);
1177 void
1178 ChertWritableDatabase::update_mod_plist(Xapian::docid did,
1179 const string & tname,
1180 char type,
1181 Xapian::termcount wdf)
1183 // Find or make the appropriate entry in mod_plists.
1184 map<string, map<docid, pair<char, termcount> > >::iterator j;
1185 j = mod_plists.find(tname);
1186 if (j == mod_plists.end()) {
1187 map<docid, pair<char, termcount> > m;
1188 j = mod_plists.insert(make_pair(tname, m)).first;
1191 map<docid, pair<char, termcount> >::iterator k;
1192 k = j->second.find(did);
1193 if (k == j->second.end()) {
1194 j->second.insert(make_pair(did, make_pair(type, wdf)));
1195 } else {
1196 if (type == 'A') {
1197 // Adding an entry which has already been deleted.
1198 Assert(k->second.first == 'D');
1199 type = 'M';
1201 k->second = make_pair(type, wdf);
1205 Xapian::docid
1206 ChertWritableDatabase::add_document(const Xapian::Document & document)
1208 LOGCALL(DB, Xapian::docid, "ChertWritableDatabase::add_document", document);
1209 // Make sure the docid counter doesn't overflow.
1210 if (stats.get_last_docid() == CHERT_MAX_DOCID)
1211 throw Xapian::DatabaseError("Run out of docids - you'll have to use copydatabase to eliminate any gaps before you can add more documents");
1212 // Use the next unused document ID.
1213 RETURN(add_document_(stats.get_next_docid(), document));
1216 Xapian::docid
1217 ChertWritableDatabase::add_document_(Xapian::docid did,
1218 const Xapian::Document & document)
1220 LOGCALL(DB, Xapian::docid, "ChertWritableDatabase::add_document_", did | document);
1221 Assert(did != 0);
1222 try {
1223 // Add the record using that document ID.
1224 record_table.replace_record(document.get_data(), did);
1226 // Set the values.
1227 value_manager.add_document(did, document, value_stats);
1229 chert_doclen_t new_doclen = 0;
1231 Xapian::TermIterator term = document.termlist_begin();
1232 for ( ; term != document.termlist_end(); ++term) {
1233 termcount wdf = term.get_wdf();
1234 // Calculate the new document length
1235 new_doclen += wdf;
1236 stats.check_wdf(wdf);
1238 string tname = *term;
1239 if (tname.size() > MAX_SAFE_TERM_LENGTH)
1240 throw Xapian::InvalidArgumentError("Term too long (> " STRINGIZE(MAX_SAFE_TERM_LENGTH) "): " + tname);
1241 add_freq_delta(tname, 1, wdf);
1242 insert_mod_plist(did, tname, wdf);
1244 PositionIterator pos = term.positionlist_begin();
1245 if (pos != term.positionlist_end()) {
1246 position_table.set_positionlist(
1247 did, tname,
1248 pos, term.positionlist_end(), false);
1252 LOGLINE(DB, "Calculated doclen for new document " << did << " as " << new_doclen);
1254 // Set the termlist.
1255 if (termlist_table.is_open())
1256 termlist_table.set_termlist(did, document, new_doclen);
1258 // Set the new document length
1259 Assert(doclens.find(did) == doclens.end() || doclens[did] == static_cast<Xapian::termcount>(-1));
1260 doclens[did] = new_doclen;
1261 stats.add_document(new_doclen);
1262 } catch (...) {
1263 // If an error occurs while adding a document, or doing any other
1264 // transaction, the modifications so far must be cleared before
1265 // returning control to the user - otherwise partial modifications will
1266 // persist in memory, and eventually get written to disk.
1267 cancel();
1268 throw;
1271 check_flush_threshold();
1273 RETURN(did);
1276 void
1277 ChertWritableDatabase::delete_document(Xapian::docid did)
1279 LOGCALL_VOID(DB, "ChertWritableDatabase::delete_document", did);
1280 Assert(did != 0);
1282 if (!termlist_table.is_open())
1283 throw_termlist_table_close_exception();
1285 if (rare(modify_shortcut_docid == did)) {
1286 // The modify_shortcut document can't be used for a modification
1287 // shortcut now, because it's been deleted!
1288 modify_shortcut_document = NULL;
1289 modify_shortcut_docid = 0;
1292 // Remove the record. If this fails, just propagate the exception since
1293 // the state should still be consistent (most likely it's
1294 // DocNotFoundError).
1295 record_table.delete_record(did);
1297 try {
1298 // Remove the values.
1299 value_manager.delete_document(did, value_stats);
1301 // OK, now add entries to remove the postings in the underlying record.
1302 intrusive_ptr<const ChertWritableDatabase> ptrtothis(this);
1303 ChertTermList termlist(ptrtothis, did);
1305 stats.delete_document(termlist.get_doclength());
1307 termlist.next();
1308 while (!termlist.at_end()) {
1309 string tname = termlist.get_termname();
1310 position_table.delete_positionlist(did, tname);
1311 termcount wdf = termlist.get_wdf();
1313 add_freq_delta(tname, -1, -wdf);
1314 update_mod_plist(did, tname, 'D', 0u);
1316 termlist.next();
1319 // Remove the termlist.
1320 if (termlist_table.is_open())
1321 termlist_table.delete_termlist(did);
1323 // Mark this document as removed.
1324 doclens[did] = static_cast<Xapian::termcount>(-1);
1325 } catch (...) {
1326 // If an error occurs while deleting a document, or doing any other
1327 // transaction, the modifications so far must be cleared before
1328 // returning control to the user - otherwise partial modifications will
1329 // persist in memory, and eventually get written to disk.
1330 cancel();
1331 throw;
1334 check_flush_threshold();
1337 void
1338 ChertWritableDatabase::replace_document(Xapian::docid did,
1339 const Xapian::Document & document)
1341 LOGCALL_VOID(DB, "ChertWritableDatabase::replace_document", did | document);
1342 Assert(did != 0);
1344 try {
1345 if (did > stats.get_last_docid()) {
1346 stats.set_last_docid(did);
1347 // If this docid is above the highwatermark, then we can't be
1348 // replacing an existing document.
1349 (void)add_document_(did, document);
1350 return;
1353 if (!termlist_table.is_open()) {
1354 // We can replace an *unused* docid <= last_docid too.
1355 intrusive_ptr<const ChertDatabase> ptrtothis(this);
1356 if (!postlist_table.document_exists(did, ptrtothis)) {
1357 (void)add_document_(did, document);
1358 return;
1360 throw_termlist_table_close_exception();
1363 // Check for a document read from this database being replaced - ie, a
1364 // modification operation.
1365 bool modifying = false;
1366 if (modify_shortcut_docid &&
1367 document.internal->get_docid() == modify_shortcut_docid) {
1368 if (document.internal.get() == modify_shortcut_document) {
1369 // We have a docid, it matches, and the pointer matches, so we
1370 // can skip modification of any data which hasn't been modified
1371 // in the document.
1372 if (!document.internal->modified()) {
1373 // If the document is unchanged, we've nothing to do.
1374 return;
1376 modifying = true;
1377 LOGLINE(DB, "Detected potential document modification shortcut.");
1378 } else {
1379 // The modify_shortcut document can't be used for a
1380 // modification shortcut now, because it's about to be
1381 // modified.
1382 modify_shortcut_document = NULL;
1383 modify_shortcut_docid = 0;
1387 if (!modifying || document.internal->terms_modified()) {
1388 bool pos_modified = !modifying ||
1389 document.internal->term_positions_modified();
1390 intrusive_ptr<const ChertWritableDatabase> ptrtothis(this);
1391 ChertTermList termlist(ptrtothis, did);
1392 Xapian::TermIterator term = document.termlist_begin();
1393 chert_doclen_t old_doclen = termlist.get_doclength();
1394 stats.delete_document(old_doclen);
1395 chert_doclen_t new_doclen = old_doclen;
1397 string old_tname, new_tname;
1399 termlist.next();
1400 while (!termlist.at_end() || term != document.termlist_end()) {
1401 int cmp;
1402 if (termlist.at_end()) {
1403 cmp = 1;
1404 new_tname = *term;
1405 } else {
1406 old_tname = termlist.get_termname();
1407 if (term != document.termlist_end()) {
1408 new_tname = *term;
1409 cmp = old_tname.compare(new_tname);
1410 } else {
1411 cmp = -1;
1415 if (cmp < 0) {
1416 // Term old_tname has been deleted.
1417 termcount old_wdf = termlist.get_wdf();
1418 new_doclen -= old_wdf;
1419 add_freq_delta(old_tname, -1, -old_wdf);
1420 if (pos_modified)
1421 position_table.delete_positionlist(did, old_tname);
1422 update_mod_plist(did, old_tname, 'D', 0u);
1423 termlist.next();
1424 } else if (cmp > 0) {
1425 // Term new_tname as been added.
1426 termcount new_wdf = term.get_wdf();
1427 new_doclen += new_wdf;
1428 stats.check_wdf(new_wdf);
1429 if (new_tname.size() > MAX_SAFE_TERM_LENGTH)
1430 throw Xapian::InvalidArgumentError("Term too long (> " STRINGIZE(MAX_SAFE_TERM_LENGTH) "): " + new_tname);
1431 add_freq_delta(new_tname, 1, new_wdf);
1432 update_mod_plist(did, new_tname, 'A', new_wdf);
1433 if (pos_modified) {
1434 PositionIterator pos = term.positionlist_begin();
1435 if (pos != term.positionlist_end()) {
1436 position_table.set_positionlist(
1437 did, new_tname,
1438 pos, term.positionlist_end(), false);
1441 ++term;
1442 } else if (cmp == 0) {
1443 // Term already exists: look for wdf and positionlist changes.
1444 termcount old_wdf = termlist.get_wdf();
1445 termcount new_wdf = term.get_wdf();
1447 // Check the stats even if wdf hasn't changed, because
1448 // this is the only document, the stats will have been
1449 // zeroed.
1450 stats.check_wdf(new_wdf);
1452 if (old_wdf != new_wdf) {
1453 new_doclen += new_wdf - old_wdf;
1454 add_freq_delta(new_tname, 0, new_wdf - old_wdf);
1455 update_mod_plist(did, new_tname, 'M', new_wdf);
1458 if (pos_modified) {
1459 PositionIterator pos = term.positionlist_begin();
1460 if (pos != term.positionlist_end()) {
1461 position_table.set_positionlist(did, new_tname, pos,
1462 term.positionlist_end(),
1463 true);
1464 } else {
1465 position_table.delete_positionlist(did, new_tname);
1469 ++term;
1470 termlist.next();
1473 LOGLINE(DB, "Calculated doclen for replacement document " << did << " as " << new_doclen);
1475 // Set the termlist.
1476 if (termlist_table.is_open())
1477 termlist_table.set_termlist(did, document, new_doclen);
1479 // Set the new document length
1480 if (new_doclen != old_doclen)
1481 doclens[did] = new_doclen;
1482 stats.add_document(new_doclen);
1485 if (!modifying || document.internal->data_modified()) {
1486 // Replace the record
1487 record_table.replace_record(document.get_data(), did);
1490 if (!modifying || document.internal->values_modified()) {
1491 // Replace the values.
1492 value_manager.replace_document(did, document, value_stats);
1494 } catch (const Xapian::DocNotFoundError &) {
1495 (void)add_document_(did, document);
1496 return;
1497 } catch (...) {
1498 // If an error occurs while replacing a document, or doing any other
1499 // transaction, the modifications so far must be cleared before
1500 // returning control to the user - otherwise partial modifications will
1501 // persist in memory, and eventually get written to disk.
1502 cancel();
1503 throw;
1506 check_flush_threshold();
1509 Xapian::Document::Internal *
1510 ChertWritableDatabase::open_document(Xapian::docid did, bool lazy) const
1512 LOGCALL(DB, Xapian::Document::Internal *, "ChertWritableDatabase::open_document", did | lazy);
1513 modify_shortcut_document = ChertDatabase::open_document(did, lazy);
1514 // Store the docid only after open_document() successfully returns, so an
1515 // attempt to open a missing document doesn't overwrite this.
1516 modify_shortcut_docid = did;
1517 RETURN(modify_shortcut_document);
1520 Xapian::termcount
1521 ChertWritableDatabase::get_doclength(Xapian::docid did) const
1523 LOGCALL(DB, Xapian::termcount, "ChertWritableDatabase::get_doclength", did);
1524 map<docid, termcount>::const_iterator i = doclens.find(did);
1525 if (i != doclens.end()) {
1526 Xapian::termcount doclen = i->second;
1527 if (doclen == static_cast<Xapian::termcount>(-1)) {
1528 throw Xapian::DocNotFoundError("Document " + str(did) + " not found");
1530 RETURN(doclen);
1532 RETURN(ChertDatabase::get_doclength(did));
1535 Xapian::termcount
1536 ChertWritableDatabase::get_unique_terms(Xapian::docid did) const
1538 LOGCALL(DB, Xapian::termcount, "ChertWritableDatabase::get_unique_terms", did);
1539 Assert(did != 0);
1540 // Note that the "approximate" size should be exact in this case.
1542 // get_unique_terms() really ought to only count terms with wdf > 0, but
1543 // that's expensive to calculate on demand, so for now let's just ensure
1544 // unique_terms <= doclen.
1545 map<docid, termcount>::const_iterator i = doclens.find(did);
1546 if (i != doclens.end()) {
1547 Xapian::termcount doclen = i->second;
1548 if (doclen == static_cast<Xapian::termcount>(-1)) {
1549 throw Xapian::DocNotFoundError("Document " + str(did) + " not found");
1551 intrusive_ptr<const ChertDatabase> ptrtothis(this);
1552 ChertTermList termlist(ptrtothis, did);
1553 RETURN(min(doclen, termlist.get_approx_size()));
1555 RETURN(ChertDatabase::get_unique_terms(did));
1558 void
1559 ChertWritableDatabase::get_freqs(const string & term,
1560 Xapian::doccount * termfreq_ptr,
1561 Xapian::termcount * collfreq_ptr) const
1563 LOGCALL_VOID(DB, "ChertWritableDatabase::get_freqs", term | termfreq_ptr | collfreq_ptr);
1564 Assert(!term.empty());
1565 ChertDatabase::get_freqs(term, termfreq_ptr, collfreq_ptr);
1566 map<string, pair<termcount_diff, termcount_diff> >::const_iterator i;
1567 i = freq_deltas.find(term);
1568 if (i != freq_deltas.end()) {
1569 if (termfreq_ptr)
1570 *termfreq_ptr += i->second.first;
1571 if (collfreq_ptr)
1572 *collfreq_ptr += i->second.second;
1576 Xapian::doccount
1577 ChertWritableDatabase::get_value_freq(Xapian::valueno slot) const
1579 LOGCALL(DB, Xapian::doccount, "ChertWritableDatabase::get_value_freq", slot);
1580 map<Xapian::valueno, ValueStats>::const_iterator i;
1581 i = value_stats.find(slot);
1582 if (i != value_stats.end()) RETURN(i->second.freq);
1583 RETURN(ChertDatabase::get_value_freq(slot));
1586 std::string
1587 ChertWritableDatabase::get_value_lower_bound(Xapian::valueno slot) const
1589 LOGCALL(DB, std::string, "ChertWritableDatabase::get_value_lower_bound", slot);
1590 map<Xapian::valueno, ValueStats>::const_iterator i;
1591 i = value_stats.find(slot);
1592 if (i != value_stats.end()) RETURN(i->second.lower_bound);
1593 RETURN(ChertDatabase::get_value_lower_bound(slot));
1596 std::string
1597 ChertWritableDatabase::get_value_upper_bound(Xapian::valueno slot) const
1599 LOGCALL(DB, std::string, "ChertWritableDatabase::get_value_upper_bound", slot);
1600 map<Xapian::valueno, ValueStats>::const_iterator i;
1601 i = value_stats.find(slot);
1602 if (i != value_stats.end()) RETURN(i->second.upper_bound);
1603 RETURN(ChertDatabase::get_value_upper_bound(slot));
1606 bool
1607 ChertWritableDatabase::term_exists(const string & tname) const
1609 LOGCALL(DB, bool, "ChertWritableDatabase::term_exists", tname);
1610 Xapian::doccount tf;
1611 get_freqs(tname, &tf, NULL);
1612 RETURN(tf != 0);
1615 LeafPostList *
1616 ChertWritableDatabase::open_post_list(const string& tname) const
1618 LOGCALL(DB, LeafPostList *, "ChertWritableDatabase::open_post_list", tname);
1619 intrusive_ptr<const ChertWritableDatabase> ptrtothis(this);
1621 if (tname.empty()) {
1622 Xapian::doccount doccount = get_doccount();
1623 if (stats.get_last_docid() == doccount) {
1624 RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
1626 if (doclens.empty()) {
1627 RETURN(new ChertAllDocsPostList(ptrtothis, doccount));
1629 RETURN(new ChertAllDocsModifiedPostList(ptrtothis, doccount, doclens));
1632 map<string, map<docid, pair<char, termcount> > >::const_iterator j;
1633 j = mod_plists.find(tname);
1634 if (j != mod_plists.end()) {
1635 // We've got buffered changes to this term's postlist, so we need to
1636 // use a ChertModifiedPostList.
1637 RETURN(new ChertModifiedPostList(ptrtothis, tname, j->second));
1640 RETURN(new ChertPostList(ptrtothis, tname, true));
1643 ValueList *
1644 ChertWritableDatabase::open_value_list(Xapian::valueno slot) const
1646 LOGCALL(DB, ValueList *, "ChertWritableDatabase::open_value_list", slot);
1647 // If there are changes, we don't have code to iterate the modified value
1648 // list so we need to flush (but don't commit - there may be a transaction
1649 // in progress).
1650 if (change_count) value_manager.merge_changes();
1651 RETURN(ChertDatabase::open_value_list(slot));
1654 TermList *
1655 ChertWritableDatabase::open_allterms(const string & prefix) const
1657 LOGCALL(DB, TermList *, "ChertWritableDatabase::open_allterms", NO_ARGS);
1658 // If there are changes, terms may have been added or removed, and so we
1659 // need to flush (but don't commit - there may be a transaction in
1660 // progress).
1661 if (change_count) flush_postlist_changes();
1662 RETURN(ChertDatabase::open_allterms(prefix));
1665 void
1666 ChertWritableDatabase::cancel()
1668 ChertDatabase::cancel();
1669 stats.read(postlist_table);
1670 freq_deltas.clear();
1671 doclens.clear();
1672 mod_plists.clear();
1673 value_stats.clear();
1674 change_count = 0;
1677 void
1678 ChertWritableDatabase::add_spelling(const string & word,
1679 Xapian::termcount freqinc) const
1681 spelling_table.add_word(word, freqinc);
1684 void
1685 ChertWritableDatabase::remove_spelling(const string & word,
1686 Xapian::termcount freqdec) const
1688 spelling_table.remove_word(word, freqdec);
1691 TermList *
1692 ChertWritableDatabase::open_spelling_wordlist() const
1694 spelling_table.merge_changes();
1695 return ChertDatabase::open_spelling_wordlist();
1698 TermList *
1699 ChertWritableDatabase::open_synonym_keylist(const string & prefix) const
1701 synonym_table.merge_changes();
1702 return ChertDatabase::open_synonym_keylist(prefix);
1705 void
1706 ChertWritableDatabase::add_synonym(const string & term,
1707 const string & synonym) const
1709 synonym_table.add_synonym(term, synonym);
1712 void
1713 ChertWritableDatabase::remove_synonym(const string & term,
1714 const string & synonym) const
1716 synonym_table.remove_synonym(term, synonym);
1719 void
1720 ChertWritableDatabase::clear_synonyms(const string & term) const
1722 synonym_table.clear_synonyms(term);
1725 void
1726 ChertWritableDatabase::set_metadata(const string & key, const string & value)
1728 LOGCALL_VOID(DB, "ChertWritableDatabase::set_metadata", key | value);
1729 string btree_key("\x00\xc0", 2);
1730 btree_key += key;
1731 if (value.empty()) {
1732 postlist_table.del(btree_key);
1733 } else {
1734 postlist_table.add(btree_key, value);
1738 void
1739 ChertWritableDatabase::invalidate_doc_object(Xapian::Document::Internal * obj) const
1741 if (obj == modify_shortcut_document) {
1742 modify_shortcut_document = NULL;
1743 modify_shortcut_docid = 0;
1747 bool
1748 ChertWritableDatabase::has_uncommitted_changes() const
1750 return change_count > 0 ||
1751 postlist_table.is_modified() ||
1752 position_table.is_modified() ||
1753 termlist_table.is_modified() ||
1754 value_manager.is_modified() ||
1755 synonym_table.is_modified() ||
1756 spelling_table.is_modified() ||
1757 record_table.is_modified();