Use Vlinder::Atomics
[Arachnida.git] / lib / Spin / Connection.cpp
blobbe84094b036ec949e1fe8a152883f4b166ec5256
1 #include "Connection.h"
2 #include <stdexcept>
3 #include <cassert>
4 #include <climits>
5 #if defined(_WIN32) && ! defined(__CYGWIN__)
6 #include <Windows.h>
7 #define socklen_t int
8 #else
9 extern "C" {
10 #include <sys/socket.h>
11 #include <netinet/in.h>
13 #endif
14 #include <boost/bind.hpp>
15 #include <Scorpion/BIO.h>
16 #include <Agelena/Logger.h>
17 #include "Private/ConnectionHandler.h"
18 #include "Handlers/NewDataHandler.h"
19 #include "Exceptions/Connection.h"
21 #define AGELENA_COMPONENT_ID 0x5350493eUL
22 #define AGELENA_SUBCOMPONENT_ID 0x434e584eUL
24 namespace Spin
26 Connection::~Connection()
28 AGELENA_DEBUG_1("Connection(%1%)::~Connection()", this);
29 clearNewDataHandler();
32 std::pair< std::size_t, int > Connection::write(const std::vector< char > & data)
34 AGELENA_DEBUG_1("std::pair< std::size_t, int > Connection(%1%)::write(const std::vector< char > & data)", this);
35 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
36 if (!bio_)
37 throw Exceptions::Connection::UnusableConnection();
38 else
39 { /* all is well */ }
41 assert(data.size() < INT_MAX);
42 int written(0);
43 int reason(no_error__);
45 if (!data.empty())
47 written = bio_->write(&(data[0]), data.size());
48 if (written == -2)
49 throw Exceptions::Connection::MethodNotImplemented();
50 else if (written < static_cast< int >(data.size()))
52 if (!bio_->shouldRetry())
54 throw Exceptions::Connection::ConnectionClosed();
56 else
57 { /* non-permanent error */ }
58 reason |= should_retry__;
59 if (bio_->shouldRead())
60 reason |= should_read__;
61 else
62 { /* no reading requested */ }
63 if (bio_->shouldWrite())
64 reason |= should_write__;
65 else
66 { /* no write requested */ }
68 else
69 { /* all is well */ }
71 else
72 { /* nothing to do */ }
74 return std::make_pair(written, reason);
77 std::pair< std::size_t, int > Connection::read(std::vector< char > & buffer)
79 AGELENA_DEBUG_1("std::pair< std::size_t, int > Connection(%1%)::read(std::vector< char > & buffer)", this);
80 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
81 if (!bio_)
82 throw Exceptions::Connection::UnusableConnection();
83 else
84 { /* all is well */ }
86 std::size_t bytes_read_into_buffer(0);
87 int reason(no_error__);
88 bool continue_until_retry(false);
89 std::vector< char >::size_type start_offset(0);
90 if (buffer.empty())
92 continue_until_retry = true;
93 buffer.resize(default_read_block_size__);
95 else
96 { /* no special handling here */ }
97 read_entry_point:
98 const std::size_t bytes_requested(buffer.size() - start_offset);
99 assert(bytes_requested < INT_MAX);
100 int bytes_read(bio_->read(&(buffer[start_offset]), bytes_requested));
101 if (bytes_read < static_cast< int >(bytes_requested))
103 bytes_read_into_buffer += bytes_read;
104 if (!bio_->shouldRetry() && bytes_read <= 0)
106 throw Exceptions::Connection::ConnectionClosed();
108 else
110 reason |= should_retry__;
111 if (bio_->shouldRead())
112 reason |= should_read__;
113 else
114 { /* no reading requested */ }
115 if (bio_->shouldWrite())
116 reason |= should_write__;
117 else
118 { /* no write requested */ }
121 else if (continue_until_retry)
123 bytes_read_into_buffer = buffer.size();
124 start_offset = buffer.size();
125 buffer.resize(buffer.size() + default_read_block_size__);
126 goto read_entry_point;
128 else
130 bytes_read_into_buffer = buffer.size();
133 return std::make_pair(bytes_read_into_buffer, reason);
136 bool Connection::poll() const
138 AGELENA_DEBUG_1("bool Connection(%1%)::poll() const", this);
139 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
140 if (!bio_)
141 throw Exceptions::Connection::UnusableConnection();
142 else
143 { /* all is well */ }
144 return bio_->poll();
147 void Connection::close()
149 AGELENA_DEBUG_1("void Connection(%1%)::close()", this);
150 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
151 bio_.reset();
154 bool Connection::usesSSL() const
156 AGELENA_DEBUG_1("bool Connection(%1%)::usesSSL() const", this);
157 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
158 if (!bio_)
159 throw Exceptions::Connection::UnusableConnection();
160 else
161 { /* all is well */ }
162 return bio_->usesSSL();
165 void Connection::setNewDataHandler(Handlers::NewDataHandler & handler, OnErrorCallback on_error_callback/* = OnErrorCallback()*/)
167 AGELENA_DEBUG_1("void Connection(%1%)::setNewDataHandler(Handlers::NewDataHandler & handler, OnErrorCallback on_error_callback/* = OnErrorCallback()*/)", this);
168 if (on_error_callback.empty())
170 boost::recursive_mutex::scoped_lock sentinel(error_handler_lock_);
171 on_error_callback = boost::bind(&Connection::onError_, this);
173 else
174 { /* use the one provided */ }
175 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
176 data_handler_ = &handler;
177 if (fd_ == -1)
178 throw Exceptions::Connection::UnusableConnection();
179 else
180 { /* all is well */ }
181 Private::ConnectionHandler::getInstance().attach(fd_, boost::bind(&Connection::onDataReady_, this), on_error_callback);
184 void Connection::clearNewDataHandler()
186 AGELENA_DEBUG_1("void Connection(%1%)::clearNewDataHandler()", this);
187 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
188 if (data_handler_)
190 if (fd_ == -1)
191 throw Exceptions::Connection::UnusableConnection();
192 else
193 { /* all is well */ }
194 Private::ConnectionHandler::getInstance().detach(fd_);
195 data_handler_ = 0;
197 else
198 { /* nothing to clear */ }
201 void Connection::setErrorHandler(const OnErrorCallback & on_error_callback)
203 boost::recursive_mutex::scoped_lock sentinel(error_handler_lock_);
204 error_handler_ = on_error_callback;
207 void Connection::clearErrorHandler()
209 boost::recursive_mutex::scoped_lock sentinel(error_handler_lock_);
210 error_handler_.clear();
213 Details::Address Connection::getPeerAddress() const
215 AGELENA_DEBUG_1("Details::Address Connection(%1%)::getPeerAddress() const", this);
216 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
217 if (!bio_)
218 throw Exceptions::Connection::UnusableConnection();
219 else
220 { /* all is well */ }
221 int socket_fd(fd_);
222 ::sockaddr_in peer_addr;
223 socklen_t peer_addr_size(sizeof(peer_addr));
224 ::getpeername(socket_fd, (::sockaddr*)&peer_addr, &peer_addr_size);
225 return Details::Address(peer_addr.sin_addr.s_addr);
228 void Connection::setReadTimeout(unsigned int seconds)
230 AGELENA_DEBUG_2("void Connection(%1%)::setReadTimeout(unsigned int %2%)", this, seconds);
231 boost::recursive_mutex::scoped_lock sentinel(bio_lock_);
232 if (!bio_)
233 throw Exceptions::Connection::UnusableConnection();
234 else
235 { /* all is well */ }
236 int socket_fd(fd_);
237 ::setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&seconds, sizeof(unsigned int));
240 Connection::Connection(Scorpion::BIO * bio)
241 : bio_(bio),
242 data_handler_(0),
243 fd_(bio->getFD())
245 AGELENA_DEBUG_1("Connection(%1%)::Connection(Scorpion::BIO * bio)", this);
248 void Connection::onDataReady_()
250 AGELENA_DEBUG_1("void Connection(%1%)::onDataReady_()", this);
251 if (data_handler_)
252 (*data_handler_)(shared_from_this());
253 else
254 { /* no-op */ }
257 void Connection::onError_()
259 boost::recursive_mutex::scoped_lock sentinel(error_handler_lock_);
260 if (error_handler_)
262 error_handler_();
264 else
265 { /* no-op */ }