1 /** @file replication.cc
2 * @brief Replication support for Xapian databases.
4 /* Copyright (C) 2008 Lemur Consulting Ltd
5 * Copyright (C) 2008,2009,2010,2011,2012,2013,2014,2015,2016,2017 Olly Betts
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include "replication.h"
26 #include "xapian/intrusive_ptr.h"
27 #include "xapian/constants.h"
28 #include "xapian/dbfactory.h"
29 #include "xapian/error.h"
30 #include "xapian/version.h"
32 #include "backends/databaseinternal.h"
33 #include "backends/databasereplicator.h"
35 #include "filetests.h"
36 #include "fileutils.h"
41 #include "net/remoteconnection.h"
42 #include "replicationprotocol.h"
43 #include "safeerrno.h"
44 #include "safesysstat.h"
45 #include "safeunistd.h"
46 #include "net/length.h"
48 #include "unicode/description_append.h"
55 using namespace Xapian
;
57 // The banner comment used at the top of the replica's stub database file.
58 #define REPLICA_STUB_BANNER \
59 "# Automatically generated by Xapian::DatabaseReplica v" XAPIAN_VERSION ".\n" \
60 "# Do not manually edit - replication operations may regenerate this file.\n"
64 throw_connection_closed_unexpectedly()
66 throw Xapian::NetworkError("Connection closed unexpectedly");
70 DatabaseMaster::write_changesets_to_fd(int fd
,
71 const string
& start_revision
,
72 ReplicationInfo
* info
) const
74 LOGCALL_VOID(REPLICA
, "DatabaseMaster::write_changesets_to_fd", fd
| start_revision
| info
);
80 } catch (const Xapian::DatabaseError
& e
) {
81 RemoteConnection
conn(-1, fd
);
82 conn
.send_message(REPL_REPLY_FAIL
,
83 "Can't open database: " + e
.get_msg(),
87 if (db
.internal
->size() != 1) {
88 throw Xapian::InvalidOperationError("DatabaseMaster needs to be pointed at exactly one subdatabase");
91 // Extract the UUID from start_revision and compare it to the database.
92 bool need_whole_db
= false;
94 if (start_revision
.empty()) {
97 const char * ptr
= start_revision
.data();
98 const char * end
= ptr
+ start_revision
.size();
100 decode_length_and_check(&ptr
, end
, uuid_length
);
101 string
request_uuid(ptr
, uuid_length
);
103 string db_uuid
= db
.internal
->get_uuid();
104 if (request_uuid
!= db_uuid
) {
105 need_whole_db
= true;
107 revision
.assign(ptr
, end
- ptr
);
110 db
.internal
->write_changesets_to_fd(fd
, revision
, need_whole_db
, info
);
114 DatabaseMaster::get_description() const
116 string desc
= "DatabaseMaster(";
117 description_append(desc
, path
);
122 /// Internal implementation of DatabaseReplica
123 class DatabaseReplica::Internal
: public Xapian::Internal::intrusive_base
{
124 /// Don't allow assignment.
125 void operator=(const Internal
&);
127 /// Don't allow copying.
128 Internal(const Internal
&);
130 /// The path to the replica directory.
133 /// The id of the currently live database in the replica (0 or 1).
136 /** The live database being replicated.
138 * This needs to be mutable because it is sometimes lazily opened.
140 mutable WritableDatabase live_db
;
142 /// Do we need to heal the replica?
143 bool live_db_corrupt
= false;
145 /** Do we have an offline database currently?
147 * The offline database is a new copy of the database we're bringing up
148 * to the required revision, which can't yet be made live.
150 bool have_offline_db
;
152 /** Flag to indicate that the only valid operation next is a full copy.
156 /** The revision that the secondary database has been updated to.
158 string offline_revision
;
160 /** The UUID of the secondary database.
164 /** The revision that the secondary database must reach before it can be
167 string offline_needed_revision
;
169 /** The time at which a changeset was last applied to the live database.
171 * Set to 0 if no changeset applied to the live database so far.
173 double last_live_changeset_time
;
175 /// The remote connection we're using.
176 RemoteConnection
* conn
;
178 /** Update the stub database which points to a single database.
180 * The stub database file is created at a separate path, and then
181 * atomically moved into place to replace the old stub database. This
182 * should allow searches to continue uninterrupted.
184 void update_stub_database() const;
186 /** Delete the offline database. */
187 void remove_offline_db();
189 /** Apply a set of DB copy messages from the connection.
191 void apply_db_copy(double end_time
);
193 /** Check that a message type is as expected.
195 * Throws a NetworkError if the type is not the expected one.
197 void check_message_type(int type
, int expected
) const;
199 /** Check if the offline database has reached the required version.
201 * If so, make it live, and remove the old live database.
203 * @return true iff the offline database is made live
205 bool possibly_make_offline_live();
207 string
get_replica_path(int id
) const {
215 /// Open a new DatabaseReplica::Internal for the specified path.
216 explicit Internal(const string
& path_
);
219 ~Internal() { delete conn
; }
221 /// Get a string describing the current revision of the replica.
222 string
get_revision_info() const;
224 /// Set the file descriptor to read changesets from.
225 void set_read_fd(int fd
);
227 /// Read and apply the next changeset.
228 bool apply_next_changeset(ReplicationInfo
* info
,
229 double reader_close_time
);
231 /// Return a string describing this object.
232 string
get_description() const { return path
; }
235 // Methods of DatabaseReplica
237 DatabaseReplica::DatabaseReplica(const string
& path
)
238 : internal(new DatabaseReplica::Internal(path
))
240 LOGCALL_CTOR(REPLICA
, "DatabaseReplica", path
);
243 DatabaseReplica::~DatabaseReplica()
245 LOGCALL_DTOR(REPLICA
, "DatabaseReplica");
250 DatabaseReplica::get_revision_info() const
252 LOGCALL(REPLICA
, string
, "DatabaseReplica::get_revision_info", NO_ARGS
);
253 RETURN(internal
->get_revision_info());
257 DatabaseReplica::set_read_fd(int fd
)
259 LOGCALL_VOID(REPLICA
, "DatabaseReplica::set_read_fd", fd
);
260 internal
->set_read_fd(fd
);
264 DatabaseReplica::apply_next_changeset(ReplicationInfo
* info
,
265 double reader_close_time
)
267 LOGCALL(REPLICA
, bool, "DatabaseReplica::apply_next_changeset", info
| reader_close_time
);
270 RETURN(internal
->apply_next_changeset(info
, reader_close_time
));
274 DatabaseReplica::get_description() const
276 string
desc("DatabaseReplica(");
277 desc
+= internal
->get_description();
282 // Methods of DatabaseReplica::Internal
285 DatabaseReplica::Internal::update_stub_database() const
287 string stub_path
= path
;
288 stub_path
+= "/XAPIANDB";
289 string tmp_path
= stub_path
;
292 ofstream
stub(tmp_path
.c_str());
293 stub
<< REPLICA_STUB_BANNER
294 "auto replica_" << live_id
<< endl
;
296 if (!io_tmp_rename(tmp_path
, stub_path
)) {
297 string
msg("Failed to update stub db file for replica: ");
299 throw Xapian::DatabaseOpeningError(msg
, errno
);
303 DatabaseReplica::Internal::Internal(const string
& path_
)
304 : path(path_
), live_id(0), live_db(), have_offline_db(false),
305 need_copy_next(false), offline_revision(), offline_needed_revision(),
306 last_live_changeset_time(), conn(NULL
)
308 LOGCALL_CTOR(REPLICA
, "DatabaseReplica::Internal", path_
);
309 #ifndef XAPIAN_HAS_GLASS_BACKEND
310 throw FeatureUnavailableError("Replication requires the glass backend to be enabled");
312 if (mkdir(path
.c_str(), 0777) == 0) {
313 // The database doesn't already exist - make a directory, containing a
314 // stub database, and point it to a new database.
316 // Create an empty database - the backend doesn't matter as if the
317 // master is a different type, then the replica will become that type
319 live_db
= WritableDatabase(get_replica_path(live_id
),
321 update_stub_database();
323 if (errno
!= EEXIST
) {
324 throw DatabaseOpeningError("Couldn't create directory '" + path
+ "'", errno
);
326 if (!dir_exists(path
)) {
327 throw DatabaseOpeningError("Replica path must be a directory");
329 string stub_path
= path
;
330 stub_path
+= "/XAPIANDB";
332 live_db
= WritableDatabase(stub_path
,
333 Xapian::DB_OPEN
|Xapian::DB_BACKEND_STUB
);
334 } catch (const Xapian::DatabaseCorruptError
&) {
335 // If the database is too corrupt to open, force a full copy so we
336 // auto-heal from this condition. Instance seen in the wild was
337 // that the replica had all files truncated to size 0.
338 live_db_corrupt
= true;
340 // FIXME: simplify all this?
341 ifstream
stub(stub_path
.c_str());
343 while (getline(stub
, line
)) {
344 if (!line
.empty() && line
[0] != '#') {
345 live_id
= line
[line
.size() - 1] - '0';
354 DatabaseReplica::Internal::get_revision_info() const
356 LOGCALL(REPLICA
, string
, "DatabaseReplica::Internal::get_revision_info", NO_ARGS
);
357 if (live_db_corrupt
) {
361 switch (live_db
.internal
->size()) {
363 live_db
= WritableDatabase(get_replica_path(live_id
), Xapian::DB_OPEN
);
369 throw Xapian::InvalidOperationError("DatabaseReplica needs to be "
370 "pointed at exactly one "
374 string uuid
= live_db
.get_uuid();
375 string buf
= encode_length(uuid
.size());
377 pack_uint(buf
, live_db
.get_revision());
382 DatabaseReplica::Internal::remove_offline_db()
384 // Delete the offline database.
385 removedir(get_replica_path(live_id
^ 1));
386 have_offline_db
= false;
390 DatabaseReplica::Internal::apply_db_copy(double end_time
)
392 have_offline_db
= true;
393 last_live_changeset_time
= 0;
394 string offline_path
= get_replica_path(live_id
^ 1);
395 // If there's already an offline database, discard it. This happens if one
396 // copy of the database was sent, but further updates were needed before it
397 // could be made live, and the remote end was then unable to send those
398 // updates (probably due to not having changesets available, or the remote
399 // database being replaced by a new database).
400 removedir(offline_path
);
401 if (mkdir(offline_path
.c_str(), 0777)) {
402 throw Xapian::DatabaseError("Cannot make directory '" +
403 offline_path
+ "'", errno
);
408 int type
= conn
->get_message(buf
, end_time
);
409 check_message_type(type
, REPL_REPLY_DB_HEADER
);
410 const char * ptr
= buf
.data();
411 const char * end
= ptr
+ buf
.size();
413 decode_length_and_check(&ptr
, end
, uuid_length
);
414 offline_uuid
.assign(ptr
, uuid_length
);
415 offline_revision
.assign(buf
, ptr
+ uuid_length
- buf
.data(), buf
.npos
);
418 // Now, read the files for the database from the connection and create it.
421 int type
= conn
->sniff_next_message_type(end_time
);
422 if (type
< 0 || type
== REPL_REPLY_FAIL
)
424 if (type
== REPL_REPLY_DB_FOOTER
)
427 type
= conn
->get_message(filename
, end_time
);
428 check_message_type(type
, REPL_REPLY_DB_FILENAME
);
430 // Check that the filename doesn't contain '..'. No valid database
431 // file contains .., so we don't need to check that the .. is a path.
432 if (filename
.find("..") != string::npos
) {
433 throw NetworkError("Filename in database contains '..'");
436 type
= conn
->sniff_next_message_type(end_time
);
437 if (type
< 0 || type
== REPL_REPLY_FAIL
)
440 string filepath
= offline_path
+ "/" + filename
;
441 type
= conn
->receive_file(filepath
, end_time
);
443 throw_connection_closed_unexpectedly();
444 check_message_type(type
, REPL_REPLY_DB_FILEDATA
);
446 int type
= conn
->get_message(offline_needed_revision
, end_time
);
447 check_message_type(type
, REPL_REPLY_DB_FOOTER
);
448 need_copy_next
= false;
452 DatabaseReplica::Internal::check_message_type(int type
, int expected
) const
454 if (type
!= expected
) {
456 throw_connection_closed_unexpectedly();
457 string m
= "Expected replication protocol message type #";
461 throw NetworkError(m
);
466 DatabaseReplica::Internal::possibly_make_offline_live()
468 string
replica_path(get_replica_path(live_id
^ 1));
469 unique_ptr
<DatabaseReplicator
> replicator
;
471 replicator
.reset(DatabaseReplicator::open(replica_path
));
472 } catch (const Xapian::DatabaseError
&) {
475 if (offline_needed_revision
.empty()) {
478 if (!replicator
->check_revision_at_least(offline_revision
,
479 offline_needed_revision
)) {
483 string replicated_uuid
= replicator
->get_uuid();
484 if (replicated_uuid
.empty()) {
488 if (replicated_uuid
!= offline_uuid
) {
493 // Open the database first, so that if there's a problem, an exception
494 // will be thrown before we make the new database live.
495 live_db
= WritableDatabase(replica_path
, Xapian::DB_OPEN
);
496 live_db_corrupt
= false;
497 update_stub_database();
503 DatabaseReplica::Internal::set_read_fd(int fd
)
507 conn
= new RemoteConnection(fd
, -1);
511 DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo
* info
,
512 double reader_close_time
)
514 LOGCALL(REPLICA
, bool, "DatabaseReplica::Internal::apply_next_changeset", info
| reader_close_time
);
516 int type
= conn
->sniff_next_message_type(0.0);
518 case REPL_REPLY_END_OF_CHANGES
: {
520 type
= conn
->get_message(buf
, 0.0);
521 check_message_type(type
, REPL_REPLY_END_OF_CHANGES
);
524 case REPL_REPLY_DB_HEADER
:
525 // Apply the copy - remove offline db in case of any error.
529 ++(info
->fullcopy_count
);
532 unique_ptr
<DatabaseReplicator
> replicator(
533 DatabaseReplicator::open(get_replica_path(live_id
^ 1)));
534 replica_uuid
= replicator
->get_uuid();
536 if (replica_uuid
!= offline_uuid
) {
538 // We've been sent an database with the wrong uuid,
539 // which only happens if the database at the server
540 // got changed during the copy, so the only safe
541 // action next is a new copy. Set a flag to ensure
542 // that this happens, or we're at risk of database
544 need_copy_next
= true;
550 if (possibly_make_offline_live()) {
552 info
->changed
= true;
555 case REPL_REPLY_CHANGESET
:
556 if (need_copy_next
) {
557 throw NetworkError("Needed a database copy next");
559 if (!have_offline_db
) {
560 // Close the live db.
561 string
replica_path(get_replica_path(live_id
));
562 live_db
= WritableDatabase();
564 if (last_live_changeset_time
!= 0.0) {
565 // Wait until at least "reader_close_time" seconds have
566 // passed since the last changeset was applied, to
567 // allow any active readers to finish and be reopened.
569 until
= last_live_changeset_time
+ reader_close_time
;
570 RealTime::sleep(until
);
573 // Open a replicator for the live path, and apply the
576 unique_ptr
<DatabaseReplicator
> replicator(
577 DatabaseReplicator::open(replica_path
));
579 // Ignore the returned revision number, since we are
580 // live so the changeset must be safe to apply to a
582 replicator
->apply_changeset_from_conn(*conn
, 0.0, true);
584 last_live_changeset_time
= RealTime::now();
587 ++(info
->changeset_count
);
588 info
->changed
= true;
590 // Now the replicator is closed, open the live db again.
591 live_db
= WritableDatabase(replica_path
, Xapian::DB_OPEN
);
592 live_db_corrupt
= false;
597 unique_ptr
<DatabaseReplicator
> replicator(
598 DatabaseReplicator::open(get_replica_path(live_id
^ 1)));
600 offline_revision
= replicator
->
601 apply_changeset_from_conn(*conn
, 0.0, false);
604 ++(info
->changeset_count
);
607 if (possibly_make_offline_live()) {
609 info
->changed
= true;
612 case REPL_REPLY_FAIL
: {
614 if (conn
->get_message(buf
, 0.0) < 0)
615 throw_connection_closed_unexpectedly();
616 throw NetworkError("Unable to fully synchronise: " + buf
);
619 throw_connection_closed_unexpectedly();
621 throw NetworkError("Unknown replication protocol message (" +