Refactor to simply compaction code slightly
[xapian.git] / xapian-core / api / compactor.cc
blob5798278dc602f68e7e132af484d8fcaa97f44c4a
1 /** @file compactor.cc
2 * @brief Compact a database, or merge and compact several.
3 */
4 /* Copyright (C) 2003,2004,2005,2006,2007,2008,2009,2010,2011,2015 Olly Betts
5 * Copyright (C) 2008 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation; either version 2 of the
10 * License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
20 * USA
23 #include <config.h>
25 #include <xapian/compactor.h>
27 #include "safeerrno.h"
29 #include <algorithm>
30 #include <fstream>
32 #include <cstdio> // for rename()
33 #include <cstring>
34 #include <ctime>
35 #include "safesysstat.h"
36 #include <sys/types.h>
38 #include "safeunistd.h"
39 #include "safefcntl.h"
41 #include "noreturn.h"
42 #include "omassert.h"
43 #include "fileutils.h"
44 #ifdef __WIN32__
45 # include "msvc_posix_wrapper.h"
46 #endif
47 #include "stringutils.h"
48 #include "str.h"
49 #include "utils.h"
51 #include "backends/brass/brass_compact.h"
52 #include "backends/brass/brass_version.h"
53 #include "backends/chert/chert_compact.h"
54 #include "backends/chert/chert_version.h"
55 #include "backends/flint/flint_compact.h"
56 #include "backends/flint/flint_version.h"
58 #include <xapian.h>
60 using namespace std;
62 class CmpByFirstUsed {
63 const vector<pair<Xapian::docid, Xapian::docid> > & used_ranges;
65 public:
66 CmpByFirstUsed(const vector<pair<Xapian::docid, Xapian::docid> > & ur)
67 : used_ranges(ur) { }
69 bool operator()(size_t a, size_t b) {
70 return used_ranges[a].first < used_ranges[b].first;
74 static const char * backend_names[] = {
75 NULL,
76 "brass",
77 "chert",
78 "flint"
81 enum { STUB_NO, STUB_FILE, STUB_DIR };
83 namespace Xapian {
85 class Compactor::Internal : public Xapian::Internal::RefCntBase {
86 friend class Compactor;
88 string destdir;
89 bool renumber;
90 bool multipass;
91 int compact_to_stub;
92 size_t block_size;
93 compaction_level compaction;
95 Xapian::docid tot_off;
96 Xapian::docid last_docid;
98 enum { UNKNOWN, BRASS, CHERT, FLINT } backend;
100 struct stat sb;
102 string first_source;
104 vector<string> sources;
105 vector<Xapian::docid> offset;
106 vector<pair<Xapian::docid, Xapian::docid> > used_ranges;
107 public:
108 Internal()
109 : renumber(true), multipass(false),
110 block_size(8192), compaction(FULL), tot_off(0),
111 last_docid(0), backend(UNKNOWN)
115 void set_destdir(const string & destdir_);
117 void add_source(const string & srcdir);
119 void compact(Xapian::Compactor & compactor);
122 Compactor::Compactor() : internal(new Compactor::Internal()) { }
124 Compactor::~Compactor() { }
126 void
127 Compactor::set_block_size(size_t block_size)
129 internal->block_size = block_size;
132 void
133 Compactor::set_renumber(bool renumber)
135 internal->renumber = renumber;
138 void
139 Compactor::set_multipass(bool multipass)
141 internal->multipass = multipass;
144 void
145 Compactor::set_compaction_level(compaction_level compaction)
147 internal->compaction = compaction;
150 void
151 Compactor::set_destdir(const string & destdir)
153 internal->set_destdir(destdir);
156 void
157 Compactor::add_source(const string & srcdir)
159 internal->add_source(srcdir);
162 void
163 Compactor::compact()
165 internal->compact(*this);
168 void
169 Compactor::set_status(const string & table, const string & status)
171 (void)table;
172 (void)status;
175 string
176 Compactor::resolve_duplicate_metadata(const string & key,
177 size_t num_tags, const std::string tags[])
179 (void)key;
180 (void)num_tags;
181 return tags[0];
186 XAPIAN_NORETURN(
187 static void
188 backend_mismatch(const string &dbpath1, int backend1,
189 const string &dbpath2, int backend2)
191 static void
192 backend_mismatch(const string &dbpath1, int backend1,
193 const string &dbpath2, int backend2)
195 string msg = "All databases must be the same type ('";
196 msg += dbpath1;
197 msg += "' is ";
198 msg += backend_names[backend1];
199 msg += ", but '";
200 msg += dbpath2;
201 msg += "' is ";
202 msg += backend_names[backend2];
203 msg += ')';
204 throw Xapian::InvalidArgumentError(msg);
207 namespace Xapian {
209 void
210 Compactor::Internal::set_destdir(const string & destdir_) {
211 destdir = destdir_;
212 compact_to_stub = STUB_NO;
213 if (stat(destdir, &sb) == 0 && S_ISREG(sb.st_mode)) {
214 // Stub file.
215 compact_to_stub = STUB_FILE;
216 } else if (stat(destdir + "/XAPIANDB", &sb) == 0 && S_ISREG(sb.st_mode)) {
217 // Stub directory.
218 compact_to_stub = STUB_DIR;
222 void
223 Compactor::Internal::add_source(const string & srcdir)
225 // Check destdir isn't the same as any source directory, unless it is a
226 // stub database.
227 if (!compact_to_stub && srcdir == destdir) {
228 throw Xapian::InvalidArgumentError("destination may not be the same as any source directory, unless it is a stub database");
231 if (stat(srcdir, &sb) == 0) {
232 bool is_stub = false;
233 string file = srcdir;
234 if (S_ISREG(sb.st_mode)) {
235 // Stub database file.
236 is_stub = true;
237 } else if (S_ISDIR(sb.st_mode)) {
238 file += "/XAPIANDB";
239 if (stat(file.c_str(), &sb) == 0 && S_ISREG(sb.st_mode)) {
240 // Stub database directory.
241 is_stub = true;
244 if (is_stub) {
245 ifstream stub(file.c_str());
246 string line;
247 unsigned int line_no = 0;
248 while (getline(stub, line)) {
249 ++line_no;
250 if (line.empty() || line[0] == '#')
251 continue;
252 string::size_type space = line.find(' ');
253 if (space == string::npos) space = line.size();
255 string type(line, 0, space);
256 line.erase(0, space + 1);
258 if (type == "auto" || type == "chert" || type == "flint" ||
259 type == "brass") {
260 resolve_relative_path(line, file);
261 add_source(line);
262 continue;
265 if (type == "remote" || type == "inmemory") {
266 string msg = "Can't compact stub entry of type '";
267 msg += type;
268 msg += '\'';
269 throw Xapian::InvalidOperationError(msg);
272 throw Xapian::DatabaseError("Bad line in stub file");
274 return;
278 if (stat(srcdir + "/iamflint", &sb) == 0) {
279 if (backend == UNKNOWN) {
280 backend = FLINT;
281 } else if (backend != FLINT) {
282 backend_mismatch(first_source, backend, srcdir, FLINT);
284 } else if (stat(srcdir + "/iamchert", &sb) == 0) {
285 if (backend == UNKNOWN) {
286 backend = CHERT;
287 } else if (backend != CHERT) {
288 backend_mismatch(first_source, backend, srcdir, CHERT);
290 } else if (stat(srcdir + "/iambrass", &sb) == 0) {
291 if (backend == UNKNOWN) {
292 backend = BRASS;
293 } else if (backend != BRASS) {
294 backend_mismatch(first_source, backend, srcdir, BRASS);
296 } else {
297 string msg = srcdir;
298 msg += ": not a flint, chert or brass database";
299 throw Xapian::InvalidArgumentError(msg);
302 if (first_source.empty())
303 first_source = srcdir;
305 Xapian::Database db(srcdir);
306 Xapian::docid first = 0, last = 0;
308 // "Empty" databases might have spelling or synonym data so can't
309 // just be completely ignored.
310 Xapian::doccount num_docs = db.get_doccount();
311 if (num_docs != 0) {
312 Xapian::PostingIterator it = db.postlist_begin(string());
313 // This test should never fail, since db.get_doccount() is
314 // non-zero!
315 Assert(it != db.postlist_end(string()));
316 first = *it;
318 if (renumber && first) {
319 // Prune any unused docids off the start of this source
320 // database.
322 // tot_off could wrap here, but it's unsigned, so that's
323 // OK.
324 tot_off -= (first - 1);
327 // There may be unused documents at the end of the range.
328 // Binary chop using skip_to to find the last actually used
329 // document id.
330 last = db.get_lastdocid();
331 Xapian::docid last_lbound = first + num_docs - 1;
332 while (last_lbound < last) {
333 Xapian::docid mid;
334 mid = last_lbound + (last - last_lbound + 1) / 2;
335 it.skip_to(mid);
336 if (it == db.postlist_end(string())) {
337 last = mid - 1;
338 it = db.postlist_begin(string());
339 continue;
341 last_lbound = *it;
344 offset.push_back(tot_off);
345 if (renumber)
346 tot_off += last;
347 else if (last_docid < db.get_lastdocid())
348 last_docid = db.get_lastdocid();
349 used_ranges.push_back(make_pair(first, last));
351 sources.push_back(srcdir + '/');
354 void
355 Compactor::Internal::compact(Xapian::Compactor & compactor)
357 if (renumber)
358 last_docid = tot_off;
360 if (!renumber && sources.size() > 1) {
361 // We want to process the sources in ascending order of first
362 // docid. So we create a vector "order" with ascending integers
363 // and then sort so the indirected order is right. Then we reorder
364 // the vectors into that order and check the ranges are disjoint.
365 vector<size_t> order;
366 order.reserve(sources.size());
367 for (size_t i = 0; i < sources.size(); ++i)
368 order.push_back(i);
370 sort(order.begin(), order.end(), CmpByFirstUsed(used_ranges));
372 // Reorder the vectors to be in ascending of first docid, and
373 // set all the offsets to 0.
374 vector<string> sources_(sources.size());
375 vector<pair<Xapian::docid, Xapian::docid> > used_ranges_;
376 used_ranges_.reserve(sources.size());
378 Xapian::docid last_start = 0, last_end = 0;
379 for (size_t j = 0; j != order.size(); ++j) {
380 size_t n = order[j];
382 swap(sources_[j], sources[n]);
383 used_ranges_.push_back(used_ranges[n]);
385 const pair<Xapian::docid, Xapian::docid> p = used_ranges[n];
386 // Skip empty databases.
387 if (p.first == 0 && p.second == 0)
388 continue;
389 // Check for overlap with the previous database's range.
390 if (p.first <= last_end) {
391 string msg = "when merging databases, --no-renumber is only currently supported if the databases have disjoint ranges of used document ids: ";
392 msg += sources[order[j - 1]];
393 msg += " has range ";
394 msg += str(last_start);
395 msg += '-';
396 msg += str(last_end);
397 msg += ", ";
398 msg += sources[n];
399 msg += " has range ";
400 msg += str(p.first);
401 msg += '-';
402 msg += str(p.second);
403 throw Xapian::InvalidOperationError(msg);
405 last_start = p.first;
406 last_end = p.second;
409 swap(sources, sources_);
410 swap(used_ranges, used_ranges_);
413 string stub_file;
414 if (compact_to_stub) {
415 stub_file = destdir;
416 if (compact_to_stub == STUB_DIR) {
417 stub_file += "/XAPIANDB";
418 destdir += '/';
419 } else {
420 destdir += '_';
422 size_t sfx = destdir.size();
423 time_t now = time(NULL);
424 while (true) {
425 destdir.resize(sfx);
426 destdir += str(now++);
427 if (mkdir(destdir, 0755) == 0)
428 break;
429 if (errno != EEXIST) {
430 string msg = destdir;
431 msg += ": mkdir failed";
432 throw Xapian::DatabaseError(msg, errno);
435 } else {
436 // If the destination database directory doesn't exist, create it.
437 if (mkdir(destdir, 0755) < 0) {
438 // Check why mkdir failed. It's ok if the directory already
439 // exists, but we also get EEXIST if there's an existing file with
440 // that name.
441 if (errno == EEXIST) {
442 if (stat(destdir, &sb) == 0 && S_ISDIR(sb.st_mode))
443 errno = 0;
444 else
445 errno = EEXIST; // stat might have changed it
447 if (errno) {
448 string msg = destdir;
449 msg += ": cannot create directory";
450 throw Xapian::DatabaseError(msg, errno);
455 if (backend == CHERT) {
456 #ifdef XAPIAN_HAS_CHERT_BACKEND
457 compact_chert(compactor, destdir.c_str(), sources, offset, block_size,
458 compaction, multipass, last_docid);
460 // Create the version file ("iamchert").
462 // This file contains a UUID, and we want the copy to have a fresh
463 // UUID since its revision counter is reset to 1.
464 ChertVersion(destdir).create();
465 #else
466 throw Xapian::FeatureUnavailableError("Chert backend disabled at build time");
467 #endif
468 } else if (backend == BRASS) {
469 #ifdef XAPIAN_HAS_BRASS_BACKEND
470 compact_brass(compactor, destdir.c_str(), sources, offset, block_size,
471 compaction, multipass, last_docid);
472 // Create the version file ("iambrass").
474 // This file contains a UUID, and we want the copy to have a fresh
475 // UUID since its revision counter is reset to 1.
476 BrassVersion(destdir).create();
477 #else
478 throw Xapian::FeatureUnavailableError("Brass backend disabled at build time");
479 #endif
480 } else {
481 #ifdef XAPIAN_HAS_FLINT_BACKEND
482 compact_flint(compactor, destdir.c_str(), sources, offset, block_size,
483 compaction, multipass, last_docid);
484 // Create the version file ("iamflint").
486 // This file contains a UUID, and we want the copy to have a fresh
487 // UUID since its revision counter is reset to 1.
488 FlintVersion(destdir).create();
489 #else
490 throw Xapian::FeatureUnavailableError("Flint backend disabled at build time");
491 #endif
494 if (compact_to_stub) {
495 string new_stub_file = destdir;
496 new_stub_file += "/new_stub.tmp";
498 ofstream new_stub(new_stub_file.c_str());
499 #ifndef __WIN32__
500 size_t slash = destdir.find_last_of('/');
501 #else
502 size_t slash = destdir.find_last_of("/\\");
503 #endif
504 new_stub << "auto " << destdir.substr(slash + 1) << '\n';
506 #ifndef __WIN32__
507 if (rename(new_stub_file.c_str(), stub_file.c_str()) < 0) {
508 #else
509 if (msvc_posix_rename(new_stub_file.c_str(), stub_file.c_str()) < 0) {
510 #endif
511 // FIXME: try to clean up?
512 string msg = "Cannot rename '";
513 msg += new_stub_file;
514 msg += "' to '";
515 msg += stub_file;
516 msg += '\'';
517 throw Xapian::DatabaseError(msg, errno);