No longer need to initialise weights under synonym
[xapian.git] / xapian-core / api / replication.cc
blobcb0a97db2cd9b43cf2251ef5277459c5dff59d2b
1 /** @file replication.cc
2 * @brief Replication support for Xapian databases.
3 */
4 /* Copyright (C) 2008 Lemur Consulting Ltd
5 * Copyright (C) 2008,2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <config.h>
24 #include "replication.h"
26 #include "xapian/intrusive_ptr.h"
27 #include "xapian/constants.h"
28 #include "xapian/dbfactory.h"
29 #include "xapian/error.h"
30 #include "xapian/version.h"
32 #include "backends/database.h"
33 #include "backends/databasereplicator.h"
34 #include "debuglog.h"
35 #include "filetests.h"
36 #include "fileutils.h"
37 #include "io_utils.h"
38 #include "omassert.h"
39 #include "realtime.h"
40 #include "net/remoteconnection.h"
41 #include "noreturn.h"
42 #include "replicationprotocol.h"
43 #include "safeerrno.h"
44 #include "safesysstat.h"
45 #include "safeunistd.h"
46 #include "net/length.h"
47 #include "str.h"
48 #include "unicode/description_append.h"
50 #include "autoptr.h"
51 #include <fstream>
52 #include <string>
54 using namespace std;
55 using namespace Xapian;
57 // The banner comment used at the top of the replica's stub database file.
58 #define REPLICA_STUB_BANNER \
59 "# Automatically generated by Xapian::DatabaseReplica v" XAPIAN_VERSION ".\n" \
60 "# Do not manually edit - replication operations may regenerate this file.\n"
62 XAPIAN_NORETURN(static void throw_connection_closed_unexpectedly());
63 static void
64 throw_connection_closed_unexpectedly()
66 throw Xapian::NetworkError("Connection closed unexpectedly");
69 void
70 DatabaseMaster::write_changesets_to_fd(int fd,
71 const string & start_revision,
72 ReplicationInfo * info) const
74 LOGCALL_VOID(REPLICA, "DatabaseMaster::write_changesets_to_fd", fd | start_revision | info);
75 if (info != NULL)
76 info->clear();
77 Database db;
78 try {
79 db = Database(path);
80 } catch (const Xapian::DatabaseError & e) {
81 RemoteConnection conn(-1, fd);
82 conn.send_message(REPL_REPLY_FAIL,
83 "Can't open database: " + e.get_msg(),
84 0.0);
85 return;
87 if (db.internal.size() != 1) {
88 throw Xapian::InvalidOperationError("DatabaseMaster needs to be pointed at exactly one subdatabase");
91 // Extract the UUID from start_revision and compare it to the database.
92 bool need_whole_db = false;
93 string revision;
94 if (start_revision.empty()) {
95 need_whole_db = true;
96 } else {
97 const char * ptr = start_revision.data();
98 const char * end = ptr + start_revision.size();
99 size_t uuid_length;
100 decode_length_and_check(&ptr, end, uuid_length);
101 string request_uuid(ptr, uuid_length);
102 ptr += uuid_length;
103 string db_uuid = db.internal[0]->get_uuid();
104 if (request_uuid != db_uuid) {
105 need_whole_db = true;
107 revision.assign(ptr, end - ptr);
110 db.internal[0]->write_changesets_to_fd(fd, revision, need_whole_db, info);
113 string
114 DatabaseMaster::get_description() const
116 string desc = "DatabaseMaster(";
117 description_append(desc, path);
118 desc += ")";
119 return desc;
122 /// Internal implementation of DatabaseReplica
123 class DatabaseReplica::Internal : public Xapian::Internal::intrusive_base {
124 /// Don't allow assignment.
125 void operator=(const Internal &);
127 /// Don't allow copying.
128 Internal(const Internal &);
130 /// The path to the replica directory.
131 string path;
133 /// The id of the currently live database in the replica (0 or 1).
134 int live_id;
136 /** The live database being replicated.
138 * This needs to be mutable because it is sometimes lazily opened.
140 mutable WritableDatabase live_db;
142 /** Do we have an offline database currently?
144 * The offline database is a new copy of the database we're bringing up
145 * to the required revision, which can't yet be made live.
147 bool have_offline_db;
149 /** Flag to indicate that the only valid operation next is a full copy.
151 bool need_copy_next;
153 /** The revision that the secondary database has been updated to.
155 string offline_revision;
157 /** The UUID of the secondary database.
159 string offline_uuid;
161 /** The revision that the secondary database must reach before it can be
162 * made live.
164 string offline_needed_revision;
166 /** The time at which a changeset was last applied to the live database.
168 * Set to 0 if no changeset applied to the live database so far.
170 double last_live_changeset_time;
172 /// The remote connection we're using.
173 RemoteConnection * conn;
175 /** Update the stub database which points to a single database.
177 * The stub database file is created at a separate path, and then
178 * atomically moved into place to replace the old stub database. This
179 * should allow searches to continue uninterrupted.
181 void update_stub_database() const;
183 /** Delete the offline database. */
184 void remove_offline_db();
186 /** Apply a set of DB copy messages from the connection.
188 void apply_db_copy(double end_time);
190 /** Check that a message type is as expected.
192 * Throws a NetworkError if the type is not the expected one.
194 void check_message_type(int type, int expected) const;
196 /** Check if the offline database has reached the required version.
198 * If so, make it live, and remove the old live database.
200 * @return true iff the offline database is made live
202 bool possibly_make_offline_live();
204 string get_replica_path(int id) const {
205 string p = path;
206 p += "/replica_";
207 p += char('0' + id);
208 return p;
211 public:
212 /// Open a new DatabaseReplica::Internal for the specified path.
213 explicit Internal(const string & path_);
215 /// Destructor.
216 ~Internal() { delete conn; }
218 /// Get a string describing the current revision of the replica.
219 string get_revision_info() const;
221 /// Set the file descriptor to read changesets from.
222 void set_read_fd(int fd);
224 /// Read and apply the next changeset.
225 bool apply_next_changeset(ReplicationInfo * info,
226 double reader_close_time);
228 /// Return a string describing this object.
229 string get_description() const { return path; }
232 // Methods of DatabaseReplica
234 DatabaseReplica::DatabaseReplica(const string & path)
235 : internal(new DatabaseReplica::Internal(path))
237 LOGCALL_CTOR(REPLICA, "DatabaseReplica", path);
240 DatabaseReplica::~DatabaseReplica()
242 LOGCALL_DTOR(REPLICA, "DatabaseReplica");
243 delete internal;
246 string
247 DatabaseReplica::get_revision_info() const
249 LOGCALL(REPLICA, string, "DatabaseReplica::get_revision_info", NO_ARGS);
250 RETURN(internal->get_revision_info());
253 void
254 DatabaseReplica::set_read_fd(int fd)
256 LOGCALL_VOID(REPLICA, "DatabaseReplica::set_read_fd", fd);
257 internal->set_read_fd(fd);
260 bool
261 DatabaseReplica::apply_next_changeset(ReplicationInfo * info,
262 double reader_close_time)
264 LOGCALL(REPLICA, bool, "DatabaseReplica::apply_next_changeset", info | reader_close_time);
265 if (info != NULL)
266 info->clear();
267 RETURN(internal->apply_next_changeset(info, reader_close_time));
270 string
271 DatabaseReplica::get_description() const
273 string desc("DatabaseReplica(");
274 desc += internal->get_description();
275 desc += ')';
276 return desc;
279 // Methods of DatabaseReplica::Internal
281 void
282 DatabaseReplica::Internal::update_stub_database() const
284 string stub_path = path;
285 stub_path += "/XAPIANDB";
286 string tmp_path = stub_path;
287 tmp_path += ".tmp";
289 ofstream stub(tmp_path.c_str());
290 stub << REPLICA_STUB_BANNER
291 "auto replica_" << live_id << endl;
293 if (!io_tmp_rename(tmp_path, stub_path)) {
294 string msg("Failed to update stub db file for replica: ");
295 msg += path;
296 throw Xapian::DatabaseOpeningError(msg, errno);
300 DatabaseReplica::Internal::Internal(const string & path_)
301 : path(path_), live_id(0), live_db(), have_offline_db(false),
302 need_copy_next(false), offline_revision(), offline_needed_revision(),
303 last_live_changeset_time(), conn(NULL)
305 LOGCALL_CTOR(REPLICA, "DatabaseReplica::Internal", path_);
306 #if !defined XAPIAN_HAS_CHERT_BACKEND && !defined XAPIAN_HAS_GLASS_BACKEND
307 throw FeatureUnavailableError("Replication requires the chert or glass backends to be enabled");
308 #else
309 if (mkdir(path.c_str(), 0777) == 0) {
310 // The database doesn't already exist - make a directory, containing a
311 // stub database, and point it to a new database.
313 // Create an empty database - the backend doesn't matter as if the
314 // master is a different type, then the replica will become that type
315 // automatically.
316 live_db = WritableDatabase(get_replica_path(live_id),
317 Xapian::DB_CREATE);
318 update_stub_database();
319 } else {
320 if (errno != EEXIST) {
321 throw DatabaseOpeningError("Couldn't create directory '" + path + "'", errno);
323 if (!dir_exists(path)) {
324 throw DatabaseOpeningError("Replica path must be a directory");
326 string stub_path = path;
327 stub_path += "/XAPIANDB";
328 try {
329 live_db = WritableDatabase(stub_path,
330 Xapian::DB_OPEN|Xapian::DB_BACKEND_STUB);
331 } catch (const Xapian::DatabaseCorruptError &) {
332 // If the database is too corrupt to open, force a full copy so we
333 // auto-heal from this condition. Instance seen in the wild was
334 // that the replica had all files truncated to size 0.
335 live_db.internal.push_back(NULL);
337 // FIXME: simplify all this?
338 ifstream stub(stub_path.c_str());
339 string line;
340 while (getline(stub, line)) {
341 if (!line.empty() && line[0] != '#') {
342 live_id = line[line.size() - 1] - '0';
343 break;
347 #endif
350 string
351 DatabaseReplica::Internal::get_revision_info() const
353 LOGCALL(REPLICA, string, "DatabaseReplica::Internal::get_revision_info", NO_ARGS);
354 if (live_db.internal.empty())
355 live_db = WritableDatabase(get_replica_path(live_id), Xapian::DB_OPEN);
356 if (live_db.internal.size() != 1)
357 throw Xapian::InvalidOperationError("DatabaseReplica needs to be pointed at exactly one subdatabase");
359 if (live_db.internal[0].get() == NULL) RETURN(string());
361 string uuid = (live_db.internal[0])->get_uuid();
362 string buf = encode_length(uuid.size());
363 buf += uuid;
364 buf += (live_db.internal[0])->get_revision_info();
365 RETURN(buf);
368 void
369 DatabaseReplica::Internal::remove_offline_db()
371 // Delete the offline database.
372 removedir(get_replica_path(live_id ^ 1));
373 have_offline_db = false;
376 void
377 DatabaseReplica::Internal::apply_db_copy(double end_time)
379 have_offline_db = true;
380 last_live_changeset_time = 0;
381 string offline_path = get_replica_path(live_id ^ 1);
382 // If there's already an offline database, discard it. This happens if one
383 // copy of the database was sent, but further updates were needed before it
384 // could be made live, and the remote end was then unable to send those
385 // updates (probably due to not having changesets available, or the remote
386 // database being replaced by a new database).
387 removedir(offline_path);
388 if (mkdir(offline_path.c_str(), 0777)) {
389 throw Xapian::DatabaseError("Cannot make directory '" +
390 offline_path + "'", errno);
394 string buf;
395 int type = conn->get_message(buf, end_time);
396 check_message_type(type, REPL_REPLY_DB_HEADER);
397 const char * ptr = buf.data();
398 const char * end = ptr + buf.size();
399 size_t uuid_length;
400 decode_length_and_check(&ptr, end, uuid_length);
401 offline_uuid.assign(ptr, uuid_length);
402 offline_revision.assign(buf, ptr + uuid_length - buf.data(), buf.npos);
405 // Now, read the files for the database from the connection and create it.
406 while (true) {
407 string filename;
408 int type = conn->sniff_next_message_type(end_time);
409 if (type < 0 || type == REPL_REPLY_FAIL)
410 return;
411 if (type == REPL_REPLY_DB_FOOTER)
412 break;
414 type = conn->get_message(filename, end_time);
415 check_message_type(type, REPL_REPLY_DB_FILENAME);
417 // Check that the filename doesn't contain '..'. No valid database
418 // file contains .., so we don't need to check that the .. is a path.
419 if (filename.find("..") != string::npos) {
420 throw NetworkError("Filename in database contains '..'");
423 type = conn->sniff_next_message_type(end_time);
424 if (type < 0 || type == REPL_REPLY_FAIL)
425 return;
427 string filepath = offline_path + "/" + filename;
428 type = conn->receive_file(filepath, end_time);
429 if (type < 0)
430 throw_connection_closed_unexpectedly();
431 check_message_type(type, REPL_REPLY_DB_FILEDATA);
433 int type = conn->get_message(offline_needed_revision, end_time);
434 check_message_type(type, REPL_REPLY_DB_FOOTER);
435 need_copy_next = false;
438 void
439 DatabaseReplica::Internal::check_message_type(int type, int expected) const
441 if (type != expected) {
442 if (type < 0)
443 throw_connection_closed_unexpectedly();
444 string m = "Expected replication protocol message type #";
445 m += str(expected);
446 m += ", got #";
447 m += str(type);
448 throw NetworkError(m);
452 bool
453 DatabaseReplica::Internal::possibly_make_offline_live()
455 string replica_path(get_replica_path(live_id ^ 1));
456 AutoPtr<DatabaseReplicator> replicator;
457 try {
458 replicator.reset(DatabaseReplicator::open(replica_path));
459 } catch (const Xapian::DatabaseError &) {
460 return false;
462 if (offline_needed_revision.empty()) {
463 return false;
465 if (!replicator->check_revision_at_least(offline_revision,
466 offline_needed_revision)) {
467 return false;
470 string replicated_uuid = replicator->get_uuid();
471 if (replicated_uuid.empty()) {
472 return false;
475 if (replicated_uuid != offline_uuid) {
476 return false;
479 live_id ^= 1;
480 // Open the database first, so that if there's a problem, an exception
481 // will be thrown before we make the new database live.
482 live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
483 update_stub_database();
484 remove_offline_db();
485 return true;
488 void
489 DatabaseReplica::Internal::set_read_fd(int fd)
491 delete conn;
492 conn = NULL;
493 conn = new RemoteConnection(fd, -1);
496 bool
497 DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info,
498 double reader_close_time)
500 LOGCALL(REPLICA, bool, "DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time);
501 while (true) {
502 int type = conn->sniff_next_message_type(0.0);
503 switch (type) {
504 case REPL_REPLY_END_OF_CHANGES: {
505 string buf;
506 type = conn->get_message(buf, 0.0);
507 check_message_type(type, REPL_REPLY_END_OF_CHANGES);
508 RETURN(false);
510 case REPL_REPLY_DB_HEADER:
511 // Apply the copy - remove offline db in case of any error.
512 try {
513 apply_db_copy(0.0);
514 if (info != NULL)
515 ++(info->fullcopy_count);
516 string replica_uuid;
518 AutoPtr<DatabaseReplicator> replicator(
519 DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
520 replica_uuid = replicator->get_uuid();
522 if (replica_uuid != offline_uuid) {
523 remove_offline_db();
524 // We've been sent an database with the wrong uuid,
525 // which only happens if the database at the server
526 // got changed during the copy, so the only safe
527 // action next is a new copy. Set a flag to ensure
528 // that this happens, or we're at risk of database
529 // corruption.
530 need_copy_next = true;
532 } catch (...) {
533 remove_offline_db();
534 throw;
536 if (possibly_make_offline_live()) {
537 if (info != NULL)
538 info->changed = true;
540 break;
541 case REPL_REPLY_CHANGESET:
542 if (need_copy_next) {
543 throw NetworkError("Needed a database copy next");
545 if (!have_offline_db) {
546 // Close the live db.
547 string replica_path(get_replica_path(live_id));
548 live_db = WritableDatabase();
550 if (last_live_changeset_time != 0.0) {
551 // Wait until at least "reader_close_time" seconds have
552 // passed since the last changeset was applied, to
553 // allow any active readers to finish and be reopened.
554 double until;
555 until = last_live_changeset_time + reader_close_time;
556 RealTime::sleep(until);
559 // Open a replicator for the live path, and apply the
560 // changeset.
562 AutoPtr<DatabaseReplicator> replicator(
563 DatabaseReplicator::open(replica_path));
565 // Ignore the returned revision number, since we are
566 // live so the changeset must be safe to apply to a
567 // live DB.
568 replicator->apply_changeset_from_conn(*conn, 0.0, true);
570 last_live_changeset_time = RealTime::now();
572 if (info != NULL) {
573 ++(info->changeset_count);
574 info->changed = true;
576 // Now the replicator is closed, open the live db again.
577 live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
578 RETURN(true);
582 AutoPtr<DatabaseReplicator> replicator(
583 DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
585 offline_revision = replicator->
586 apply_changeset_from_conn(*conn, 0.0, false);
588 if (info != NULL) {
589 ++(info->changeset_count);
592 if (possibly_make_offline_live()) {
593 if (info != NULL)
594 info->changed = true;
596 RETURN(true);
597 case REPL_REPLY_FAIL: {
598 string buf;
599 if (conn->get_message(buf, 0.0) < 0)
600 throw_connection_closed_unexpectedly();
601 throw NetworkError("Unable to fully synchronise: " + buf);
603 case -1:
604 throw_connection_closed_unexpectedly();
605 default:
606 throw NetworkError("Unknown replication protocol message (" +
607 str(type) + ")");