1 #include "Connection.h"
5 #if defined(_WIN32) && ! defined(__CYGWIN__)
10 #include <sys/socket.h>
11 #include <netinet/in.h>
14 #include <boost/bind.hpp>
15 #include <Scorpion/BIO.h>
16 #include <Agelena/Logger.h>
17 #include "Private/ConnectionHandler.h"
18 #include "Handlers/NewDataHandler.h"
19 #include "Exceptions/Connection.h"
21 #define AGELENA_COMPONENT_ID 0x5350493eUL
22 #define AGELENA_SUBCOMPONENT_ID 0x434e584eUL
26 Connection::~Connection()
28 AGELENA_DEBUG_1("Connection(%1%)::~Connection()", this);
29 clearNewDataHandler();
32 std::pair
< std::size_t, int > Connection::write(const std::vector
< char > & data
)
34 AGELENA_DEBUG_1("std::pair< std::size_t, int > Connection(%1%)::write(const std::vector< char > & data)", this);
35 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
37 throw Exceptions::Connection::UnusableConnection();
41 assert(data
.size() < INT_MAX
);
43 int reason(no_error__
);
47 written
= bio_
->write(&(data
[0]), data
.size());
49 throw Exceptions::Connection::MethodNotImplemented();
50 else if (written
< static_cast< int >(data
.size()))
52 if (!bio_
->shouldRetry())
54 throw Exceptions::Connection::ConnectionClosed();
57 { /* non-permanent error */ }
58 reason
|= should_retry__
;
59 if (bio_
->shouldRead())
60 reason
|= should_read__
;
62 { /* no reading requested */ }
63 if (bio_
->shouldWrite())
64 reason
|= should_write__
;
66 { /* no write requested */ }
72 { /* nothing to do */ }
74 return std::make_pair(written
, reason
);
77 std::pair
< std::size_t, int > Connection::read(std::vector
< char > & buffer
)
79 AGELENA_DEBUG_1("std::pair< std::size_t, int > Connection(%1%)::read(std::vector< char > & buffer)", this);
80 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
82 throw Exceptions::Connection::UnusableConnection();
86 std::size_t bytes_read_into_buffer(0);
87 int reason(no_error__
);
88 bool continue_until_retry(false);
89 std::vector
< char >::size_type
start_offset(0);
92 continue_until_retry
= true;
93 buffer
.resize(default_read_block_size__
);
96 { /* no special handling here */ }
98 const std::size_t bytes_requested(buffer
.size() - start_offset
);
99 assert(bytes_requested
< INT_MAX
);
100 int bytes_read(bio_
->read(&(buffer
[start_offset
]), bytes_requested
));
101 if (bytes_read
< static_cast< int >(bytes_requested
))
103 bytes_read_into_buffer
+= bytes_read
;
104 if (!bio_
->shouldRetry() && bytes_read
<= 0)
106 throw Exceptions::Connection::ConnectionClosed();
110 reason
|= should_retry__
;
111 if (bio_
->shouldRead())
112 reason
|= should_read__
;
114 { /* no reading requested */ }
115 if (bio_
->shouldWrite())
116 reason
|= should_write__
;
118 { /* no write requested */ }
121 else if (continue_until_retry
)
123 bytes_read_into_buffer
= buffer
.size();
124 start_offset
= buffer
.size();
125 buffer
.resize(buffer
.size() + default_read_block_size__
);
126 goto read_entry_point
;
130 bytes_read_into_buffer
= buffer
.size();
133 return std::make_pair(bytes_read_into_buffer
, reason
);
136 bool Connection::poll() const
138 AGELENA_DEBUG_1("bool Connection(%1%)::poll() const", this);
139 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
141 throw Exceptions::Connection::UnusableConnection();
143 { /* all is well */ }
147 void Connection::close()
149 AGELENA_DEBUG_1("void Connection(%1%)::close()", this);
150 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
154 bool Connection::usesSSL() const
156 AGELENA_DEBUG_1("bool Connection(%1%)::usesSSL() const", this);
157 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
159 throw Exceptions::Connection::UnusableConnection();
161 { /* all is well */ }
162 return bio_
->usesSSL();
165 void Connection::setNewDataHandler(Handlers::NewDataHandler
& handler
, OnErrorCallback on_error_callback
/* = OnErrorCallback()*/)
167 AGELENA_DEBUG_1("void Connection(%1%)::setNewDataHandler(Handlers::NewDataHandler & handler, OnErrorCallback on_error_callback/* = OnErrorCallback()*/)", this);
168 if (on_error_callback
.empty())
170 boost::recursive_mutex::scoped_lock
sentinel(error_handler_lock_
);
171 on_error_callback
= boost::bind(&Connection::onError_
, this);
174 { /* use the one provided */ }
175 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
176 data_handler_
= &handler
;
178 throw Exceptions::Connection::UnusableConnection();
180 { /* all is well */ }
181 Private::ConnectionHandler::getInstance().attach(fd_
, boost::bind(&Connection::onDataReady_
, this), on_error_callback
);
184 void Connection::clearNewDataHandler()
186 AGELENA_DEBUG_1("void Connection(%1%)::clearNewDataHandler()", this);
187 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
191 throw Exceptions::Connection::UnusableConnection();
193 { /* all is well */ }
194 Private::ConnectionHandler::getInstance().detach(fd_
);
198 { /* nothing to clear */ }
201 void Connection::setErrorHandler(const OnErrorCallback
& on_error_callback
)
203 boost::recursive_mutex::scoped_lock
sentinel(error_handler_lock_
);
204 error_handler_
= on_error_callback
;
207 void Connection::clearErrorHandler()
209 boost::recursive_mutex::scoped_lock
sentinel(error_handler_lock_
);
210 error_handler_
.clear();
213 Details::Address
Connection::getPeerAddress() const
215 AGELENA_DEBUG_1("Details::Address Connection(%1%)::getPeerAddress() const", this);
216 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
218 throw Exceptions::Connection::UnusableConnection();
220 { /* all is well */ }
222 ::sockaddr_in peer_addr
;
223 socklen_t
peer_addr_size(sizeof(peer_addr
));
224 ::getpeername(socket_fd
, (::sockaddr
*)&peer_addr
, &peer_addr_size
);
225 return Details::Address(peer_addr
.sin_addr
.s_addr
);
228 void Connection::setReadTimeout(unsigned int seconds
)
230 AGELENA_DEBUG_2("void Connection(%1%)::setReadTimeout(unsigned int %2%)", this, seconds
);
231 boost::recursive_mutex::scoped_lock
sentinel(bio_lock_
);
233 throw Exceptions::Connection::UnusableConnection();
235 { /* all is well */ }
237 ::setsockopt(socket_fd
, SOL_SOCKET
, SO_RCVTIMEO
, (const char *)&seconds
, sizeof(unsigned int));
240 Connection::Connection(Scorpion::BIO
* bio
)
245 AGELENA_DEBUG_1("Connection(%1%)::Connection(Scorpion::BIO * bio)", this);
248 void Connection::onDataReady_()
250 AGELENA_DEBUG_1("void Connection(%1%)::onDataReady_()", this);
252 (*data_handler_
)(shared_from_this());
257 void Connection::onError_()
259 boost::recursive_mutex::scoped_lock
sentinel(error_handler_lock_
);