Drop special handling for Compaq C++
[xapian.git] / xapian-core / backends / glass / glass_databasereplicator.cc
blob5730589052d236f5c7cf3f4edce3b9af2c2bb50f
1 /** @file glass_databasereplicator.cc
2 * @brief Support for glass database replication
3 */
4 /* Copyright 2008 Lemur Consulting Ltd
5 * Copyright 2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation; either version 2 of the
10 * License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
20 * USA
23 #include <config.h>
25 #include "glass_databasereplicator.h"
27 #include "xapian/error.h"
29 #include "../flint_lock.h"
30 #include "glass_defs.h"
31 #include "glass_replicate_internal.h"
32 #include "glass_version.h"
33 #include "compression_stream.h"
34 #include "debuglog.h"
35 #include "fd.h"
36 #include "internaltypes.h"
37 #include "io_utils.h"
38 #include "pack.h"
39 #include "posixy_wrapper.h"
40 #include "net/remoteconnection.h"
41 #include "replicationprotocol.h"
42 #include "str.h"
43 #include "stringutils.h"
45 #include <algorithm>
46 #include <cerrno>
48 [[noreturn]]
49 static void
50 throw_connection_closed_unexpectedly()
52 throw Xapian::NetworkError("Connection closed unexpectedly");
55 using namespace std;
56 using namespace Xapian;
58 static const char * dbnames =
59 "/postlist." GLASS_TABLE_EXTENSION "\0"
60 "/docdata." GLASS_TABLE_EXTENSION "\0\0"
61 "/termlist." GLASS_TABLE_EXTENSION "\0"
62 "/position." GLASS_TABLE_EXTENSION "\0"
63 "/spelling." GLASS_TABLE_EXTENSION "\0"
64 "/synonym." GLASS_TABLE_EXTENSION;
66 GlassDatabaseReplicator::GlassDatabaseReplicator(const string & db_dir_)
67 : db_dir(db_dir_)
69 std::fill_n(fds, sizeof(fds) / sizeof(fds[0]), -1);
72 void
73 GlassDatabaseReplicator::commit() const
75 for (size_t i = 0; i != Glass::MAX_; ++i) {
76 int fd = fds[i];
77 if (fd >= 0) {
78 io_sync(fd);
79 #if 0 // FIXME: close or keep open?
80 close(fd);
81 fds[i] = -1;
82 #endif
87 GlassDatabaseReplicator::~GlassDatabaseReplicator()
89 for (size_t i = 0; i != Glass::MAX_; ++i) {
90 int fd = fds[i];
91 if (fd >= 0) {
92 close(fd);
97 bool
98 GlassDatabaseReplicator::check_revision_at_least(const string & rev,
99 const string & target) const
101 LOGCALL(DB, bool, "GlassDatabaseReplicator::check_revision_at_least", rev | target);
103 glass_revision_number_t rev_val;
104 glass_revision_number_t target_val;
106 const char * ptr = rev.data();
107 const char * end = ptr + rev.size();
108 if (!unpack_uint(&ptr, end, &rev_val)) {
109 throw NetworkError("Invalid revision string supplied to check_revision_at_least");
112 ptr = target.data();
113 end = ptr + target.size();
114 if (!unpack_uint(&ptr, end, &target_val)) {
115 throw NetworkError("Invalid revision string supplied to check_revision_at_least");
118 RETURN(rev_val >= target_val);
121 void
122 GlassDatabaseReplicator::process_changeset_chunk_version(string & buf,
123 RemoteConnection & conn,
124 double end_time) const
126 const char *ptr = buf.data();
127 const char *end = ptr + buf.size();
129 glass_revision_number_t rev;
130 if (!unpack_uint(&ptr, end, &rev))
131 throw NetworkError("Invalid revision in changeset");
133 string::size_type size;
134 if (!unpack_uint(&ptr, end, &size))
135 throw NetworkError("Invalid version file size in changeset");
137 // Get the new version file into buf.
138 buf.erase(0, ptr - buf.data());
139 int res = conn.get_message_chunk(buf, size, end_time);
140 if (res <= 0) {
141 if (res < 0)
142 throw_connection_closed_unexpectedly();
143 throw NetworkError("Unexpected end of changeset (6)");
146 // Write size bytes from start of buf to new version file.
147 string tmpfile = db_dir;
148 tmpfile += "/v.rtmp";
149 int fd = posixy_open(tmpfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, 0666);
150 if (fd == -1) {
151 string msg = "Failed to open ";
152 msg += tmpfile;
153 throw DatabaseError(msg, errno);
156 FD closer(fd);
157 io_write(fd, buf.data(), size);
158 io_sync(fd);
160 string version_file = db_dir;
161 version_file += "/iamglass";
162 if (!io_tmp_rename(tmpfile, version_file)) {
163 string msg("Couldn't create new version file ");
164 msg += version_file;
165 throw DatabaseError(msg, errno);
168 buf.erase(0, size);
171 void
172 GlassDatabaseReplicator::process_changeset_chunk_blocks(Glass::table_type table,
173 unsigned v,
174 string & buf,
175 RemoteConnection & conn,
176 double end_time) const
178 const char *ptr = buf.data();
179 const char *end = ptr + buf.size();
181 unsigned int changeset_blocksize = GLASS_MIN_BLOCKSIZE << v;
182 if (changeset_blocksize > 65536 ||
183 (changeset_blocksize & (changeset_blocksize - 1))) {
184 throw NetworkError("Invalid blocksize in changeset");
186 uint4 block_number;
187 if (!unpack_uint(&ptr, end, &block_number))
188 throw NetworkError("Invalid block number in changeset");
190 buf.erase(0, ptr - buf.data());
192 int fd = fds[table];
193 if (fd == -1) {
194 string db_path = db_dir;
195 db_path += dbnames + table * (11 + CONST_STRLEN(GLASS_TABLE_EXTENSION));
196 fd = posixy_open(db_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
197 if (fd == -1) {
198 string msg = "Failed to open ";
199 msg += db_path;
200 throw DatabaseError(msg, errno);
202 fds[table] = fd;
205 int res = conn.get_message_chunk(buf, changeset_blocksize, end_time);
206 if (res <= 0) {
207 if (res < 0)
208 throw_connection_closed_unexpectedly();
209 throw NetworkError("Unexpected end of changeset (4)");
212 io_write_block(fd, buf.data(), changeset_blocksize, block_number);
213 buf.erase(0, changeset_blocksize);
216 string
217 GlassDatabaseReplicator::apply_changeset_from_conn(RemoteConnection & conn,
218 double end_time,
219 bool valid) const
221 LOGCALL(DB, string, "GlassDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
223 // Lock the database to perform modifications.
224 FlintLock lock(db_dir);
225 string explanation;
226 FlintLock::reason why = lock.lock(true, false, explanation);
227 if (why != FlintLock::SUCCESS) {
228 lock.throw_databaselockerror(why, db_dir, explanation);
231 int type = conn.get_message_chunked(end_time);
232 if (type < 0)
233 throw_connection_closed_unexpectedly();
234 AssertEq(type, REPL_REPLY_CHANGESET);
236 string buf;
237 // Read enough to be certain that we've got the header part of the
238 // changeset.
240 if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
241 throw_connection_closed_unexpectedly();
242 const char *ptr = buf.data();
243 const char *end = ptr + buf.size();
244 // Check the magic string.
245 if (!startswith(buf, CHANGES_MAGIC_STRING)) {
246 throw NetworkError("Invalid ChangeSet magic string");
248 ptr += CONST_STRLEN(CHANGES_MAGIC_STRING);
249 if (ptr == end)
250 throw NetworkError("Couldn't read a valid version number from changeset");
251 unsigned int changes_version = *ptr++;
252 if (changes_version != CHANGES_VERSION)
253 throw NetworkError("Unsupported changeset version");
255 glass_revision_number_t startrev;
256 glass_revision_number_t endrev;
258 if (!unpack_uint(&ptr, end, &startrev))
259 throw NetworkError("Couldn't read a valid start revision from changeset");
260 if (!unpack_uint(&ptr, end, &endrev))
261 throw NetworkError("Couldn't read a valid end revision from changeset");
263 if (endrev <= startrev)
264 throw NetworkError("End revision in changeset is not later than start revision");
266 if (ptr == end)
267 throw NetworkError("Unexpected end of changeset (1)");
269 if (valid) {
270 // Check the revision number.
271 // If the database was not known to be valid, we cannot
272 // reliably determine its revision number, so must skip this
273 // check.
274 GlassVersion version_file(db_dir);
275 version_file.read();
276 if (startrev != version_file.get_revision())
277 throw NetworkError("Changeset supplied is for wrong revision number");
280 unsigned char changes_type = *ptr++;
281 if (changes_type != 0) {
282 throw NetworkError("Unsupported changeset type: " + str(changes_type));
283 // FIXME - support changes of type 1, produced when DANGEROUS mode is
284 // on.
287 // Clear the bits of the buffer which have been read.
288 buf.erase(0, ptr - buf.data());
290 // Read the items from the changeset.
291 while (true) {
292 if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
293 throw_connection_closed_unexpectedly();
294 ptr = buf.data();
295 end = ptr + buf.size();
296 if (ptr == end)
297 throw NetworkError("Unexpected end of changeset (3)");
299 // Read the type of the next chunk of data
300 // chunk type can be (in binary):
302 // 11111111 - last chunk
303 // 11111110 - version file
304 // 00BBBTTT - table block:
305 // Block size = (GLASS_MIN_BLOCKSIZE<<BBB) BBB=0..5
306 // Table TTT=0..(Glass::MAX_-1)
307 unsigned char chunk_type = *ptr++;
308 if (chunk_type == 0xff)
309 break;
310 if (chunk_type == 0xfe) {
311 // Version file.
312 buf.erase(0, ptr - buf.data());
313 process_changeset_chunk_version(buf, conn, end_time);
314 continue;
316 size_t table_code = (chunk_type & 0x07);
317 if (table_code >= Glass::MAX_)
318 throw NetworkError("Bad table code in changeset file");
319 Glass::table_type table = static_cast<Glass::table_type>(table_code);
320 unsigned char v = (chunk_type >> 3) & 0x0f;
322 // Process the chunk
323 buf.erase(0, ptr - buf.data());
324 process_changeset_chunk_blocks(table, v, buf, conn, end_time);
327 if (ptr != end)
328 throw NetworkError("Junk found at end of changeset");
330 buf.resize(0);
331 pack_uint(buf, endrev);
333 commit();
335 RETURN(buf);
338 string
339 GlassDatabaseReplicator::get_uuid() const
341 LOGCALL(DB, string, "GlassDatabaseReplicator::get_uuid", NO_ARGS);
342 GlassVersion version_file(db_dir);
343 try {
344 version_file.read();
345 } catch (const Xapian::DatabaseError &) {
346 RETURN(string());
348 RETURN(version_file.get_uuid_string());