Skip lockfilealreadyopen1 under __CYGWIN__ &__WIN32__
[xapian.git] / xapian-core / api / replication.cc
bloba53a572589cddc55257edec977d7df13fbc5bf88
1 /** @file replication.cc
2 * @brief Replication support for Xapian databases.
3 */
4 /* Copyright (C) 2008 Lemur Consulting Ltd
5 * Copyright (C) 2008,2009,2010,2011,2012,2013,2014,2015,2016,2017 Olly Betts
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <config.h>
24 #include "replication.h"
26 #include "xapian/intrusive_ptr.h"
27 #include "xapian/constants.h"
28 #include "xapian/dbfactory.h"
29 #include "xapian/error.h"
30 #include "xapian/version.h"
32 #include "backends/databaseinternal.h"
33 #include "backends/databasereplicator.h"
34 #include "debuglog.h"
35 #include "filetests.h"
36 #include "fileutils.h"
37 #include "io_utils.h"
38 #include "omassert.h"
39 #include "realtime.h"
40 #include "net/remoteconnection.h"
41 #include "replicationprotocol.h"
42 #include "safeerrno.h"
43 #include "safesysstat.h"
44 #include "safeunistd.h"
45 #include "net/length.h"
46 #include "str.h"
47 #include "unicode/description_append.h"
49 #include <fstream>
50 #include <memory>
51 #include <string>
53 using namespace std;
54 using namespace Xapian;
56 // The banner comment used at the top of the replica's stub database file.
57 #define REPLICA_STUB_BANNER \
58 "# Automatically generated by Xapian::DatabaseReplica v" XAPIAN_VERSION ".\n" \
59 "# Do not manually edit - replication operations may regenerate this file.\n"
61 [[noreturn]]
62 static void
63 throw_connection_closed_unexpectedly()
65 throw Xapian::NetworkError("Connection closed unexpectedly");
68 void
69 DatabaseMaster::write_changesets_to_fd(int fd,
70 const string & start_revision,
71 ReplicationInfo * info) const
73 LOGCALL_VOID(REPLICA, "DatabaseMaster::write_changesets_to_fd", fd | start_revision | info);
74 if (info != NULL)
75 info->clear();
76 Database db;
77 try {
78 db = Database(path);
79 } catch (const Xapian::DatabaseError & e) {
80 RemoteConnection conn(-1, fd);
81 conn.send_message(REPL_REPLY_FAIL,
82 "Can't open database: " + e.get_msg(),
83 0.0);
84 return;
86 if (db.internal->size() != 1) {
87 throw Xapian::InvalidOperationError("DatabaseMaster needs to be pointed at exactly one subdatabase");
90 // Extract the UUID from start_revision and compare it to the database.
91 bool need_whole_db = false;
92 string revision;
93 if (start_revision.empty()) {
94 need_whole_db = true;
95 } else {
96 const char * ptr = start_revision.data();
97 const char * end = ptr + start_revision.size();
98 size_t uuid_length;
99 decode_length_and_check(&ptr, end, uuid_length);
100 string request_uuid(ptr, uuid_length);
101 ptr += uuid_length;
102 string db_uuid = db.internal->get_uuid();
103 if (request_uuid != db_uuid) {
104 need_whole_db = true;
106 revision.assign(ptr, end - ptr);
109 db.internal->write_changesets_to_fd(fd, revision, need_whole_db, info);
112 string
113 DatabaseMaster::get_description() const
115 string desc = "DatabaseMaster(";
116 description_append(desc, path);
117 desc += ")";
118 return desc;
121 /// Internal implementation of DatabaseReplica
122 class DatabaseReplica::Internal : public Xapian::Internal::intrusive_base {
123 /// Don't allow assignment.
124 void operator=(const Internal &);
126 /// Don't allow copying.
127 Internal(const Internal &);
129 /// The path to the replica directory.
130 string path;
132 /// The id of the currently live database in the replica (0 or 1).
133 int live_id;
135 /** The live database being replicated.
137 * This needs to be mutable because it is sometimes lazily opened.
139 mutable WritableDatabase live_db;
141 /// Do we need to heal the replica?
142 bool live_db_corrupt = false;
144 /** Do we have an offline database currently?
146 * The offline database is a new copy of the database we're bringing up
147 * to the required revision, which can't yet be made live.
149 bool have_offline_db;
151 /** Flag to indicate that the only valid operation next is a full copy.
153 bool need_copy_next;
155 /** The revision that the secondary database has been updated to.
157 string offline_revision;
159 /** The UUID of the secondary database.
161 string offline_uuid;
163 /** The revision that the secondary database must reach before it can be
164 * made live.
166 string offline_needed_revision;
168 /** The time at which a changeset was last applied to the live database.
170 * Set to 0 if no changeset applied to the live database so far.
172 double last_live_changeset_time;
174 /// The remote connection we're using.
175 RemoteConnection * conn;
177 /** Update the stub database which points to a single database.
179 * The stub database file is created at a separate path, and then
180 * atomically moved into place to replace the old stub database. This
181 * should allow searches to continue uninterrupted.
183 void update_stub_database() const;
185 /** Delete the offline database. */
186 void remove_offline_db();
188 /** Apply a set of DB copy messages from the connection.
190 void apply_db_copy(double end_time);
192 /** Check that a message type is as expected.
194 * Throws a NetworkError if the type is not the expected one.
196 void check_message_type(int type, int expected) const;
198 /** Check if the offline database has reached the required version.
200 * If so, make it live, and remove the old live database.
202 * @return true iff the offline database is made live
204 bool possibly_make_offline_live();
206 string get_replica_path(int id) const {
207 string p = path;
208 p += "/replica_";
209 p += char('0' + id);
210 return p;
213 public:
214 /// Open a new DatabaseReplica::Internal for the specified path.
215 explicit Internal(const string & path_);
217 /// Destructor.
218 ~Internal() { delete conn; }
220 /// Get a string describing the current revision of the replica.
221 string get_revision_info() const;
223 /// Set the file descriptor to read changesets from.
224 void set_read_fd(int fd);
226 /// Read and apply the next changeset.
227 bool apply_next_changeset(ReplicationInfo * info,
228 double reader_close_time);
230 /// Return a string describing this object.
231 string get_description() const { return path; }
234 // Methods of DatabaseReplica
236 DatabaseReplica::DatabaseReplica(const string & path)
237 : internal(new DatabaseReplica::Internal(path))
239 LOGCALL_CTOR(REPLICA, "DatabaseReplica", path);
242 DatabaseReplica::~DatabaseReplica()
244 LOGCALL_DTOR(REPLICA, "DatabaseReplica");
245 delete internal;
248 string
249 DatabaseReplica::get_revision_info() const
251 LOGCALL(REPLICA, string, "DatabaseReplica::get_revision_info", NO_ARGS);
252 RETURN(internal->get_revision_info());
255 void
256 DatabaseReplica::set_read_fd(int fd)
258 LOGCALL_VOID(REPLICA, "DatabaseReplica::set_read_fd", fd);
259 internal->set_read_fd(fd);
262 bool
263 DatabaseReplica::apply_next_changeset(ReplicationInfo * info,
264 double reader_close_time)
266 LOGCALL(REPLICA, bool, "DatabaseReplica::apply_next_changeset", info | reader_close_time);
267 if (info != NULL)
268 info->clear();
269 RETURN(internal->apply_next_changeset(info, reader_close_time));
272 string
273 DatabaseReplica::get_description() const
275 string desc("DatabaseReplica(");
276 desc += internal->get_description();
277 desc += ')';
278 return desc;
281 // Methods of DatabaseReplica::Internal
283 void
284 DatabaseReplica::Internal::update_stub_database() const
286 string stub_path = path;
287 stub_path += "/XAPIANDB";
288 string tmp_path = stub_path;
289 tmp_path += ".tmp";
291 ofstream stub(tmp_path.c_str());
292 stub << REPLICA_STUB_BANNER
293 "auto replica_" << live_id << endl;
295 if (!io_tmp_rename(tmp_path, stub_path)) {
296 string msg("Failed to update stub db file for replica: ");
297 msg += path;
298 throw Xapian::DatabaseOpeningError(msg, errno);
302 DatabaseReplica::Internal::Internal(const string & path_)
303 : path(path_), live_id(0), live_db(), have_offline_db(false),
304 need_copy_next(false), offline_revision(), offline_needed_revision(),
305 last_live_changeset_time(), conn(NULL)
307 LOGCALL_CTOR(REPLICA, "DatabaseReplica::Internal", path_);
308 #ifndef XAPIAN_HAS_GLASS_BACKEND
309 throw FeatureUnavailableError("Replication requires the glass backend to be enabled");
310 #else
311 if (mkdir(path.c_str(), 0777) == 0) {
312 // The database doesn't already exist - make a directory, containing a
313 // stub database, and point it to a new database.
315 // Create an empty database - the backend doesn't matter as if the
316 // master is a different type, then the replica will become that type
317 // automatically.
318 live_db = WritableDatabase(get_replica_path(live_id),
319 Xapian::DB_CREATE);
320 update_stub_database();
321 } else {
322 if (errno != EEXIST) {
323 throw DatabaseOpeningError("Couldn't create directory '" + path + "'", errno);
325 if (!dir_exists(path)) {
326 throw DatabaseOpeningError("Replica path must be a directory");
328 string stub_path = path;
329 stub_path += "/XAPIANDB";
330 try {
331 live_db = WritableDatabase(stub_path,
332 Xapian::DB_OPEN|Xapian::DB_BACKEND_STUB);
333 } catch (const Xapian::DatabaseCorruptError &) {
334 // If the database is too corrupt to open, force a full copy so we
335 // auto-heal from this condition. Instance seen in the wild was
336 // that the replica had all files truncated to size 0.
337 live_db_corrupt = true;
339 // FIXME: simplify all this?
340 ifstream stub(stub_path.c_str());
341 string line;
342 while (getline(stub, line)) {
343 if (!line.empty() && line[0] != '#') {
344 live_id = line[line.size() - 1] - '0';
345 break;
349 #endif
352 string
353 DatabaseReplica::Internal::get_revision_info() const
355 LOGCALL(REPLICA, string, "DatabaseReplica::Internal::get_revision_info", NO_ARGS);
356 if (live_db_corrupt) {
357 RETURN(string());
360 switch (live_db.internal->size()) {
361 case 0:
362 live_db = WritableDatabase(get_replica_path(live_id), Xapian::DB_OPEN);
363 break;
364 case 1:
365 // OK
366 break;
367 default:
368 throw Xapian::InvalidOperationError("DatabaseReplica needs to be "
369 "pointed at exactly one "
370 "subdatabase");
373 string uuid = live_db.get_uuid();
374 string buf = encode_length(uuid.size());
375 buf += uuid;
376 buf += live_db.internal->get_revision_info();
377 RETURN(buf);
380 void
381 DatabaseReplica::Internal::remove_offline_db()
383 // Delete the offline database.
384 removedir(get_replica_path(live_id ^ 1));
385 have_offline_db = false;
388 void
389 DatabaseReplica::Internal::apply_db_copy(double end_time)
391 have_offline_db = true;
392 last_live_changeset_time = 0;
393 string offline_path = get_replica_path(live_id ^ 1);
394 // If there's already an offline database, discard it. This happens if one
395 // copy of the database was sent, but further updates were needed before it
396 // could be made live, and the remote end was then unable to send those
397 // updates (probably due to not having changesets available, or the remote
398 // database being replaced by a new database).
399 removedir(offline_path);
400 if (mkdir(offline_path.c_str(), 0777)) {
401 throw Xapian::DatabaseError("Cannot make directory '" +
402 offline_path + "'", errno);
406 string buf;
407 int type = conn->get_message(buf, end_time);
408 check_message_type(type, REPL_REPLY_DB_HEADER);
409 const char * ptr = buf.data();
410 const char * end = ptr + buf.size();
411 size_t uuid_length;
412 decode_length_and_check(&ptr, end, uuid_length);
413 offline_uuid.assign(ptr, uuid_length);
414 offline_revision.assign(buf, ptr + uuid_length - buf.data(), buf.npos);
417 // Now, read the files for the database from the connection and create it.
418 while (true) {
419 string filename;
420 int type = conn->sniff_next_message_type(end_time);
421 if (type < 0 || type == REPL_REPLY_FAIL)
422 return;
423 if (type == REPL_REPLY_DB_FOOTER)
424 break;
426 type = conn->get_message(filename, end_time);
427 check_message_type(type, REPL_REPLY_DB_FILENAME);
429 // Check that the filename doesn't contain '..'. No valid database
430 // file contains .., so we don't need to check that the .. is a path.
431 if (filename.find("..") != string::npos) {
432 throw NetworkError("Filename in database contains '..'");
435 type = conn->sniff_next_message_type(end_time);
436 if (type < 0 || type == REPL_REPLY_FAIL)
437 return;
439 string filepath = offline_path + "/" + filename;
440 type = conn->receive_file(filepath, end_time);
441 if (type < 0)
442 throw_connection_closed_unexpectedly();
443 check_message_type(type, REPL_REPLY_DB_FILEDATA);
445 int type = conn->get_message(offline_needed_revision, end_time);
446 check_message_type(type, REPL_REPLY_DB_FOOTER);
447 need_copy_next = false;
450 void
451 DatabaseReplica::Internal::check_message_type(int type, int expected) const
453 if (type != expected) {
454 if (type < 0)
455 throw_connection_closed_unexpectedly();
456 string m = "Expected replication protocol message type #";
457 m += str(expected);
458 m += ", got #";
459 m += str(type);
460 throw NetworkError(m);
464 bool
465 DatabaseReplica::Internal::possibly_make_offline_live()
467 string replica_path(get_replica_path(live_id ^ 1));
468 unique_ptr<DatabaseReplicator> replicator;
469 try {
470 replicator.reset(DatabaseReplicator::open(replica_path));
471 } catch (const Xapian::DatabaseError &) {
472 return false;
474 if (offline_needed_revision.empty()) {
475 return false;
477 if (!replicator->check_revision_at_least(offline_revision,
478 offline_needed_revision)) {
479 return false;
482 string replicated_uuid = replicator->get_uuid();
483 if (replicated_uuid.empty()) {
484 return false;
487 if (replicated_uuid != offline_uuid) {
488 return false;
491 live_id ^= 1;
492 // Open the database first, so that if there's a problem, an exception
493 // will be thrown before we make the new database live.
494 live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
495 live_db_corrupt = false;
496 update_stub_database();
497 remove_offline_db();
498 return true;
501 void
502 DatabaseReplica::Internal::set_read_fd(int fd)
504 delete conn;
505 conn = NULL;
506 conn = new RemoteConnection(fd, -1);
509 bool
510 DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info,
511 double reader_close_time)
513 LOGCALL(REPLICA, bool, "DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time);
514 while (true) {
515 int type = conn->sniff_next_message_type(0.0);
516 switch (type) {
517 case REPL_REPLY_END_OF_CHANGES: {
518 string buf;
519 type = conn->get_message(buf, 0.0);
520 check_message_type(type, REPL_REPLY_END_OF_CHANGES);
521 RETURN(false);
523 case REPL_REPLY_DB_HEADER:
524 // Apply the copy - remove offline db in case of any error.
525 try {
526 apply_db_copy(0.0);
527 if (info != NULL)
528 ++(info->fullcopy_count);
529 string replica_uuid;
531 unique_ptr<DatabaseReplicator> replicator(
532 DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
533 replica_uuid = replicator->get_uuid();
535 if (replica_uuid != offline_uuid) {
536 remove_offline_db();
537 // We've been sent an database with the wrong uuid,
538 // which only happens if the database at the server
539 // got changed during the copy, so the only safe
540 // action next is a new copy. Set a flag to ensure
541 // that this happens, or we're at risk of database
542 // corruption.
543 need_copy_next = true;
545 } catch (...) {
546 remove_offline_db();
547 throw;
549 if (possibly_make_offline_live()) {
550 if (info != NULL)
551 info->changed = true;
553 break;
554 case REPL_REPLY_CHANGESET:
555 if (need_copy_next) {
556 throw NetworkError("Needed a database copy next");
558 if (!have_offline_db) {
559 // Close the live db.
560 string replica_path(get_replica_path(live_id));
561 live_db = WritableDatabase();
563 if (last_live_changeset_time != 0.0) {
564 // Wait until at least "reader_close_time" seconds have
565 // passed since the last changeset was applied, to
566 // allow any active readers to finish and be reopened.
567 double until;
568 until = last_live_changeset_time + reader_close_time;
569 RealTime::sleep(until);
572 // Open a replicator for the live path, and apply the
573 // changeset.
575 unique_ptr<DatabaseReplicator> replicator(
576 DatabaseReplicator::open(replica_path));
578 // Ignore the returned revision number, since we are
579 // live so the changeset must be safe to apply to a
580 // live DB.
581 replicator->apply_changeset_from_conn(*conn, 0.0, true);
583 last_live_changeset_time = RealTime::now();
585 if (info != NULL) {
586 ++(info->changeset_count);
587 info->changed = true;
589 // Now the replicator is closed, open the live db again.
590 live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
591 live_db_corrupt = false;
592 RETURN(true);
596 unique_ptr<DatabaseReplicator> replicator(
597 DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
599 offline_revision = replicator->
600 apply_changeset_from_conn(*conn, 0.0, false);
602 if (info != NULL) {
603 ++(info->changeset_count);
606 if (possibly_make_offline_live()) {
607 if (info != NULL)
608 info->changed = true;
610 RETURN(true);
611 case REPL_REPLY_FAIL: {
612 string buf;
613 if (conn->get_message(buf, 0.0) < 0)
614 throw_connection_closed_unexpectedly();
615 throw NetworkError("Unable to fully synchronise: " + buf);
617 case -1:
618 throw_connection_closed_unexpectedly();
619 default:
620 throw NetworkError("Unknown replication protocol message (" +
621 str(type) + ")");