2 // This file is part of the aMule Project.
4 // Copyright (c) 2011-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2011-2011 Stu Redman ( admin@amule.org / http://www.amule.org )
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
27 # include "config.h" // Needed for HAVE_BOOST_SOURCES
31 #define _WIN32_WINNT 0x0501 // Boost complains otherwise
34 // Windows requires that Boost headers are included before wx headers.
35 // This works if precompiled headers are disabled for this file.
37 #define BOOST_ALL_NO_LIB
39 // Suppress warning caused by faulty boost/preprocessor/config/config.hpp in Boost 1.49
40 #if defined __GNUC__ && ! defined __GXX_EXPERIMENTAL_CXX0X__ && __cplusplus < 201103L
41 #define BOOST_PP_VARIADICS 0
44 #include <algorithm> // Needed for std::min - Boost up to 1.54 fails to compile with MSVC 2013 otherwise
46 #include <boost/asio.hpp>
47 #include <boost/bind.hpp>
48 #include <boost/version.hpp>
51 // Do away with building Boost.System, adding lib paths...
52 // Just include the single file and be done.
54 #ifdef HAVE_BOOST_SOURCES
55 # include <boost/../libs/system/src/error_code.cpp>
57 # include <boost/system/error_code.hpp>
60 #include "LibSocket.h"
61 #include <wx/thread.h> // wxMutex
62 #include <wx/intl.h> // _()
63 #include <common/Format.h> // Needed for CFormat
65 #include "GuiEvents.h"
66 #include "amuleIPV4Address.h"
67 #include "MuleUDPSocket.h"
68 #include "OtherFunctions.h" // DeleteContents, MuleBoostVersion
69 #include "ScopedPtr.h"
70 #include <common/Macros.h>
72 using namespace boost::asio
;
73 using namespace boost::system
; // for error_code
74 static io_service s_io_service
;
76 // Number of threads in the Asio thread pool
77 const int CAsioService::m_numberOfThreads
= 4;
80 * ASIO Client TCP socket implementation
83 class CamuleIPV4Endpoint
: public ip::tcp::endpoint
{
85 CamuleIPV4Endpoint() {}
87 CamuleIPV4Endpoint(const CamuleIPV4Endpoint
& impl
) : ip::tcp::endpoint(impl
) {}
88 CamuleIPV4Endpoint(const ip::tcp::endpoint
& ep
) { * this = ep
; }
89 CamuleIPV4Endpoint(const ip::udp::endpoint
& ep
) { address(ep
.address()); port(ep
.port()); }
91 const CamuleIPV4Endpoint
& operator = (const ip::tcp::endpoint
& ep
)
93 * (ip::tcp::endpoint
*) this = ep
;
101 // cppcheck-suppress uninitMemberVar m_readBufferPtr
102 CAsioSocketImpl(CLibSocket
* libSocket
) :
103 m_libSocket(libSocket
),
104 m_strand(s_io_service
),
105 m_timer(s_io_service
)
108 m_blocksRead
= false;
109 m_blocksWrite
= false;
112 m_readBufferSize
= 0;
113 m_readPending
= false;
114 m_readBufferContent
= 0;
115 m_eventPending
= false;
120 m_isDestroying
= false;
121 m_proxyState
= false;
126 m_socket
= new ip::tcp::socket(s_io_service
);
128 // Set socket to non blocking
129 m_socket
->non_blocking();
134 delete[] m_readBuffer
;
135 delete[] m_sendBuffer
;
138 void Notify(bool notify
)
143 bool Connect(const amuleIPV4Address
& adr
, bool wait
)
148 m_port
= adr
.Service();
151 m_sync
= !m_notify
; // set this once for the whole lifetime of the socket
152 AddDebugLogLineF(logAsio
, CFormat(wxT("Connect %s %p")) % m_IP
% this);
154 if (wait
|| m_sync
) {
156 m_socket
->connect(adr
.GetEndpoint(), ec
);
161 m_socket
->async_connect(adr
.GetEndpoint(),
162 m_strand
.wrap(boost::bind(& CAsioSocketImpl::HandleConnect
, this, placeholders::error
)));
163 // m_OK and return are false because we are not connected yet
168 bool IsConnected() const
173 // For wxSocketClient, Ok won't return true unless the client is connected to a server.
179 bool IsDestroying() const
181 return m_isDestroying
;
184 // Returns the actual error code
185 int LastError() const
190 // Is reading blocked?
191 bool BlocksRead() const
196 // Is writing blocked?
197 bool BlocksWrite() const
199 return m_blocksWrite
;
202 // Problem: wx sends an event when data gets available, so first there is an event, then Read() is called
203 // Asio can read async with callback, so you first read, then you get an event.
205 // - Read some data in background into a buffer
206 // - Callback posts event when something is there
207 // - Read data from buffer
208 // - If data is exhausted, start reading more in background
209 // - If not, post another event (making sure events don't pile up though)
210 uint32
Read(char * buf
, uint32 bytesToRead
)
212 if (bytesToRead
== 0) { // huh?
217 return ReadSync(buf
, bytesToRead
);
221 AddDebugLogLineF(logAsio
, CFormat(wxT("Read1 %s %d - Error")) % m_IP
% bytesToRead
);
225 if (m_readPending
// Background read hasn't completed.
226 || m_readBufferContent
== 0) { // shouldn't be if it's not pending
229 AddDebugLogLineF(logAsio
, CFormat(wxT("Read1 %s %d - Block")) % m_IP
% bytesToRead
);
233 m_blocksRead
= false; // shouldn't be needed
235 // Read from our buffer
236 uint32 readCache
= std::min(m_readBufferContent
, bytesToRead
);
237 memcpy(buf
, m_readBufferPtr
, readCache
);
238 m_readBufferContent
-= readCache
;
239 m_readBufferPtr
+= readCache
;
241 AddDebugLogLineF(logAsio
, CFormat(wxT("Read2 %s %d - %d")) % m_IP
% bytesToRead
% readCache
);
242 if (m_readBufferContent
) {
243 // Data left, post another event
246 // Nothing left, read more
247 StartBackgroundRead();
253 // Make a copy of the data and send it in background
254 // - unless a background send is already going on
255 uint32
Write(const void * buf
, uint32 nbytes
)
258 return WriteSync(buf
, nbytes
);
262 m_blocksWrite
= true;
263 AddDebugLogLineF(logAsio
, CFormat(wxT("Write blocks %d %p %s")) % nbytes
% m_sendBuffer
% m_IP
);
266 AddDebugLogLineF(logAsio
, CFormat(wxT("Write %d %s")) % nbytes
% m_IP
);
267 m_sendBuffer
= new char[nbytes
];
268 memcpy(m_sendBuffer
, buf
, nbytes
);
269 m_strand
.dispatch(boost::bind(& CAsioSocketImpl::DispatchWrite
, this, nbytes
));
280 if (m_sync
|| s_io_service
.stopped()) {
283 m_strand
.dispatch(boost::bind(& CAsioSocketImpl::DispatchClose
, this));
291 if (m_isDestroying
) {
292 AddDebugLogLineC(logAsio
, CFormat(wxT("Destroy() already dying socket %p %p %s")) % m_libSocket
% this % m_IP
);
295 m_isDestroying
= true;
296 AddDebugLogLineF(logAsio
, CFormat(wxT("Destroy() %p %p %s")) % m_libSocket
% this % m_IP
);
298 if (m_sync
|| s_io_service
.stopped()) {
301 // Close prevents creation of any more callbacks, but does not clear any callbacks already
302 // sitting in Asio's event queue (I have seen such a crash).
303 // So create a delay timer so they can be called until core is notified.
304 m_timer
.expires_from_now(boost::posix_time::seconds(1));
305 m_timer
.async_wait(m_strand
.wrap(boost::bind(& CAsioSocketImpl::HandleDestroy
, this)));
321 // Bind socket to local endpoint if user wants to choose the local address
323 void SetLocal(const amuleIPV4Address
& local
)
326 if (!m_socket
->is_open()) {
327 // Socket is usually still closed when this is called
328 m_socket
->open(boost::asio::ip::tcp::v4(), ec
);
330 AddDebugLogLineC(logAsio
, CFormat(wxT("Can't open socket : %s")) % ec
.message());
334 // We are using random (OS-defined) local ports.
335 // To set a constant output port, first call
336 // m_socket->set_option(socket_base::reuse_address(true));
337 // and then set the endpoint's port to it.
339 CamuleIPV4Endpoint
endpoint(local
.GetEndpoint());
341 m_socket
->bind(endpoint
, ec
);
343 AddDebugLogLineC(logAsio
, CFormat(wxT("Can't bind socket to local endpoint %s : %s"))
344 % local
.IPAddress() % ec
.message());
346 AddDebugLogLineF(logAsio
, CFormat(wxT("Bound socket to local endpoint %s")) % local
.IPAddress());
351 void EventProcessed()
353 m_eventPending
= false;
356 void SetWrapSocket(CLibSocket
* socket
)
358 m_libSocket
= socket
;
359 // Also do some setting up
363 StartBackgroundRead();
369 amuleIPV4Address addr
= CamuleIPV4Endpoint(m_socket
->remote_endpoint(ec
));
371 AddDebugLogLineN(logAsio
, CFormat(wxT("UpdateIP failed %p %s")) % this % ec
.message());
375 m_port
= addr
.Service();
376 AddDebugLogLineF(logAsio
, CFormat(wxT("UpdateIP %s %d %p")) % m_IP
% m_port
% this);
380 const wxChar
* GetIP() const { return m_IP
; }
381 uint16
GetPort() const { return m_port
; }
383 ip::tcp::socket
& GetAsioSocket()
388 bool GetProxyState() const { return m_proxyState
; }
390 void SetProxyState(bool state
, const amuleIPV4Address
* adr
)
392 m_proxyState
= state
;
394 // Start. Get the true IP for logging.
397 AddDebugLogLineF(logAsio
, CFormat(wxT("SetProxyState to proxy %s")) % m_IP
);
399 // Transition from proxy to normal mode
400 AddDebugLogLineF(logAsio
, CFormat(wxT("SetProxyState to normal %s")) % m_IP
);
408 // Access to m_socket is all bundled in the thread running s_io_service to avoid
409 // concurrent access to the socket from several threads.
410 // So once things are running (after connect), all access goes through one of these handlers.
417 AddDebugLogLineC(logAsio
, CFormat(wxT("Close error %s %s")) % m_IP
% ec
.message());
419 AddDebugLogLineF(logAsio
, CFormat(wxT("Closed %s")) % m_IP
);
423 void DispatchBackgroundRead()
425 AddDebugLogLineF(logAsio
, CFormat(wxT("DispatchBackgroundRead %s")) % m_IP
);
426 m_socket
->async_read_some(null_buffers(),
427 m_strand
.wrap(boost::bind(& CAsioSocketImpl::HandleRead
, this, placeholders::error
)));
430 void DispatchWrite(uint32 nbytes
)
432 async_write(*m_socket
, buffer(m_sendBuffer
, nbytes
),
433 m_strand
.wrap(boost::bind(& CAsioSocketImpl::HandleSend
, this, placeholders::error
, placeholders::bytes_transferred
)));
437 // Completion handlers for async requests
440 void HandleConnect(const error_code
& err
)
443 AddDebugLogLineF(logAsio
, CFormat(wxT("HandleConnect %d %s")) % m_OK
% m_IP
);
444 if (m_isDestroying
) {
445 AddDebugLogLineF(logAsio
, CFormat(wxT("HandleConnect: socket pending for deletion %s")) % m_IP
);
447 CoreNotify_LibSocketConnect(m_libSocket
, err
.value());
449 // After connect also send a OUTPUT event to show data is available
450 CoreNotify_LibSocketSend(m_libSocket
, 0);
452 StartBackgroundRead();
458 void HandleSend(const error_code
& err
, size_t DEBUG_ONLY(bytes_transferred
) )
460 delete[] m_sendBuffer
;
463 if (m_isDestroying
) {
464 AddDebugLogLineF(logAsio
, CFormat(wxT("HandleSend: socket pending for deletion %s")) % m_IP
);
467 AddDebugLogLineN(logAsio
, CFormat(wxT("HandleSend Error %d %s")) % bytes_transferred
% m_IP
);
470 AddDebugLogLineF(logAsio
, CFormat(wxT("HandleSend %d %s")) % bytes_transferred
% m_IP
);
471 m_blocksWrite
= false;
472 CoreNotify_LibSocketSend(m_libSocket
, m_ErrorCode
);
477 void HandleRead(const error_code
& ec
)
479 if (m_isDestroying
) {
480 AddDebugLogLineF(logAsio
, CFormat(wxT("HandleRead: socket pending for deletion %s")) % m_IP
);
484 // This is what we get in Windows when a connection gets closed from remote.
485 AddDebugLogLineN(logAsio
, CFormat(wxT("HandleReadError %s %s")) % m_IP
% ec
.message());
491 uint32 avail
= m_socket
->available(ec2
);
493 AddDebugLogLineN(logAsio
, CFormat(wxT("HandleReadError available %d %s %s")) % avail
% m_IP
% ec2
.message());
498 // This is what we get in Linux when a connection gets closed from remote.
499 AddDebugLogLineF(logAsio
, CFormat(wxT("HandleReadError nothing available %s")) % m_IP
);
504 AddDebugLogLineF(logAsio
, CFormat(wxT("HandleRead %d %s")) % avail
% m_IP
);
506 // adjust (or create) our read buffer
507 if (m_readBufferSize
< avail
) {
508 delete[] m_readBuffer
;
509 m_readBuffer
= new char[avail
];
510 m_readBufferSize
= avail
;
512 m_readBufferPtr
= m_readBuffer
;
514 // read available data
515 m_readBufferContent
= m_socket
->read_some(buffer(m_readBuffer
, avail
), ec2
);
516 if (SetError(ec2
) || m_readBufferContent
== 0) {
517 AddDebugLogLineN(logAsio
, CFormat(wxT("HandleReadError read %d %s %s")) % m_readBufferContent
% m_IP
% ec2
.message());
522 m_readPending
= false;
523 m_blocksRead
= false;
529 AddDebugLogLineF(logAsio
, CFormat(wxT("HandleDestroy() %p %p %s")) % m_libSocket
% this % m_IP
);
530 CoreNotify_LibSocketDestroy(m_libSocket
);
538 void StartBackgroundRead()
540 m_readPending
= true;
541 m_readBufferContent
= 0;
542 m_strand
.dispatch(boost::bind(& CAsioSocketImpl::DispatchBackgroundRead
, this));
545 void PostReadEvent(int DEBUG_ONLY(from
) )
547 if (!m_eventPending
) {
548 AddDebugLogLineF(logAsio
, CFormat(wxT("Post read event %d %s")) % from
% m_IP
);
549 m_eventPending
= true;
550 CoreNotify_LibSocketReceive(m_libSocket
, m_ErrorCode
);
556 if (!m_isDestroying
&& !m_closed
) {
557 CoreNotify_LibSocketLost(m_libSocket
);
566 bool SetError(const error_code
& err
)
568 m_ErrorCode
= err
.value();
569 return m_ErrorCode
!= errc::success
;
573 // Synchronous sockets (amulecmd)
575 uint32
ReadSync(char * buf
, uint32 bytesToRead
)
578 uint32 received
= read(*m_socket
, buffer(buf
, bytesToRead
), ec
);
583 uint32
WriteSync(const void * buf
, uint32 nbytes
)
586 uint32 sent
= write(*m_socket
, buffer(buf
, nbytes
), ec
);
592 // Access to even const & wxString is apparently not thread-safe.
593 // Locks are set/removed in wx and reference counts can go astray.
594 // So store our IP string in a wxString which is used nowhere.
595 // Store a pointer to its string buffer as well and use THAT everywhere.
597 void SetIp(const amuleIPV4Address
& adr
)
599 m_IPstring
= adr
.IPAddress();
600 m_IP
= m_IPstring
.c_str();
601 m_IPint
= StringIPtoUint32(m_IPstring
);
604 CLibSocket
* m_libSocket
;
605 ip::tcp::socket
* m_socket
;
607 wxString m_IPstring
; // as String (use nowhere because of threading!)
608 const wxChar
* m_IP
; // as char* (use in debug logs)
609 uint32 m_IPint
; // as int
610 uint16 m_port
; // remote port
616 uint32 m_readBufferSize
;
617 char * m_readBufferPtr
;
619 uint32 m_readBufferContent
;
622 io_service::strand m_strand
; // handle synchronisation in io_service thread pool
623 deadline_timer m_timer
;
626 bool m_isDestroying
; // true if Destroy() was called
628 bool m_notify
; // set by Notify()
629 bool m_sync
; // copied from !m_notify on Connect()
634 * Library socket wrapper
637 CLibSocket::CLibSocket(int /* flags */)
639 m_aSocket
= new CAsioSocketImpl(this);
643 CLibSocket::~CLibSocket()
645 AddDebugLogLineF(logAsio
, CFormat(wxT("~CLibSocket() %p %p %s")) % this % m_aSocket
% m_aSocket
->GetIP());
650 bool CLibSocket::Connect(const amuleIPV4Address
& adr
, bool wait
)
652 return m_aSocket
->Connect(adr
, wait
);
656 bool CLibSocket::IsConnected() const
658 return m_aSocket
->IsConnected();
662 bool CLibSocket::IsOk() const
664 return m_aSocket
->IsOk();
668 wxString
CLibSocket::GetPeer()
670 return m_aSocket
->GetPeer();
674 uint32
CLibSocket::GetPeerInt()
676 return m_aSocket
->GetPeerInt();
680 void CLibSocket::Destroy()
682 m_aSocket
->Destroy();
686 bool CLibSocket::IsDestroying() const
688 return m_aSocket
->IsDestroying();
692 void CLibSocket::Notify(bool notify
)
694 m_aSocket
->Notify(notify
);
698 uint32
CLibSocket::Read(void * buffer
, uint32 nbytes
)
700 return m_aSocket
->Read((char *) buffer
, nbytes
);
704 uint32
CLibSocket::Write(const void * buffer
, uint32 nbytes
)
706 return m_aSocket
->Write(buffer
, nbytes
);
710 void CLibSocket::Close()
716 int CLibSocket::LastError() const
718 return m_aSocket
->LastError();
722 void CLibSocket::SetLocal(const amuleIPV4Address
& local
)
724 m_aSocket
->SetLocal(local
);
731 bool CLibSocket::BlocksRead() const
733 return m_aSocket
->BlocksRead();
737 bool CLibSocket::BlocksWrite() const
739 return m_aSocket
->BlocksWrite();
743 void CLibSocket::EventProcessed()
745 m_aSocket
->EventProcessed();
749 void CLibSocket::LinkSocketImpl(class CAsioSocketImpl
* socket
)
753 m_aSocket
->SetWrapSocket(this);
757 const wxChar
* CLibSocket::GetIP() const
759 return m_aSocket
->GetIP();
763 bool CLibSocket::GetProxyState() const
765 return m_aSocket
->GetProxyState();
769 void CLibSocket::SetProxyState(bool state
, const amuleIPV4Address
* adr
)
771 m_aSocket
->SetProxyState(state
, adr
);
776 * ASIO TCP socket server
779 class CAsioSocketServerImpl
: public ip::tcp::acceptor
782 CAsioSocketServerImpl(const amuleIPV4Address
& adr
, CLibSocketServer
* libSocketServer
)
783 : ip::tcp::acceptor(s_io_service
),
784 m_libSocketServer(libSocketServer
),
785 m_currentSocket(NULL
),
786 m_strand(s_io_service
)
789 m_socketAvailable
= false;
792 open(adr
.GetEndpoint().protocol());
793 set_option(ip::tcp::acceptor::reuse_address(true));
794 bind(adr
.GetEndpoint());
798 AddDebugLogLineN(logAsio
, CFormat(wxT("CAsioSocketServerImpl bind to %s %d")) % adr
.IPAddress() % adr
.Service());
799 } catch (const system_error
& err
) {
800 AddDebugLogLineC(logAsio
, CFormat(wxT("CAsioSocketServerImpl bind to %s %d failed - %s")) % adr
.IPAddress() % adr
.Service() % err
.code().message());
804 ~CAsioSocketServerImpl()
808 // For wxSocketServer, Ok will return true if the server could bind to the specified address and is already listening for new connections.
809 bool IsOk() const { return m_ok
; }
811 void Close() { close(); }
813 bool AcceptWith(CLibSocket
& socket
)
815 if (!m_socketAvailable
) {
816 AddDebugLogLineF(logAsio
, wxT("AcceptWith: nothing there"));
820 // return the socket we received
821 socket
.LinkSocketImpl(m_currentSocket
.release());
823 // check if we have another socket ready for reception
824 m_currentSocket
.reset(new CAsioSocketImpl(NULL
));
826 // async_accept does not work if server is non-blocking
827 // temporarily switch it to non-blocking
829 // we are set to non-blocking, so this returns right away
830 accept(m_currentSocket
->GetAsioSocket(), ec
);
833 if (ec
|| !m_currentSocket
->UpdateIP()) {
835 m_socketAvailable
= false;
836 // start getting another one
838 AddDebugLogLineF(logAsio
, wxT("AcceptWith: ok, getting another socket in background"));
840 // we got another socket right away
841 m_socketAvailable
= true; // it is already true, but this improves readability
842 AddDebugLogLineF(logAsio
, wxT("AcceptWith: ok, another socket is available"));
843 // aMule actually doesn't need a notification as it polls the listen socket.
844 // amuleweb does need it though
845 CoreNotify_ServerTCPAccept(m_libSocketServer
);
851 bool SocketAvailable() const { return m_socketAvailable
; }
857 m_currentSocket
.reset(new CAsioSocketImpl(NULL
));
858 async_accept(m_currentSocket
->GetAsioSocket(),
859 m_strand
.wrap(boost::bind(& CAsioSocketServerImpl::HandleAccept
, this, placeholders::error
)));
862 void HandleAccept(const error_code
& error
)
865 AddDebugLogLineC(logAsio
, CFormat(wxT("Error in HandleAccept: %s")) % error
.message());
867 if (m_currentSocket
->UpdateIP()) {
868 AddDebugLogLineN(logAsio
, CFormat(wxT("HandleAccept received a connection from %s:%d"))
869 % m_currentSocket
->GetIP() % m_currentSocket
->GetPort());
870 m_socketAvailable
= true;
871 CoreNotify_ServerTCPAccept(m_libSocketServer
);
874 AddDebugLogLineN(logAsio
, wxT("Error in HandleAccept: invalid socket"));
877 // We were not successful. Try again.
878 // Post the request to the event queue to make sure it doesn't get called immediately.
879 m_strand
.post(boost::bind(& CAsioSocketServerImpl::StartAccept
, this));
882 // The wrapper object
883 CLibSocketServer
* m_libSocketServer
;
886 // The last socket that connected to us
887 CScopedPtr
<CAsioSocketImpl
> m_currentSocket
;
888 // Is there a socket available?
889 bool m_socketAvailable
;
890 io_service::strand m_strand
; // handle synchronisation in io_service thread pool
894 CLibSocketServer::CLibSocketServer(const amuleIPV4Address
& adr
, int /* flags */)
896 m_aServer
= new CAsioSocketServerImpl(adr
, this);
900 CLibSocketServer::~CLibSocketServer()
906 // Accepts an incoming connection request, and creates a new CLibSocket object which represents the server-side of the connection.
907 // Only used in CamuleApp::ListenSocketHandler() and we don't get there.
908 CLibSocket
* CLibSocketServer::Accept(bool /* wait */)
915 // Accept an incoming connection using the specified socket object.
916 bool CLibSocketServer::AcceptWith(CLibSocket
& socket
, bool WXUNUSED_UNLESS_DEBUG(wait
) )
919 return m_aServer
->AcceptWith(socket
);
923 bool CLibSocketServer::IsOk() const
925 return m_aServer
->IsOk();
929 void CLibSocketServer::Close()
935 bool CLibSocketServer::SocketAvailable()
937 return m_aServer
->SocketAvailable();
942 * ASIO UDP socket implementation
945 class CAsioUDPSocketImpl
953 amuleIPV4Address ipadr
;
955 CUDPData(const void * src
, uint32 _size
, amuleIPV4Address adr
) :
956 size(_size
), ipadr(adr
)
958 buffer
= new char[size
];
959 memcpy(buffer
, src
, size
);
969 CAsioUDPSocketImpl(const amuleIPV4Address
&address
, int /* flags */, CLibUDPSocket
* libSocket
) :
970 m_libSocket(libSocket
),
971 m_strand(s_io_service
),
972 m_timer(s_io_service
),
977 m_readBuffer
= new char[CMuleUDPSocket::UDP_BUFFER_SIZE
];
982 ~CAsioUDPSocketImpl()
984 AddDebugLogLineF(logAsio
, wxT("UDP ~CAsioUDPSocketImpl"));
986 delete[] m_readBuffer
;
987 DeleteContents(m_receiveBuffers
);
990 void SetClientData(CMuleUDPSocket
* muleSocket
)
992 AddDebugLogLineF(logAsio
, wxT("UDP SetClientData"));
993 m_muleSocket
= muleSocket
;
996 uint32
RecvFrom(amuleIPV4Address
& addr
, void* buf
, uint32 nBytes
)
1000 wxMutexLocker
lock(m_receiveBuffersLock
);
1001 if (m_receiveBuffers
.empty()) {
1002 AddDebugLogLineN(logAsio
, wxT("UDP RecvFromError no data"));
1005 recdata
= * m_receiveBuffers
.begin();
1006 m_receiveBuffers
.pop_front();
1008 uint32 read
= recdata
->size
;
1009 if (read
> nBytes
) {
1010 // should not happen
1011 AddDebugLogLineN(logAsio
, CFormat(wxT("UDP RecvFromError too much data %d")) % read
);
1014 memcpy(buf
, recdata
->buffer
, read
);
1015 addr
= recdata
->ipadr
;
1020 uint32
SendTo(const amuleIPV4Address
& addr
, const void* buf
, uint32 nBytes
)
1022 // Collect data, make a copy of the buffer's content
1023 CUDPData
* recdata
= new CUDPData(buf
, nBytes
, addr
);
1024 AddDebugLogLineF(logAsio
, CFormat(wxT("UDP SendTo %d to %s")) % nBytes
% addr
.IPAddress());
1025 m_strand
.dispatch(boost::bind(& CAsioUDPSocketImpl::DispatchSendTo
, this, recdata
));
1036 if (s_io_service
.stopped()) {
1039 m_strand
.dispatch(boost::bind(& CAsioUDPSocketImpl::DispatchClose
, this));
1045 AddDebugLogLineF(logAsio
, CFormat(wxT("Destroy() %p %p")) % m_libSocket
% this);
1047 if (s_io_service
.stopped()) {
1050 // Close prevents creation of any more callbacks, but does not clear any callbacks already
1051 // sitting in Asio's event queue (I have seen such a crash).
1052 // So create a delay timer so they can be called until core is notified.
1053 m_timer
.expires_from_now(boost::posix_time::seconds(1));
1054 m_timer
.async_wait(m_strand
.wrap(boost::bind(& CAsioUDPSocketImpl::HandleDestroy
, this)));
1061 // Dispatch handlers
1062 // Access to m_socket is all bundled in the thread running s_io_service to avoid
1063 // concurrent access to the socket from several threads.
1064 // So once things are running (after connect), all access goes through one of these handlers.
1066 void DispatchClose()
1069 m_socket
->close(ec
);
1071 AddDebugLogLineC(logAsio
, CFormat(wxT("UDP Close error %s")) % ec
.message());
1073 AddDebugLogLineF(logAsio
, wxT("UDP Closed"));
1077 void DispatchSendTo(CUDPData
* recdata
)
1079 ip::udp::endpoint
endpoint(recdata
->ipadr
.GetEndpoint().address(), recdata
->ipadr
.Service());
1081 AddDebugLogLineF(logAsio
, CFormat(wxT("UDP DispatchSendTo %d to %s:%d")) % recdata
->size
1082 % endpoint
.address().to_string() % endpoint
.port());
1083 m_socket
->async_send_to(buffer(recdata
->buffer
, recdata
->size
), endpoint
,
1084 m_strand
.wrap(boost::bind(& CAsioUDPSocketImpl::HandleSendTo
, this, placeholders::error
, placeholders::bytes_transferred
, recdata
)));
1088 // Completion handlers for async requests
1091 void HandleRead(const error_code
& ec
, size_t received
)
1094 AddDebugLogLineN(logAsio
, CFormat(wxT("UDP HandleReadError %s")) % ec
.message());
1095 } else if (received
== 0) {
1096 AddDebugLogLineF(logAsio
, wxT("UDP HandleReadError nothing available"));
1097 } else if (m_muleSocket
== NULL
) {
1098 AddDebugLogLineN(logAsio
, wxT("UDP HandleReadError no handler"));
1101 amuleIPV4Address ipadr
= amuleIPV4Address(CamuleIPV4Endpoint(m_receiveEndpoint
));
1102 AddDebugLogLineF(logAsio
, CFormat(wxT("UDP HandleRead %d %s:%d")) % received
% ipadr
.IPAddress() % ipadr
.Service());
1104 // create our read buffer
1105 CUDPData
* recdata
= new CUDPData(m_readBuffer
, received
, ipadr
);
1107 wxMutexLocker
lock(m_receiveBuffersLock
);
1108 m_receiveBuffers
.push_back(recdata
);
1110 CoreNotify_UDPSocketReceive(m_muleSocket
);
1112 StartBackgroundRead();
1115 void HandleSendTo(const error_code
& ec
, size_t sent
, CUDPData
* recdata
)
1118 AddDebugLogLineN(logAsio
, CFormat(wxT("UDP HandleSendToError %s")) % ec
.message());
1119 } else if (sent
!= recdata
->size
) {
1120 AddDebugLogLineN(logAsio
, CFormat(wxT("UDP HandleSendToError tosend: %d sent %d")) % recdata
->size
% sent
);
1122 if (m_muleSocket
== NULL
) {
1123 AddDebugLogLineN(logAsio
, wxT("UDP HandleSendToError no handler"));
1125 AddDebugLogLineF(logAsio
, CFormat(wxT("UDP HandleSendTo %d to %s")) % sent
% recdata
->ipadr
.IPAddress());
1126 CoreNotify_UDPSocketSend(m_muleSocket
);
1131 void HandleDestroy()
1133 AddDebugLogLineF(logAsio
, CFormat(wxT("HandleDestroy() %p %p")) % m_libSocket
% this);
1145 ip::udp::endpoint
endpoint(m_address
.GetEndpoint().address(), m_address
.Service());
1146 m_socket
= new ip::udp::socket(s_io_service
, endpoint
);
1147 AddDebugLogLineN(logAsio
, CFormat(wxT("Created UDP socket %s %d")) % m_address
.IPAddress() % m_address
.Service());
1148 StartBackgroundRead();
1149 } catch (const system_error
& err
) {
1150 AddLogLineC(CFormat(wxT("Error creating UDP socket %s %d : %s")) % m_address
.IPAddress() % m_address
.Service() % err
.code().message());
1156 void StartBackgroundRead()
1158 m_socket
->async_receive_from(buffer(m_readBuffer
, CMuleUDPSocket::UDP_BUFFER_SIZE
), m_receiveEndpoint
,
1159 m_strand
.wrap(boost::bind(& CAsioUDPSocketImpl::HandleRead
, this, placeholders::error
, placeholders::bytes_transferred
)));
1162 CLibUDPSocket
* m_libSocket
;
1163 ip::udp::socket
* m_socket
;
1164 CMuleUDPSocket
* m_muleSocket
;
1166 io_service::strand m_strand
; // handle synchronisation in io_service thread pool
1167 deadline_timer m_timer
;
1168 amuleIPV4Address m_address
;
1170 // One fix receive buffer
1171 char * m_readBuffer
;
1172 // and a list of dynamic buffers. UDP data may be coming in faster
1173 // than the main loop can handle it.
1174 std::list
<CUDPData
*> m_receiveBuffers
;
1175 wxMutex m_receiveBuffersLock
;
1177 // Address of last reception
1178 ip::udp::endpoint m_receiveEndpoint
;
1183 * Library UDP socket wrapper
1186 CLibUDPSocket::CLibUDPSocket(amuleIPV4Address
&address
, int flags
)
1188 m_aSocket
= new CAsioUDPSocketImpl(address
, flags
, this);
1192 CLibUDPSocket::~CLibUDPSocket()
1194 AddDebugLogLineF(logAsio
, CFormat(wxT("~CLibUDPSocket() %p %p")) % this % m_aSocket
);
1199 bool CLibUDPSocket::IsOk() const
1201 return m_aSocket
->IsOk();
1205 uint32
CLibUDPSocket::RecvFrom(amuleIPV4Address
& addr
, void* buf
, uint32 nBytes
)
1207 return m_aSocket
->RecvFrom(addr
, buf
, nBytes
);
1211 uint32
CLibUDPSocket::SendTo(const amuleIPV4Address
& addr
, const void* buf
, uint32 nBytes
)
1213 return m_aSocket
->SendTo(addr
, buf
, nBytes
);
1217 void CLibUDPSocket::SetClientData(CMuleUDPSocket
* muleSocket
)
1219 m_aSocket
->SetClientData(muleSocket
);
1223 int CLibUDPSocket::LastError() const
1229 void CLibUDPSocket::Close()
1235 void CLibUDPSocket::Destroy()
1237 m_aSocket
->Destroy();
1242 * CAsioService - ASIO event loop thread
1245 class CAsioServiceThread
: public wxThread
{
1247 CAsioServiceThread() : wxThread(wxTHREAD_JOINABLE
)
1249 static int count
= 0;
1250 m_threadNumber
= ++count
;
1257 AddLogLineNS(CFormat(_("Asio thread %d started")) % m_threadNumber
);
1258 io_service::work
worker(s_io_service
); // keep io_service running
1260 AddDebugLogLineN(logAsio
, CFormat(wxT("Asio thread %d stopped")) % m_threadNumber
);
1270 * The constructor starts the thread.
1272 CAsioService::CAsioService()
1274 m_threads
= new CAsioServiceThread
[m_numberOfThreads
];
1278 CAsioService::~CAsioService()
1283 void CAsioService::Stop()
1288 s_io_service
.stop();
1289 // Wait for threads to exit
1290 for (int i
= 0; i
< m_numberOfThreads
; i
++) {
1291 CAsioServiceThread
* t
= m_threads
+ i
;
1306 amuleIPV4Address::amuleIPV4Address()
1308 m_endpoint
= new CamuleIPV4Endpoint();
1311 amuleIPV4Address::amuleIPV4Address(const amuleIPV4Address
&a
)
1316 amuleIPV4Address::amuleIPV4Address(const CamuleIPV4Endpoint
&ep
)
1321 amuleIPV4Address::~amuleIPV4Address()
1326 amuleIPV4Address
& amuleIPV4Address::operator=(const amuleIPV4Address
&a
)
1328 m_endpoint
= new CamuleIPV4Endpoint(* a
.m_endpoint
);
1332 amuleIPV4Address
& amuleIPV4Address::operator=(const CamuleIPV4Endpoint
&ep
)
1334 m_endpoint
= new CamuleIPV4Endpoint(ep
);
1338 bool amuleIPV4Address::Hostname(const wxString
& name
)
1340 if (name
.IsEmpty()) {
1343 // This is usually just an IP.
1344 std::string
sname(unicode2char(name
));
1346 ip::address_v4 adr
= ip::address_v4::from_string(sname
, ec
);
1348 m_endpoint
->address(adr
);
1351 AddDebugLogLineN(logAsio
, CFormat(wxT("Hostname(\"%s\") failed, not an IP address %s")) % name
% ec
.message());
1353 // Try to resolve (sync). Normally not required. Unless you type in your hostname as "local IP address" or something.
1355 ip::tcp::resolver
res(s_io_service
);
1356 // We only want to get IPV4 addresses.
1357 ip::tcp::resolver::query
query(ip::tcp::v4(), sname
, "");
1358 ip::tcp::resolver::iterator endpoint_iterator
= res
.resolve(query
, ec2
);
1360 AddDebugLogLineN(logAsio
, CFormat(wxT("Hostname(\"%s\") resolve failed: %s")) % name
% ec2
.message());
1363 if (endpoint_iterator
== ip::tcp::resolver::iterator()) {
1364 AddDebugLogLineN(logAsio
, CFormat(wxT("Hostname(\"%s\") resolve failed: no address found")) % name
);
1367 m_endpoint
->address(endpoint_iterator
->endpoint().address());
1368 AddDebugLogLineN(logAsio
, CFormat(wxT("Hostname(\"%s\") resolved to %s")) % name
% IPAddress());
1372 bool amuleIPV4Address::Service(uint16 service
)
1377 m_endpoint
->port(service
);
1381 uint16
amuleIPV4Address::Service() const
1383 return m_endpoint
->port();
1386 bool amuleIPV4Address::IsLocalHost() const
1388 return m_endpoint
->address().is_loopback();
1391 wxString
amuleIPV4Address::IPAddress() const
1393 return CFormat(wxT("%s")) % m_endpoint
->address().to_string();
1396 // "Set address to any of the addresses of the current machine."
1397 // This just sets the address to 0.0.0.0 .
1398 // wx does the same.
1399 bool amuleIPV4Address::AnyAddress()
1401 m_endpoint
->address(ip::address_v4::any());
1402 AddDebugLogLineN(logAsio
, CFormat(wxT("AnyAddress: set to %s")) % IPAddress());
1406 const CamuleIPV4Endpoint
& amuleIPV4Address::GetEndpoint() const
1408 return * m_endpoint
;
1411 CamuleIPV4Endpoint
& amuleIPV4Address::GetEndpoint()
1413 return * m_endpoint
;
1418 // Notification stuff
1420 namespace MuleNotify
1423 void LibSocketConnect(CLibSocket
* socket
, int error
)
1425 if (socket
->IsDestroying()) {
1426 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketConnect Destroying %s %d")) % socket
->GetIP() % error
);
1427 } else if (socket
->GetProxyState()) {
1428 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketConnect Proxy %s %d")) % socket
->GetIP() % error
);
1429 socket
->OnProxyEvent(MULE_SOCKET_CONNECTION
);
1431 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketConnect %s %d")) %socket
->GetIP() % error
);
1432 socket
->OnConnect(error
);
1436 void LibSocketSend(CLibSocket
* socket
, int error
)
1438 if (socket
->IsDestroying()) {
1439 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketSend Destroying %s %d")) % socket
->GetIP() % error
);
1440 } else if (socket
->GetProxyState()) {
1441 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketSend Proxy %s %d")) % socket
->GetIP() % error
);
1442 socket
->OnProxyEvent(MULE_SOCKET_OUTPUT
);
1444 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketSend %s %d")) % socket
->GetIP() % error
);
1445 socket
->OnSend(error
);
1449 void LibSocketReceive(CLibSocket
* socket
, int error
)
1451 socket
->EventProcessed();
1452 if (socket
->IsDestroying()) {
1453 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketReceive Destroying %s %d")) % socket
->GetIP() % error
);
1454 } else if (socket
->GetProxyState()) {
1455 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketReceive Proxy %s %d")) % socket
->GetIP() % error
);
1456 socket
->OnProxyEvent(MULE_SOCKET_INPUT
);
1458 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketReceive %s %d")) % socket
->GetIP() % error
);
1459 socket
->OnReceive(error
);
1463 void LibSocketLost(CLibSocket
* socket
)
1465 if (socket
->IsDestroying()) {
1466 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketLost Destroying %s")) % socket
->GetIP());
1467 } else if (socket
->GetProxyState()) {
1468 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketLost Proxy %s")) % socket
->GetIP());
1469 socket
->OnProxyEvent(MULE_SOCKET_LOST
);
1471 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocketLost %s")) % socket
->GetIP());
1476 void LibSocketDestroy(CLibSocket
* socket
)
1478 AddDebugLogLineF(logAsio
, CFormat(wxT("LibSocket_Destroy %s")) % socket
->GetIP());
1482 void ProxySocketEvent(CLibSocket
* socket
, int evt
)
1484 AddDebugLogLineF(logAsio
, CFormat(wxT("ProxySocketEvent %s %d")) % socket
->GetIP() % evt
);
1485 socket
->OnProxyEvent(evt
);
1488 void ServerTCPAccept(CLibSocketServer
* socketServer
)
1490 AddDebugLogLineF(logAsio
, wxT("ServerTCP_Accept"));
1491 socketServer
->OnAccept();
1494 void UDPSocketSend(CMuleUDPSocket
* socket
)
1496 AddDebugLogLineF(logAsio
, wxT("UDPSocketSend"));
1500 void UDPSocketReceive(CMuleUDPSocket
* socket
)
1502 AddDebugLogLineF(logAsio
, wxT("UDPSocketReceive"));
1503 socket
->OnReceive(0);
1507 } // namespace MuleNotify
1510 // Initialize MuleBoostVersion
1512 wxString MuleBoostVersion
= CFormat(wxT("%d.%d")) % (BOOST_VERSION
/ 100000) % (BOOST_VERSION
/ 100 % 1000);