1 /** @file remoteconnection.cc
2 * @brief RemoteConnection class used by the remote backend.
4 /* Copyright (C) 2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2017 Olly Betts
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "remoteconnection.h"
25 #include <xapian/error.h>
27 #include "safeerrno.h"
28 #include "safefcntl.h"
29 #include "safesysselect.h"
30 #include "safeunistd.h"
37 # include <type_traits>
42 #include "filetests.h"
44 #include "posixy_wrapper.h"
47 #include "socket_utils.h"
51 #define CHUNKSIZE 4096
55 throw_database_closed()
57 throw Xapian::DatabaseError("Database has been closed");
62 throw_network_error_insane_message_length()
64 throw Xapian::NetworkError("Insane message length specified!");
69 update_overlapped_offset(WSAOVERLAPPED
& overlapped
, DWORD n
)
71 // Signed overflow is undefined so check DWORD is unsigned.
72 static_assert(std::is_unsigned
<DWORD
>::value
, "Type DWORD should be unsigned");
73 overlapped
.Offset
+= n
;
74 if (overlapped
.Offset
< n
) ++overlapped
.OffsetHigh
;
78 RemoteConnection::RemoteConnection(int fdin_
, int fdout_
,
79 const string
& context_
)
80 : fdin(fdin_
), fdout(fdout_
), context(context_
)
83 memset(&overlapped
, 0, sizeof(overlapped
));
84 overlapped
.hEvent
= CreateEvent(NULL
, FALSE
, FALSE
, NULL
);
85 if (!overlapped
.hEvent
)
86 throw Xapian::NetworkError("Failed to setup OVERLAPPED",
87 context
, -int(GetLastError()));
93 RemoteConnection::~RemoteConnection()
95 if (overlapped
.hEvent
)
96 CloseHandle(overlapped
.hEvent
);
101 RemoteConnection::read_at_least(size_t min_len
, double end_time
)
103 LOGCALL(REMOTE
, bool, "RemoteConnection::read_at_least", min_len
| end_time
);
105 if (buffer
.length() >= min_len
) return true;
108 HANDLE hin
= fd_to_handle(fdin
);
112 BOOL ok
= ReadFile(hin
, buf
, sizeof(buf
), &received
, &overlapped
);
114 int errcode
= GetLastError();
115 if (errcode
!= ERROR_IO_PENDING
)
116 throw Xapian::NetworkError("read failed", context
, -errcode
);
117 // Is asynch - just wait for the data to be received or a timeout.
119 waitrc
= WaitForSingleObject(overlapped
.hEvent
, calc_read_wait_msecs(end_time
));
120 if (waitrc
!= WAIT_OBJECT_0
) {
121 LOGLINE(REMOTE
, "read: timeout has expired");
122 throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context
);
124 // Get the final result of the read.
125 if (!GetOverlappedResult(hin
, &overlapped
, &received
, FALSE
))
126 throw Xapian::NetworkError("Failed to get overlapped result",
127 context
, -int(GetLastError()));
135 buffer
.append(buf
, received
);
137 // We must update the offset in the OVERLAPPED structure manually.
138 update_overlapped_offset(overlapped
, received
);
139 } while (buffer
.length() < min_len
);
141 // If there's no end_time, just use blocking I/O.
142 if (fcntl(fdin
, F_SETFL
, (end_time
!= 0.0) ? O_NONBLOCK
: 0) < 0) {
143 throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
149 ssize_t received
= read(fdin
, buf
, sizeof(buf
));
152 buffer
.append(buf
, received
);
153 if (buffer
.length() >= min_len
) return true;
162 LOGLINE(REMOTE
, "read gave errno = " << errno
);
163 if (errno
== EINTR
) continue;
166 throw Xapian::NetworkError("read failed", context
, errno
);
168 Assert(end_time
!= 0.0);
170 // Calculate how far in the future end_time is.
171 double time_diff
= end_time
- RealTime::now();
172 // Check if the timeout has expired.
174 LOGLINE(REMOTE
, "read: timeout has expired");
175 throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context
);
178 // Use select to wait until there is data or the timeout is reached.
181 FD_SET(fdin
, &fdset
);
184 RealTime::to_timeval(time_diff
, &tv
);
185 int select_result
= select(fdin
+ 1, &fdset
, 0, &fdset
, &tv
);
186 if (select_result
> 0) break;
188 if (select_result
== 0)
189 throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context
);
191 // EINTR means select was interrupted by a signal. The Linux
192 // select(2) man page says: "Portable programs may wish to check
193 // for EAGAIN and loop, just as with EINTR" and that seems to be
194 // necessary for cygwin at least.
195 if (errno
!= EINTR
&& errno
!= EAGAIN
)
196 throw Xapian::NetworkError("select failed during read", context
, errno
);
204 RemoteConnection::ready_to_read() const
206 LOGCALL(REMOTE
, bool, "RemoteConnection::ready_to_read", NO_ARGS
);
208 throw_database_closed();
210 if (!buffer
.empty()) RETURN(true);
212 // Use select to see if there's data available to be read.
215 FD_SET(fdin
, &fdset
);
217 // Set a 0.1 second timeout to avoid a busy loop.
218 // FIXME: this would be much better done by exposing the fd so that the
219 // matcher can call select on all the fds involved...
223 RETURN(select(fdin
+ 1, &fdset
, 0, &fdset
, &tv
) > 0);
227 RemoteConnection::send_message(char type
, const string
&message
,
230 LOGCALL_VOID(REMOTE
, "RemoteConnection::send_message", type
| message
| end_time
);
232 throw_database_closed();
236 header
+= encode_length(message
.size());
239 HANDLE hout
= fd_to_handle(fdout
);
240 const string
* str
= &header
;
245 BOOL ok
= WriteFile(hout
, str
->data() + count
, str
->size() - count
, &n
, &overlapped
);
247 int errcode
= GetLastError();
248 if (errcode
!= ERROR_IO_PENDING
)
249 throw Xapian::NetworkError("write failed", context
, -errcode
);
250 // Just wait for the data to be sent, or a timeout.
252 waitrc
= WaitForSingleObject(overlapped
.hEvent
, calc_read_wait_msecs(end_time
));
253 if (waitrc
!= WAIT_OBJECT_0
) {
254 LOGLINE(REMOTE
, "write: timeout has expired");
255 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context
);
257 // Get the final result.
258 if (!GetOverlappedResult(hout
, &overlapped
, &n
, FALSE
))
259 throw Xapian::NetworkError("Failed to get overlapped result",
260 context
, -int(GetLastError()));
265 // We must update the offset in the OVERLAPPED structure manually.
266 update_overlapped_offset(overlapped
, n
);
268 if (count
== str
->size()) {
269 if (str
== &message
|| message
.empty()) return;
275 // If there's no end_time, just use blocking I/O.
276 if (fcntl(fdout
, F_SETFL
, (end_time
!= 0.0) ? O_NONBLOCK
: 0) < 0) {
277 throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
281 const string
* str
= &header
;
286 // We've set write to non-blocking, so just try writing as there
287 // will usually be space.
288 ssize_t n
= write(fdout
, str
->data() + count
, str
->size() - count
);
292 if (count
== str
->size()) {
293 if (str
== &message
|| message
.empty()) return;
300 LOGLINE(REMOTE
, "write gave errno = " << errno
);
301 if (errno
== EINTR
) continue;
304 throw Xapian::NetworkError("write failed", context
, errno
);
306 // Use select to wait until there is space or the timeout is reached.
308 FD_SET(fdout
, &fdset
);
310 double time_diff
= end_time
- RealTime::now();
312 LOGLINE(REMOTE
, "write: timeout has expired");
313 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context
);
317 RealTime::to_timeval(time_diff
, &tv
);
318 int select_result
= select(fdout
+ 1, 0, &fdset
, &fdset
, &tv
);
320 if (select_result
< 0) {
321 if (errno
== EINTR
) {
322 // EINTR means select was interrupted by a signal.
323 // We could just retry the select, but it's easier to just
327 throw Xapian::NetworkError("select failed during write", context
, errno
);
330 if (select_result
== 0)
331 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context
);
337 RemoteConnection::send_file(char type
, int fd
, double end_time
)
339 LOGCALL_VOID(REMOTE
, "RemoteConnection::send_file", type
| fd
| end_time
);
341 throw_database_closed();
343 off_t size
= file_size(fd
);
345 throw Xapian::NetworkError("Couldn't stat file to send", errno
);
346 // FIXME: Use sendfile() or similar if available?
352 string enc_size
= encode_length(size
);
353 c
+= enc_size
.size();
354 // An encoded length should be just a few bytes.
355 AssertRel(c
, <=, sizeof(buf
));
356 memcpy(buf
+ 1, enc_size
.data(), enc_size
.size());
360 HANDLE hout
= fd_to_handle(fdout
);
364 BOOL ok
= WriteFile(hout
, buf
+ count
, c
- count
, &n
, &overlapped
);
366 int errcode
= GetLastError();
367 if (errcode
!= ERROR_IO_PENDING
)
368 throw Xapian::NetworkError("write failed", context
, -errcode
);
369 // Just wait for the data to be sent, or a timeout.
371 waitrc
= WaitForSingleObject(overlapped
.hEvent
, calc_read_wait_msecs(end_time
));
372 if (waitrc
!= WAIT_OBJECT_0
) {
373 LOGLINE(REMOTE
, "write: timeout has expired");
374 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context
);
376 // Get the final result.
377 if (!GetOverlappedResult(hout
, &overlapped
, &n
, FALSE
))
378 throw Xapian::NetworkError("Failed to get overlapped result",
379 context
, -int(GetLastError()));
384 // We must update the offset in the OVERLAPPED structure manually.
385 update_overlapped_offset(overlapped
, n
);
388 if (size
== 0) return;
392 res
= read(fd
, buf
, sizeof(buf
));
393 } while (res
< 0 && errno
== EINTR
);
394 if (res
< 0) throw Xapian::NetworkError("read failed", errno
);
402 // If there's no end_time, just use blocking I/O.
403 if (fcntl(fdout
, F_SETFL
, (end_time
!= 0.0) ? O_NONBLOCK
: 0) < 0) {
404 throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
411 // We've set write to non-blocking, so just try writing as there
412 // will usually be space.
413 ssize_t n
= write(fdout
, buf
+ count
, c
- count
);
418 if (size
== 0) return;
422 res
= read(fd
, buf
, sizeof(buf
));
423 } while (res
< 0 && errno
== EINTR
);
424 if (res
< 0) throw Xapian::NetworkError("read failed", errno
);
433 LOGLINE(REMOTE
, "write gave errno = " << errno
);
434 if (errno
== EINTR
) continue;
437 throw Xapian::NetworkError("write failed", context
, errno
);
439 // Use select to wait until there is space or the timeout is reached.
441 FD_SET(fdout
, &fdset
);
443 double time_diff
= end_time
- RealTime::now();
445 LOGLINE(REMOTE
, "write: timeout has expired");
446 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context
);
450 RealTime::to_timeval(time_diff
, &tv
);
451 int select_result
= select(fdout
+ 1, 0, &fdset
, &fdset
, &tv
);
453 if (select_result
< 0) {
454 if (errno
== EINTR
) {
455 // EINTR means select was interrupted by a signal.
456 // We could just retry the select, but it's easier to just
460 throw Xapian::NetworkError("select failed during write", context
, errno
);
463 if (select_result
== 0)
464 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context
);
470 RemoteConnection::sniff_next_message_type(double end_time
)
472 LOGCALL(REMOTE
, int, "RemoteConnection::sniff_next_message_type", end_time
);
474 throw_database_closed();
476 if (!read_at_least(1, end_time
))
478 unsigned char type
= buffer
[0];
483 RemoteConnection::get_message(string
&result
, double end_time
)
485 LOGCALL(REMOTE
, int, "RemoteConnection::get_message", result
| end_time
);
487 throw_database_closed();
489 if (!read_at_least(2, end_time
))
491 size_t len
= static_cast<unsigned char>(buffer
[1]);
492 if (!read_at_least(len
+ 2, end_time
))
495 result
.assign(buffer
.data() + 2, len
);
496 unsigned char type
= buffer
[0];
497 buffer
.erase(0, len
+ 2);
501 string::const_iterator i
= buffer
.begin() + 2;
505 if (i
== buffer
.end() || shift
> 28) {
506 // Something is very wrong...
507 throw_network_error_insane_message_length();
510 len
|= size_t(ch
& 0x7f) << shift
;
512 } while ((ch
& 0x80) == 0);
514 size_t header_len
= (i
- buffer
.begin());
515 if (!read_at_least(header_len
+ len
, end_time
))
517 result
.assign(buffer
.data() + header_len
, len
);
518 unsigned char type
= buffer
[0];
519 buffer
.erase(0, header_len
+ len
);
524 RemoteConnection::get_message_chunked(double end_time
)
526 LOGCALL(REMOTE
, int, "RemoteConnection::get_message_chunked", end_time
);
529 throw_database_closed();
531 if (!read_at_least(2, end_time
))
533 uint_least64_t len
= static_cast<unsigned char>(buffer
[1]);
535 chunked_data_left
= off_t(len
);
536 char type
= buffer
[0];
540 if (!read_at_least(len
+ 2, end_time
))
543 string::const_iterator i
= buffer
.begin() + 2;
547 // Allow at most 63 bits for message lengths - it's neatly a multiple
548 // of 7 bits and anything longer than this is almost certainly a
550 // The value also needs to be representable as an
551 // off_t (which is a signed type), so use that size if it is smaller.
552 const int SHIFT_LIMIT
= 63;
553 if (rare(i
== buffer
.end() || shift
>= SHIFT_LIMIT
)) {
554 // Something is very wrong...
555 throw_network_error_insane_message_length();
558 uint_least64_t bits
= ch
& 0x7f;
559 len
|= bits
<< shift
;
561 } while ((ch
& 0x80) == 0);
564 chunked_data_left
= off_t(len
);
565 if (sizeof(off_t
) * CHAR_BIT
< 63) {
566 // Check that the value of len fits in an off_t without loss.
567 if (rare(uint_least64_t(chunked_data_left
) != len
)) {
568 throw_network_error_insane_message_length();
572 unsigned char type
= buffer
[0];
573 size_t header_len
= (i
- buffer
.begin());
574 buffer
.erase(0, header_len
);
579 RemoteConnection::get_message_chunk(string
&result
, size_t at_least
,
582 LOGCALL(REMOTE
, int, "RemoteConnection::get_message_chunk", result
| at_least
| end_time
);
584 throw_database_closed();
586 if (at_least
<= result
.size()) RETURN(true);
587 at_least
-= result
.size();
589 bool read_enough
= (off_t(at_least
) <= chunked_data_left
);
590 if (!read_enough
) at_least
= size_t(chunked_data_left
);
592 if (!read_at_least(at_least
, end_time
))
595 size_t retlen
= min(off_t(buffer
.size()), chunked_data_left
);
596 result
.append(buffer
, 0, retlen
);
597 buffer
.erase(0, retlen
);
598 chunked_data_left
-= retlen
;
600 RETURN(int(read_enough
));
603 /** Write n bytes from block pointed to by p to file descriptor fd. */
605 write_all(int fd
, const char * p
, size_t n
)
608 ssize_t c
= write(fd
, p
, n
);
610 if (errno
== EINTR
) continue;
611 throw Xapian::NetworkError("Error writing to file", errno
);
619 RemoteConnection::receive_file(const string
&file
, double end_time
)
621 LOGCALL(REMOTE
, int, "RemoteConnection::receive_file", file
| end_time
);
623 throw_database_closed();
625 // FIXME: Do we want to be able to delete the file during writing?
626 FD
fd(posixy_open(file
.c_str(), O_WRONLY
|O_CREAT
|O_TRUNC
|O_CLOEXEC
, 0666));
628 throw Xapian::NetworkError("Couldn't open file for writing: " + file
, errno
);
630 int type
= get_message_chunked(end_time
);
632 off_t min_read
= min(chunked_data_left
, off_t(CHUNKSIZE
));
633 if (!read_at_least(min_read
, end_time
))
635 write_all(fd
, buffer
.data(), min_read
);
636 chunked_data_left
-= min_read
;
637 buffer
.erase(0, min_read
);
638 } while (chunked_data_left
);
643 RemoteConnection::do_close(bool wait
)
645 LOGCALL_VOID(REMOTE
, "RemoteConnection::do_close", wait
);
649 // We can be called from a destructor, so we can't throw an
652 send_message(MSG_SHUTDOWN
, string(), 0.0);
656 HANDLE hin
= fd_to_handle(fdin
);
659 BOOL ok
= ReadFile(hin
, &dummy
, 1, &received
, &overlapped
);
660 if (!ok
&& GetLastError() == ERROR_IO_PENDING
) {
661 // Wait for asynchronous read to complete.
662 (void)WaitForSingleObject(overlapped
.hEvent
, INFINITE
);
665 // Wait for the connection to be closed - when this happens
666 // select() will report that a read won't block.
669 FD_SET(fdin
, &fdset
);
672 res
= select(fdin
+ 1, &fdset
, 0, &fdset
, NULL
);
673 } while (res
< 0 && errno
== EINTR
);
676 close_fd_or_socket(fdin
);
678 // If the same fd is used in both directions, don't close it twice.
679 if (fdin
== fdout
) fdout
= -1;
685 close_fd_or_socket(fdout
);
692 RemoteConnection::calc_read_wait_msecs(double end_time
)
697 // Calculate how far in the future end_time is.
698 double time_diff
= end_time
- RealTime::now();
700 // DWORD is unsigned, so we mustn't try and return a negative value.
701 if (time_diff
< 0.0) {
702 throw Xapian::NetworkTimeoutError("Timeout expired before starting read", context
);
704 return static_cast<DWORD
>(time_diff
* 1000.0);