Add setReadTimeout
[Arachnida.git] / lib / Spin / Connection.cpp
blob03fda646f654fd31e279a63081b42529cf2ac72e
1 #include "Connection.h"
2 #include <stdexcept>
3 #include <cassert>
4 #include <climits>
5 #if defined(_WIN32) && ! defined(__CYGWIN__)
6 #include <Windows.h>
7 #define socklen_t int
8 #else
9 extern "C" {
10 #include <sys/socket.h>
11 #include <netinet/in.h>
13 #endif
14 #include <boost/bind.hpp>
15 #include <Acari/atomicPrimitives.h>
16 #include <Scorpion/BIO.h>
17 #include <Agelena/Logger.h>
18 #include "Private/ConnectionHandler.h"
19 #include "Handlers/NewDataHandler.h"
20 #include "Exceptions/Connection.h"
22 namespace Spin
24 Connection::~Connection()
26 AGELENA_DEBUG_1("Connection(%1%)::~Connection()", this);
27 clearNewDataHandler();
30 std::pair< std::size_t, int > Connection::write(const std::vector< char > & data)
32 AGELENA_DEBUG_1("std::pair< std::size_t, int > Connection(%1%)::write(const std::vector< char > & data)", this);
33 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
34 if (!bio_)
35 throw Exceptions::Connection::UnusableConnection();
36 else
37 { /* all is well */ }
39 assert(data.size() < INT_MAX);
40 int written(0);
41 int reason(no_error__);
43 if (!data.empty())
45 written = bio_->write(&(data[0]), data.size());
46 if (written == -2)
47 throw Exceptions::Connection::MethodNotImplemented();
48 else if (written < static_cast< int >(data.size()))
50 if (!bio_->shouldRetry())
52 throw Exceptions::Connection::ConnectionClosed();
54 else
55 { /* non-permanent error */ }
56 reason |= should_retry__;
57 if (bio_->shouldRead())
58 reason |= should_read__;
59 else
60 { /* no reading requested */ }
61 if (bio_->shouldWrite())
62 reason |= should_write__;
63 else
64 { /* no write requested */ }
66 else
67 { /* all is well */ }
69 else
70 { /* nothing to do */ }
72 return std::make_pair(written, reason);
75 std::pair< std::size_t, int > Connection::read(std::vector< char > & buffer)
77 AGELENA_DEBUG_1("std::pair< std::size_t, int > Connection(%1%)::read(std::vector< char > & buffer)", this);
78 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
79 if (!bio_)
80 throw Exceptions::Connection::UnusableConnection();
81 else
82 { /* all is well */ }
84 std::size_t bytes_read_into_buffer(0);
85 int reason(no_error__);
86 bool continue_until_retry(false);
87 std::vector< char >::size_type start_offset(0);
88 if (buffer.empty())
90 continue_until_retry = true;
91 buffer.resize(default_read_block_size__);
93 else
94 { /* no special handling here */ }
95 read_entry_point:
96 const std::size_t bytes_requested(buffer.size() - start_offset);
97 assert(bytes_requested < INT_MAX);
98 int bytes_read(bio_->read(&(buffer[start_offset]), bytes_requested));
99 if (bytes_read < static_cast< int >(bytes_requested))
101 bytes_read_into_buffer += bytes_read;
102 if (!bio_->shouldRetry() && bytes_read <= 0)
104 throw Exceptions::Connection::ConnectionClosed();
106 else
108 reason |= should_retry__;
109 if (bio_->shouldRead())
110 reason |= should_read__;
111 else
112 { /* no reading requested */ }
113 if (bio_->shouldWrite())
114 reason |= should_write__;
115 else
116 { /* no write requested */ }
119 else if (continue_until_retry)
121 bytes_read_into_buffer = buffer.size();
122 start_offset = buffer.size();
123 buffer.resize(buffer.size() + default_read_block_size__);
124 goto read_entry_point;
126 else
128 bytes_read_into_buffer = buffer.size();
131 return std::make_pair(bytes_read_into_buffer, reason);
134 bool Connection::poll() const
136 AGELENA_DEBUG_1("bool Connection(%1%)::poll() const", this);
137 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
138 if (!bio_)
139 throw Exceptions::Connection::UnusableConnection();
140 else
141 { /* all is well */ }
142 return bio_->poll();
145 void Connection::close()
147 AGELENA_DEBUG_1("void Connection(%1%)::close()", this);
148 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
149 bio_.reset();
152 bool Connection::usesSSL() const
154 AGELENA_DEBUG_1("bool Connection(%1%)::usesSSL() const", this);
155 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
156 if (!bio_)
157 throw Exceptions::Connection::UnusableConnection();
158 else
159 { /* all is well */ }
160 return bio_->usesSSL();
163 void Connection::setNewDataHandler(Handlers::NewDataHandler & handler, OnErrorCallback on_error_callback/* = OnErrorCallback()*/)
165 AGELENA_DEBUG_1("void Connection(%1%)::setNewDataHandler(Handlers::NewDataHandler & handler, OnErrorCallback on_error_callback/* = OnErrorCallback()*/)", this);
166 if (on_error_callback.empty())
168 boost::recursive_mutex::scoped_lock sentinel(error_handler_lock_);
169 on_error_callback = boost::bind(&Connection::onError_, this);
171 else
172 { /* use the one provided */ }
173 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
174 data_handler_ = &handler;
175 if (fd_ == -1)
176 throw Exceptions::Connection::UnusableConnection();
177 else
178 { /* all is well */ }
179 Private::ConnectionHandler::getInstance().attach(fd_, boost::bind(&Connection::onDataReady_, this), on_error_callback);
182 void Connection::clearNewDataHandler()
184 AGELENA_DEBUG_1("void Connection(%1%)::clearNewDataHandler()", this);
185 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
186 if (data_handler_)
188 if (fd_ == -1)
189 throw Exceptions::Connection::UnusableConnection();
190 else
191 { /* all is well */ }
192 Private::ConnectionHandler::getInstance().detach(fd_);
193 data_handler_ = 0;
195 else
196 { /* nothing to clear */ }
199 void Connection::setErrorHandler(const OnErrorCallback & on_error_callback)
201 boost::recursive_mutex::scoped_lock sentinel(error_handler_lock_);
202 error_handler_ = on_error_callback;
205 void Connection::clearErrorHandler()
207 boost::recursive_mutex::scoped_lock sentinel(error_handler_lock_);
208 error_handler_.clear();
211 Details::Address Connection::getPeerAddress() const
213 AGELENA_DEBUG_1("Details::Address Connection(%1%)::getPeerAddress() const", this);
214 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
215 if (!bio_)
216 throw Exceptions::Connection::UnusableConnection();
217 else
218 { /* all is well */ }
219 int socket_fd(fd_);
220 ::sockaddr_in peer_addr;
221 socklen_t peer_addr_size(sizeof(peer_addr));
222 ::getpeername(socket_fd, (::sockaddr*)&peer_addr, &peer_addr_size);
223 return Details::Address(peer_addr.sin_addr.s_addr);
226 void Connection::setReadTimeout(unsigned int seconds)
228 AGELENA_DEBUG_2("void Connection(%1%)::setReadTimeout(unsigned int %2%)", this, seconds);
229 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
230 if (!bio_)
231 throw Exceptions::Connection::UnusableConnection();
232 else
233 { /* all is well */ }
234 int socket_fd(fd_);
235 ::setsockopt(fd_, SOL_SOCKET, SO_RCVTIMEO, (const char *)&seconds, sizeof(unsigned int));
238 Connection::Connection(Scorpion::BIO * bio)
239 : bio_(bio),
240 data_handler_(0),
241 fd_(bio->getFD())
243 AGELENA_DEBUG_1("Connection(%1%)::Connection(Scorpion::BIO * bio)", this);
246 void Connection::onDataReady_()
248 AGELENA_DEBUG_1("void Connection(%1%)::onDataReady_()", this);
249 if (data_handler_)
250 (*data_handler_)(shared_from_this());
251 else
252 { /* no-op */ }
255 void Connection::onError_()
257 boost::recursive_mutex::scoped_lock sentinel(error_handler_lock_);
258 if (error_handler_)
260 error_handler_();
262 else
263 { /* no-op */ }