1 /** @file remoteserver.cc
2 * @brief Xapian remote backend server base class
4 /* Copyright (C) 2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017 Olly Betts
5 * Copyright (C) 2006,2007,2009,2010 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "remoteserver.h"
25 #include "xapian/constants.h"
26 #include "xapian/database.h"
27 #include "xapian/enquire.h"
28 #include "xapian/error.h"
29 #include "xapian/matchspy.h"
30 #include "xapian/query.h"
31 #include "xapian/rset.h"
32 #include "xapian/valueiterator.h"
34 #include "safeerrno.h"
39 #include "api/msetinternal.h"
41 #include "matcher/matcher.h"
44 #include "serialise.h"
45 #include "serialise-double.h"
46 #include "serialise-error.h"
48 #include "stringutils.h"
49 #include "weight/weightinternal.h"
57 throw Xapian::InvalidOperationError("Server is read-only");
60 /// Class to throw when we receive the connection closing message.
61 struct ConnectionClosed
{ };
63 RemoteServer::RemoteServer(const vector
<string
>& dbpaths
,
64 int fdin_
, int fdout_
,
65 double active_timeout_
, double idle_timeout_
,
67 : RemoteConnection(fdin_
, fdout_
, string()),
68 db(NULL
), wdb(NULL
), writable(writable_
),
69 active_timeout(active_timeout_
), idle_timeout(idle_timeout_
)
71 // Catch errors opening the database and propagate them to the client.
73 Assert(!dbpaths
.empty());
74 // We always open the database read-only to start with. If we're
75 // writable, the client can ask to be upgraded to write access once
76 // connected if it wants it.
77 db
= new Xapian::Database(dbpaths
[0]);
78 // Build a better description than Database::get_description() gives
79 // in the variable context. FIXME: improve Database::get_description()
80 // and then just use that instead.
84 vector
<string
>::const_iterator
i(dbpaths
.begin());
85 for (++i
; i
!= dbpaths
.end(); ++i
) {
86 db
->add_database(Xapian::Database(*i
));
91 AssertEq(dbpaths
.size(), 1); // Expecting exactly one database.
93 } catch (const Xapian::Error
&err
) {
94 // Propagate the exception to the client.
95 send_message(REPLY_EXCEPTION
, serialise_error(err
));
96 // And rethrow it so our caller can log it and close the connection.
101 // It's simplest to just ignore SIGPIPE. We'll still know if the
102 // connection dies because we'll get EPIPE back from write().
103 if (signal(SIGPIPE
, SIG_IGN
) == SIG_ERR
)
104 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno
);
107 // Send greeting message.
108 msg_update(string());
111 RemoteServer::~RemoteServer()
114 // wdb is either NULL or equal to db, so we shouldn't delete it too!
118 RemoteServer::get_message(double timeout
, string
& result
,
119 message_type required_type
)
121 double end_time
= RealTime::end_time(timeout
);
122 int type
= RemoteConnection::get_message(result
, end_time
);
124 // Handle "shutdown connection" message here. Treat EOF here for a read-only
125 // database the same way since a read-only client just closes the
126 // connection when done.
127 if (type
== MSG_SHUTDOWN
|| (type
< 0 && wdb
== NULL
))
128 throw ConnectionClosed();
130 throw Xapian::NetworkError("Connection closed unexpectedly");
131 if (type
>= MSG_MAX
) {
132 string
errmsg("Invalid message type ");
134 throw Xapian::NetworkError(errmsg
);
136 if (required_type
!= MSG_MAX
&& type
!= int(required_type
)) {
137 string
errmsg("Expecting message type ");
138 errmsg
+= str(int(required_type
));
141 throw Xapian::NetworkError(errmsg
);
143 return static_cast<message_type
>(type
);
147 RemoteServer::send_message(reply_type type
, const string
&message
)
149 double end_time
= RealTime::end_time(active_timeout
);
150 unsigned char type_as_char
= static_cast<unsigned char>(type
);
151 RemoteConnection::send_message(type_as_char
, message
, end_time
);
154 typedef void (RemoteServer::* dispatch_func
)(const string
&);
162 size_t type
= get_message(idle_timeout
, message
);
165 msg_allterms(message
);
168 msg_collfreq(message
);
171 msg_document(message
);
174 msg_termexists(message
);
177 msg_termfreq(message
);
180 msg_valuestats(message
);
183 msg_keepalive(message
);
186 msg_doclength(message
);
192 msg_termlist(message
);
194 case MSG_POSITIONLIST
:
195 msg_positionlist(message
);
198 msg_postlist(message
);
206 case MSG_ADDDOCUMENT
:
207 msg_adddocument(message
);
212 case MSG_DELETEDOCUMENTTERM
:
213 msg_deletedocumentterm(message
);
218 case MSG_REPLACEDOCUMENT
:
219 msg_replacedocument(message
);
221 case MSG_REPLACEDOCUMENTTERM
:
222 msg_replacedocumentterm(message
);
224 case MSG_DELETEDOCUMENT
:
225 msg_deletedocument(message
);
227 case MSG_WRITEACCESS
:
228 msg_writeaccess(message
);
230 case MSG_GETMETADATA
:
231 msg_getmetadata(message
);
233 case MSG_SETMETADATA
:
234 msg_setmetadata(message
);
236 case MSG_ADDSPELLING
:
237 msg_addspelling(message
);
239 case MSG_REMOVESPELLING
:
240 msg_removespelling(message
);
242 case MSG_METADATAKEYLIST
:
243 msg_metadatakeylist(message
);
248 case MSG_UNIQUETERMS
:
249 msg_uniqueterms(message
);
251 case MSG_POSITIONLISTCOUNT
:
252 msg_positionlistcount(message
);
255 // MSG_GETMSET - used during a conversation.
256 // MSG_SHUTDOWN - handled by get_message().
257 string
errmsg("Unexpected message type ");
259 throw Xapian::InvalidArgumentError(errmsg
);
262 } catch (const Xapian::NetworkTimeoutError
& e
) {
264 // We've had a timeout, so the client may not be listening, so
265 // set the end_time to 1 and if we can't send the message right
266 // away, just exit and the client will cope.
267 send_message(REPLY_EXCEPTION
, serialise_error(e
), 1.0);
270 // And rethrow it so our caller can log it and close the
273 } catch (const Xapian::NetworkError
&) {
274 // All other network errors mean we are fatally confused and are
275 // unlikely to be able to communicate further across this
276 // connection. So we don't try to propagate the error to the
277 // client, but instead just rethrow the exception so our caller can
278 // log it and close the connection.
280 } catch (const Xapian::Error
&e
) {
281 // Propagate the exception to the client, then return to the main
282 // message handling loop.
283 send_message(REPLY_EXCEPTION
, serialise_error(e
));
284 } catch (ConnectionClosed
&) {
287 // Propagate an unknown exception to the client.
288 send_message(REPLY_EXCEPTION
, string());
289 // And rethrow it so our caller can log it and close the
297 RemoteServer::msg_allterms(const string
&message
)
299 string prev
= message
;
302 const string
& prefix
= message
;
303 const Xapian::TermIterator end
= db
->allterms_end(prefix
);
304 for (Xapian::TermIterator t
= db
->allterms_begin(prefix
); t
!= end
; ++t
) {
305 if (rare(prev
.size() > 255))
307 const string
& v
= *t
;
308 size_t reuse
= common_prefix_length(prev
, v
);
309 reply
= encode_length(t
.get_termfreq());
310 reply
.append(1, char(reuse
));
311 reply
.append(v
, reuse
, string::npos
);
312 send_message(REPLY_ALLTERMS
, reply
);
316 send_message(REPLY_DONE
, string());
320 RemoteServer::msg_termlist(const string
&message
)
322 const char *p
= message
.data();
323 const char *p_end
= p
+ message
.size();
325 decode_length(&p
, p_end
, did
);
327 send_message(REPLY_DOCLENGTH
, encode_length(db
->get_doclength(did
)));
329 const Xapian::TermIterator end
= db
->termlist_end(did
);
330 for (Xapian::TermIterator t
= db
->termlist_begin(did
); t
!= end
; ++t
) {
331 if (rare(prev
.size() > 255))
333 const string
& v
= *t
;
334 size_t reuse
= common_prefix_length(prev
, v
);
335 string reply
= encode_length(t
.get_wdf());
336 reply
+= encode_length(t
.get_termfreq());
337 reply
.append(1, char(reuse
));
338 reply
.append(v
, reuse
, string::npos
);
339 send_message(REPLY_TERMLIST
, reply
);
343 send_message(REPLY_DONE
, string());
347 RemoteServer::msg_positionlist(const string
&message
)
349 const char *p
= message
.data();
350 const char *p_end
= p
+ message
.size();
352 decode_length(&p
, p_end
, did
);
353 string
term(p
, p_end
- p
);
355 Xapian::termpos lastpos
= static_cast<Xapian::termpos
>(-1);
356 const Xapian::PositionIterator end
= db
->positionlist_end(did
, term
);
357 for (Xapian::PositionIterator i
= db
->positionlist_begin(did
, term
);
359 Xapian::termpos pos
= *i
;
360 send_message(REPLY_POSITIONLIST
, encode_length(pos
- lastpos
- 1));
364 send_message(REPLY_DONE
, string());
368 RemoteServer::msg_positionlistcount(const string
&message
)
370 const char *p
= message
.data();
371 const char *p_end
= p
+ message
.size();
373 decode_length(&p
, p_end
, did
);
375 // This is kind of clumsy, but what the public API requires.
376 Xapian::termcount result
= 0;
377 Xapian::TermIterator termit
= db
->termlist_begin(did
);
378 if (termit
!= db
->termlist_end(did
)) {
379 string
term(p
, p_end
- p
);
380 termit
.skip_to(term
);
381 if (termit
!= db
->termlist_end(did
)) {
382 result
= termit
.positionlist_count();
385 send_message(REPLY_POSITIONLISTCOUNT
, encode_length(result
));
389 RemoteServer::msg_postlist(const string
&message
)
391 const string
& term
= message
;
393 Xapian::doccount termfreq
= db
->get_termfreq(term
);
394 Xapian::termcount collfreq
= db
->get_collection_freq(term
);
395 send_message(REPLY_POSTLISTSTART
, encode_length(termfreq
) + encode_length(collfreq
));
397 Xapian::docid lastdocid
= 0;
398 const Xapian::PostingIterator end
= db
->postlist_end(term
);
399 for (Xapian::PostingIterator i
= db
->postlist_begin(term
);
402 Xapian::docid newdocid
= *i
;
403 string reply
= encode_length(newdocid
- lastdocid
- 1);
404 reply
+= encode_length(i
.get_wdf());
406 send_message(REPLY_POSTLISTITEM
, reply
);
407 lastdocid
= newdocid
;
410 send_message(REPLY_DONE
, string());
414 RemoteServer::msg_writeaccess(const string
& msg
)
419 int flags
= Xapian::DB_OPEN
;
420 const char *p
= msg
.c_str();
421 const char *p_end
= p
+ msg
.size();
424 decode_length(&p
, p_end
, flag_bits
);
425 flags
|= flag_bits
&~ Xapian::DB_ACTION_MASK_
;
427 throw Xapian::NetworkError("Junk at end of MSG_WRITEACCESS");
431 wdb
= new Xapian::WritableDatabase(context
, flags
);
438 RemoteServer::msg_reopen(const string
& msg
)
441 send_message(REPLY_DONE
, string());
448 RemoteServer::msg_update(const string
&)
450 static const char protocol
[2] = {
451 char(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION
),
452 char(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION
)
454 string
message(protocol
, 2);
455 Xapian::doccount num_docs
= db
->get_doccount();
456 message
+= encode_length(num_docs
);
457 message
+= encode_length(db
->get_lastdocid() - num_docs
);
458 Xapian::termcount doclen_lb
= db
->get_doclength_lower_bound();
459 message
+= encode_length(doclen_lb
);
460 message
+= encode_length(db
->get_doclength_upper_bound() - doclen_lb
);
461 message
+= (db
->has_positions() ? '1' : '0');
462 message
+= encode_length(db
->get_total_length());
463 string uuid
= db
->get_uuid();
465 send_message(REPLY_UPDATE
, message
);
469 RemoteServer::msg_query(const string
&message_in
)
471 const char *p
= message_in
.c_str();
472 const char *p_end
= p
+ message_in
.size();
474 // Unserialise the Query.
476 decode_length_and_check(&p
, p_end
, len
);
477 Xapian::Query
query(Xapian::Query::unserialise(string(p
, len
), reg
));
480 // Unserialise assorted Enquire settings.
481 Xapian::termcount qlen
;
482 decode_length(&p
, p_end
, qlen
);
484 Xapian::valueno collapse_max
;
485 decode_length(&p
, p_end
, collapse_max
);
487 Xapian::valueno collapse_key
= Xapian::BAD_VALUENO
;
489 decode_length(&p
, p_end
, collapse_key
);
491 if (p_end
- p
< 4 || *p
< '0' || *p
> '2') {
492 throw Xapian::NetworkError("bad message (docid_order)");
494 Xapian::Enquire::docid_order order
;
495 order
= static_cast<Xapian::Enquire::docid_order
>(*p
++ - '0');
497 Xapian::valueno sort_key
;
498 decode_length(&p
, p_end
, sort_key
);
500 if (*p
< '0' || *p
> '3') {
501 throw Xapian::NetworkError("bad message (sort_by)");
503 Xapian::Enquire::Internal::sort_setting sort_by
;
504 sort_by
= static_cast<Xapian::Enquire::Internal::sort_setting
>(*p
++ - '0');
506 if (*p
< '0' || *p
> '1') {
507 throw Xapian::NetworkError("bad message (sort_value_forward)");
509 bool sort_value_forward(*p
++ != '0');
511 double time_limit
= unserialise_double(&p
, p_end
);
513 int percent_threshold
= *p
++;
514 if (percent_threshold
< 0 || percent_threshold
> 100) {
515 throw Xapian::NetworkError("bad message (percent_threshold)");
518 double weight_threshold
= unserialise_double(&p
, p_end
);
519 if (weight_threshold
< 0) {
520 throw Xapian::NetworkError("bad message (weight_threshold)");
523 // Unserialise the Weight object.
524 decode_length_and_check(&p
, p_end
, len
);
525 string
wtname(p
, len
);
528 const Xapian::Weight
* wttype
= reg
.get_weighting_scheme(wtname
);
529 if (wttype
== NULL
) {
530 // Note: user weighting schemes should be registered by adding them to
531 // a Registry, and setting the context using
532 // RemoteServer::set_registry().
533 throw Xapian::InvalidArgumentError("Weighting scheme " +
534 wtname
+ " not registered");
537 decode_length_and_check(&p
, p_end
, len
);
538 unique_ptr
<Xapian::Weight
> wt(wttype
->unserialise(string(p
, len
)));
541 // Unserialise the RSet object.
542 decode_length_and_check(&p
, p_end
, len
);
543 Xapian::RSet rset
= unserialise_rset(string(p
, len
));
546 // Unserialise any MatchSpy objects.
547 vector
<Xapian::Internal::opt_intrusive_ptr
<Xapian::MatchSpy
>> matchspies
;
549 decode_length_and_check(&p
, p_end
, len
);
550 string
spytype(p
, len
);
551 const Xapian::MatchSpy
* spyclass
= reg
.get_match_spy(spytype
);
552 if (spyclass
== NULL
) {
553 throw Xapian::InvalidArgumentError("Match spy " + spytype
+
558 decode_length_and_check(&p
, p_end
, len
);
559 matchspies
.push_back(spyclass
->unserialise(string(p
, len
), reg
)->release());
563 Xapian::Weight::Internal local_stats
;
564 Matcher
matcher(*db
, query
, qlen
, &rset
, local_stats
, *wt
,
566 collapse_key
, collapse_max
,
567 percent_threshold
, weight_threshold
,
568 order
, sort_key
, sort_by
, sort_value_forward
, time_limit
,
571 send_message(REPLY_STATS
, serialise_stats(local_stats
));
574 get_message(active_timeout
, message
, MSG_GETMSET
);
576 p_end
= p
+ message
.size();
578 Xapian::termcount first
;
579 decode_length(&p
, p_end
, first
);
580 Xapian::termcount maxitems
;
581 decode_length(&p
, p_end
, maxitems
);
583 Xapian::termcount check_at_least
;
584 decode_length(&p
, p_end
, check_at_least
);
586 message
.erase(0, message
.size() - (p_end
- p
));
587 unique_ptr
<Xapian::Weight::Internal
> total_stats(new Xapian::Weight::Internal
);
588 unserialise_stats(message
, *total_stats
);
589 total_stats
->set_bounds_from_db(*db
);
591 Xapian::MSet mset
= matcher
.get_mset(first
, maxitems
, check_at_least
,
592 *total_stats
, *wt
, 0, 0,
593 collapse_key
, collapse_max
,
594 percent_threshold
, weight_threshold
,
596 sort_key
, sort_by
, sort_value_forward
,
597 time_limit
, matchspies
);
598 // FIXME: The local side already has these stats, except for the maxpart
600 mset
.internal
->set_stats(total_stats
.release());
603 for (auto i
: matchspies
) {
604 string spy_results
= i
->serialise_results();
605 message
+= encode_length(spy_results
.size());
606 message
+= spy_results
;
608 message
+= mset
.internal
->serialise();
609 send_message(REPLY_RESULTS
, message
);
613 RemoteServer::msg_document(const string
&message
)
615 const char *p
= message
.data();
616 const char *p_end
= p
+ message
.size();
618 decode_length(&p
, p_end
, did
);
620 Xapian::Document doc
= db
->get_document(did
);
622 send_message(REPLY_DOCDATA
, doc
.get_data());
624 Xapian::ValueIterator i
;
625 for (i
= doc
.values_begin(); i
!= doc
.values_end(); ++i
) {
626 string item
= encode_length(i
.get_valueno());
628 send_message(REPLY_VALUE
, item
);
630 send_message(REPLY_DONE
, string());
634 RemoteServer::msg_keepalive(const string
&)
636 // Ensure *our* database stays alive, as it may contain remote databases!
638 send_message(REPLY_DONE
, string());
642 RemoteServer::msg_termexists(const string
&term
)
644 send_message((db
->term_exists(term
) ? REPLY_TERMEXISTS
: REPLY_TERMDOESNTEXIST
), string());
648 RemoteServer::msg_collfreq(const string
&term
)
650 send_message(REPLY_COLLFREQ
, encode_length(db
->get_collection_freq(term
)));
654 RemoteServer::msg_termfreq(const string
&term
)
656 send_message(REPLY_TERMFREQ
, encode_length(db
->get_termfreq(term
)));
660 RemoteServer::msg_freqs(const string
&term
)
662 string msg
= encode_length(db
->get_termfreq(term
));
663 msg
+= encode_length(db
->get_collection_freq(term
));
664 send_message(REPLY_FREQS
, msg
);
668 RemoteServer::msg_valuestats(const string
& message
)
670 const char *p
= message
.data();
671 const char *p_end
= p
+ message
.size();
673 Xapian::valueno slot
;
674 decode_length(&p
, p_end
, slot
);
676 message_out
+= encode_length(db
->get_value_freq(slot
));
677 string bound
= db
->get_value_lower_bound(slot
);
678 message_out
+= encode_length(bound
.size());
679 message_out
+= bound
;
680 bound
= db
->get_value_upper_bound(slot
);
681 message_out
+= encode_length(bound
.size());
682 message_out
+= bound
;
684 send_message(REPLY_VALUESTATS
, message_out
);
689 RemoteServer::msg_doclength(const string
&message
)
691 const char *p
= message
.data();
692 const char *p_end
= p
+ message
.size();
694 decode_length(&p
, p_end
, did
);
695 send_message(REPLY_DOCLENGTH
, encode_length(db
->get_doclength(did
)));
699 RemoteServer::msg_uniqueterms(const string
&message
)
701 const char *p
= message
.data();
702 const char *p_end
= p
+ message
.size();
704 decode_length(&p
, p_end
, did
);
705 send_message(REPLY_UNIQUETERMS
, encode_length(db
->get_unique_terms(did
)));
709 RemoteServer::msg_commit(const string
&)
716 send_message(REPLY_DONE
, string());
720 RemoteServer::msg_cancel(const string
&)
725 // We can't call cancel since that's an internal method, but this
726 // has the same effect with minimal additional overhead.
727 wdb
->begin_transaction(false);
728 wdb
->cancel_transaction();
732 RemoteServer::msg_adddocument(const string
& message
)
737 Xapian::docid did
= wdb
->add_document(unserialise_document(message
));
739 send_message(REPLY_ADDDOCUMENT
, encode_length(did
));
743 RemoteServer::msg_deletedocument(const string
& message
)
748 const char *p
= message
.data();
749 const char *p_end
= p
+ message
.size();
751 decode_length(&p
, p_end
, did
);
753 wdb
->delete_document(did
);
755 send_message(REPLY_DONE
, string());
759 RemoteServer::msg_deletedocumentterm(const string
& message
)
764 wdb
->delete_document(message
);
768 RemoteServer::msg_replacedocument(const string
& message
)
773 const char *p
= message
.data();
774 const char *p_end
= p
+ message
.size();
776 decode_length(&p
, p_end
, did
);
778 wdb
->replace_document(did
, unserialise_document(string(p
, p_end
)));
782 RemoteServer::msg_replacedocumentterm(const string
& message
)
787 const char *p
= message
.data();
788 const char *p_end
= p
+ message
.size();
790 decode_length_and_check(&p
, p_end
, len
);
791 string
unique_term(p
, len
);
794 Xapian::docid did
= wdb
->replace_document(unique_term
, unserialise_document(string(p
, p_end
)));
796 send_message(REPLY_ADDDOCUMENT
, encode_length(did
));
800 RemoteServer::msg_getmetadata(const string
& message
)
802 send_message(REPLY_METADATA
, db
->get_metadata(message
));
806 RemoteServer::msg_metadatakeylist(const string
& message
)
808 string prev
= message
;
811 const string
& prefix
= message
;
812 const Xapian::TermIterator end
= db
->metadata_keys_end(prefix
);
813 Xapian::TermIterator t
= db
->metadata_keys_begin(prefix
);
814 for (; t
!= end
; ++t
) {
815 if (rare(prev
.size() > 255))
817 const string
& v
= *t
;
818 size_t reuse
= common_prefix_length(prev
, v
);
819 reply
.assign(1, char(reuse
));
820 reply
.append(v
, reuse
, string::npos
);
821 send_message(REPLY_METADATAKEYLIST
, reply
);
824 send_message(REPLY_DONE
, string());
828 RemoteServer::msg_setmetadata(const string
& message
)
832 const char *p
= message
.data();
833 const char *p_end
= p
+ message
.size();
835 decode_length_and_check(&p
, p_end
, keylen
);
836 string
key(p
, keylen
);
838 string
val(p
, p_end
- p
);
839 wdb
->set_metadata(key
, val
);
843 RemoteServer::msg_addspelling(const string
& message
)
847 const char *p
= message
.data();
848 const char *p_end
= p
+ message
.size();
849 Xapian::termcount freqinc
;
850 decode_length(&p
, p_end
, freqinc
);
851 wdb
->add_spelling(string(p
, p_end
- p
), freqinc
);
855 RemoteServer::msg_removespelling(const string
& message
)
859 const char *p
= message
.data();
860 const char *p_end
= p
+ message
.size();
861 Xapian::termcount freqdec
;
862 decode_length(&p
, p_end
, freqdec
);
863 auto result
= wdb
->remove_spelling(string(p
, p_end
- p
), freqdec
);
864 send_message(REPLY_REMOVESPELLING
, encode_length(result
));