[honey] Fix portability to systems without pread()
[xapian.git] / xapian-core / net / remoteconnection.cc
blobf1c72b4dcd9c42be3af7ae6f58d61d7fa1fb1ed0
1 /** @file remoteconnection.cc
2 * @brief RemoteConnection class used by the remote backend.
3 */
4 /* Copyright (C) 2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2017 Olly Betts
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 #include <config.h>
23 #include "remoteconnection.h"
25 #include <xapian/error.h>
27 #include "safeerrno.h"
28 #include "safefcntl.h"
29 #include "safesysselect.h"
30 #include "safeunistd.h"
32 #include <algorithm>
33 #include <climits>
34 #include <cstdint>
35 #include <string>
36 #ifdef __WIN32__
37 # include <type_traits>
38 #endif
40 #include "debuglog.h"
41 #include "fd.h"
42 #include "filetests.h"
43 #include "omassert.h"
44 #include "posixy_wrapper.h"
45 #include "realtime.h"
46 #include "length.h"
47 #include "socket_utils.h"
49 using namespace std;
51 #define CHUNKSIZE 4096
53 [[noreturn]]
54 static void
55 throw_database_closed()
57 throw Xapian::DatabaseError("Database has been closed");
60 [[noreturn]]
61 static void
62 throw_network_error_insane_message_length()
64 throw Xapian::NetworkError("Insane message length specified!");
67 #ifdef __WIN32__
68 inline void
69 update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
71 // Signed overflow is undefined so check DWORD is unsigned.
72 static_assert(std::is_unsigned<DWORD>::value, "Type DWORD should be unsigned");
73 overlapped.Offset += n;
74 if (overlapped.Offset < n) ++overlapped.OffsetHigh;
76 #endif
78 RemoteConnection::RemoteConnection(int fdin_, int fdout_,
79 const string & context_)
80 : fdin(fdin_), fdout(fdout_), context(context_)
82 #ifdef __WIN32__
83 memset(&overlapped, 0, sizeof(overlapped));
84 overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
85 if (!overlapped.hEvent)
86 throw Xapian::NetworkError("Failed to setup OVERLAPPED",
87 context, -int(GetLastError()));
89 #endif
92 #ifdef __WIN32__
93 RemoteConnection::~RemoteConnection()
95 if (overlapped.hEvent)
96 CloseHandle(overlapped.hEvent);
98 #endif
100 bool
101 RemoteConnection::read_at_least(size_t min_len, double end_time)
103 LOGCALL(REMOTE, bool, "RemoteConnection::read_at_least", min_len | end_time);
105 if (buffer.length() >= min_len) return true;
107 #ifdef __WIN32__
108 HANDLE hin = fd_to_handle(fdin);
109 do {
110 char buf[CHUNKSIZE];
111 DWORD received;
112 BOOL ok = ReadFile(hin, buf, sizeof(buf), &received, &overlapped);
113 if (!ok) {
114 int errcode = GetLastError();
115 if (errcode != ERROR_IO_PENDING)
116 throw Xapian::NetworkError("read failed", context, -errcode);
117 // Is asynch - just wait for the data to be received or a timeout.
118 DWORD waitrc;
119 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
120 if (waitrc != WAIT_OBJECT_0) {
121 LOGLINE(REMOTE, "read: timeout has expired");
122 throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context);
124 // Get the final result of the read.
125 if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
126 throw Xapian::NetworkError("Failed to get overlapped result",
127 context, -int(GetLastError()));
130 if (received == 0) {
131 do_close(false);
132 return false;
135 buffer.append(buf, received);
137 // We must update the offset in the OVERLAPPED structure manually.
138 update_overlapped_offset(overlapped, received);
139 } while (buffer.length() < min_len);
140 #else
141 // If there's no end_time, just use blocking I/O.
142 if (fcntl(fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
143 throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
144 context, errno);
147 while (true) {
148 char buf[CHUNKSIZE];
149 ssize_t received = read(fdin, buf, sizeof(buf));
151 if (received > 0) {
152 buffer.append(buf, received);
153 if (buffer.length() >= min_len) return true;
154 continue;
157 if (received == 0) {
158 do_close(false);
159 return false;
162 LOGLINE(REMOTE, "read gave errno = " << errno);
163 if (errno == EINTR) continue;
165 if (errno != EAGAIN)
166 throw Xapian::NetworkError("read failed", context, errno);
168 Assert(end_time != 0.0);
169 while (true) {
170 // Calculate how far in the future end_time is.
171 double time_diff = end_time - RealTime::now();
172 // Check if the timeout has expired.
173 if (time_diff < 0) {
174 LOGLINE(REMOTE, "read: timeout has expired");
175 throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context);
178 // Use select to wait until there is data or the timeout is reached.
179 fd_set fdset;
180 FD_ZERO(&fdset);
181 FD_SET(fdin, &fdset);
183 struct timeval tv;
184 RealTime::to_timeval(time_diff, &tv);
185 int select_result = select(fdin + 1, &fdset, 0, &fdset, &tv);
186 if (select_result > 0) break;
188 if (select_result == 0)
189 throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context);
191 // EINTR means select was interrupted by a signal. The Linux
192 // select(2) man page says: "Portable programs may wish to check
193 // for EAGAIN and loop, just as with EINTR" and that seems to be
194 // necessary for cygwin at least.
195 if (errno != EINTR && errno != EAGAIN)
196 throw Xapian::NetworkError("select failed during read", context, errno);
199 #endif
200 return true;
203 bool
204 RemoteConnection::ready_to_read() const
206 LOGCALL(REMOTE, bool, "RemoteConnection::ready_to_read", NO_ARGS);
207 if (fdin == -1)
208 throw_database_closed();
210 if (!buffer.empty()) RETURN(true);
212 // Use select to see if there's data available to be read.
213 fd_set fdset;
214 FD_ZERO(&fdset);
215 FD_SET(fdin, &fdset);
217 // Set a 0.1 second timeout to avoid a busy loop.
218 // FIXME: this would be much better done by exposing the fd so that the
219 // matcher can call select on all the fds involved...
220 struct timeval tv;
221 tv.tv_sec = 0;
222 tv.tv_usec = 100000;
223 RETURN(select(fdin + 1, &fdset, 0, &fdset, &tv) > 0);
226 void
227 RemoteConnection::send_message(char type, const string &message,
228 double end_time)
230 LOGCALL_VOID(REMOTE, "RemoteConnection::send_message", type | message | end_time);
231 if (fdout == -1)
232 throw_database_closed();
234 string header;
235 header += type;
236 header += encode_length(message.size());
238 #ifdef __WIN32__
239 HANDLE hout = fd_to_handle(fdout);
240 const string * str = &header;
242 size_t count = 0;
243 while (true) {
244 DWORD n;
245 BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
246 if (!ok) {
247 int errcode = GetLastError();
248 if (errcode != ERROR_IO_PENDING)
249 throw Xapian::NetworkError("write failed", context, -errcode);
250 // Just wait for the data to be sent, or a timeout.
251 DWORD waitrc;
252 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
253 if (waitrc != WAIT_OBJECT_0) {
254 LOGLINE(REMOTE, "write: timeout has expired");
255 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
257 // Get the final result.
258 if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
259 throw Xapian::NetworkError("Failed to get overlapped result",
260 context, -int(GetLastError()));
263 count += n;
265 // We must update the offset in the OVERLAPPED structure manually.
266 update_overlapped_offset(overlapped, n);
268 if (count == str->size()) {
269 if (str == &message || message.empty()) return;
270 str = &message;
271 count = 0;
274 #else
275 // If there's no end_time, just use blocking I/O.
276 if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
277 throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
278 context, errno);
281 const string * str = &header;
283 fd_set fdset;
284 size_t count = 0;
285 while (true) {
286 // We've set write to non-blocking, so just try writing as there
287 // will usually be space.
288 ssize_t n = write(fdout, str->data() + count, str->size() - count);
290 if (n >= 0) {
291 count += n;
292 if (count == str->size()) {
293 if (str == &message || message.empty()) return;
294 str = &message;
295 count = 0;
297 continue;
300 LOGLINE(REMOTE, "write gave errno = " << errno);
301 if (errno == EINTR) continue;
303 if (errno != EAGAIN)
304 throw Xapian::NetworkError("write failed", context, errno);
306 // Use select to wait until there is space or the timeout is reached.
307 FD_ZERO(&fdset);
308 FD_SET(fdout, &fdset);
310 double time_diff = end_time - RealTime::now();
311 if (time_diff < 0) {
312 LOGLINE(REMOTE, "write: timeout has expired");
313 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
316 struct timeval tv;
317 RealTime::to_timeval(time_diff, &tv);
318 int select_result = select(fdout + 1, 0, &fdset, &fdset, &tv);
320 if (select_result < 0) {
321 if (errno == EINTR) {
322 // EINTR means select was interrupted by a signal.
323 // We could just retry the select, but it's easier to just
324 // retry the write.
325 continue;
327 throw Xapian::NetworkError("select failed during write", context, errno);
330 if (select_result == 0)
331 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
333 #endif
336 void
337 RemoteConnection::send_file(char type, int fd, double end_time)
339 LOGCALL_VOID(REMOTE, "RemoteConnection::send_file", type | fd | end_time);
340 if (fdout == -1)
341 throw_database_closed();
343 off_t size = file_size(fd);
344 if (errno)
345 throw Xapian::NetworkError("Couldn't stat file to send", errno);
346 // FIXME: Use sendfile() or similar if available?
348 char buf[CHUNKSIZE];
349 buf[0] = type;
350 size_t c = 1;
352 string enc_size = encode_length(size);
353 c += enc_size.size();
354 // An encoded length should be just a few bytes.
355 AssertRel(c, <=, sizeof(buf));
356 memcpy(buf + 1, enc_size.data(), enc_size.size());
359 #ifdef __WIN32__
360 HANDLE hout = fd_to_handle(fdout);
361 size_t count = 0;
362 while (true) {
363 DWORD n;
364 BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
365 if (!ok) {
366 int errcode = GetLastError();
367 if (errcode != ERROR_IO_PENDING)
368 throw Xapian::NetworkError("write failed", context, -errcode);
369 // Just wait for the data to be sent, or a timeout.
370 DWORD waitrc;
371 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
372 if (waitrc != WAIT_OBJECT_0) {
373 LOGLINE(REMOTE, "write: timeout has expired");
374 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
376 // Get the final result.
377 if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
378 throw Xapian::NetworkError("Failed to get overlapped result",
379 context, -int(GetLastError()));
382 count += n;
384 // We must update the offset in the OVERLAPPED structure manually.
385 update_overlapped_offset(overlapped, n);
387 if (count == c) {
388 if (size == 0) return;
390 ssize_t res;
391 do {
392 res = read(fd, buf, sizeof(buf));
393 } while (res < 0 && errno == EINTR);
394 if (res < 0) throw Xapian::NetworkError("read failed", errno);
395 c = size_t(res);
397 size -= c;
398 count = 0;
401 #else
402 // If there's no end_time, just use blocking I/O.
403 if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
404 throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
405 context, errno);
408 fd_set fdset;
409 size_t count = 0;
410 while (true) {
411 // We've set write to non-blocking, so just try writing as there
412 // will usually be space.
413 ssize_t n = write(fdout, buf + count, c - count);
415 if (n >= 0) {
416 count += n;
417 if (count == c) {
418 if (size == 0) return;
420 ssize_t res;
421 do {
422 res = read(fd, buf, sizeof(buf));
423 } while (res < 0 && errno == EINTR);
424 if (res < 0) throw Xapian::NetworkError("read failed", errno);
425 c = size_t(res);
427 size -= c;
428 count = 0;
430 continue;
433 LOGLINE(REMOTE, "write gave errno = " << errno);
434 if (errno == EINTR) continue;
436 if (errno != EAGAIN)
437 throw Xapian::NetworkError("write failed", context, errno);
439 // Use select to wait until there is space or the timeout is reached.
440 FD_ZERO(&fdset);
441 FD_SET(fdout, &fdset);
443 double time_diff = end_time - RealTime::now();
444 if (time_diff < 0) {
445 LOGLINE(REMOTE, "write: timeout has expired");
446 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
449 struct timeval tv;
450 RealTime::to_timeval(time_diff, &tv);
451 int select_result = select(fdout + 1, 0, &fdset, &fdset, &tv);
453 if (select_result < 0) {
454 if (errno == EINTR) {
455 // EINTR means select was interrupted by a signal.
456 // We could just retry the select, but it's easier to just
457 // retry the write.
458 continue;
460 throw Xapian::NetworkError("select failed during write", context, errno);
463 if (select_result == 0)
464 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
466 #endif
470 RemoteConnection::sniff_next_message_type(double end_time)
472 LOGCALL(REMOTE, int, "RemoteConnection::sniff_next_message_type", end_time);
473 if (fdin == -1)
474 throw_database_closed();
476 if (!read_at_least(1, end_time))
477 RETURN(-1);
478 unsigned char type = buffer[0];
479 RETURN(type);
483 RemoteConnection::get_message(string &result, double end_time)
485 LOGCALL(REMOTE, int, "RemoteConnection::get_message", result | end_time);
486 if (fdin == -1)
487 throw_database_closed();
489 if (!read_at_least(2, end_time))
490 RETURN(-1);
491 size_t len = static_cast<unsigned char>(buffer[1]);
492 if (!read_at_least(len + 2, end_time))
493 RETURN(-1);
494 if (len != 0xff) {
495 result.assign(buffer.data() + 2, len);
496 unsigned char type = buffer[0];
497 buffer.erase(0, len + 2);
498 RETURN(type);
500 len = 0;
501 string::const_iterator i = buffer.begin() + 2;
502 unsigned char ch;
503 int shift = 0;
504 do {
505 if (i == buffer.end() || shift > 28) {
506 // Something is very wrong...
507 throw_network_error_insane_message_length();
509 ch = *i++;
510 len |= size_t(ch & 0x7f) << shift;
511 shift += 7;
512 } while ((ch & 0x80) == 0);
513 len += 255;
514 size_t header_len = (i - buffer.begin());
515 if (!read_at_least(header_len + len, end_time))
516 RETURN(-1);
517 result.assign(buffer.data() + header_len, len);
518 unsigned char type = buffer[0];
519 buffer.erase(0, header_len + len);
520 RETURN(type);
524 RemoteConnection::get_message_chunked(double end_time)
526 LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunked", end_time);
528 if (fdin == -1)
529 throw_database_closed();
531 if (!read_at_least(2, end_time))
532 RETURN(-1);
533 uint_least64_t len = static_cast<unsigned char>(buffer[1]);
534 if (len != 0xff) {
535 chunked_data_left = off_t(len);
536 char type = buffer[0];
537 buffer.erase(0, 2);
538 RETURN(type);
540 if (!read_at_least(len + 2, end_time))
541 RETURN(-1);
542 len = 0;
543 string::const_iterator i = buffer.begin() + 2;
544 unsigned char ch;
545 int shift = 0;
546 do {
547 // Allow at most 63 bits for message lengths - it's neatly a multiple
548 // of 7 bits and anything longer than this is almost certainly a
549 // corrupt value.
550 // The value also needs to be representable as an
551 // off_t (which is a signed type), so use that size if it is smaller.
552 const int SHIFT_LIMIT = 63;
553 if (rare(i == buffer.end() || shift >= SHIFT_LIMIT)) {
554 // Something is very wrong...
555 throw_network_error_insane_message_length();
557 ch = *i++;
558 uint_least64_t bits = ch & 0x7f;
559 len |= bits << shift;
560 shift += 7;
561 } while ((ch & 0x80) == 0);
562 len += 255;
564 chunked_data_left = off_t(len);
565 if (sizeof(off_t) * CHAR_BIT < 63) {
566 // Check that the value of len fits in an off_t without loss.
567 if (rare(uint_least64_t(chunked_data_left) != len)) {
568 throw_network_error_insane_message_length();
572 unsigned char type = buffer[0];
573 size_t header_len = (i - buffer.begin());
574 buffer.erase(0, header_len);
575 RETURN(type);
579 RemoteConnection::get_message_chunk(string &result, size_t at_least,
580 double end_time)
582 LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunk", result | at_least | end_time);
583 if (fdin == -1)
584 throw_database_closed();
586 if (at_least <= result.size()) RETURN(true);
587 at_least -= result.size();
589 bool read_enough = (off_t(at_least) <= chunked_data_left);
590 if (!read_enough) at_least = size_t(chunked_data_left);
592 if (!read_at_least(at_least, end_time))
593 RETURN(-1);
595 size_t retlen = min(off_t(buffer.size()), chunked_data_left);
596 result.append(buffer, 0, retlen);
597 buffer.erase(0, retlen);
598 chunked_data_left -= retlen;
600 RETURN(int(read_enough));
603 /** Write n bytes from block pointed to by p to file descriptor fd. */
604 static void
605 write_all(int fd, const char * p, size_t n)
607 while (n) {
608 ssize_t c = write(fd, p, n);
609 if (c < 0) {
610 if (errno == EINTR) continue;
611 throw Xapian::NetworkError("Error writing to file", errno);
613 p += c;
614 n -= c;
619 RemoteConnection::receive_file(const string &file, double end_time)
621 LOGCALL(REMOTE, int, "RemoteConnection::receive_file", file | end_time);
622 if (fdin == -1)
623 throw_database_closed();
625 // FIXME: Do we want to be able to delete the file during writing?
626 FD fd(posixy_open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, 0666));
627 if (fd == -1)
628 throw Xapian::NetworkError("Couldn't open file for writing: " + file, errno);
630 int type = get_message_chunked(end_time);
631 do {
632 off_t min_read = min(chunked_data_left, off_t(CHUNKSIZE));
633 if (!read_at_least(min_read, end_time))
634 RETURN(-1);
635 write_all(fd, buffer.data(), min_read);
636 chunked_data_left -= min_read;
637 buffer.erase(0, min_read);
638 } while (chunked_data_left);
639 RETURN(type);
642 void
643 RemoteConnection::do_close(bool wait)
645 LOGCALL_VOID(REMOTE, "RemoteConnection::do_close", wait);
647 if (fdin >= 0) {
648 if (wait) {
649 // We can be called from a destructor, so we can't throw an
650 // exception.
651 try {
652 send_message(MSG_SHUTDOWN, string(), 0.0);
653 } catch (...) {
655 #ifdef __WIN32__
656 HANDLE hin = fd_to_handle(fdin);
657 char dummy;
658 DWORD received;
659 BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
660 if (!ok && GetLastError() == ERROR_IO_PENDING) {
661 // Wait for asynchronous read to complete.
662 (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
664 #else
665 // Wait for the connection to be closed - when this happens
666 // select() will report that a read won't block.
667 fd_set fdset;
668 FD_ZERO(&fdset);
669 FD_SET(fdin, &fdset);
670 int res;
671 do {
672 res = select(fdin + 1, &fdset, 0, &fdset, NULL);
673 } while (res < 0 && errno == EINTR);
674 #endif
676 close_fd_or_socket(fdin);
678 // If the same fd is used in both directions, don't close it twice.
679 if (fdin == fdout) fdout = -1;
681 fdin = -1;
684 if (fdout >= 0) {
685 close_fd_or_socket(fdout);
686 fdout = -1;
690 #ifdef __WIN32__
691 DWORD
692 RemoteConnection::calc_read_wait_msecs(double end_time)
694 if (end_time == 0.0)
695 return INFINITE;
697 // Calculate how far in the future end_time is.
698 double time_diff = end_time - RealTime::now();
700 // DWORD is unsigned, so we mustn't try and return a negative value.
701 if (time_diff < 0.0) {
702 throw Xapian::NetworkTimeoutError("Timeout expired before starting read", context);
704 return static_cast<DWORD>(time_diff * 1000.0);
706 #endif