2 // This file is part of the aMule Project.
4 // Copyright (c) 2003-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2002-2011 Merkur ( devs@emule-project.net / http://www.emule-project.net )
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
27 #include "EMSocket.h" // Interface declarations.
29 #include <protocol/Protocols.h>
30 #include <protocol/ed2k/Constants.h>
32 #include "Packet.h" // Needed for CPacket
34 #include "GetTickCount.h"
35 #include "UploadBandwidthThrottler.h"
37 #include "Preferences.h"
38 #include "ScopedPtr.h"
41 const uint32 MAX_PACKET_SIZE
= 2000000;
43 // cppcheck-suppress uninitMemberVar CEMSocket::pendingHeader
44 CEMSocket::CEMSocket(const CProxyData
*ProxyData
)
45 : CEncryptedStreamSocket(MULE_SOCKET_NOWAIT
, ProxyData
)
47 // If an interface has been specified,
48 // then we need to bind to it.
49 if (!thePrefs::GetAddress().IsEmpty()) {
50 amuleIPV4Address host
;
52 // No need to warn here, in case of failure to
53 // assign the hostname. That is already done
54 // in amule.cpp when starting ...
55 if (host
.Hostname(thePrefs::GetAddress())) {
60 byConnected
= ES_NOTCONNECTED
;
61 m_uTimeOut
= CONNECTION_TIMEOUT
; // default timeout for ed2k sockets
63 // Download (pseudo) rate control
65 downloadLimitEnable
= false;
66 pendingOnReceive
= false;
68 // Download partial header
69 pendingHeaderSize
= 0;
71 // Download partial packet
73 pendingPacketSize
= 0;
80 m_currentPacket_is_controlpacket
= false;
81 m_currentPackageIsFromPartFile
= false;
83 m_numberOfSentBytesCompleteFile
= 0;
84 m_numberOfSentBytesPartFile
= 0;
85 m_numberOfSentBytesControlPacket
= 0;
87 lastCalledSend
= ::GetTickCount();
88 lastSent
= ::GetTickCount()-1000;
90 m_bAccelerateUpload
= false;
92 m_actualPayloadSize
= 0;
93 m_actualPayloadSizeSent
= 0;
98 lastFinishedStandard
= 0;
101 CEMSocket::~CEMSocket()
103 // need to be locked here to know that the other methods
104 // won't be in the middle of things
106 wxMutexLocker
lock(m_sendLocker
);
107 byConnected
= ES_DISCONNECTED
;
110 // now that we know no other method will keep adding to the queue
111 // we can remove ourself from the queue
112 if (theApp
->uploadBandwidthThrottler
) {
113 theApp
->uploadBandwidthThrottler
->RemoveFromAllQueues(this);
119 SetNotify(0); // this is already done in Destroy()
125 void CEMSocket::ClearQueues()
127 wxMutexLocker
lock(m_sendLocker
);
129 DeleteContents(m_control_queue
);
132 CStdPacketQueue::iterator it
= m_standard_queue
.begin();
133 for (; it
!= m_standard_queue
.end(); ++it
) {
136 m_standard_queue
.clear();
139 // Download (pseudo) rate control
141 downloadLimitEnable
= false;
142 pendingOnReceive
= false;
144 // Download partial header
145 pendingHeaderSize
= 0;
147 // Download partial packet
148 delete[] pendingPacket
;
149 pendingPacket
= NULL
;
150 pendingPacketSize
= 0;
160 void CEMSocket::OnClose(int WXUNUSED(nErrorCode
))
162 // need to be locked here to know that the other methods
163 // won't be in the middle of things
165 wxMutexLocker
lock(m_sendLocker
);
166 byConnected
= ES_DISCONNECTED
;
169 // now that we know no other method will keep adding to the queue
170 // we can remove ourself from the queue
171 theApp
->uploadBandwidthThrottler
->RemoveFromAllQueues(this);
177 void CEMSocket::OnReceive(int nErrorCode
)
186 // Check current connection state
187 if (byConnected
== ES_DISCONNECTED
) {
190 byConnected
= ES_CONNECTED
; // ES_DISCONNECTED, ES_NOTCONNECTED, ES_CONNECTED
195 // CPU load improvement
196 if (downloadLimitEnable
&& downloadLimit
== 0){
197 pendingOnReceive
= true;
203 if (pendingHeaderSize
< PACKET_HEADER_SIZE
) {
204 delete[] pendingPacket
;
205 pendingPacket
= NULL
;
206 buf
= pendingHeader
+ pendingHeaderSize
;
207 readMax
= PACKET_HEADER_SIZE
- pendingHeaderSize
;
208 } else if (pendingPacket
== NULL
) {
209 pendingPacketSize
= 0;
210 readMax
= CPacket::GetPacketSizeFromHeader(pendingHeader
);
211 if (readMax
> MAX_PACKET_SIZE
) {
212 pendingHeaderSize
= 0;
216 pendingPacket
= new byte
[readMax
+ 1];
219 buf
= pendingPacket
+ pendingPacketSize
;
220 readMax
= CPacket::GetPacketSizeFromHeader(pendingHeader
) - pendingPacketSize
;
223 if (downloadLimitEnable
&& readMax
> downloadLimit
) {
224 readMax
= downloadLimit
;
229 wxMutexLocker
lock(m_sendLocker
);
230 ret
= Read(buf
, readMax
);
232 pendingOnReceive
= true;
235 if (LastError() || ret
== 0) {
241 if (downloadLimitEnable
) {
243 if (ret
>= downloadLimit
) {
246 downloadLimit
-= ret
;
250 // CPU load improvement
251 // Detect if the socket's buffer is empty (or the size did match...)
252 pendingOnReceive
= (ret
== readMax
);
254 if (pendingHeaderSize
>= PACKET_HEADER_SIZE
) {
255 pendingPacketSize
+= ret
;
256 if (pendingPacketSize
>= CPacket::GetPacketSizeFromHeader(pendingHeader
)) {
257 CScopedPtr
<CPacket
> packet(new CPacket(pendingHeader
, pendingPacket
));
258 pendingPacket
= NULL
;
259 pendingPacketSize
= 0;
260 pendingHeaderSize
= 0;
262 // Bugfix We still need to check for a valid protocol
263 // Remark: the default eMule v0.26b had removed this test......
264 switch (packet
->GetProtocol()){
268 case OP_ED2KV2HEADER
:
269 case OP_ED2KV2PACKEDPROT
:
272 OnError(ERR_WRONGHEADER
);
277 PacketReceived(packet
.get());
280 pendingHeaderSize
+= ret
;
282 } while (ret
&& pendingHeaderSize
>= PACKET_HEADER_SIZE
);
286 void CEMSocket::SetDownloadLimit(uint32 limit
)
288 downloadLimit
= limit
;
289 downloadLimitEnable
= true;
291 // CPU load improvement
292 if(limit
> 0 && pendingOnReceive
== true){
298 void CEMSocket::DisableDownloadLimit()
300 downloadLimitEnable
= false;
302 // CPU load improvement
303 if (pendingOnReceive
== true){
310 * Queues up the packet to be sent. Another thread will actually send the packet.
312 * If the packet is not a control packet, and if the socket decides that its queue is
313 * full and forceAdd is false, then the socket is allowed to refuse to add the packet
314 * to its queue. It will then return false and it is up to the calling thread to try
315 * to call SendPacket for that packet again at a later time.
317 * @param packet address to the packet that should be added to the queue
319 * @param delpacket if true, the responsibility for deleting the packet after it has been sent
320 * has been transferred to this object. If false, don't delete the packet after it
323 * @param controlpacket the packet is a controlpacket
325 * @param forceAdd this packet must be added to the queue, even if it is full. If this flag is true
326 * then the method can not refuse to add the packet, and therefore not return false.
328 * @return true if the packet was added to the queue, false otherwise
330 void CEMSocket::SendPacket(CPacket
* packet
, bool delpacket
, bool controlpacket
, uint32 actualPayloadSize
)
332 //printf("* SendPacket called on socket %p\n", this);
333 wxMutexLocker
lock(m_sendLocker
);
335 if (byConnected
== ES_DISCONNECTED
) {
336 //printf("* Disconnected, drop packet\n");
342 packet
= new CPacket(*packet
);
346 //printf("* Adding a control packet\n");
347 m_control_queue
.push_back(packet
);
349 // queue up for controlpacket
350 theApp
->uploadBandwidthThrottler
->QueueForSendingControlPacket(this, HasSent());
352 //printf("* Adding a normal packet to the queue\n");
353 bool first
= !((sendbuffer
&& !m_currentPacket_is_controlpacket
) || !m_standard_queue
.empty());
354 StandardPacketQueueEntry queueEntry
= { actualPayloadSize
, packet
};
355 m_standard_queue
.push_back(queueEntry
);
357 // reset timeout for the first time
359 lastFinishedStandard
= ::GetTickCount();
360 m_bAccelerateUpload
= true; // Always accelerate first packet in a block
367 uint64
CEMSocket::GetSentBytesCompleteFileSinceLastCallAndReset()
369 wxMutexLocker
lock( m_sendLocker
);
371 uint64 sentBytes
= m_numberOfSentBytesCompleteFile
;
372 m_numberOfSentBytesCompleteFile
= 0;
378 uint64
CEMSocket::GetSentBytesPartFileSinceLastCallAndReset()
380 wxMutexLocker
lock( m_sendLocker
);
382 uint64 sentBytes
= m_numberOfSentBytesPartFile
;
383 m_numberOfSentBytesPartFile
= 0;
388 uint64
CEMSocket::GetSentBytesControlPacketSinceLastCallAndReset()
390 wxMutexLocker
lock( m_sendLocker
);
392 uint64 sentBytes
= m_numberOfSentBytesControlPacket
;
393 m_numberOfSentBytesControlPacket
= 0;
398 uint64
CEMSocket::GetSentPayloadSinceLastCallAndReset()
400 wxMutexLocker
lock( m_sendLocker
);
402 uint64 sentBytes
= m_actualPayloadSizeSent
;
403 m_actualPayloadSizeSent
= 0;
409 void CEMSocket::OnSend(int nErrorCode
)
416 CEncryptedStreamSocket::OnSend(0);
418 wxMutexLocker
lock( m_sendLocker
);
421 if (byConnected
!= ES_DISCONNECTED
) {
422 byConnected
= ES_CONNECTED
;
424 if (m_currentPacket_is_controlpacket
) {
425 // queue up for control packet
426 theApp
->uploadBandwidthThrottler
->QueueForSendingControlPacket(this, HasSent());
433 * Try to put queued up data on the socket.
435 * Control packets have higher priority, and will be sent first, if possible.
436 * Standard packets can be split up in several package containers. In that case
437 * all the parts of a split package must be sent in a row, without any control packet
440 * @param maxNumberOfBytesToSend This is the maximum number of bytes that is allowed to be put on the socket
441 * this call. The actual number of sent bytes will be returned from the method.
443 * @param onlyAllowedToSendControlPacket This call we only try to put control packets on the sockets.
444 * If there's a standard packet "in the way", and we think that this socket
445 * is no longer an upload slot, then it is ok to send the standard packet to
446 * get it out of the way. But it is not allowed to pick a new standard packet
447 * from the queue during this call. Several split packets are counted as one
448 * standard packet though, so it is ok to finish them all off if necessary.
450 * @return the actual number of bytes that were put on the socket.
452 SocketSentBytes
CEMSocket::Send(uint32 maxNumberOfBytesToSend
, uint32 minFragSize
, bool onlyAllowedToSendControlPacket
)
454 wxMutexLocker
lock(m_sendLocker
);
456 //printf("* Attempt to send a packet on socket %p\n", this);
458 if (byConnected
== ES_DISCONNECTED
) {
459 //printf("* Disconnected socket %p\n", this);
460 SocketSentBytes returnVal
= { false, 0, 0 };
462 } else if (m_bBusy
&& onlyAllowedToSendControlPacket
) {
463 //printf("* Busy socket %p\n", this);
464 SocketSentBytes returnVal
= { true, 0, 0 };
468 bool anErrorHasOccured
= false;
469 uint32 sentStandardPacketBytesThisCall
= 0;
470 uint32 sentControlPacketBytesThisCall
= 0;
472 if (byConnected
== ES_CONNECTED
&& IsEncryptionLayerReady() && (!m_bBusy
|| onlyAllowedToSendControlPacket
)) {
474 //printf("* Internal attemptto send on %p\n", this);
476 if(minFragSize
< 1) {
480 maxNumberOfBytesToSend
= GetNextFragSize(maxNumberOfBytesToSend
, minFragSize
);
482 bool bWasLongTimeSinceSend
= (::GetTickCount() - lastSent
) > 1000;
484 lastCalledSend
= ::GetTickCount();
487 while(sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
< maxNumberOfBytesToSend
&& anErrorHasOccured
== false && // don't send more than allowed. Also, there should have been no error in earlier loop
488 (!m_control_queue
.empty() || !m_standard_queue
.empty() || sendbuffer
!= NULL
) && // there must exist something to send
489 (onlyAllowedToSendControlPacket
== false || // this means we are allowed to send both types of packets, so proceed
490 (sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
> 0 && (sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
) % minFragSize
!= 0) ||
491 (sendbuffer
== NULL
&& !m_control_queue
.empty()) || // There's a control packet in queue, and we are not currently sending anything, so we will handle the control packet next
492 (sendbuffer
!= NULL
&& m_currentPacket_is_controlpacket
== true) || // We are in the progress of sending a control packet. We are always allowed to send those
493 (sendbuffer
!= NULL
&& m_currentPacket_is_controlpacket
== false && bWasLongTimeSinceSend
&& !m_control_queue
.empty() && m_standard_queue
.empty() && (sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
) < minFragSize
) // We have waited to long to clean the current packet (which may be a standard packet that is in the way). Proceed no matter what the value of onlyAllowedToSendControlPacket.
497 // If we are currently not in the progress of sending a packet, we will need to find the next one to send
498 if(sendbuffer
== NULL
) {
499 CPacket
* curPacket
= NULL
;
500 if(!m_control_queue
.empty()) {
501 // There's a control packet to send
502 m_currentPacket_is_controlpacket
= true;
503 curPacket
= m_control_queue
.front();
504 m_control_queue
.pop_front();
505 } else if(!m_standard_queue
.empty() /*&& onlyAllowedToSendControlPacket == false*/) {
506 // There's a standard packet to send
507 m_currentPacket_is_controlpacket
= false;
508 StandardPacketQueueEntry queueEntry
= m_standard_queue
.front();
509 m_standard_queue
.pop_front();
510 curPacket
= queueEntry
.packet
;
511 m_actualPayloadSize
= queueEntry
.actualPayloadSize
;
513 // remember this for statistics purposes.
514 m_currentPackageIsFromPartFile
= curPacket
->IsFromPF();
516 // Just to be safe. Shouldn't happen?
517 // if we reach this point, then there's something wrong with the while condition above!
519 AddDebugLogLineC(logGeneral
, wxT("EMSocket: Couldn't get a new packet! There's an error in the first while condition in EMSocket::Send()"));
521 SocketSentBytes returnVal
= { true, sentStandardPacketBytesThisCall
, sentControlPacketBytesThisCall
};
525 // We found a packet to send. Get the data to send from the
526 // package container and dispose of the container.
527 sendblen
= curPacket
->GetRealPacketSize();
528 sendbuffer
= curPacket
->DetachPacket();
532 CryptPrepareSendData((byte
*)sendbuffer
, sendblen
);
535 // At this point we've got a packet to send in sendbuffer. Try to send it. Loop until entire packet
536 // is sent, or until we reach maximum bytes to send for this call, or until we get an error.
537 // NOTE! If send would block (returns WOULDBLOCK), we will return from this method INSIDE this loop.
538 while (sent
< sendblen
&&
539 sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
< maxNumberOfBytesToSend
&&
541 onlyAllowedToSendControlPacket
== false || // this means we are allowed to send both types of packets, so proceed
542 m_currentPacket_is_controlpacket
||
543 (bWasLongTimeSinceSend
&& (sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
) < minFragSize
) ||
544 (sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
) % minFragSize
!= 0
546 anErrorHasOccured
== false) {
547 uint32 tosend
= sendblen
-sent
;
548 if(!onlyAllowedToSendControlPacket
|| m_currentPacket_is_controlpacket
) {
549 if (maxNumberOfBytesToSend
>= sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
&& tosend
> maxNumberOfBytesToSend
-(sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
))
550 tosend
= maxNumberOfBytesToSend
-(sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
);
551 } else if(bWasLongTimeSinceSend
&& (sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
) < minFragSize
) {
552 if (minFragSize
>= sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
&& tosend
> minFragSize
-(sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
))
553 tosend
= minFragSize
-(sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
);
555 uint32 nextFragMaxBytesToSent
= GetNextFragSize(sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
, minFragSize
);
556 if (nextFragMaxBytesToSent
>= sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
&& tosend
> nextFragMaxBytesToSent
-(sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
))
557 tosend
= nextFragMaxBytesToSent
-(sentStandardPacketBytesThisCall
+ sentControlPacketBytesThisCall
);
559 wxASSERT(tosend
!= 0 && tosend
<= sendblen
-sent
);
561 //DWORD tempStartSendTick = ::GetTickCount();
563 lastSent
= ::GetTickCount();
565 uint32 result
= CEncryptedStreamSocket::Write(sendbuffer
+sent
,tosend
);
569 SocketSentBytes returnVal
= { true, sentStandardPacketBytesThisCall
, sentControlPacketBytesThisCall
};
570 return returnVal
; // Send() blocked, onsend will be called when ready to send again
571 } else if (LastError()) {
572 // Send() gave an error
573 anErrorHasOccured
= true;
575 // we managed to send some bytes. Perform bookkeeping.
581 // Log send bytes in correct class
582 if(m_currentPacket_is_controlpacket
== false) {
583 sentStandardPacketBytesThisCall
+= result
;
585 if(m_currentPackageIsFromPartFile
== true) {
586 m_numberOfSentBytesPartFile
+= result
;
588 m_numberOfSentBytesCompleteFile
+= result
;
591 sentControlPacketBytesThisCall
+= result
;
592 m_numberOfSentBytesControlPacket
+= result
;
597 if (sent
== sendblen
){
598 // we are done sending the current packet. Delete it and set
599 // sendbuffer to NULL so a new packet can be fetched.
604 if(!m_currentPacket_is_controlpacket
) {
605 m_actualPayloadSizeSent
+= m_actualPayloadSize
;
606 m_actualPayloadSize
= 0;
608 lastFinishedStandard
= ::GetTickCount(); // reset timeout
609 m_bAccelerateUpload
= false; // Safe until told otherwise
617 if(onlyAllowedToSendControlPacket
&& (!m_control_queue
.empty() || (sendbuffer
!= NULL
&& m_currentPacket_is_controlpacket
))) {
618 // enter control packet send queue
619 // we might enter control packet queue several times for the same package,
620 // but that costs very little overhead. Less overhead than trying to make sure
621 // that we only enter the queue once.
622 //printf("* Requeueing control packet on %p\n", this);
623 theApp
->uploadBandwidthThrottler
->QueueForSendingControlPacket(this, HasSent());
626 //printf("* Finishing send debug on %p\n",this);
628 SocketSentBytes returnVal
= { !anErrorHasOccured
, sentStandardPacketBytesThisCall
, sentControlPacketBytesThisCall
};
634 uint32
CEMSocket::GetNextFragSize(uint32 current
, uint32 minFragSize
)
636 if(current
% minFragSize
== 0) {
639 return minFragSize
*(current
/minFragSize
+1);
645 * Decides the (minimum) amount the socket needs to send to prevent timeout.
649 uint32
CEMSocket::GetNeededBytes()
655 uint64 sizeleft
, sizetotal
;
658 wxMutexLocker
lock(m_sendLocker
);
660 if (byConnected
== ES_DISCONNECTED
) {
664 if (!((sendbuffer
&& !m_currentPacket_is_controlpacket
) || !m_standard_queue
.empty())) {
665 // No standard packet to send. Even if data needs to be sent to prevent timout, there's nothing to send.
669 if (((sendbuffer
&& !m_currentPacket_is_controlpacket
)) && !m_control_queue
.empty())
670 m_bAccelerateUpload
= true; // We might be trying to send a block request, accelerate packet
672 sendgap
= ::GetTickCount() - lastCalledSend
;
674 timetotal
= m_bAccelerateUpload
?45000:90000;
675 timeleft
= ::GetTickCount() - lastFinishedStandard
;
676 if (sendbuffer
&& !m_currentPacket_is_controlpacket
) {
677 sizeleft
= sendblen
-sent
;
678 sizetotal
= sendblen
;
680 sizeleft
= sizetotal
= m_standard_queue
.front().packet
->GetRealPacketSize();
684 if (timeleft
>= timetotal
)
686 timeleft
= timetotal
-timeleft
;
687 if (timeleft
*sizetotal
>= timetotal
*sizeleft
) {
688 // don't use 'GetTimeOut' here in case the timeout value is high,
689 if (sendgap
> SEC2MS(20))
690 return 1; // Don't let the socket itself time out - Might happen when switching from spread(non-focus) slot to trickle slot
693 uint64 decval
= timeleft
*sizetotal
/timetotal
;
696 if (decval
< sizeleft
)
697 return sizeleft
-decval
+1; // Round up
704 * Removes all packets from the standard queue that don't have to be sent for the socket to be able to send a control packet.
706 * Before a socket can send a new packet, the current packet has to be finished. If the current packet is part of
707 * a split packet, then all parts of that split packet must be sent before the socket can send a control packet.
709 * This method keeps in standard queue only those packets that must be sent (rest of split packet), and removes everything
710 * after it. The method doesn't touch the control packet queue.
712 void CEMSocket::TruncateQueues()
714 wxMutexLocker
lock(m_sendLocker
);
716 // Clear the standard queue totally
717 // Please note! There may still be a standardpacket in the sendbuffer variable!
718 CStdPacketQueue::iterator it
= m_standard_queue
.begin();
719 for (; it
!= m_standard_queue
.end(); ++it
) {
723 m_standard_queue
.clear();
727 uint32
CEMSocket::GetTimeOut() const
733 void CEMSocket::SetTimeOut(uint32 uTimeOut
)
735 m_uTimeOut
= uTimeOut
;
737 // File_checked_for_headers