Whitespace
[amule.git] / src / LibSocketAsio.cpp
blob4bbd8e257c4abc24d8c35b2b4beca4d48d8eaf6a
1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (c) 2011-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2011-2011 Stu Redman ( admin@amule.org / http://www.amule.org )
6 //
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
9 // respective authors.
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #ifdef HAVE_CONFIG_H
27 # include "config.h" // Needed for HAVE_BOOST_SOURCES
28 #endif
30 #ifdef _MSC_VER
31 #define _WIN32_WINNT 0x0501 // Boost complains otherwise
32 #endif
34 // Windows requires that Boost headers are included before wx headers.
35 // This works if precompiled headers are disabled for this file.
37 #define BOOST_ALL_NO_LIB
39 // Suppress warning caused by faulty boost/preprocessor/config/config.hpp in Boost 1.49
40 #if defined __GNUC__ && ! defined __GXX_EXPERIMENTAL_CXX0X__ && __cplusplus < 201103L
41 #define BOOST_PP_VARIADICS 0
42 #endif
44 #include <algorithm> // Needed for std::min - Boost up to 1.54 fails to compile with MSVC 2013 otherwise
46 #include <boost/asio.hpp>
47 #include <boost/bind.hpp>
48 #include <boost/version.hpp>
51 // Do away with building Boost.System, adding lib paths...
52 // Just include the single file and be done.
54 #ifdef HAVE_BOOST_SOURCES
55 # include <boost/../libs/system/src/error_code.cpp>
56 #else
57 # include <boost/system/error_code.hpp>
58 #endif
60 #include "LibSocket.h"
61 #include <wx/thread.h> // wxMutex
62 #include <wx/intl.h> // _()
63 #include <common/Format.h> // Needed for CFormat
64 #include "Logger.h"
65 #include "GuiEvents.h"
66 #include "amuleIPV4Address.h"
67 #include "MuleUDPSocket.h"
68 #include "OtherFunctions.h" // DeleteContents, MuleBoostVersion
69 #include "ScopedPtr.h"
70 #include <common/Macros.h>
72 using namespace boost::asio;
73 using namespace boost::system; // for error_code
74 static io_service s_io_service;
76 // Number of threads in the Asio thread pool
77 const int CAsioService::m_numberOfThreads = 4;
79 /**
80 * ASIO Client TCP socket implementation
83 class CamuleIPV4Endpoint : public ip::tcp::endpoint {
84 public:
85 CamuleIPV4Endpoint() {}
87 CamuleIPV4Endpoint(const CamuleIPV4Endpoint & impl) : ip::tcp::endpoint(impl) {}
88 CamuleIPV4Endpoint(const ip::tcp::endpoint & ep) { * this = ep; }
89 CamuleIPV4Endpoint(const ip::udp::endpoint & ep) { address(ep.address()); port(ep.port()); }
91 const CamuleIPV4Endpoint& operator = (const ip::tcp::endpoint & ep)
93 * (ip::tcp::endpoint *) this = ep;
94 return *this;
98 class CAsioSocketImpl
100 public:
101 // cppcheck-suppress uninitMemberVar m_readBufferPtr
102 CAsioSocketImpl(CLibSocket * libSocket) :
103 m_libSocket(libSocket),
104 m_strand(s_io_service),
105 m_timer(s_io_service)
107 m_OK = false;
108 m_blocksRead = false;
109 m_blocksWrite = false;
110 m_ErrorCode = 0;
111 m_readBuffer = NULL;
112 m_readBufferSize = 0;
113 m_readPending = false;
114 m_readBufferContent = 0;
115 m_eventPending = false;
116 m_port = 0;
117 m_sendBuffer = NULL;
118 m_connected = false;
119 m_closed = false;
120 m_isDestroying = false;
121 m_proxyState = false;
122 m_notify = true;
123 m_sync = false;
124 m_IP = wxT("?");
125 m_IPint = 0;
126 m_socket = new ip::tcp::socket(s_io_service);
128 // Set socket to non blocking
129 m_socket->non_blocking();
132 ~CAsioSocketImpl()
134 delete[] m_readBuffer;
135 delete[] m_sendBuffer;
138 void Notify(bool notify)
140 m_notify = notify;
143 bool Connect(const amuleIPV4Address& adr, bool wait)
145 if (!m_proxyState) {
146 SetIp(adr);
148 m_port = adr.Service();
149 m_closed = false;
150 m_OK = false;
151 m_sync = !m_notify; // set this once for the whole lifetime of the socket
152 AddDebugLogLineF(logAsio, CFormat(wxT("Connect %s %p")) % m_IP % this);
154 if (wait || m_sync) {
155 error_code ec;
156 m_socket->connect(adr.GetEndpoint(), ec);
157 m_OK = !ec;
158 m_connected = m_OK;
159 return m_OK;
160 } else {
161 m_socket->async_connect(adr.GetEndpoint(),
162 m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleConnect, this, placeholders::error)));
163 // m_OK and return are false because we are not connected yet
164 return false;
168 bool IsConnected() const
170 return m_connected;
173 // For wxSocketClient, Ok won't return true unless the client is connected to a server.
174 bool IsOk() const
176 return m_OK;
179 bool IsDestroying() const
181 return m_isDestroying;
184 // Returns the actual error code
185 int LastError() const
187 return m_ErrorCode;
190 // Is reading blocked?
191 bool BlocksRead() const
193 return m_blocksRead;
196 // Is writing blocked?
197 bool BlocksWrite() const
199 return m_blocksWrite;
202 // Problem: wx sends an event when data gets available, so first there is an event, then Read() is called
203 // Asio can read async with callback, so you first read, then you get an event.
204 // Strategy:
205 // - Read some data in background into a buffer
206 // - Callback posts event when something is there
207 // - Read data from buffer
208 // - If data is exhausted, start reading more in background
209 // - If not, post another event (making sure events don't pile up though)
210 uint32 Read(char * buf, uint32 bytesToRead)
212 if (bytesToRead == 0) { // huh?
213 return 0;
216 if (m_sync) {
217 return ReadSync(buf, bytesToRead);
220 if (m_ErrorCode) {
221 AddDebugLogLineF(logAsio, CFormat(wxT("Read1 %s %d - Error")) % m_IP % bytesToRead);
222 return 0;
225 if (m_readPending // Background read hasn't completed.
226 || m_readBufferContent == 0) { // shouldn't be if it's not pending
228 m_blocksRead = true;
229 AddDebugLogLineF(logAsio, CFormat(wxT("Read1 %s %d - Block")) % m_IP % bytesToRead);
230 return 0;
233 m_blocksRead = false; // shouldn't be needed
235 // Read from our buffer
236 uint32 readCache = std::min(m_readBufferContent, bytesToRead);
237 memcpy(buf, m_readBufferPtr, readCache);
238 m_readBufferContent -= readCache;
239 m_readBufferPtr += readCache;
241 AddDebugLogLineF(logAsio, CFormat(wxT("Read2 %s %d - %d")) % m_IP % bytesToRead % readCache);
242 if (m_readBufferContent) {
243 // Data left, post another event
244 PostReadEvent(1);
245 } else {
246 // Nothing left, read more
247 StartBackgroundRead();
249 return readCache;
253 // Make a copy of the data and send it in background
254 // - unless a background send is already going on
255 uint32 Write(const void * buf, uint32 nbytes)
257 if (m_sync) {
258 return WriteSync(buf, nbytes);
261 if (m_sendBuffer) {
262 m_blocksWrite = true;
263 AddDebugLogLineF(logAsio, CFormat(wxT("Write blocks %d %p %s")) % nbytes % m_sendBuffer % m_IP);
264 return 0;
266 AddDebugLogLineF(logAsio, CFormat(wxT("Write %d %s")) % nbytes % m_IP);
267 m_sendBuffer = new char[nbytes];
268 memcpy(m_sendBuffer, buf, nbytes);
269 m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchWrite, this, nbytes));
270 m_ErrorCode = 0;
271 return nbytes;
275 void Close()
277 if (!m_closed) {
278 m_closed = true;
279 m_connected = false;
280 if (m_sync || s_io_service.stopped()) {
281 DispatchClose();
282 } else {
283 m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchClose, this));
289 void Destroy()
291 if (m_isDestroying) {
292 AddDebugLogLineC(logAsio, CFormat(wxT("Destroy() already dying socket %p %p %s")) % m_libSocket % this % m_IP);
293 return;
295 m_isDestroying = true;
296 AddDebugLogLineF(logAsio, CFormat(wxT("Destroy() %p %p %s")) % m_libSocket % this % m_IP);
297 Close();
298 if (m_sync || s_io_service.stopped()) {
299 HandleDestroy();
300 } else {
301 // Close prevents creation of any more callbacks, but does not clear any callbacks already
302 // sitting in Asio's event queue (I have seen such a crash).
303 // So create a delay timer so they can be called until core is notified.
304 m_timer.expires_from_now(boost::posix_time::seconds(1));
305 m_timer.async_wait(m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleDestroy, this)));
310 wxString GetPeer()
312 return m_IP;
315 uint32 GetPeerInt()
317 return m_IPint;
321 // Bind socket to local endpoint if user wants to choose the local address
323 void SetLocal(const amuleIPV4Address& local)
325 error_code ec;
326 if (!m_socket->is_open()) {
327 // Socket is usually still closed when this is called
328 m_socket->open(boost::asio::ip::tcp::v4(), ec);
329 if (ec) {
330 AddDebugLogLineC(logAsio, CFormat(wxT("Can't open socket : %s")) % ec.message());
334 // We are using random (OS-defined) local ports.
335 // To set a constant output port, first call
336 // m_socket->set_option(socket_base::reuse_address(true));
337 // and then set the endpoint's port to it.
339 CamuleIPV4Endpoint endpoint(local.GetEndpoint());
340 endpoint.port(0);
341 m_socket->bind(endpoint, ec);
342 if (ec) {
343 AddDebugLogLineC(logAsio, CFormat(wxT("Can't bind socket to local endpoint %s : %s"))
344 % local.IPAddress() % ec.message());
345 } else {
346 AddDebugLogLineF(logAsio, CFormat(wxT("Bound socket to local endpoint %s")) % local.IPAddress());
351 void EventProcessed()
353 m_eventPending = false;
356 void SetWrapSocket(CLibSocket * socket)
358 m_libSocket = socket;
359 // Also do some setting up
360 m_OK = true;
361 m_connected = true;
362 // Start reading
363 StartBackgroundRead();
366 bool UpdateIP()
368 error_code ec;
369 amuleIPV4Address addr = CamuleIPV4Endpoint(m_socket->remote_endpoint(ec));
370 if (SetError(ec)) {
371 AddDebugLogLineN(logAsio, CFormat(wxT("UpdateIP failed %p %s")) % this % ec.message());
372 return false;
374 SetIp(addr);
375 m_port = addr.Service();
376 AddDebugLogLineF(logAsio, CFormat(wxT("UpdateIP %s %d %p")) % m_IP % m_port % this);
377 return true;
380 const wxChar * GetIP() const { return m_IP; }
381 uint16 GetPort() const { return m_port; }
383 ip::tcp::socket & GetAsioSocket()
385 return * m_socket;
388 bool GetProxyState() const { return m_proxyState; }
390 void SetProxyState(bool state, const amuleIPV4Address * adr)
392 m_proxyState = state;
393 if (state) {
394 // Start. Get the true IP for logging.
395 wxASSERT(adr);
396 SetIp(*adr);
397 AddDebugLogLineF(logAsio, CFormat(wxT("SetProxyState to proxy %s")) % m_IP);
398 } else {
399 // Transition from proxy to normal mode
400 AddDebugLogLineF(logAsio, CFormat(wxT("SetProxyState to normal %s")) % m_IP);
401 m_ErrorCode = 0;
405 private:
407 // Dispatch handlers
408 // Access to m_socket is all bundled in the thread running s_io_service to avoid
409 // concurrent access to the socket from several threads.
410 // So once things are running (after connect), all access goes through one of these handlers.
412 void DispatchClose()
414 error_code ec;
415 m_socket->close(ec);
416 if (ec) {
417 AddDebugLogLineC(logAsio, CFormat(wxT("Close error %s %s")) % m_IP % ec.message());
418 } else {
419 AddDebugLogLineF(logAsio, CFormat(wxT("Closed %s")) % m_IP);
423 void DispatchBackgroundRead()
425 AddDebugLogLineF(logAsio, CFormat(wxT("DispatchBackgroundRead %s")) % m_IP);
426 m_socket->async_read_some(null_buffers(),
427 m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleRead, this, placeholders::error)));
430 void DispatchWrite(uint32 nbytes)
432 async_write(*m_socket, buffer(m_sendBuffer, nbytes),
433 m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleSend, this, placeholders::error, placeholders::bytes_transferred)));
437 // Completion handlers for async requests
440 void HandleConnect(const error_code& err)
442 m_OK = !err;
443 AddDebugLogLineF(logAsio, CFormat(wxT("HandleConnect %d %s")) % m_OK % m_IP);
444 if (m_isDestroying) {
445 AddDebugLogLineF(logAsio, CFormat(wxT("HandleConnect: socket pending for deletion %s")) % m_IP);
446 } else {
447 CoreNotify_LibSocketConnect(m_libSocket, err.value());
448 if (m_OK) {
449 // After connect also send a OUTPUT event to show data is available
450 CoreNotify_LibSocketSend(m_libSocket, 0);
451 // Start reading
452 StartBackgroundRead();
453 m_connected = true;
458 void HandleSend(const error_code& err, size_t DEBUG_ONLY(bytes_transferred) )
460 delete[] m_sendBuffer;
461 m_sendBuffer = NULL;
463 if (m_isDestroying) {
464 AddDebugLogLineF(logAsio, CFormat(wxT("HandleSend: socket pending for deletion %s")) % m_IP);
465 } else {
466 if (SetError(err)) {
467 AddDebugLogLineN(logAsio, CFormat(wxT("HandleSend Error %d %s")) % bytes_transferred % m_IP);
468 PostLostEvent();
469 } else {
470 AddDebugLogLineF(logAsio, CFormat(wxT("HandleSend %d %s")) % bytes_transferred % m_IP);
471 m_blocksWrite = false;
472 CoreNotify_LibSocketSend(m_libSocket, m_ErrorCode);
477 void HandleRead(const error_code & ec)
479 if (m_isDestroying) {
480 AddDebugLogLineF(logAsio, CFormat(wxT("HandleRead: socket pending for deletion %s")) % m_IP);
483 if (SetError(ec)) {
484 // This is what we get in Windows when a connection gets closed from remote.
485 AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError %s %s")) % m_IP % ec.message());
486 PostLostEvent();
487 return;
490 error_code ec2;
491 uint32 avail = m_socket->available(ec2);
492 if (SetError(ec2)) {
493 AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError available %d %s %s")) % avail % m_IP % ec2.message());
494 PostLostEvent();
495 return;
497 if (avail == 0) {
498 // This is what we get in Linux when a connection gets closed from remote.
499 AddDebugLogLineF(logAsio, CFormat(wxT("HandleReadError nothing available %s")) % m_IP);
500 SetError();
501 PostLostEvent();
502 return;
504 AddDebugLogLineF(logAsio, CFormat(wxT("HandleRead %d %s")) % avail % m_IP);
506 // adjust (or create) our read buffer
507 if (m_readBufferSize < avail) {
508 delete[] m_readBuffer;
509 m_readBuffer = new char[avail];
510 m_readBufferSize = avail;
512 m_readBufferPtr = m_readBuffer;
514 // read available data
515 m_readBufferContent = m_socket->read_some(buffer(m_readBuffer, avail), ec2);
516 if (SetError(ec2) || m_readBufferContent == 0) {
517 AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError read %d %s %s")) % m_readBufferContent % m_IP % ec2.message());
518 PostLostEvent();
519 return;
522 m_readPending = false;
523 m_blocksRead = false;
524 PostReadEvent(2);
527 void HandleDestroy()
529 AddDebugLogLineF(logAsio, CFormat(wxT("HandleDestroy() %p %p %s")) % m_libSocket % this % m_IP);
530 CoreNotify_LibSocketDestroy(m_libSocket);
535 // Other functions
538 void StartBackgroundRead()
540 m_readPending = true;
541 m_readBufferContent = 0;
542 m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchBackgroundRead, this));
545 void PostReadEvent(int DEBUG_ONLY(from) )
547 if (!m_eventPending) {
548 AddDebugLogLineF(logAsio, CFormat(wxT("Post read event %d %s")) % from % m_IP);
549 m_eventPending = true;
550 CoreNotify_LibSocketReceive(m_libSocket, m_ErrorCode);
554 void PostLostEvent()
556 if (!m_isDestroying && !m_closed) {
557 CoreNotify_LibSocketLost(m_libSocket);
561 void SetError()
563 m_ErrorCode = 2;
566 bool SetError(const error_code & err)
568 m_ErrorCode = err.value();
569 return m_ErrorCode != errc::success;
573 // Synchronous sockets (amulecmd)
575 uint32 ReadSync(char * buf, uint32 bytesToRead)
577 error_code ec;
578 uint32 received = read(*m_socket, buffer(buf, bytesToRead), ec);
579 SetError(ec);
580 return received;
583 uint32 WriteSync(const void * buf, uint32 nbytes)
585 error_code ec;
586 uint32 sent = write(*m_socket, buffer(buf, nbytes), ec);
587 SetError(ec);
588 return sent;
592 // Access to even const & wxString is apparently not thread-safe.
593 // Locks are set/removed in wx and reference counts can go astray.
594 // So store our IP string in a wxString which is used nowhere.
595 // Store a pointer to its string buffer as well and use THAT everywhere.
597 void SetIp(const amuleIPV4Address& adr)
599 m_IPstring = adr.IPAddress();
600 m_IP = m_IPstring.c_str();
601 m_IPint = StringIPtoUint32(m_IPstring);
604 CLibSocket * m_libSocket;
605 ip::tcp::socket * m_socket;
606 // remote IP
607 wxString m_IPstring; // as String (use nowhere because of threading!)
608 const wxChar * m_IP; // as char* (use in debug logs)
609 uint32 m_IPint; // as int
610 uint16 m_port; // remote port
611 bool m_OK;
612 int m_ErrorCode;
613 bool m_blocksRead;
614 bool m_blocksWrite;
615 char * m_readBuffer;
616 uint32 m_readBufferSize;
617 char * m_readBufferPtr;
618 bool m_readPending;
619 uint32 m_readBufferContent;
620 bool m_eventPending;
621 char * m_sendBuffer;
622 io_service::strand m_strand; // handle synchronisation in io_service thread pool
623 deadline_timer m_timer;
624 bool m_connected;
625 bool m_closed;
626 bool m_isDestroying; // true if Destroy() was called
627 bool m_proxyState;
628 bool m_notify; // set by Notify()
629 bool m_sync; // copied from !m_notify on Connect()
634 * Library socket wrapper
637 CLibSocket::CLibSocket(int /* flags */)
639 m_aSocket = new CAsioSocketImpl(this);
643 CLibSocket::~CLibSocket()
645 AddDebugLogLineF(logAsio, CFormat(wxT("~CLibSocket() %p %p %s")) % this % m_aSocket % m_aSocket->GetIP());
646 delete m_aSocket;
650 bool CLibSocket::Connect(const amuleIPV4Address& adr, bool wait)
652 return m_aSocket->Connect(adr, wait);
656 bool CLibSocket::IsConnected() const
658 return m_aSocket->IsConnected();
662 bool CLibSocket::IsOk() const
664 return m_aSocket->IsOk();
668 wxString CLibSocket::GetPeer()
670 return m_aSocket->GetPeer();
674 uint32 CLibSocket::GetPeerInt()
676 return m_aSocket->GetPeerInt();
680 void CLibSocket::Destroy()
682 m_aSocket->Destroy();
686 bool CLibSocket::IsDestroying() const
688 return m_aSocket->IsDestroying();
692 void CLibSocket::Notify(bool notify)
694 m_aSocket->Notify(notify);
698 uint32 CLibSocket::Read(void * buffer, uint32 nbytes)
700 return m_aSocket->Read((char *) buffer, nbytes);
704 uint32 CLibSocket::Write(const void * buffer, uint32 nbytes)
706 return m_aSocket->Write(buffer, nbytes);
710 void CLibSocket::Close()
712 m_aSocket->Close();
716 int CLibSocket::LastError() const
718 return m_aSocket->LastError();
722 void CLibSocket::SetLocal(const amuleIPV4Address& local)
724 m_aSocket->SetLocal(local);
729 // new Stuff
731 bool CLibSocket::BlocksRead() const
733 return m_aSocket->BlocksRead();
737 bool CLibSocket::BlocksWrite() const
739 return m_aSocket->BlocksWrite();
743 void CLibSocket::EventProcessed()
745 m_aSocket->EventProcessed();
749 void CLibSocket::LinkSocketImpl(class CAsioSocketImpl * socket)
751 delete m_aSocket;
752 m_aSocket = socket;
753 m_aSocket->SetWrapSocket(this);
757 const wxChar * CLibSocket::GetIP() const
759 return m_aSocket->GetIP();
763 bool CLibSocket::GetProxyState() const
765 return m_aSocket->GetProxyState();
769 void CLibSocket::SetProxyState(bool state, const amuleIPV4Address * adr)
771 m_aSocket->SetProxyState(state, adr);
776 * ASIO TCP socket server
779 class CAsioSocketServerImpl : public ip::tcp::acceptor
781 public:
782 CAsioSocketServerImpl(const amuleIPV4Address & adr, CLibSocketServer * libSocketServer)
783 : ip::tcp::acceptor(s_io_service),
784 m_libSocketServer(libSocketServer),
785 m_currentSocket(NULL),
786 m_strand(s_io_service)
788 m_ok = false;
789 m_socketAvailable = false;
791 try {
792 open(adr.GetEndpoint().protocol());
793 set_option(ip::tcp::acceptor::reuse_address(true));
794 bind(adr.GetEndpoint());
795 listen();
796 StartAccept();
797 m_ok = true;
798 AddDebugLogLineN(logAsio, CFormat(wxT("CAsioSocketServerImpl bind to %s %d")) % adr.IPAddress() % adr.Service());
799 } catch (const system_error& err) {
800 AddDebugLogLineC(logAsio, CFormat(wxT("CAsioSocketServerImpl bind to %s %d failed - %s")) % adr.IPAddress() % adr.Service() % err.code().message());
804 ~CAsioSocketServerImpl()
808 // For wxSocketServer, Ok will return true if the server could bind to the specified address and is already listening for new connections.
809 bool IsOk() const { return m_ok; }
811 void Close() { close(); }
813 bool AcceptWith(CLibSocket & socket)
815 if (!m_socketAvailable) {
816 AddDebugLogLineF(logAsio, wxT("AcceptWith: nothing there"));
817 return false;
820 // return the socket we received
821 socket.LinkSocketImpl(m_currentSocket.release());
823 // check if we have another socket ready for reception
824 m_currentSocket.reset(new CAsioSocketImpl(NULL));
825 error_code ec;
826 // async_accept does not work if server is non-blocking
827 // temporarily switch it to non-blocking
828 non_blocking(true);
829 // we are set to non-blocking, so this returns right away
830 accept(m_currentSocket->GetAsioSocket(), ec);
831 // back to blocking
832 non_blocking(false);
833 if (ec || !m_currentSocket->UpdateIP()) {
834 // nothing there
835 m_socketAvailable = false;
836 // start getting another one
837 StartAccept();
838 AddDebugLogLineF(logAsio, wxT("AcceptWith: ok, getting another socket in background"));
839 } else {
840 // we got another socket right away
841 m_socketAvailable = true; // it is already true, but this improves readability
842 AddDebugLogLineF(logAsio, wxT("AcceptWith: ok, another socket is available"));
843 // aMule actually doesn't need a notification as it polls the listen socket.
844 // amuleweb does need it though
845 CoreNotify_ServerTCPAccept(m_libSocketServer);
848 return true;
851 bool SocketAvailable() const { return m_socketAvailable; }
853 private:
855 void StartAccept()
857 m_currentSocket.reset(new CAsioSocketImpl(NULL));
858 async_accept(m_currentSocket->GetAsioSocket(),
859 m_strand.wrap(boost::bind(& CAsioSocketServerImpl::HandleAccept, this, placeholders::error)));
862 void HandleAccept(const error_code& error)
864 if (error) {
865 AddDebugLogLineC(logAsio, CFormat(wxT("Error in HandleAccept: %s")) % error.message());
866 } else {
867 if (m_currentSocket->UpdateIP()) {
868 AddDebugLogLineN(logAsio, CFormat(wxT("HandleAccept received a connection from %s:%d"))
869 % m_currentSocket->GetIP() % m_currentSocket->GetPort());
870 m_socketAvailable = true;
871 CoreNotify_ServerTCPAccept(m_libSocketServer);
872 return;
873 } else {
874 AddDebugLogLineN(logAsio, wxT("Error in HandleAccept: invalid socket"));
877 // We were not successful. Try again.
878 // Post the request to the event queue to make sure it doesn't get called immediately.
879 m_strand.post(boost::bind(& CAsioSocketServerImpl::StartAccept, this));
882 // The wrapper object
883 CLibSocketServer * m_libSocketServer;
884 // Startup ok
885 bool m_ok;
886 // The last socket that connected to us
887 CScopedPtr<CAsioSocketImpl> m_currentSocket;
888 // Is there a socket available?
889 bool m_socketAvailable;
890 io_service::strand m_strand; // handle synchronisation in io_service thread pool
894 CLibSocketServer::CLibSocketServer(const amuleIPV4Address& adr, int /* flags */)
896 m_aServer = new CAsioSocketServerImpl(adr, this);
900 CLibSocketServer::~CLibSocketServer()
902 delete m_aServer;
906 // Accepts an incoming connection request, and creates a new CLibSocket object which represents the server-side of the connection.
907 // Only used in CamuleApp::ListenSocketHandler() and we don't get there.
908 CLibSocket * CLibSocketServer::Accept(bool /* wait */)
910 wxFAIL;
911 return NULL;
915 // Accept an incoming connection using the specified socket object.
916 bool CLibSocketServer::AcceptWith(CLibSocket & socket, bool WXUNUSED_UNLESS_DEBUG(wait) )
918 wxASSERT(!wait);
919 return m_aServer->AcceptWith(socket);
923 bool CLibSocketServer::IsOk() const
925 return m_aServer->IsOk();
929 void CLibSocketServer::Close()
931 m_aServer->Close();
935 bool CLibSocketServer::SocketAvailable()
937 return m_aServer->SocketAvailable();
942 * ASIO UDP socket implementation
945 class CAsioUDPSocketImpl
947 private:
948 // UDP data block
949 class CUDPData {
950 public:
951 char * buffer;
952 uint32 size;
953 amuleIPV4Address ipadr;
955 CUDPData(const void * src, uint32 _size, amuleIPV4Address adr) :
956 size(_size), ipadr(adr)
958 buffer = new char[size];
959 memcpy(buffer, src, size);
962 ~CUDPData()
964 delete[] buffer;
968 public:
969 CAsioUDPSocketImpl(const amuleIPV4Address &address, int /* flags */, CLibUDPSocket * libSocket) :
970 m_libSocket(libSocket),
971 m_strand(s_io_service),
972 m_timer(s_io_service),
973 m_address(address)
975 m_muleSocket = NULL;
976 m_socket = NULL;
977 m_readBuffer = new char[CMuleUDPSocket::UDP_BUFFER_SIZE];
978 m_OK = true;
979 CreateSocket();
982 ~CAsioUDPSocketImpl()
984 AddDebugLogLineF(logAsio, wxT("UDP ~CAsioUDPSocketImpl"));
985 delete m_socket;
986 delete[] m_readBuffer;
987 DeleteContents(m_receiveBuffers);
990 void SetClientData(CMuleUDPSocket * muleSocket)
992 AddDebugLogLineF(logAsio, wxT("UDP SetClientData"));
993 m_muleSocket = muleSocket;
996 uint32 RecvFrom(amuleIPV4Address& addr, void* buf, uint32 nBytes)
998 CUDPData * recdata;
1000 wxMutexLocker lock(m_receiveBuffersLock);
1001 if (m_receiveBuffers.empty()) {
1002 AddDebugLogLineN(logAsio, wxT("UDP RecvFromError no data"));
1003 return 0;
1005 recdata = * m_receiveBuffers.begin();
1006 m_receiveBuffers.pop_front();
1008 uint32 read = recdata->size;
1009 if (read > nBytes) {
1010 // should not happen
1011 AddDebugLogLineN(logAsio, CFormat(wxT("UDP RecvFromError too much data %d")) % read);
1012 read = nBytes;
1014 memcpy(buf, recdata->buffer, read);
1015 addr = recdata->ipadr;
1016 delete recdata;
1017 return read;
1020 uint32 SendTo(const amuleIPV4Address& addr, const void* buf, uint32 nBytes)
1022 // Collect data, make a copy of the buffer's content
1023 CUDPData * recdata = new CUDPData(buf, nBytes, addr);
1024 AddDebugLogLineF(logAsio, CFormat(wxT("UDP SendTo %d to %s")) % nBytes % addr.IPAddress());
1025 m_strand.dispatch(boost::bind(& CAsioUDPSocketImpl::DispatchSendTo, this, recdata));
1026 return nBytes;
1029 bool IsOk() const
1031 return m_OK;
1034 void Close()
1036 if (s_io_service.stopped()) {
1037 DispatchClose();
1038 } else {
1039 m_strand.dispatch(boost::bind(& CAsioUDPSocketImpl::DispatchClose, this));
1043 void Destroy()
1045 AddDebugLogLineF(logAsio, CFormat(wxT("Destroy() %p %p")) % m_libSocket % this);
1046 Close();
1047 if (s_io_service.stopped()) {
1048 HandleDestroy();
1049 } else {
1050 // Close prevents creation of any more callbacks, but does not clear any callbacks already
1051 // sitting in Asio's event queue (I have seen such a crash).
1052 // So create a delay timer so they can be called until core is notified.
1053 m_timer.expires_from_now(boost::posix_time::seconds(1));
1054 m_timer.async_wait(m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleDestroy, this)));
1059 private:
1061 // Dispatch handlers
1062 // Access to m_socket is all bundled in the thread running s_io_service to avoid
1063 // concurrent access to the socket from several threads.
1064 // So once things are running (after connect), all access goes through one of these handlers.
1066 void DispatchClose()
1068 error_code ec;
1069 m_socket->close(ec);
1070 if (ec) {
1071 AddDebugLogLineC(logAsio, CFormat(wxT("UDP Close error %s")) % ec.message());
1072 } else {
1073 AddDebugLogLineF(logAsio, wxT("UDP Closed"));
1077 void DispatchSendTo(CUDPData * recdata)
1079 ip::udp::endpoint endpoint(recdata->ipadr.GetEndpoint().address(), recdata->ipadr.Service());
1081 AddDebugLogLineF(logAsio, CFormat(wxT("UDP DispatchSendTo %d to %s:%d")) % recdata->size
1082 % endpoint.address().to_string() % endpoint.port());
1083 m_socket->async_send_to(buffer(recdata->buffer, recdata->size), endpoint,
1084 m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleSendTo, this, placeholders::error, placeholders::bytes_transferred, recdata)));
1088 // Completion handlers for async requests
1091 void HandleRead(const error_code & ec, size_t received)
1093 if (ec) {
1094 AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleReadError %s")) % ec.message());
1095 } else if (received == 0) {
1096 AddDebugLogLineF(logAsio, wxT("UDP HandleReadError nothing available"));
1097 } else if (m_muleSocket == NULL) {
1098 AddDebugLogLineN(logAsio, wxT("UDP HandleReadError no handler"));
1099 } else {
1101 amuleIPV4Address ipadr = amuleIPV4Address(CamuleIPV4Endpoint(m_receiveEndpoint));
1102 AddDebugLogLineF(logAsio, CFormat(wxT("UDP HandleRead %d %s:%d")) % received % ipadr.IPAddress() % ipadr.Service());
1104 // create our read buffer
1105 CUDPData * recdata = new CUDPData(m_readBuffer, received, ipadr);
1107 wxMutexLocker lock(m_receiveBuffersLock);
1108 m_receiveBuffers.push_back(recdata);
1110 CoreNotify_UDPSocketReceive(m_muleSocket);
1112 StartBackgroundRead();
1115 void HandleSendTo(const error_code & ec, size_t sent, CUDPData * recdata)
1117 if (ec) {
1118 AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleSendToError %s")) % ec.message());
1119 } else if (sent != recdata->size) {
1120 AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleSendToError tosend: %d sent %d")) % recdata->size % sent);
1122 if (m_muleSocket == NULL) {
1123 AddDebugLogLineN(logAsio, wxT("UDP HandleSendToError no handler"));
1124 } else {
1125 AddDebugLogLineF(logAsio, CFormat(wxT("UDP HandleSendTo %d to %s")) % sent % recdata->ipadr.IPAddress());
1126 CoreNotify_UDPSocketSend(m_muleSocket);
1128 delete recdata;
1131 void HandleDestroy()
1133 AddDebugLogLineF(logAsio, CFormat(wxT("HandleDestroy() %p %p")) % m_libSocket % this);
1134 delete m_libSocket;
1138 // Other functions
1141 void CreateSocket()
1143 try {
1144 delete m_socket;
1145 ip::udp::endpoint endpoint(m_address.GetEndpoint().address(), m_address.Service());
1146 m_socket = new ip::udp::socket(s_io_service, endpoint);
1147 AddDebugLogLineN(logAsio, CFormat(wxT("Created UDP socket %s %d")) % m_address.IPAddress() % m_address.Service());
1148 StartBackgroundRead();
1149 } catch (const system_error& err) {
1150 AddLogLineC(CFormat(wxT("Error creating UDP socket %s %d : %s")) % m_address.IPAddress() % m_address.Service() % err.code().message());
1151 m_socket = NULL;
1152 m_OK = false;
1156 void StartBackgroundRead()
1158 m_socket->async_receive_from(buffer(m_readBuffer, CMuleUDPSocket::UDP_BUFFER_SIZE), m_receiveEndpoint,
1159 m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleRead, this, placeholders::error, placeholders::bytes_transferred)));
1162 CLibUDPSocket * m_libSocket;
1163 ip::udp::socket * m_socket;
1164 CMuleUDPSocket * m_muleSocket;
1165 bool m_OK;
1166 io_service::strand m_strand; // handle synchronisation in io_service thread pool
1167 deadline_timer m_timer;
1168 amuleIPV4Address m_address;
1170 // One fix receive buffer
1171 char * m_readBuffer;
1172 // and a list of dynamic buffers. UDP data may be coming in faster
1173 // than the main loop can handle it.
1174 std::list<CUDPData *> m_receiveBuffers;
1175 wxMutex m_receiveBuffersLock;
1177 // Address of last reception
1178 ip::udp::endpoint m_receiveEndpoint;
1183 * Library UDP socket wrapper
1186 CLibUDPSocket::CLibUDPSocket(amuleIPV4Address &address, int flags)
1188 m_aSocket = new CAsioUDPSocketImpl(address, flags, this);
1192 CLibUDPSocket::~CLibUDPSocket()
1194 AddDebugLogLineF(logAsio, CFormat(wxT("~CLibUDPSocket() %p %p")) % this % m_aSocket);
1195 delete m_aSocket;
1199 bool CLibUDPSocket::IsOk() const
1201 return m_aSocket->IsOk();
1205 uint32 CLibUDPSocket::RecvFrom(amuleIPV4Address& addr, void* buf, uint32 nBytes)
1207 return m_aSocket->RecvFrom(addr, buf, nBytes);
1211 uint32 CLibUDPSocket::SendTo(const amuleIPV4Address& addr, const void* buf, uint32 nBytes)
1213 return m_aSocket->SendTo(addr, buf, nBytes);
1217 void CLibUDPSocket::SetClientData(CMuleUDPSocket * muleSocket)
1219 m_aSocket->SetClientData(muleSocket);
1223 int CLibUDPSocket::LastError() const
1225 return !IsOk();
1229 void CLibUDPSocket::Close()
1231 m_aSocket->Close();
1235 void CLibUDPSocket::Destroy()
1237 m_aSocket->Destroy();
1242 * CAsioService - ASIO event loop thread
1245 class CAsioServiceThread : public wxThread {
1246 public:
1247 CAsioServiceThread() : wxThread(wxTHREAD_JOINABLE)
1249 static int count = 0;
1250 m_threadNumber = ++count;
1251 Create();
1252 Run();
1255 void * Entry()
1257 AddLogLineNS(CFormat(_("Asio thread %d started")) % m_threadNumber);
1258 io_service::work worker(s_io_service); // keep io_service running
1259 s_io_service.run();
1260 AddDebugLogLineN(logAsio, CFormat(wxT("Asio thread %d stopped")) % m_threadNumber);
1262 return NULL;
1265 private:
1266 int m_threadNumber;
1270 * The constructor starts the thread.
1272 CAsioService::CAsioService()
1274 m_threads = new CAsioServiceThread[m_numberOfThreads];
1278 CAsioService::~CAsioService()
1283 void CAsioService::Stop()
1285 if (!m_threads) {
1286 return;
1288 s_io_service.stop();
1289 // Wait for threads to exit
1290 for (int i = 0; i < m_numberOfThreads; i++) {
1291 CAsioServiceThread * t = m_threads + i;
1292 t->Wait();
1294 delete[] m_threads;
1295 m_threads = 0;
1303 * amuleIPV4Address
1306 amuleIPV4Address::amuleIPV4Address()
1308 m_endpoint = new CamuleIPV4Endpoint();
1311 amuleIPV4Address::amuleIPV4Address(const amuleIPV4Address &a)
1313 *this = a;
1316 amuleIPV4Address::amuleIPV4Address(const CamuleIPV4Endpoint &ep)
1318 *this = ep;
1321 amuleIPV4Address::~amuleIPV4Address()
1323 delete m_endpoint;
1326 amuleIPV4Address& amuleIPV4Address::operator=(const amuleIPV4Address &a)
1328 m_endpoint = new CamuleIPV4Endpoint(* a.m_endpoint);
1329 return *this;
1332 amuleIPV4Address& amuleIPV4Address::operator=(const CamuleIPV4Endpoint &ep)
1334 m_endpoint = new CamuleIPV4Endpoint(ep);
1335 return *this;
1338 bool amuleIPV4Address::Hostname(const wxString& name)
1340 if (name.IsEmpty()) {
1341 return false;
1343 // This is usually just an IP.
1344 std::string sname(unicode2char(name));
1345 error_code ec;
1346 ip::address_v4 adr = ip::address_v4::from_string(sname, ec);
1347 if (!ec) {
1348 m_endpoint->address(adr);
1349 return true;
1351 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") failed, not an IP address %s")) % name % ec.message());
1353 // Try to resolve (sync). Normally not required. Unless you type in your hostname as "local IP address" or something.
1354 error_code ec2;
1355 ip::tcp::resolver res(s_io_service);
1356 // We only want to get IPV4 addresses.
1357 ip::tcp::resolver::query query(ip::tcp::v4(), sname, "");
1358 ip::tcp::resolver::iterator endpoint_iterator = res.resolve(query, ec2);
1359 if (ec2) {
1360 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolve failed: %s")) % name % ec2.message());
1361 return false;
1363 if (endpoint_iterator == ip::tcp::resolver::iterator()) {
1364 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolve failed: no address found")) % name);
1365 return false;
1367 m_endpoint->address(endpoint_iterator->endpoint().address());
1368 AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolved to %s")) % name % IPAddress());
1369 return true;
1372 bool amuleIPV4Address::Service(uint16 service)
1374 if (service == 0) {
1375 return false;
1377 m_endpoint->port(service);
1378 return true;
1381 uint16 amuleIPV4Address::Service() const
1383 return m_endpoint->port();
1386 bool amuleIPV4Address::IsLocalHost() const
1388 return m_endpoint->address().is_loopback();
1391 wxString amuleIPV4Address::IPAddress() const
1393 return CFormat(wxT("%s")) % m_endpoint->address().to_string();
1396 // "Set address to any of the addresses of the current machine."
1397 // This just sets the address to 0.0.0.0 .
1398 // wx does the same.
1399 bool amuleIPV4Address::AnyAddress()
1401 m_endpoint->address(ip::address_v4::any());
1402 AddDebugLogLineN(logAsio, CFormat(wxT("AnyAddress: set to %s")) % IPAddress());
1403 return true;
1406 const CamuleIPV4Endpoint & amuleIPV4Address::GetEndpoint() const
1408 return * m_endpoint;
1411 CamuleIPV4Endpoint & amuleIPV4Address::GetEndpoint()
1413 return * m_endpoint;
1418 // Notification stuff
1420 namespace MuleNotify
1423 void LibSocketConnect(CLibSocket * socket, int error)
1425 if (socket->IsDestroying()) {
1426 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect Destroying %s %d")) % socket->GetIP() % error);
1427 } else if (socket->GetProxyState()) {
1428 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect Proxy %s %d")) % socket->GetIP() % error);
1429 socket->OnProxyEvent(MULE_SOCKET_CONNECTION);
1430 } else {
1431 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect %s %d")) %socket->GetIP() % error);
1432 socket->OnConnect(error);
1436 void LibSocketSend(CLibSocket * socket, int error)
1438 if (socket->IsDestroying()) {
1439 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend Destroying %s %d")) % socket->GetIP() % error);
1440 } else if (socket->GetProxyState()) {
1441 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend Proxy %s %d")) % socket->GetIP() % error);
1442 socket->OnProxyEvent(MULE_SOCKET_OUTPUT);
1443 } else {
1444 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend %s %d")) % socket->GetIP() % error);
1445 socket->OnSend(error);
1449 void LibSocketReceive(CLibSocket * socket, int error)
1451 socket->EventProcessed();
1452 if (socket->IsDestroying()) {
1453 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive Destroying %s %d")) % socket->GetIP() % error);
1454 } else if (socket->GetProxyState()) {
1455 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive Proxy %s %d")) % socket->GetIP() % error);
1456 socket->OnProxyEvent(MULE_SOCKET_INPUT);
1457 } else {
1458 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive %s %d")) % socket->GetIP() % error);
1459 socket->OnReceive(error);
1463 void LibSocketLost(CLibSocket * socket)
1465 if (socket->IsDestroying()) {
1466 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost Destroying %s")) % socket->GetIP());
1467 } else if (socket->GetProxyState()) {
1468 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost Proxy %s")) % socket->GetIP());
1469 socket->OnProxyEvent(MULE_SOCKET_LOST);
1470 } else {
1471 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost %s")) % socket->GetIP());
1472 socket->OnLost();
1476 void LibSocketDestroy(CLibSocket * socket)
1478 AddDebugLogLineF(logAsio, CFormat(wxT("LibSocket_Destroy %s")) % socket->GetIP());
1479 delete socket;
1482 void ProxySocketEvent(CLibSocket * socket, int evt)
1484 AddDebugLogLineF(logAsio, CFormat(wxT("ProxySocketEvent %s %d")) % socket->GetIP() % evt);
1485 socket->OnProxyEvent(evt);
1488 void ServerTCPAccept(CLibSocketServer * socketServer)
1490 AddDebugLogLineF(logAsio, wxT("ServerTCP_Accept"));
1491 socketServer->OnAccept();
1494 void UDPSocketSend(CMuleUDPSocket * socket)
1496 AddDebugLogLineF(logAsio, wxT("UDPSocketSend"));
1497 socket->OnSend(0);
1500 void UDPSocketReceive(CMuleUDPSocket * socket)
1502 AddDebugLogLineF(logAsio, wxT("UDPSocketReceive"));
1503 socket->OnReceive(0);
1507 } // namespace MuleNotify
1510 // Initialize MuleBoostVersion
1512 wxString MuleBoostVersion = CFormat(wxT("%d.%d")) % (BOOST_VERSION / 100000) % (BOOST_VERSION / 100 % 1000);