1 #include "Connection.h"
5 #if defined(_WIN32) && ! defined(__CYGWIN__)
10 #include <sys/socket.h>
11 #include <netinet/in.h>
14 #include <boost/bind.hpp>
15 #include <Acari/atomicPrimitives.h>
16 #include <Scorpion/BIO.h>
17 #include <Agelena/Logger.h>
18 #include "Private/ConnectionHandler.h"
19 #include "Handlers/NewDataHandler.h"
20 #include "Exceptions/Connection.h"
24 Connection::~Connection()
26 AGELENA_DEBUG_1("Connection(%1%)::~Connection()", this);
27 clearNewDataHandler();
30 std::pair
< std::size_t, int > Connection::write(const std::vector
< char > & data
)
32 AGELENA_DEBUG_1("std::pair< std::size_t, int > Connection(%1%)::write(const std::vector< char > & data)", this);
33 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
35 throw Exceptions::Connection::UnusableConnection();
39 assert(data
.size() < INT_MAX
);
41 int reason(no_error__
);
45 written
= bio_
->write(&(data
[0]), data
.size());
47 throw Exceptions::Connection::MethodNotImplemented();
48 else if (written
< static_cast< int >(data
.size()))
50 if (!bio_
->shouldRetry())
52 throw Exceptions::Connection::ConnectionClosed();
55 { /* non-permanent error */ }
56 reason
|= should_retry__
;
57 if (bio_
->shouldRead())
58 reason
|= should_read__
;
60 { /* no reading requested */ }
61 if (bio_
->shouldWrite())
62 reason
|= should_write__
;
64 { /* no write requested */ }
70 { /* nothing to do */ }
72 return std::make_pair(written
, reason
);
75 std::pair
< std::size_t, int > Connection::read(std::vector
< char > & buffer
)
77 AGELENA_DEBUG_1("std::pair< std::size_t, int > Connection(%1%)::read(std::vector< char > & buffer)", this);
78 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
80 throw Exceptions::Connection::UnusableConnection();
84 std::size_t bytes_read_into_buffer(0);
85 int reason(no_error__
);
86 bool continue_until_retry(false);
87 std::vector
< char >::size_type
start_offset(0);
90 continue_until_retry
= true;
91 buffer
.resize(default_read_block_size__
);
94 { /* no special handling here */ }
96 const std::size_t bytes_requested(buffer
.size() - start_offset
);
97 assert(bytes_requested
< INT_MAX
);
98 int bytes_read(bio_
->read(&(buffer
[start_offset
]), bytes_requested
));
99 if (bytes_read
< static_cast< int >(bytes_requested
))
101 bytes_read_into_buffer
+= bytes_read
;
102 if (!bio_
->shouldRetry() && bytes_read
<= 0)
104 throw Exceptions::Connection::ConnectionClosed();
108 reason
|= should_retry__
;
109 if (bio_
->shouldRead())
110 reason
|= should_read__
;
112 { /* no reading requested */ }
113 if (bio_
->shouldWrite())
114 reason
|= should_write__
;
116 { /* no write requested */ }
119 else if (continue_until_retry
)
121 bytes_read_into_buffer
= buffer
.size();
122 start_offset
= buffer
.size();
123 buffer
.resize(buffer
.size() + default_read_block_size__
);
124 goto read_entry_point
;
128 bytes_read_into_buffer
= buffer
.size();
131 return std::make_pair(bytes_read_into_buffer
, reason
);
134 bool Connection::poll() const
136 AGELENA_DEBUG_1("bool Connection(%1%)::poll() const", this);
137 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
139 throw Exceptions::Connection::UnusableConnection();
141 { /* all is well */ }
145 void Connection::close()
147 AGELENA_DEBUG_1("void Connection(%1%)::close()", this);
148 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
152 bool Connection::usesSSL() const
154 AGELENA_DEBUG_1("bool Connection(%1%)::usesSSL() const", this);
155 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
157 throw Exceptions::Connection::UnusableConnection();
159 { /* all is well */ }
160 return bio_
->usesSSL();
163 void Connection::setNewDataHandler(Handlers::NewDataHandler
& handler
, OnErrorCallback on_error_callback
/* = OnErrorCallback()*/)
165 AGELENA_DEBUG_1("void Connection(%1%)::setNewDataHandler(Handlers::NewDataHandler & handler, OnErrorCallback on_error_callback/* = OnErrorCallback()*/)", this);
166 if (on_error_callback
.empty())
168 boost::recursive_mutex::scoped_lock
sentinel(error_handler_lock_
);
169 on_error_callback
= boost::bind(&Connection::onError_
, this);
172 { /* use the one provided */ }
173 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
174 data_handler_
= &handler
;
176 throw Exceptions::Connection::UnusableConnection();
178 { /* all is well */ }
179 Private::ConnectionHandler::getInstance().attach(fd_
, boost::bind(&Connection::onDataReady_
, this), on_error_callback
);
182 void Connection::clearNewDataHandler()
184 AGELENA_DEBUG_1("void Connection(%1%)::clearNewDataHandler()", this);
185 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
189 throw Exceptions::Connection::UnusableConnection();
191 { /* all is well */ }
192 Private::ConnectionHandler::getInstance().detach(fd_
);
196 { /* nothing to clear */ }
199 void Connection::setErrorHandler(const OnErrorCallback
& on_error_callback
)
201 boost::recursive_mutex::scoped_lock
sentinel(error_handler_lock_
);
202 error_handler_
= on_error_callback
;
205 void Connection::clearErrorHandler()
207 boost::recursive_mutex::scoped_lock
sentinel(error_handler_lock_
);
208 error_handler_
.clear();
211 Details::Address
Connection::getPeerAddress() const
213 AGELENA_DEBUG_1("Details::Address Connection(%1%)::getPeerAddress() const", this);
214 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
216 throw Exceptions::Connection::UnusableConnection();
218 { /* all is well */ }
220 ::sockaddr_in peer_addr
;
221 socklen_t
peer_addr_size(sizeof(peer_addr
));
222 ::getpeername(socket_fd
, (::sockaddr
*)&peer_addr
, &peer_addr_size
);
223 return Details::Address(peer_addr
.sin_addr
.s_addr
);
226 void Connection::setReadTimeout(unsigned int seconds
)
228 AGELENA_DEBUG_2("void Connection(%1%)::setReadTimeout(unsigned int %2%)", this, seconds
);
229 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
231 throw Exceptions::Connection::UnusableConnection();
233 { /* all is well */ }
235 ::setsockopt(fd_
, SOL_SOCKET
, SO_RCVTIMEO
, (const char *)&seconds
, sizeof(unsigned int));
238 Connection::Connection(Scorpion::BIO
* bio
)
243 AGELENA_DEBUG_1("Connection(%1%)::Connection(Scorpion::BIO * bio)", this);
246 void Connection::onDataReady_()
248 AGELENA_DEBUG_1("void Connection(%1%)::onDataReady_()", this);
250 (*data_handler_
)(shared_from_this());
255 void Connection::onError_()
257 boost::recursive_mutex::scoped_lock
sentinel(error_handler_lock_
);