1 /** @file remote-database.cc
2 * @brief Remote backend database class
4 /* Copyright (C) 2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2017 Olly Betts
5 * Copyright (C) 2007,2009,2010 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation; either version 2 of the
10 * License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include "remote-database.h"
28 #include "api/msetinternal.h"
29 #include "api/smallvector.h"
30 #include "backends/inmemory/inmemory_positionlist.h"
31 #include "net_postlist.h"
32 #include "net_termlist.h"
33 #include "remote-document.h"
36 #include "net/length.h"
37 #include "net/serialise.h"
38 #include "net/serialise-error.h"
39 #include "serialise-double.h"
41 #include "stringutils.h" // For STRINGIZE().
42 #include "weight/weightinternal.h"
49 #include "xapian/constants.h"
50 #include "xapian/error.h"
51 #include "xapian/matchspy.h"
54 using Xapian::Internal::intrusive_ptr
;
58 throw_handshake_failed(const string
& context
)
60 throw Xapian::NetworkError("Handshake failed - is this a Xapian server?",
66 throw_connection_closed_unexpectedly()
68 throw Xapian::NetworkError("Connection closed unexpectedly");
71 RemoteDatabase::RemoteDatabase(int fd
, double timeout_
,
72 const string
& context_
, bool writable
,
74 : Xapian::Database::Internal(writable
?
76 TRANSACTION_READONLY
),
77 link(fd
, fd
, context_
),
81 mru_slot(Xapian::BAD_VALUENO
),
85 // It's simplest to just ignore SIGPIPE. We'll still know if the
86 // connection dies because we'll get EPIPE back from write().
87 if (signal(SIGPIPE
, SIG_IGN
) == SIG_ERR
) {
88 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno
);
92 update_stats(MSG_MAX
);
95 if (flags
& Xapian::DB_RETRY_LOCK
) {
96 const string
& body
= encode_length(flags
& Xapian::DB_RETRY_LOCK
);
97 update_stats(MSG_WRITEACCESS
, body
);
99 update_stats(MSG_WRITEACCESS
);
105 RemoteDatabase::positionlist_count(Xapian::docid did
,
106 const std::string
& term
) const
108 if (cached_stats_valid
&& !has_positional_info
)
111 send_message(MSG_POSITIONLISTCOUNT
, encode_length(did
) + term
);
114 get_message(message
, REPLY_POSITIONLISTCOUNT
);
115 const char * p
= message
.data();
116 const char * p_end
= p
+ message
.size();
117 Xapian::termcount count
;
118 decode_length(&p
, p_end
, count
);
120 throw Xapian::NetworkError("Bad REPLY_POSITIONLISTCOUNT message received", context
);
126 RemoteDatabase::keep_alive()
128 send_message(MSG_KEEPALIVE
, string());
130 get_message(message
, REPLY_DONE
);
134 RemoteDatabase::open_metadata_keylist(const std::string
&prefix
) const
136 // Ensure that total_length and doccount are up-to-date.
137 if (!cached_stats_valid
) update_stats();
139 send_message(MSG_METADATAKEYLIST
, prefix
);
142 unique_ptr
<NetworkTermList
> tlist(
143 new NetworkTermList(0, doccount
,
144 intrusive_ptr
<const RemoteDatabase
>(this),
146 vector
<NetworkTermListItem
> & items
= tlist
->items
;
148 string term
= prefix
;
149 while (get_message_or_done(message
, REPLY_METADATAKEYLIST
)) {
150 NetworkTermListItem item
;
151 term
.resize(size_t(static_cast<unsigned char>(message
[0])));
152 term
.append(message
, 1, string::npos
);
154 items
.push_back(item
);
157 tlist
->current_position
= tlist
->items
.begin();
158 return tlist
.release();
162 RemoteDatabase::open_term_list(Xapian::docid did
) const
166 // Ensure that total_length and doccount are up-to-date.
167 if (!cached_stats_valid
) update_stats();
169 send_message(MSG_TERMLIST
, encode_length(did
));
172 get_message(message
, REPLY_DOCLENGTH
);
173 const char * p
= message
.c_str();
174 const char * p_end
= p
+ message
.size();
175 Xapian::termcount doclen
;
176 decode_length(&p
, p_end
, doclen
);
178 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context
);
181 unique_ptr
<NetworkTermList
> tlist(
182 new NetworkTermList(doclen
, doccount
,
183 intrusive_ptr
<const RemoteDatabase
>(this),
185 vector
<NetworkTermListItem
> & items
= tlist
->items
;
188 while (get_message_or_done(message
, REPLY_TERMLIST
)) {
189 NetworkTermListItem item
;
191 p_end
= p
+ message
.size();
192 decode_length(&p
, p_end
, item
.wdf
);
193 decode_length(&p
, p_end
, item
.termfreq
);
194 term
.resize(size_t(static_cast<unsigned char>(*p
++)));
195 term
.append(p
, p_end
);
197 items
.push_back(item
);
200 tlist
->current_position
= tlist
->items
.begin();
201 return tlist
.release();
205 RemoteDatabase::open_term_list_direct(Xapian::docid did
) const
207 return RemoteDatabase::open_term_list(did
);
211 RemoteDatabase::open_allterms(const string
& prefix
) const {
212 // Ensure that total_length and doccount are up-to-date.
213 if (!cached_stats_valid
) update_stats();
215 send_message(MSG_ALLTERMS
, prefix
);
217 unique_ptr
<NetworkTermList
> tlist(
218 new NetworkTermList(0, doccount
,
219 intrusive_ptr
<const RemoteDatabase
>(this),
221 vector
<NetworkTermListItem
> & items
= tlist
->items
;
223 string term
= prefix
;
225 while (get_message_or_done(message
, REPLY_ALLTERMS
)) {
226 NetworkTermListItem item
;
227 const char * p
= message
.data();
228 const char * p_end
= p
+ message
.size();
229 decode_length(&p
, p_end
, item
.termfreq
);
230 term
.resize(size_t(static_cast<unsigned char>(*p
++)));
231 term
.append(p
, p_end
);
233 items
.push_back(item
);
236 tlist
->current_position
= tlist
->items
.begin();
237 return tlist
.release();
241 RemoteDatabase::open_post_list(const string
& term
) const
243 return RemoteDatabase::open_leaf_post_list(term
, false);
247 RemoteDatabase::open_leaf_post_list(const string
& term
, bool) const
249 return new NetworkPostList(intrusive_ptr
<const RemoteDatabase
>(this), term
);
253 RemoteDatabase::read_post_list(const string
&term
, NetworkPostList
& pl
) const
255 send_message(MSG_POSTLIST
, term
);
258 get_message(message
, REPLY_POSTLISTSTART
);
260 const char * p
= message
.data();
261 const char * p_end
= p
+ message
.size();
262 Xapian::doccount termfreq
;
263 decode_length(&p
, p_end
, termfreq
);
265 while (get_message_or_done(message
, REPLY_POSTLISTITEM
)) {
266 pl
.append_posting(message
);
273 RemoteDatabase::open_position_list(Xapian::docid did
, const string
&term
) const
275 send_message(MSG_POSITIONLIST
, encode_length(did
) + term
);
277 Xapian::VecCOW
<Xapian::termpos
> positions
;
280 Xapian::termpos lastpos
= static_cast<Xapian::termpos
>(-1);
281 while (get_message_or_done(message
, REPLY_POSITIONLIST
)) {
282 const char * p
= message
.data();
283 const char * p_end
= p
+ message
.size();
285 decode_length(&p
, p_end
, inc
);
287 positions
.push_back(lastpos
);
290 return new InMemoryPositionList(std::move(positions
));
294 RemoteDatabase::has_positions() const
296 if (!cached_stats_valid
) update_stats();
297 return has_positional_info
;
301 RemoteDatabase::reopen()
303 mru_slot
= Xapian::BAD_VALUENO
;
304 return update_stats(MSG_REOPEN
);
308 RemoteDatabase::close()
313 // Currently lazy is used:
315 // * To implement API flag Xapian::DOC_ASSUME_VALID which can be specified when
316 // calling method Database::get_document()
318 // * To read values for backends without streamed values in SlowValueList
320 // * If you call get_data(), values_begin() or values_count() on a Document
321 // object passed to a KeyMaker, MatchDecider, MatchSpy during the match
323 // The first is relevant to the remote backend, but doesn't happen during
326 // SlowValueList is used with the remote backend, but not to read values
329 // KeyMaker and MatchSpy happens on the server with the remote backend, so
330 // they aren't relevant here.
332 // So the cases which are relevant to the remote backend don't matter during
333 // the match, and so we can ignore the lazy flag here without affecting matcher
335 Xapian::Document::Internal
*
336 RemoteDatabase::open_document(Xapian::docid did
, bool /*lazy*/) const
340 send_message(MSG_DOCUMENT
, encode_length(did
));
342 map
<Xapian::valueno
, string
> values
;
343 get_message(doc_data
, REPLY_DOCDATA
);
346 while (get_message_or_done(message
, REPLY_VALUE
)) {
347 const char * p
= message
.data();
348 const char * p_end
= p
+ message
.size();
349 Xapian::valueno slot
;
350 decode_length(&p
, p_end
, slot
);
351 values
.insert(make_pair(slot
, string(p
, p_end
)));
354 return new RemoteDocument(this, did
, doc_data
, std::move(values
));
358 RemoteDatabase::update_stats(message_type msg_code
, const string
& body
) const
360 // MSG_MAX signals that we're handling the opening greeting, which isn't in
361 // response to an explicit message.
362 if (msg_code
!= MSG_MAX
)
363 send_message(msg_code
, body
);
366 if (!get_message_or_done(message
, REPLY_UPDATE
)) {
367 // The database was already open at the latest revision.
371 if (message
.size() < 3) {
372 throw_handshake_failed(context
);
374 const char *p
= message
.c_str();
375 const char *p_end
= p
+ message
.size();
377 // The protocol major versions must match. The protocol minor version of
378 // the server must be >= that of the client.
379 int protocol_major
= static_cast<unsigned char>(*p
++);
380 int protocol_minor
= static_cast<unsigned char>(*p
++);
381 if (protocol_major
!= XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION
||
382 protocol_minor
< XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION
) {
383 string
errmsg("Server supports protocol version");
384 if (protocol_minor
) {
386 errmsg
+= str(protocol_major
);
389 errmsg
+= str(protocol_major
);
391 errmsg
+= str(protocol_minor
);
393 " - client is using "
394 STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION
)
396 STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION
);
397 throw Xapian::NetworkError(errmsg
, context
);
400 decode_length(&p
, p_end
, doccount
);
401 decode_length(&p
, p_end
, lastdocid
);
402 lastdocid
+= doccount
;
403 decode_length(&p
, p_end
, doclen_lbound
);
404 decode_length(&p
, p_end
, doclen_ubound
);
405 doclen_ubound
+= doclen_lbound
;
407 throw Xapian::NetworkError("Bad stats update message received", context
);
409 has_positional_info
= (*p
++ == '1');
410 decode_length(&p
, p_end
, total_length
);
411 uuid
.assign(p
, p_end
);
412 cached_stats_valid
= true;
417 RemoteDatabase::get_doccount() const
419 if (!cached_stats_valid
) update_stats();
424 RemoteDatabase::get_lastdocid() const
426 if (!cached_stats_valid
) update_stats();
431 RemoteDatabase::get_total_length() const
433 if (!cached_stats_valid
) update_stats();
438 RemoteDatabase::term_exists(const string
& tname
) const
441 return get_doccount() != 0;
443 send_message(MSG_TERMEXISTS
, tname
);
445 reply_type type
= get_message(message
,
447 REPLY_TERMDOESNTEXIST
);
448 return (type
== REPLY_TERMEXISTS
);
452 RemoteDatabase::get_freqs(const string
& term
,
453 Xapian::doccount
* termfreq_ptr
,
454 Xapian::termcount
* collfreq_ptr
) const
456 Assert(!term
.empty());
462 send_message(MSG_FREQS
, term
);
463 get_message(message
, REPLY_FREQS
);
465 send_message(MSG_TERMFREQ
, term
);
466 get_message(message
, REPLY_TERMFREQ
);
469 p_end
= p
+ message
.size();
470 decode_length(&p
, p_end
, *termfreq_ptr
);
471 } else if (collfreq_ptr
) {
472 send_message(MSG_COLLFREQ
, term
);
473 get_message(message
, REPLY_COLLFREQ
);
475 p_end
= p
+ message
.size();
478 decode_length(&p
, p_end
, *collfreq_ptr
);
483 RemoteDatabase::read_value_stats(Xapian::valueno slot
) const
485 if (mru_slot
!= slot
) {
486 send_message(MSG_VALUESTATS
, encode_length(slot
));
488 get_message(message
, REPLY_VALUESTATS
);
489 const char * p
= message
.data();
490 const char * p_end
= p
+ message
.size();
492 decode_length(&p
, p_end
, mru_valstats
.freq
);
494 decode_length_and_check(&p
, p_end
, len
);
495 mru_valstats
.lower_bound
.assign(p
, len
);
497 decode_length_and_check(&p
, p_end
, len
);
498 mru_valstats
.upper_bound
.assign(p
, len
);
501 throw Xapian::NetworkError("Bad REPLY_VALUESTATS message received", context
);
507 RemoteDatabase::get_value_freq(Xapian::valueno slot
) const
509 read_value_stats(slot
);
510 return mru_valstats
.freq
;
514 RemoteDatabase::get_value_lower_bound(Xapian::valueno slot
) const
516 read_value_stats(slot
);
517 return mru_valstats
.lower_bound
;
521 RemoteDatabase::get_value_upper_bound(Xapian::valueno slot
) const
523 read_value_stats(slot
);
524 return mru_valstats
.upper_bound
;
528 RemoteDatabase::get_doclength_lower_bound() const
530 return doclen_lbound
;
534 RemoteDatabase::get_doclength_upper_bound() const
536 return doclen_ubound
;
540 RemoteDatabase::get_wdf_upper_bound(const string
&) const
542 // The default implementation returns get_collection_freq(), but we
543 // don't want the overhead of a remote message and reply per query
544 // term, and we can get called in the middle of a remote exchange
545 // too. FIXME: handle this bound in the stats local/remote code...
546 return doclen_ubound
;
550 RemoteDatabase::get_doclength(Xapian::docid did
) const
553 send_message(MSG_DOCLENGTH
, encode_length(did
));
555 get_message(message
, REPLY_DOCLENGTH
);
556 const char * p
= message
.c_str();
557 const char * p_end
= p
+ message
.size();
558 Xapian::termcount doclen
;
559 decode_length(&p
, p_end
, doclen
);
561 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context
);
567 RemoteDatabase::get_unique_terms(Xapian::docid did
) const
570 send_message(MSG_UNIQUETERMS
, encode_length(did
));
572 get_message(message
, REPLY_UNIQUETERMS
);
573 const char * p
= message
.c_str();
574 const char * p_end
= p
+ message
.size();
575 Xapian::termcount doclen
;
576 decode_length(&p
, p_end
, doclen
);
578 throw Xapian::NetworkError("Bad REPLY_UNIQUETERMS message received", context
);
584 RemoteDatabase::get_message(string
&result
,
585 reply_type required_type
,
586 reply_type required_type2
) const
588 double end_time
= RealTime::end_time(timeout
);
589 int type
= link
.get_message(result
, end_time
);
591 throw_connection_closed_unexpectedly();
592 if (rare(type
) >= REPLY_MAX
) {
593 if (required_type
== REPLY_UPDATE
)
594 throw_handshake_failed(context
);
595 string
errmsg("Invalid reply type ");
597 throw Xapian::NetworkError(errmsg
);
599 if (type
== REPLY_EXCEPTION
) {
600 unserialise_error(result
, "REMOTE:", context
);
602 if (type
!= required_type
&& type
!= required_type2
) {
603 string
errmsg("Expecting reply type ");
604 errmsg
+= str(int(required_type
));
605 if (required_type2
!= required_type
) {
607 errmsg
+= str(int(required_type2
));
611 throw Xapian::NetworkError(errmsg
);
614 return static_cast<reply_type
>(type
);
618 RemoteDatabase::send_message(message_type type
, const string
&message
) const
620 double end_time
= RealTime::end_time(timeout
);
621 link
.send_message(static_cast<unsigned char>(type
), message
, end_time
);
625 RemoteDatabase::do_close()
627 bool writable
= !is_read_only();
629 // The dtor hasn't really been called! FIXME: This works, but means any
630 // exceptions from end_transaction()/commit() are swallowed, which is
631 // not entirely desirable.
634 // If we're writable, wait for a confirmation of the close, so we know that
635 // changes have been written and flushed, and the database write lock
636 // released. For the non-writable case, there's no need to wait, so don't
637 // slow down searching by waiting here.
638 link
.do_close(writable
);
642 RemoteDatabase::set_query(const Xapian::Query
& query
,
643 Xapian::termcount qlen
,
644 Xapian::valueno collapse_key
,
645 Xapian::doccount collapse_max
,
646 Xapian::Enquire::docid_order order
,
647 Xapian::valueno sort_key
,
648 Xapian::Enquire::Internal::sort_setting sort_by
,
649 bool sort_value_forward
,
651 int percent_threshold
, double weight_threshold
,
652 const Xapian::Weight
& wtscheme
,
653 const Xapian::RSet
&omrset
,
654 const vector
<opt_ptr_spy
>& matchspies
) const
656 string tmp
= query
.serialise();
657 string message
= encode_length(tmp
.size());
660 // Serialise assorted Enquire settings.
661 message
+= encode_length(qlen
);
662 message
+= encode_length(collapse_max
);
663 if (collapse_max
) message
+= encode_length(collapse_key
);
664 message
+= char('0' + order
);
665 message
+= encode_length(sort_key
);
666 message
+= char('0' + sort_by
);
667 message
+= char('0' + sort_value_forward
);
668 message
+= serialise_double(time_limit
);
669 message
+= char(percent_threshold
);
670 message
+= serialise_double(weight_threshold
);
672 tmp
= wtscheme
.name();
673 message
+= encode_length(tmp
.size());
676 tmp
= wtscheme
.serialise();
677 message
+= encode_length(tmp
.size());
680 tmp
= serialise_rset(omrset
);
681 message
+= encode_length(tmp
.size());
684 for (auto i
: matchspies
) {
687 throw Xapian::UnimplementedError("MatchSpy subclass not suitable for use with remote searches - name() method returned empty string");
689 message
+= encode_length(tmp
.size());
692 tmp
= i
->serialise();
693 message
+= encode_length(tmp
.size());
697 send_message(MSG_QUERY
, message
);
701 RemoteDatabase::get_remote_stats(bool block
,
702 Xapian::Weight::Internal
& out
) const
704 if (!block
&& !link
.ready_to_read()) return false;
707 get_message(message
, REPLY_STATS
);
708 unserialise_stats(message
, out
);
714 RemoteDatabase::send_global_stats(Xapian::doccount first
,
715 Xapian::doccount maxitems
,
716 Xapian::doccount check_at_least
,
717 const Xapian::Weight::Internal
&stats
) const
719 string message
= encode_length(first
);
720 message
+= encode_length(maxitems
);
721 message
+= encode_length(check_at_least
);
722 message
+= serialise_stats(stats
);
723 send_message(MSG_GETMSET
, message
);
727 RemoteDatabase::get_mset(const vector
<opt_ptr_spy
>& matchspies
) const
730 get_message(message
, REPLY_RESULTS
);
731 const char * p
= message
.data();
732 const char * p_end
= p
+ message
.size();
734 for (auto i
: matchspies
) {
736 throw Xapian::NetworkError("Expected serialised matchspy");
738 decode_length_and_check(&p
, p_end
, len
);
739 string
spyresults(p
, len
);
741 i
->merge_results(spyresults
);
744 mset
.internal
->unserialise(p
, p_end
);
749 RemoteDatabase::commit()
751 send_message(MSG_COMMIT
, string());
753 // We need to wait for a response to ensure documents have been committed.
755 get_message(message
, REPLY_DONE
);
759 RemoteDatabase::cancel()
761 cached_stats_valid
= false;
762 mru_slot
= Xapian::BAD_VALUENO
;
764 send_message(MSG_CANCEL
, string());
768 RemoteDatabase::add_document(const Xapian::Document
& doc
)
770 cached_stats_valid
= false;
771 mru_slot
= Xapian::BAD_VALUENO
;
773 send_message(MSG_ADDDOCUMENT
, serialise_document(doc
));
776 get_message(message
, REPLY_ADDDOCUMENT
);
778 const char * p
= message
.data();
779 const char * p_end
= p
+ message
.size();
781 decode_length(&p
, p_end
, did
);
786 RemoteDatabase::delete_document(Xapian::docid did
)
788 cached_stats_valid
= false;
789 mru_slot
= Xapian::BAD_VALUENO
;
791 send_message(MSG_DELETEDOCUMENT
, encode_length(did
));
793 get_message(dummy
, REPLY_DONE
);
797 RemoteDatabase::delete_document(const std::string
& unique_term
)
799 cached_stats_valid
= false;
800 mru_slot
= Xapian::BAD_VALUENO
;
802 send_message(MSG_DELETEDOCUMENTTERM
, unique_term
);
806 RemoteDatabase::replace_document(Xapian::docid did
,
807 const Xapian::Document
& doc
)
809 cached_stats_valid
= false;
810 mru_slot
= Xapian::BAD_VALUENO
;
812 string message
= encode_length(did
);
813 message
+= serialise_document(doc
);
815 send_message(MSG_REPLACEDOCUMENT
, message
);
819 RemoteDatabase::replace_document(const std::string
& unique_term
,
820 const Xapian::Document
& doc
)
822 cached_stats_valid
= false;
823 mru_slot
= Xapian::BAD_VALUENO
;
825 string message
= encode_length(unique_term
.size());
826 message
+= unique_term
;
827 message
+= serialise_document(doc
);
829 send_message(MSG_REPLACEDOCUMENTTERM
, message
);
831 get_message(message
, REPLY_ADDDOCUMENT
);
833 const char * p
= message
.data();
834 const char * p_end
= p
+ message
.size();
836 decode_length(&p
, p_end
, did
);
841 RemoteDatabase::get_uuid() const
847 RemoteDatabase::get_metadata(const string
& key
) const
849 send_message(MSG_GETMETADATA
, key
);
851 get_message(metadata
, REPLY_METADATA
);
856 RemoteDatabase::set_metadata(const string
& key
, const string
& value
)
858 string data
= encode_length(key
.size());
861 send_message(MSG_SETMETADATA
, data
);
865 RemoteDatabase::add_spelling(const string
& word
,
866 Xapian::termcount freqinc
) const
868 string data
= encode_length(freqinc
);
870 send_message(MSG_ADDSPELLING
, data
);
874 RemoteDatabase::remove_spelling(const string
& word
,
875 Xapian::termcount freqdec
) const
877 string data
= encode_length(freqdec
);
879 send_message(MSG_REMOVESPELLING
, data
);
882 get_message(message
, REPLY_REMOVESPELLING
);
883 const char * p
= message
.data();
884 const char * p_end
= p
+ message
.size();
885 Xapian::termcount result
;
886 decode_length(&p
, p_end
, result
);
888 throw Xapian::NetworkError("Bad REPLY_REMOVESPELLING message received", context
);
894 RemoteDatabase::locked() const
896 throw Xapian::UnimplementedError("Database::locked() not implemented for remote backend");
900 RemoteDatabase::get_description() const
902 string desc
= "Remote(context=";