1 /** @file glass_databasereplicator.cc
2 * @brief Support for glass database replication
4 /* Copyright 2008 Lemur Consulting Ltd
5 * Copyright 2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation; either version 2 of the
10 * License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
25 #include "glass_databasereplicator.h"
27 #include "xapian/error.h"
29 #include "../flint_lock.h"
30 #include "glass_defs.h"
31 #include "glass_replicate_internal.h"
32 #include "glass_version.h"
33 #include "compression_stream.h"
36 #include "internaltypes.h"
39 #include "posixy_wrapper.h"
40 #include "net/remoteconnection.h"
41 #include "replicationprotocol.h"
43 #include "stringutils.h"
50 throw_connection_closed_unexpectedly()
52 throw Xapian::NetworkError("Connection closed unexpectedly");
56 using namespace Xapian
;
58 static const char * dbnames
=
59 "/postlist." GLASS_TABLE_EXTENSION
"\0"
60 "/docdata." GLASS_TABLE_EXTENSION
"\0\0"
61 "/termlist." GLASS_TABLE_EXTENSION
"\0"
62 "/position." GLASS_TABLE_EXTENSION
"\0"
63 "/spelling." GLASS_TABLE_EXTENSION
"\0"
64 "/synonym." GLASS_TABLE_EXTENSION
;
66 GlassDatabaseReplicator::GlassDatabaseReplicator(const string
& db_dir_
)
69 std::fill_n(fds
, sizeof(fds
) / sizeof(fds
[0]), -1);
73 GlassDatabaseReplicator::commit() const
75 for (size_t i
= 0; i
!= Glass::MAX_
; ++i
) {
79 #if 0 // FIXME: close or keep open?
87 GlassDatabaseReplicator::~GlassDatabaseReplicator()
89 for (size_t i
= 0; i
!= Glass::MAX_
; ++i
) {
98 GlassDatabaseReplicator::check_revision_at_least(const string
& rev
,
99 const string
& target
) const
101 LOGCALL(DB
, bool, "GlassDatabaseReplicator::check_revision_at_least", rev
| target
);
103 glass_revision_number_t rev_val
;
104 glass_revision_number_t target_val
;
106 const char * ptr
= rev
.data();
107 const char * end
= ptr
+ rev
.size();
108 if (!unpack_uint(&ptr
, end
, &rev_val
)) {
109 throw NetworkError("Invalid revision string supplied to check_revision_at_least");
113 end
= ptr
+ target
.size();
114 if (!unpack_uint(&ptr
, end
, &target_val
)) {
115 throw NetworkError("Invalid revision string supplied to check_revision_at_least");
118 RETURN(rev_val
>= target_val
);
122 GlassDatabaseReplicator::process_changeset_chunk_version(string
& buf
,
123 RemoteConnection
& conn
,
124 double end_time
) const
126 const char *ptr
= buf
.data();
127 const char *end
= ptr
+ buf
.size();
129 glass_revision_number_t rev
;
130 if (!unpack_uint(&ptr
, end
, &rev
))
131 throw NetworkError("Invalid revision in changeset");
133 string::size_type size
;
134 if (!unpack_uint(&ptr
, end
, &size
))
135 throw NetworkError("Invalid version file size in changeset");
137 // Get the new version file into buf.
138 buf
.erase(0, ptr
- buf
.data());
139 int res
= conn
.get_message_chunk(buf
, size
, end_time
);
142 throw_connection_closed_unexpectedly();
143 throw NetworkError("Unexpected end of changeset (6)");
146 // Write size bytes from start of buf to new version file.
147 string tmpfile
= db_dir
;
148 tmpfile
+= "/v.rtmp";
149 int fd
= posixy_open(tmpfile
.c_str(), O_WRONLY
| O_CREAT
| O_TRUNC
| O_CLOEXEC
, 0666);
151 string msg
= "Failed to open ";
153 throw DatabaseError(msg
, errno
);
157 io_write(fd
, buf
.data(), size
);
160 string version_file
= db_dir
;
161 version_file
+= "/iamglass";
162 if (!io_tmp_rename(tmpfile
, version_file
)) {
163 string
msg("Couldn't create new version file ");
165 throw DatabaseError(msg
, errno
);
172 GlassDatabaseReplicator::process_changeset_chunk_blocks(Glass::table_type table
,
175 RemoteConnection
& conn
,
176 double end_time
) const
178 const char *ptr
= buf
.data();
179 const char *end
= ptr
+ buf
.size();
181 unsigned int changeset_blocksize
= GLASS_MIN_BLOCKSIZE
<< v
;
182 if (changeset_blocksize
> 65536 ||
183 (changeset_blocksize
& (changeset_blocksize
- 1))) {
184 throw NetworkError("Invalid blocksize in changeset");
187 if (!unpack_uint(&ptr
, end
, &block_number
))
188 throw NetworkError("Invalid block number in changeset");
190 buf
.erase(0, ptr
- buf
.data());
194 string db_path
= db_dir
;
195 db_path
+= dbnames
+ table
* (11 + CONST_STRLEN(GLASS_TABLE_EXTENSION
));
196 fd
= posixy_open(db_path
.c_str(), O_WRONLY
| O_CREAT
| O_CLOEXEC
, 0666);
198 string msg
= "Failed to open ";
200 throw DatabaseError(msg
, errno
);
205 int res
= conn
.get_message_chunk(buf
, changeset_blocksize
, end_time
);
208 throw_connection_closed_unexpectedly();
209 throw NetworkError("Unexpected end of changeset (4)");
212 io_write_block(fd
, buf
.data(), changeset_blocksize
, block_number
);
213 buf
.erase(0, changeset_blocksize
);
217 GlassDatabaseReplicator::apply_changeset_from_conn(RemoteConnection
& conn
,
221 LOGCALL(DB
, string
, "GlassDatabaseReplicator::apply_changeset_from_conn", conn
| end_time
| valid
);
223 // Lock the database to perform modifications.
224 FlintLock
lock(db_dir
);
226 FlintLock::reason why
= lock
.lock(true, false, explanation
);
227 if (why
!= FlintLock::SUCCESS
) {
228 lock
.throw_databaselockerror(why
, db_dir
, explanation
);
231 int type
= conn
.get_message_chunked(end_time
);
233 throw_connection_closed_unexpectedly();
234 AssertEq(type
, REPL_REPLY_CHANGESET
);
237 // Read enough to be certain that we've got the header part of the
240 if (conn
.get_message_chunk(buf
, REASONABLE_CHANGESET_SIZE
, end_time
) < 0)
241 throw_connection_closed_unexpectedly();
242 const char *ptr
= buf
.data();
243 const char *end
= ptr
+ buf
.size();
244 // Check the magic string.
245 if (!startswith(buf
, CHANGES_MAGIC_STRING
)) {
246 throw NetworkError("Invalid ChangeSet magic string");
248 ptr
+= CONST_STRLEN(CHANGES_MAGIC_STRING
);
250 throw NetworkError("Couldn't read a valid version number from changeset");
251 unsigned int changes_version
= *ptr
++;
252 if (changes_version
!= CHANGES_VERSION
)
253 throw NetworkError("Unsupported changeset version");
255 glass_revision_number_t startrev
;
256 glass_revision_number_t endrev
;
258 if (!unpack_uint(&ptr
, end
, &startrev
))
259 throw NetworkError("Couldn't read a valid start revision from changeset");
260 if (!unpack_uint(&ptr
, end
, &endrev
))
261 throw NetworkError("Couldn't read a valid end revision from changeset");
263 if (endrev
<= startrev
)
264 throw NetworkError("End revision in changeset is not later than start revision");
267 throw NetworkError("Unexpected end of changeset (1)");
270 // Check the revision number.
271 // If the database was not known to be valid, we cannot
272 // reliably determine its revision number, so must skip this
274 GlassVersion
version_file(db_dir
);
276 if (startrev
!= version_file
.get_revision())
277 throw NetworkError("Changeset supplied is for wrong revision number");
280 unsigned char changes_type
= *ptr
++;
281 if (changes_type
!= 0) {
282 throw NetworkError("Unsupported changeset type: " + str(changes_type
));
283 // FIXME - support changes of type 1, produced when DANGEROUS mode is
287 // Clear the bits of the buffer which have been read.
288 buf
.erase(0, ptr
- buf
.data());
290 // Read the items from the changeset.
292 if (conn
.get_message_chunk(buf
, REASONABLE_CHANGESET_SIZE
, end_time
) < 0)
293 throw_connection_closed_unexpectedly();
295 end
= ptr
+ buf
.size();
297 throw NetworkError("Unexpected end of changeset (3)");
299 // Read the type of the next chunk of data
300 // chunk type can be (in binary):
302 // 11111111 - last chunk
303 // 11111110 - version file
304 // 00BBBTTT - table block:
305 // Block size = (GLASS_MIN_BLOCKSIZE<<BBB) BBB=0..5
306 // Table TTT=0..(Glass::MAX_-1)
307 unsigned char chunk_type
= *ptr
++;
308 if (chunk_type
== 0xff)
310 if (chunk_type
== 0xfe) {
312 buf
.erase(0, ptr
- buf
.data());
313 process_changeset_chunk_version(buf
, conn
, end_time
);
316 size_t table_code
= (chunk_type
& 0x07);
317 if (table_code
>= Glass::MAX_
)
318 throw NetworkError("Bad table code in changeset file");
319 Glass::table_type table
= static_cast<Glass::table_type
>(table_code
);
320 unsigned char v
= (chunk_type
>> 3) & 0x0f;
323 buf
.erase(0, ptr
- buf
.data());
324 process_changeset_chunk_blocks(table
, v
, buf
, conn
, end_time
);
328 throw NetworkError("Junk found at end of changeset");
331 pack_uint(buf
, endrev
);
339 GlassDatabaseReplicator::get_uuid() const
341 LOGCALL(DB
, string
, "GlassDatabaseReplicator::get_uuid", NO_ARGS
);
342 GlassVersion
version_file(db_dir
);
345 } catch (const Xapian::DatabaseError
&) {
348 RETURN(version_file
.get_uuid_string());