Review Kademlia assertions
[amule.git] / src / DownloadClient.cpp
blobfd673bf15ebbd4fa0402a1c6ff3fffd5517ff71d
1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (c) 2003-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2002-2011 Merkur ( devs@emule-project.net / http://www.emule-project.net )
6 //
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
9 // respective authors.
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #include "updownclient.h" // Needed for CUpDownClient
28 #include <protocol/Protocols.h>
29 #include <protocol/ed2k/Client2Client/TCP.h>
30 #include <protocol/ed2k/Client2Client/UDP.h>
31 #include <common/EventIDs.h>
32 #include <common/Macros.h>
33 #include <common/Constants.h>
35 #include <zlib.h>
36 #include <cmath> // Needed for std:exp
38 #include "ClientCredits.h" // Needed for CClientCredits
39 #include "ClientUDPSocket.h" // Needed for CClientUDPSocket
40 #include "DownloadQueue.h" // Needed for CDownloadQueue
41 #include "Preferences.h" // Needed for thePrefs
42 #include "Packet.h" // Needed for CPacket
43 #include "MemFile.h" // Needed for CMemFile
44 #include "ClientTCPSocket.h"// Needed for CClientTCPSocket
45 #include "ListenSocket.h" // Needed for CListenSocket
46 #include "amule.h" // Needed for theApp
47 #include "PartFile.h" // Needed for CPartFile
48 #include "SharedFileList.h"
49 #include "Statistics.h" // Needed for theStats
50 #include "Logger.h"
51 #include "GuiEvents.h" // Needed for Notify_*
52 #include "UploadQueue.h" // Needed for CUploadQueue
55 #ifdef __MULE_UNUSED_CODE__
56 // This function is left as a reminder.
57 // Changes here _must_ be reflected in CClientList::FindMatchingClient.
58 bool CUpDownClient::Compare(const CUpDownClient* tocomp, bool bIgnoreUserhash) const
60 if (!tocomp) {
61 // should we wxASSERT here?
62 return false;
65 //Compare only the user hash..
66 if(!bIgnoreUserhash && HasValidHash() && tocomp->HasValidHash()) {
67 return GetUserHash() == tocomp->GetUserHash();
70 if (HasLowID()) {
71 //User is firewalled.. Must do two checks..
72 if (GetIP()!=0 && GetIP() == tocomp->GetIP()) {
73 //The IP of both match
74 if (GetUserPort()!=0 && GetUserPort() == tocomp->GetUserPort()) {
75 //IP-UserPort matches
76 return true;
78 if (GetKadPort()!=0 && GetKadPort() == tocomp->GetKadPort()) {
79 //IP-KadPort Matches
80 return true;
84 if (GetUserIDHybrid()!=0
85 && GetUserIDHybrid() == tocomp->GetUserIDHybrid()
86 && GetServerIP()!=0
87 && GetServerIP() == tocomp->GetServerIP()
88 && GetServerPort()!=0
89 && GetServerPort() == tocomp->GetServerPort()) {
90 //Both have the same lowID, Same serverIP and Port..
91 return true;
94 //Both IP, and Server do not match..
95 return false;
98 //User is not firewalled.
99 if (GetUserPort()!=0) {
100 //User has a Port, lets check the rest.
101 if (GetIP() != 0 && tocomp->GetIP() != 0) {
102 //Both clients have a verified IP..
103 if(GetIP() == tocomp->GetIP() && GetUserPort() == tocomp->GetUserPort()) {
104 //IP and UserPort match..
105 return true;
107 } else {
108 //One of the two clients do not have a verified IP
109 if (GetUserIDHybrid() == tocomp->GetUserIDHybrid() && GetUserPort() == tocomp->GetUserPort()) {
110 //ID and Port Match..
111 return true;
116 if(GetKadPort()!=0) {
117 //User has a Kad Port.
118 if(GetIP() != 0 && tocomp->GetIP() != 0) {
119 //Both clients have a verified IP.
120 if(GetIP() == tocomp->GetIP() && GetKadPort() == tocomp->GetKadPort()) {
121 //IP and KadPort Match..
122 return true;
124 } else {
125 //One of the users do not have a verified IP.
126 if (GetUserIDHybrid() == tocomp->GetUserIDHybrid() && GetKadPort() == tocomp->GetKadPort()) {
127 //ID and KadProt Match..
128 return true;
133 //No Matches..
134 return false;
136 #endif
139 bool CUpDownClient::AskForDownload()
141 // 0.42e
142 if (theApp->listensocket->TooManySockets()) {
143 if (!m_socket) {
144 if (GetDownloadState() != DS_TOOMANYCONNS) {
145 SetDownloadState(DS_TOOMANYCONNS);
147 return true;
148 } else if (!m_socket->IsConnected()) {
149 if (GetDownloadState() != DS_TOOMANYCONNS) {
150 SetDownloadState(DS_TOOMANYCONNS);
152 return true;
155 m_bUDPPending = false;
156 m_dwLastAskedTime = ::GetTickCount();
157 SetDownloadState(DS_CONNECTING);
158 SetSentCancelTransfer(0);
159 return TryToConnect();
163 void CUpDownClient::SendStartupLoadReq()
165 // 0.42e
166 if (m_socket==NULL || m_reqfile==NULL) {
167 return;
169 SetDownloadState(DS_ONQUEUE);
170 CMemFile dataStartupLoadReq(16);
171 dataStartupLoadReq.WriteHash(m_reqfile->GetFileHash());
172 CPacket* packet = new CPacket(dataStartupLoadReq, OP_EDONKEYPROT, OP_STARTUPLOADREQ);
173 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
174 AddDebugLogLineN(logLocalClient, wxT("Local Client: OP_STARTUPLOADREQ to ") + GetFullIP());
175 SendPacket(packet, true, true);
179 bool CUpDownClient::IsSourceRequestAllowed()
181 //#warning REWRITE - Source swapping from eMule.
182 // 0.42e
183 uint32 dwTickCount = ::GetTickCount() + CONNECTION_LATENCY;
184 uint32 nTimePassedClient = dwTickCount - GetLastSrcAnswerTime();
185 uint32 nTimePassedFile = dwTickCount - m_reqfile->GetLastAnsweredTime();
186 bool bNeverAskedBefore = (GetLastAskedForSources() == 0);
188 uint32 uSources = m_reqfile->GetSourceCount();
189 return (
190 // if client has the correct extended protocol
191 ExtProtocolAvailable() && (SupportsSourceExchange2() || GetSourceExchange1Version() > 1) &&
192 // AND if we need more sources
193 thePrefs::GetMaxSourcePerFileSoft() > uSources &&
194 // AND if...
196 //source is not complete and file is very rare
197 ( !m_bCompleteSource
198 && (bNeverAskedBefore || nTimePassedClient > SOURCECLIENTREASKS)
199 && (uSources <= RARE_FILE/5)
200 ) ||
201 //source is not complete and file is rare
202 ( !m_bCompleteSource
203 && (bNeverAskedBefore || nTimePassedClient > SOURCECLIENTREASKS)
204 && (uSources <= RARE_FILE || uSources - m_reqfile->GetValidSourcesCount() <= RARE_FILE / 2)
205 && (nTimePassedFile > SOURCECLIENTREASKF)
206 ) ||
207 // OR if file is not rare
208 ( (bNeverAskedBefore || nTimePassedClient > (unsigned)(SOURCECLIENTREASKS * MINCOMMONPENALTY))
209 && (nTimePassedFile > (unsigned)(SOURCECLIENTREASKF * MINCOMMONPENALTY))
216 void CUpDownClient::SendFileRequest()
218 wxCHECK_RET(m_reqfile, wxT("Cannot request file when no reqfile is set"));
220 CMemFile dataFileReq(16+16);
221 dataFileReq.WriteHash(m_reqfile->GetFileHash());
223 if (SupportMultiPacket()) {
224 DEBUG_ONLY( wxString sent_opcodes; )
226 if (SupportExtMultiPacket()) {
227 dataFileReq.WriteUInt64(m_reqfile->GetFileSize());
230 AddDebugLogLineN(logClient, wxT("Sending file request to client"));
232 dataFileReq.WriteUInt8(OP_REQUESTFILENAME);
233 DEBUG_ONLY( sent_opcodes += wxT("|RFNM|"); )
234 // Extended information
235 if (GetExtendedRequestsVersion() > 0) {
236 m_reqfile->WritePartStatus(&dataFileReq);
238 if (GetExtendedRequestsVersion() > 1) {
239 m_reqfile->WriteCompleteSourcesCount(&dataFileReq);
241 if (m_reqfile->GetPartCount() > 1) {
242 DEBUG_ONLY( sent_opcodes += wxT("|RFID|"); )
243 dataFileReq.WriteUInt8(OP_SETREQFILEID);
245 if (IsEmuleClient()) {
246 SetRemoteQueueFull( true );
247 SetRemoteQueueRank(0);
249 if (IsSourceRequestAllowed()) {
250 if (SupportsSourceExchange2()){
251 DEBUG_ONLY( sent_opcodes += wxT("|RSRC2|"); )
252 dataFileReq.WriteUInt8(OP_REQUESTSOURCES2);
253 dataFileReq.WriteUInt8(SOURCEEXCHANGE2_VERSION);
254 const uint16 nOptions = 0; // 16 ... Reserved
255 dataFileReq.WriteUInt16(nOptions);
256 } else {
257 DEBUG_ONLY( sent_opcodes += wxT("|RSRC|"); )
258 dataFileReq.WriteUInt8(OP_REQUESTSOURCES);
260 m_reqfile->SetLastAnsweredTimeTimeout();
261 SetLastAskedForSources();
263 if (IsSupportingAICH()) {
264 DEBUG_ONLY( sent_opcodes += wxT("|AFHR|"); )
265 dataFileReq.WriteUInt8(OP_AICHFILEHASHREQ);
267 CPacket* packet = new CPacket(dataFileReq, OP_EMULEPROT, (SupportExtMultiPacket() ? OP_MULTIPACKET_EXT : OP_MULTIPACKET));
268 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
269 AddDebugLogLineN(logLocalClient, CFormat(wxT("Local Client: %s (%s) to %s"))
270 % (SupportExtMultiPacket() ? wxT("OP_MULTIPACKET_EXT") : wxT("OP_MULTIPACKET")) % sent_opcodes % GetFullIP());
271 SendPacket(packet, true);
272 } else {
273 //This is extended information
274 if (GetExtendedRequestsVersion() > 0 ) {
275 m_reqfile->WritePartStatus(&dataFileReq);
277 if (GetExtendedRequestsVersion() > 1 ) {
278 m_reqfile->WriteCompleteSourcesCount(&dataFileReq);
280 CPacket* packet = new CPacket(dataFileReq, OP_EDONKEYPROT, OP_REQUESTFILENAME);
281 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
282 AddDebugLogLineN( logLocalClient, wxT("Local Client: OP_REQUESTFILENAME to ") + GetFullIP() );
283 SendPacket(packet, true);
285 // 26-Jul-2003: removed requesting the file status for files <= PARTSIZE for better compatibility with ed2k protocol (eDonkeyHybrid).
286 // if the remote client answers the OP_REQUESTFILENAME with OP_REQFILENAMEANSWER the file is shared by the remote client. if we
287 // know that the file is shared, we know also that the file is complete and don't need to request the file status.
289 // Sending the packet could have deleted the client, check m_reqfile
290 if (m_reqfile && (m_reqfile->GetPartCount() > 1)) {
291 CMemFile dataSetReqFileID(16);
292 dataSetReqFileID.WriteHash(m_reqfile->GetFileHash());
293 packet = new CPacket(dataSetReqFileID, OP_EDONKEYPROT, OP_SETREQFILEID);
294 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
295 AddDebugLogLineN(logLocalClient, wxT("Local Client: OP_SETREQFILEID to ") + GetFullIP());
296 SendPacket(packet, true);
299 if (IsEmuleClient()) {
300 SetRemoteQueueFull( true );
301 SetRemoteQueueRank(0);
304 // Sending the packet could have deleted the client, check m_reqfile
305 if (m_reqfile && IsSourceRequestAllowed()) {
306 m_reqfile->SetLastAnsweredTimeTimeout();
308 CMemFile packetdata;
310 if (SupportsSourceExchange2()) {
311 packetdata.WriteUInt8(SOURCEEXCHANGE2_VERSION);
312 packetdata.WriteUInt16(0 /* Reserved */);
315 packetdata.WriteHash(m_reqfile->GetFileHash());
317 packet = new CPacket(packetdata, OP_EMULEPROT, SupportsSourceExchange2() ? OP_REQUESTSOURCES2 : OP_REQUESTSOURCES);
319 theStats::AddUpOverheadSourceExchange(packet->GetPacketSize());
320 AddDebugLogLineN( logLocalClient, wxT("Local Client: OP_REQUESTSOURCES to ") + GetFullIP() );
321 SendPacket(packet,true,true);
322 SetLastAskedForSources();
325 // Sending the packet could have deleted the client, check m_reqfile
326 if (m_reqfile && IsSupportingAICH()) {
327 packet = new CPacket(OP_AICHFILEHASHREQ,16,OP_EMULEPROT);
328 packet->Copy16ToDataBuffer((const char *)m_reqfile->GetFileHash().GetHash());
329 theStats::AddUpOverheadOther(packet->GetPacketSize());
330 AddDebugLogLineN(logLocalClient, wxT("Local Client: OP_AICHFILEHASHREQ to ") + GetFullIP());
331 SendPacket(packet,true,true);
337 void CUpDownClient::ProcessFileInfo(const CMemFile* data, const CPartFile* file)
339 // 0.42e
340 if (file==NULL) {
341 throw wxString(wxT("ERROR: Wrong file ID (ProcessFileInfo; file==NULL)"));
343 if (m_reqfile==NULL) {
344 throw wxString(wxT("ERROR: Wrong file ID (ProcessFileInfo; m_reqfile==NULL)"));
346 if (file != m_reqfile) {
347 throw wxString(wxT("ERROR: Wrong file ID (ProcessFileInfo; m_reqfile!=file)"));
350 m_clientFilename = data->ReadString((GetUnicodeSupport() != utf8strNone));
352 // 26-Jul-2003: removed requesting the file status for files <= PARTSIZE for better compatibility with ed2k protocol (eDonkeyHybrid).
353 // if the remote client answers the OP_REQUESTFILENAME with OP_REQFILENAMEANSWER the file is shared by the remote client. if we
354 // know that the file is shared, we know also that the file is complete and don't need to request the file status.
355 if (m_reqfile->GetPartCount() == 1) {
356 m_nPartCount = m_reqfile->GetPartCount();
358 m_reqfile->UpdatePartsFrequency( this, false ); // Decrement
359 m_downPartStatus.setsize( m_nPartCount, 1 );
360 m_reqfile->UpdatePartsFrequency( this, true ); // Increment
362 m_bCompleteSource = true;
364 UpdateDisplayedInfo();
365 // even if the file is <= PARTSIZE, we _may_ need the hashset for that file (if the file size == PARTSIZE)
366 if (m_reqfile->IsHashSetNeeded()) {
367 if (m_socket) {
368 CPacket* packet = new CPacket(OP_HASHSETREQUEST,16, OP_EDONKEYPROT);
369 packet->Copy16ToDataBuffer((const char *)m_reqfile->GetFileHash().GetHash());
370 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
371 AddDebugLogLineN(logLocalClient, wxT("Local Client: OP_HASHSETREQUEST to ") + GetFullIP());
372 SendPacket(packet,true,true);
373 SetDownloadState(DS_REQHASHSET);
374 m_fHashsetRequesting = 1;
375 m_reqfile->SetHashSetNeeded(false);
376 } else {
377 wxFAIL;
379 } else {
380 SendStartupLoadReq();
382 m_reqfile->UpdatePartsInfo();
386 void CUpDownClient::ProcessFileStatus(bool bUdpPacket, const CMemFile* data, const CPartFile* file)
388 // 0.42e
389 wxString strReqFileNull(wxT("ERROR: Wrong file ID (ProcessFileStatus; m_reqfile==NULL)"));
391 if ( !m_reqfile || file != m_reqfile ){
392 if (!m_reqfile) {
393 throw strReqFileNull;
395 throw wxString(wxT("ERROR: Wrong file ID (ProcessFileStatus; m_reqfile!=file)"));
398 uint16 nED2KPartCount = data->ReadUInt16();
400 m_reqfile->UpdatePartsFrequency( this, false ); // Decrement
401 m_downPartStatus.clear();
403 bool bPartsNeeded = false;
404 if (!nED2KPartCount)
406 m_nPartCount = m_reqfile->GetPartCount();
407 m_downPartStatus.setsize( m_nPartCount, 1);
408 bPartsNeeded = true;
409 m_bCompleteSource = true;
411 else
413 // Somehow this happened.
414 if (!m_reqfile) {
415 throw strReqFileNull;
417 if (m_reqfile->GetED2KPartCount() != nED2KPartCount)
419 wxString strError;
420 strError << wxT("ProcessFileStatus - wrong part number recv=") << nED2KPartCount <<
421 wxT(" expected=") << m_reqfile->GetED2KPartCount() << wxT(" ") <<
422 m_reqfile->GetFileHash().Encode();
423 m_nPartCount = 0;
424 throw strError;
426 m_nPartCount = m_reqfile->GetPartCount();
428 m_bCompleteSource = false;
429 m_downPartStatus.setsize( m_nPartCount, 0 );
430 uint16 done = 0;
432 try {
433 while (done != m_nPartCount) {
434 uint8 toread = data->ReadUInt8();
436 for ( uint8 i = 0;i < 8; i++ ) {
437 bool status = ((toread>>i)&1)? 1:0;
438 m_downPartStatus.set(done, status);
440 if (status) {
441 if (!m_reqfile->IsComplete(done)){
442 bPartsNeeded = true;
445 done++;
446 if (done == m_nPartCount) {
447 break;
451 } catch( ... ) {
452 // We want the counts to be updated, even if we fail to read everything
453 m_reqfile->UpdatePartsFrequency( this, true ); // Increment
455 throw;
459 m_reqfile->UpdatePartsFrequency( this, true ); // Increment
461 UpdateDisplayedInfo();
463 // NOTE: This function is invoked from TCP and UDP socket!
464 if (!bUdpPacket) {
465 if (!bPartsNeeded) {
466 SetDownloadState(DS_NONEEDEDPARTS);
467 } else if (m_reqfile->IsHashSetNeeded()) {
468 //If we are using the eMule filerequest packets, this is taken care of in the Multipacket!
469 if (m_socket) {
470 CPacket* packet = new CPacket(OP_HASHSETREQUEST,16, OP_EDONKEYPROT);
471 packet->Copy16ToDataBuffer((const char *)m_reqfile->GetFileHash().GetHash());
472 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
473 AddDebugLogLineN(logLocalClient, wxT("Local Client: OP_HASHSETREQUEST to ") + GetFullIP());
474 SendPacket(packet, true, true);
475 SetDownloadState(DS_REQHASHSET);
476 m_fHashsetRequesting = 1;
477 m_reqfile->SetHashSetNeeded(false);
478 } else {
479 wxFAIL;
482 else {
483 SendStartupLoadReq();
486 else {
487 if (!bPartsNeeded) {
488 SetDownloadState(DS_NONEEDEDPARTS);
489 } else {
490 SetDownloadState(DS_ONQUEUE);
493 m_reqfile->UpdatePartsInfo();
496 bool CUpDownClient::AddRequestForAnotherFile(CPartFile* file)
498 if ( m_A4AF_list.find( file ) == m_A4AF_list.end() ) {
499 // When we access a non-existing entry entry, it will be zeroed by default,
500 // so we have to set NeededParts. All in one go.
501 m_A4AF_list[file].NeededParts = true;
502 file->AddA4AFSource( this );
503 return true;
504 } else {
505 return false;
509 bool CUpDownClient::DeleteFileRequest(CPartFile* file)
511 return (m_A4AF_list.erase( file ) > 0);
514 void CUpDownClient::DeleteAllFileRequests()
516 m_A4AF_list.clear();
520 /* eMule 0.30c implementation, i give it a try (Creteil) BEGIN ... */
521 void CUpDownClient::SetDownloadState(uint8 byNewState)
523 if (m_nDownloadState != byNewState) {
524 if (m_reqfile) {
525 // Notify the client that this source has changed its state
526 m_reqfile->ClientStateChanged( m_nDownloadState, byNewState );
528 if (byNewState == DS_DOWNLOADING) {
529 m_reqfile->AddDownloadingSource(this);
530 } else if (m_nDownloadState == DS_DOWNLOADING) {
531 m_reqfile->RemoveDownloadingSource(this);
534 if (byNewState == DS_DOWNLOADING) {
535 msReceivedPrev = GetTickCount();
536 theStats::AddDownloadingSource();
537 } else if (m_nDownloadState == DS_DOWNLOADING) {
538 theStats::RemoveDownloadingSource();
541 if (m_nDownloadState == DS_DOWNLOADING) {
542 m_nDownloadState = byNewState;
543 ClearDownloadBlockRequests();
545 kBpsDown = 0.0;
546 bytesReceivedCycle = 0;
547 msReceivedPrev = 0;
548 if (byNewState == DS_NONE) {
549 if (m_reqfile) {
550 m_reqfile->UpdatePartsFrequency( this, false ); // Decrement
552 m_downPartStatus.clear();
553 m_nPartCount = 0;
555 if (m_socket && byNewState != DS_ERROR) {
556 m_socket->DisableDownloadLimit();
559 m_nDownloadState = byNewState;
560 if(GetDownloadState() == DS_DOWNLOADING) {
561 if (IsEmuleClient()) {
562 SetRemoteQueueFull(false);
564 SetRemoteQueueRank(0); // eMule 0.30c set like this ...
566 UpdateDisplayedInfo(true);
569 /* eMule 0.30c implementation, i give it a try (Creteil) END ... */
571 void CUpDownClient::ProcessHashSet(const byte* packet, uint32 size)
573 if ((!m_reqfile) || md4cmp(packet,m_reqfile->GetFileHash().GetHash())) {
574 throw wxString(wxT("Wrong fileid sent (ProcessHashSet)"));
576 if (!m_fHashsetRequesting) {
577 throw wxString(wxT("Received unsolicited hashset, ignoring it."));
579 CMemFile data(packet,size);
580 if (m_reqfile->LoadHashsetFromFile(&data,true)) {
581 m_fHashsetRequesting = 0;
582 } else {
583 m_reqfile->SetHashSetNeeded(true);
584 throw wxString(wxT("Corrupted or invalid hashset received"));
586 SendStartupLoadReq();
589 void CUpDownClient::SendBlockRequests()
591 uint32 current_time = ::GetTickCount();
592 if (GetVBTTags()) {
594 // Ask new blocks only when all completed
595 if (!m_PendingBlocks_list.empty()) {
596 return;
599 if ((m_dwLastBlockReceived + SEC2MS(5)) > current_time) {
600 // We received last block in less than 5 secs? Let's request faster.
601 m_MaxBlockRequests = m_MaxBlockRequests << 1;
602 if ( m_MaxBlockRequests > 0x20) {
603 m_MaxBlockRequests = 0x20;
605 } else {
606 m_MaxBlockRequests = m_MaxBlockRequests >> 1;
607 if ( m_MaxBlockRequests < STANDARD_BLOCKS_REQUEST) {
608 m_MaxBlockRequests = STANDARD_BLOCKS_REQUEST;
613 m_dwLastBlockReceived = current_time;
615 if (!m_reqfile) {
616 return;
619 uint8 version = GetVBTTags() ? 2 : 1;
621 if (m_DownloadBlocks_list.empty()) {
622 // Barry - instead of getting 3, just get how many is needed
623 uint16 count = m_MaxBlockRequests - m_PendingBlocks_list.size();
624 std::vector<Requested_Block_Struct*> toadd;
625 if (m_reqfile->GetNextRequestedBlock(this, toadd, count)) {
626 for (int i = 0; i != count; i++) {
627 m_DownloadBlocks_list.push_back(toadd[i]);
632 // Barry - Why are unfinished blocks requested again, not just new ones?
634 while (m_PendingBlocks_list.size() < m_MaxBlockRequests && !m_DownloadBlocks_list.empty()) {
635 Pending_Block_Struct* pblock = new Pending_Block_Struct;
636 pblock->block = m_DownloadBlocks_list.front();
637 pblock->zStream = NULL;
638 pblock->totalUnzipped = 0;
639 pblock->fZStreamError = 0;
640 pblock->fRecovered = 0;
641 m_PendingBlocks_list.push_back(pblock);
642 m_DownloadBlocks_list.pop_front();
646 if (m_PendingBlocks_list.empty()) {
648 CUpDownClient* slower_client = NULL;
650 if (thePrefs::GetDropSlowSources()) {
651 slower_client = m_reqfile->GetSlowerDownloadingClient(m_lastaverage, this);
654 if (slower_client == NULL) {
655 slower_client = this;
658 if (!slower_client->GetSentCancelTransfer()) {
659 CPacket* packet = new CPacket(OP_CANCELTRANSFER, 0, OP_EDONKEYPROT);
660 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
661 // if (slower_client != this) {
662 // printf("Dropped client %p to allow client %p to download\n",slower_client, this);
663 // }
664 slower_client->ClearDownloadBlockRequests();
665 slower_client->SendPacket(packet,true,true);
666 slower_client->SetSentCancelTransfer(1);
669 slower_client->SetDownloadState(DS_NONEEDEDPARTS);
671 if (slower_client != this) {
672 // Re-request freed blocks.
673 AddDebugLogLineN( logLocalClient, wxT("Local Client: OP_CANCELTRANSFER (faster source eager to transfer) to ") + slower_client->GetFullIP() );
674 wxASSERT(m_DownloadBlocks_list.empty());
675 wxASSERT(m_PendingBlocks_list.empty());
676 uint16 count = m_MaxBlockRequests;
677 std::vector<Requested_Block_Struct*> toadd;
678 if (m_reqfile->GetNextRequestedBlock(this, toadd, count)) {
679 for (int i = 0; i != count; i++) {
680 Pending_Block_Struct* pblock = new Pending_Block_Struct;
681 pblock->block = toadd[i];
682 pblock->zStream = NULL;
683 pblock->totalUnzipped = 0;
684 pblock->fZStreamError = 0;
685 pblock->fRecovered = 0;
686 m_PendingBlocks_list.push_back(pblock);
688 } else {
689 // WTF, we just freed blocks.
690 wxFAIL;
691 return;
693 } else {
694 // Drop this one.
695 AddDebugLogLineN( logLocalClient, wxT("Local Client: OP_CANCELTRANSFER (no free blocks) to ") + GetFullIP() );
696 //#warning Kry - Would be nice to swap A4AF here.
697 return;
701 CPacket* packet = NULL;
703 switch (version) {
704 case 2: {
705 // ED2Kv2 packet...
706 // Most common scenario: hash + blocks to request + every one
707 // having 2 uint32 tags
709 uint8 nBlocks = m_PendingBlocks_list.size();
710 if (nBlocks > m_MaxBlockRequests) {
711 nBlocks = m_MaxBlockRequests;
714 CMemFile data(16 + 1 + nBlocks*((2+4)*2));
716 data.WriteHash(m_reqfile->GetFileHash());
718 data.WriteUInt8(nBlocks);
720 std::list<Pending_Block_Struct*>::iterator it = m_PendingBlocks_list.begin();
721 while (nBlocks) {
722 wxASSERT(it != m_PendingBlocks_list.end());
723 wxASSERT( (*it)->block->StartOffset <= (*it)->block->EndOffset );
724 (*it)->fZStreamError = 0;
725 (*it)->fRecovered = 0;
726 CTagVarInt(/*Noname*/0,(*it)->block->StartOffset).WriteTagToFile(&data);
727 CTagVarInt(/*Noname*/0,(*it)->block->EndOffset).WriteTagToFile(&data);
728 ++it;
729 nBlocks--;
732 packet = new CPacket(data, OP_ED2KV2HEADER, OP_REQUESTPARTS);
733 AddDebugLogLineN( logLocalClient, CFormat(wxT("Local Client ED2Kv2: OP_REQUESTPARTS(%i) to %s"))
734 % (m_PendingBlocks_list.size()<m_MaxBlockRequests ? m_PendingBlocks_list.size() : m_MaxBlockRequests) % GetFullIP() );
736 break;
738 case 1: {
739 wxASSERT(m_MaxBlockRequests == STANDARD_BLOCKS_REQUEST);
741 //#warning Kry - I dont specially like this approach, we iterate one time too many
743 bool bHasLongBlocks = false;
745 std::list<Pending_Block_Struct*>::iterator it = m_PendingBlocks_list.begin();
746 for (uint32 i = 0; i != m_MaxBlockRequests; i++){
747 if (it != m_PendingBlocks_list.end()) {
748 Pending_Block_Struct* pending = *it++;
749 wxASSERT( pending->block->StartOffset <= pending->block->EndOffset );
750 if (pending->block->StartOffset > 0xFFFFFFFF || pending->block->EndOffset > 0xFFFFFFFF){
751 bHasLongBlocks = true;
752 if (!SupportsLargeFiles()){
753 // Requesting a large block from a client that doesn't support large files?
754 wxFAIL;
755 if (!GetSentCancelTransfer()){
756 CPacket* cancel_packet = new CPacket(OP_CANCELTRANSFER, 0, OP_EDONKEYPROT);
757 theStats::AddUpOverheadFileRequest(cancel_packet->GetPacketSize());
758 AddDebugLogLineN( logLocalClient, wxT("Local Client: OP_CANCELTRANSFER to ") + GetFullIP() );
759 SendPacket(cancel_packet,true,true);
760 SetSentCancelTransfer(1);
762 SetDownloadState(DS_ERROR);
764 break;
769 CMemFile data(16 /*Hash*/ + (m_MaxBlockRequests*(bHasLongBlocks ? 8 : 4) /* uint32/64 start*/) + (3*(bHasLongBlocks ? 8 : 4)/* uint32/64 end*/));
770 data.WriteHash(m_reqfile->GetFileHash());
772 it = m_PendingBlocks_list.begin();
773 for (uint32 i = 0; i != m_MaxBlockRequests; i++) {
774 if (it != m_PendingBlocks_list.end()) {
775 Pending_Block_Struct* pending = *it++;
776 wxASSERT( pending->block->StartOffset <= pending->block->EndOffset );
777 pending->fZStreamError = 0;
778 pending->fRecovered = 0;
779 if (bHasLongBlocks) {
780 data.WriteUInt64(pending->block->StartOffset);
781 } else {
782 data.WriteUInt32(pending->block->StartOffset);
784 } else {
785 if (bHasLongBlocks) {
786 data.WriteUInt64(0);
787 } else {
788 data.WriteUInt32(0);
793 it = m_PendingBlocks_list.begin();
794 for (uint32 i = 0; i != m_MaxBlockRequests; i++) {
795 if (it != m_PendingBlocks_list.end()) {
796 Requested_Block_Struct* block = (*it++)->block;
797 if (bHasLongBlocks) {
798 data.WriteUInt64(block->EndOffset+1);
799 } else {
800 data.WriteUInt32(block->EndOffset+1);
802 } else {
803 if (bHasLongBlocks) {
804 data.WriteUInt64(0);
805 } else {
806 data.WriteUInt32(0);
810 packet = new CPacket(data, (bHasLongBlocks ? OP_EMULEPROT : OP_EDONKEYPROT), (bHasLongBlocks ? (uint8)OP_REQUESTPARTS_I64 : (uint8)OP_REQUESTPARTS));
811 AddDebugLogLineN(logLocalClient, CFormat(wxT("Local Client: %s to %s")) % (bHasLongBlocks ? wxT("OP_REQUESTPARTS_I64") : wxT("OP_REQUESTPARTS")) % GetFullIP());
812 break;
814 default:
815 wxFAIL;
818 if (packet) {
819 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
820 SendPacket(packet, true, true);
821 } else {
822 wxFAIL;
827 Barry - Originally this only wrote to disk when a full 180k block
828 had been received from a client, and only asked for data in
829 180k blocks.
831 This meant that on average 90k was lost for every connection
832 to a client data source. That is a lot of wasted data.
834 To reduce the lost data, packets are now written to a buffer
835 and flushed to disk regularly regardless of size downloaded.
837 This includes compressed packets.
839 Data is also requested only where gaps are, not in 180k blocks.
840 The requests will still not exceed 180k, but may be smaller to
841 fill a gap.
844 void CUpDownClient::ProcessBlockPacket(const byte* packet, uint32 size, bool packed, bool largeblocks)
846 // Ignore if no data required
847 if (!(GetDownloadState() == DS_DOWNLOADING || GetDownloadState() == DS_NONEEDEDPARTS)) {
848 return;
851 // This vars are defined here to be able to use them on the catch
852 int header_size = 16;
853 uint64 nStartPos = 0;
854 uint64 nEndPos = 0;
855 uint32 nBlockSize = 0;
856 uint32 lenUnzipped = 0;
858 // Update stats
859 m_dwLastBlockReceived = ::GetTickCount();
861 try {
863 // Read data from packet
864 const CMemFile data(packet, size);
866 // Check that this data is for the correct file
867 if ((!m_reqfile) || data.ReadHash() != m_reqfile->GetFileHash()) {
868 throw wxString(wxT("Wrong fileid sent (ProcessBlockPacket)"));
871 // Find the start & end positions, and size of this chunk of data
873 if (largeblocks) {
874 nStartPos = data.ReadUInt64();
875 header_size += 8;
876 } else {
877 nStartPos = data.ReadUInt32();
878 header_size += 4;
881 if (packed) {
882 nBlockSize = data.ReadUInt32();
883 header_size += 4;
884 nEndPos = nStartPos + (size - header_size);
885 } else {
886 if (largeblocks) {
887 nEndPos = data.ReadUInt64();
888 header_size += 8;
889 } else {
890 nEndPos = data.ReadUInt32();
891 header_size += 4;
895 // Check that packet size matches the declared data size + header size
896 if ( nEndPos == nStartPos || size != ((nEndPos - nStartPos) + header_size)) {
897 throw wxString(wxT("Corrupted or invalid DataBlock received (ProcessBlockPacket)"));
899 theStats::AddDownloadFromSoft(GetClientSoft(),size - header_size);
900 bytesReceivedCycle += size - header_size;
902 credits->AddDownloaded(size - header_size, GetIP(), theApp->CryptoAvailable());
904 // Move end back one, should be inclusive
905 nEndPos--;
907 // Loop through to find the reserved block that this is within
908 std::list<Pending_Block_Struct*>::iterator it = m_PendingBlocks_list.begin();
909 for (; it != m_PendingBlocks_list.end(); ++it) {
910 Pending_Block_Struct* cur_block = *it;
912 if ((cur_block->block->StartOffset <= nStartPos) && (cur_block->block->EndOffset >= nStartPos)) {
913 // Found reserved block
915 if (cur_block->block->StartOffset == nStartPos) {
916 // This block just started transfering. Set the start time.
917 m_last_block_start = ::GetTickCountFullRes();
920 if (cur_block->fZStreamError){
921 AddDebugLogLineN(logZLib,
922 CFormat(wxT("Ignoring %u bytes of block %u-%u because of erroneous zstream state for file: %s"))
923 % (size - header_size) % nStartPos % nEndPos % m_reqfile->GetFileName());
924 m_reqfile->RemoveBlockFromList(cur_block->block->StartOffset, cur_block->block->EndOffset);
925 return;
928 // Remember this start pos, used to draw part downloading in list
929 m_lastDownloadingPart = nStartPos / PARTSIZE;
931 // Occasionally packets are duplicated, no point writing it twice
932 // This will be 0 in these cases, or the length written otherwise
933 uint32 lenWritten = 0;
935 // Handle differently depending on whether packed or not
936 if (!packed) {
937 // security sanitize check
938 if (nEndPos > cur_block->block->EndOffset) {
939 AddDebugLogLineN(logRemoteClient, CFormat(wxT("Received Blockpacket exceeds requested boundaries (requested end: %u, Part: %u, received end: %u, Part: %u), file: %s remote IP: %s")) % cur_block->block->EndOffset % (uint32)(cur_block->block->EndOffset / PARTSIZE) % nEndPos % (uint32)(nEndPos / PARTSIZE) % m_reqfile->GetFileName() % Uint32toStringIP(GetIP()));
940 m_reqfile->RemoveBlockFromList(cur_block->block->StartOffset, cur_block->block->EndOffset);
941 return;
943 // Write to disk (will be buffered in part file class)
944 lenWritten = m_reqfile->WriteToBuffer( size - header_size, (byte*)(packet + header_size), nStartPos, nEndPos, cur_block->block, this);
945 } else {
946 // Packed
947 wxASSERT( (long int)size > 0 );
948 // Create space to store unzipped data, the size is
949 // only an initial guess, will be resized in unzip()
950 // if not big enough
951 lenUnzipped = (size * 2);
952 // Don't get too big
953 if (lenUnzipped > (BLOCKSIZE + 300)) {
954 lenUnzipped = (BLOCKSIZE + 300);
956 byte *unzipped = new byte[lenUnzipped];
958 // Try to unzip the packet
959 int result = unzip(cur_block, (byte*)(packet + header_size), (size - header_size), &unzipped, &lenUnzipped);
961 // no block can be uncompressed to >2GB, 'lenUnzipped' is obviously erroneous.
962 if (result == Z_OK && ((int)lenUnzipped >= 0)) {
964 // Write any unzipped data to disk
965 if (lenUnzipped > 0) {
966 wxASSERT( (int)lenUnzipped > 0 );
968 // Use the current start and end positions for the uncompressed data
969 nStartPos = cur_block->block->StartOffset + cur_block->totalUnzipped - lenUnzipped;
970 nEndPos = cur_block->block->StartOffset + cur_block->totalUnzipped - 1;
972 if (nStartPos > cur_block->block->EndOffset || nEndPos > cur_block->block->EndOffset) {
973 AddDebugLogLineN(logZLib,
974 CFormat(wxT("Corrupted compressed packet for '%s' received (error 666)")) % m_reqfile->GetFileName());
975 m_reqfile->RemoveBlockFromList(cur_block->block->StartOffset, cur_block->block->EndOffset);
976 } else {
977 // Write uncompressed data to file
978 lenWritten = m_reqfile->WriteToBuffer( size - header_size,
979 unzipped,
980 nStartPos,
981 nEndPos,
982 cur_block->block,
983 this);
986 } else {
987 wxString strZipError;
988 if (cur_block->zStream && cur_block->zStream->msg) {
989 strZipError = wxT(" - ") + wxString::FromAscii(cur_block->zStream->msg);
992 AddDebugLogLineN(logZLib,
993 CFormat(wxT("Corrupted compressed packet for '%s' received (error %i): %s"))
994 % m_reqfile->GetFileName() % result % strZipError);
996 m_reqfile->RemoveBlockFromList(cur_block->block->StartOffset, cur_block->block->EndOffset);
998 // If we had an zstream error, there is no chance that we could recover from it nor that we
999 // could use the current zstream (which is in error state) any longer.
1000 if (cur_block->zStream){
1001 inflateEnd(cur_block->zStream);
1002 delete cur_block->zStream;
1003 cur_block->zStream = NULL;
1006 // Although we can't further use the current zstream, there is no need to disconnect the sending
1007 // client because the next zstream (a series of 10K-blocks which build a 180K-block) could be
1008 // valid again. Just ignore all further blocks for the current zstream.
1009 cur_block->fZStreamError = 1;
1010 cur_block->totalUnzipped = 0; // bluecow's fix
1012 delete [] unzipped;
1014 // These checks only need to be done if any data was written
1015 if (lenWritten > 0) {
1016 m_nTransferredDown += lenWritten;
1018 // If finished reserved block
1019 if (nEndPos == cur_block->block->EndOffset) {
1021 // Save last average speed based on data and time.
1022 // This should do bytes/sec.
1023 uint32 average_time = (::GetTickCountFullRes() - m_last_block_start);
1025 // Avoid divide by 0.
1026 if (average_time == 0) {
1027 average_time++;
1030 m_lastaverage = ((cur_block->block->EndOffset - cur_block->block->StartOffset) * 1000) / average_time;
1032 m_reqfile->RemoveBlockFromList(cur_block->block->StartOffset, cur_block->block->EndOffset);
1033 delete cur_block->block;
1034 // Not always allocated
1035 if (cur_block->zStream) {
1036 inflateEnd(cur_block->zStream);
1037 delete cur_block->zStream;
1039 delete cur_block;
1040 m_PendingBlocks_list.erase(it);
1042 // Request next block
1043 SendBlockRequests();
1046 // Stop looping and exit method
1047 return;
1050 } catch (const CEOFException& e) {
1051 wxString error = wxString(wxT("Error reading "));
1052 if (packed) error += CFormat(wxT("packed (LU: %i) largeblocks ")) % lenUnzipped;
1053 error += CFormat(wxT("data packet: RS: %i HS: %i SP: %i EP: %i BS: %i -> "))
1054 % size % header_size % nStartPos % nEndPos % nBlockSize;
1055 AddDebugLogLineC(logRemoteClient, error + e.what());
1056 return;
1060 int CUpDownClient::unzip(Pending_Block_Struct *block, byte *zipped, uint32 lenZipped, byte **unzipped, uint32 *lenUnzipped, int iRecursion)
1062 int err = Z_DATA_ERROR;
1064 // Save some typing
1065 z_stream *zS = block->zStream;
1067 // Is this the first time this block has been unzipped
1068 if (zS == NULL) {
1069 // Create stream
1070 block->zStream = new z_stream;
1071 zS = block->zStream;
1073 // Initialise stream values
1074 zS->zalloc = (alloc_func)0;
1075 zS->zfree = (free_func)0;
1076 zS->opaque = (voidpf)0;
1078 // Set output data streams, do this here to avoid overwriting on recursive calls
1079 zS->next_out = (*unzipped);
1080 zS->avail_out = (*lenUnzipped);
1082 // Initialise the z_stream
1083 err = inflateInit(zS);
1084 if (err != Z_OK) {
1085 return err;
1089 // Use whatever input is provided
1090 zS->next_in = zipped;
1091 zS->avail_in = lenZipped;
1093 // Only set the output if not being called recursively
1094 if (iRecursion == 0) {
1095 zS->next_out = (*unzipped);
1096 zS->avail_out = (*lenUnzipped);
1099 // Try to unzip the data
1100 err = inflate(zS, Z_SYNC_FLUSH);
1102 // Is zip finished reading all currently available input and writing
1103 // all generated output
1104 if (err == Z_STREAM_END) {
1105 // Finish up
1106 err = inflateEnd(zS);
1107 if (err != Z_OK) {
1108 return err;
1111 // Got a good result, set the size to the amount unzipped in this call
1112 // (including all recursive calls)
1113 (*lenUnzipped) = (zS->total_out - block->totalUnzipped);
1114 block->totalUnzipped = zS->total_out;
1115 } else if ((err == Z_OK) && (zS->avail_out == 0) && (zS->avail_in != 0)) {
1117 // Output array was not big enough,
1118 // call recursively until there is enough space
1120 // What size should we try next
1121 uint32 newLength = (*lenUnzipped) *= 2;
1122 if (newLength == 0) {
1123 newLength = lenZipped * 2;
1125 // Copy any data that was successfully unzipped to new array
1126 byte *temp = new byte[newLength];
1127 wxASSERT( zS->total_out - block->totalUnzipped <= newLength );
1128 memcpy(temp, (*unzipped), (zS->total_out - block->totalUnzipped));
1129 delete [] (*unzipped);
1130 (*unzipped) = temp;
1131 (*lenUnzipped) = newLength;
1133 // Position stream output to correct place in new array
1134 zS->next_out = (*unzipped) + (zS->total_out - block->totalUnzipped);
1135 zS->avail_out = (*lenUnzipped) - (zS->total_out - block->totalUnzipped);
1137 // Try again
1138 err = unzip(block, zS->next_in, zS->avail_in, unzipped, lenUnzipped, iRecursion + 1);
1139 } else if ((err == Z_OK) && (zS->avail_in == 0)) {
1140 // All available input has been processed, everything ok.
1141 // Set the size to the amount unzipped in this call
1142 // (including all recursive calls)
1143 (*lenUnzipped) = (zS->total_out - block->totalUnzipped);
1144 block->totalUnzipped = zS->total_out;
1145 } else {
1146 // Should not get here unless input data is corrupt
1147 wxString strZipError;
1149 if ( zS->msg ) {
1150 strZipError = CFormat(wxT(" %d '%s'")) % err % wxString::FromAscii(zS->msg);
1151 } else if (err != Z_OK) {
1152 strZipError = CFormat(wxT(" %d")) % err;
1155 AddDebugLogLineN(logZLib,
1156 CFormat(wxT("Unexpected zip error %s in file '%s'"))
1157 % strZipError % (m_reqfile ? m_reqfile->GetFileName() : CPath(wxT("?"))));
1160 if (err != Z_OK) {
1161 (*lenUnzipped) = 0;
1164 return err;
1168 // Speed is now updated only when data was received, calculated as
1169 // (data received) / (time since last receiption)
1170 // and slightly filtered (10s average).
1171 // Result is quite precise now and makes the DownloadRateAdjust workaround obsolete.
1173 float CUpDownClient::CalculateKBpsDown()
1175 const float tAverage = 10.0;
1176 uint32 msCur = GetTickCount();
1178 if (bytesReceivedCycle) {
1179 float dt = (msCur - msReceivedPrev) / 1000.0; // time since last reception
1180 if (dt < 0.01) { // (safeguard against divide-by-zero)
1181 dt = 0.01f; // diff should be 100ms actually
1183 float kBpsDownCur = bytesReceivedCycle / 1024.0 / dt;
1184 if (dt >= tAverage) {
1185 kBpsDown = kBpsDownCur;
1186 } else {
1187 kBpsDown = (kBpsDown * (tAverage - dt) + kBpsDownCur * dt) / tAverage;
1189 //AddDebugLogLineN(logLocalClient, CFormat(wxT("CalculateKBpsDown %p kbps %.1f kbpsCur %.1f dt %.3f rcv %d "))
1190 // % this % kBpsDown % kBpsDownCur % dt % bytesReceivedCycle);
1191 bytesReceivedCycle = 0;
1192 msReceivedPrev = msCur;
1195 m_cShowDR++;
1196 if (m_cShowDR == 30){
1197 m_cShowDR = 0;
1198 UpdateDisplayedInfo();
1200 if (msCur - m_dwLastBlockReceived > DOWNLOADTIMEOUT) {
1201 if (!GetSentCancelTransfer()){
1202 CPacket* packet = new CPacket(OP_CANCELTRANSFER, 0, OP_EDONKEYPROT);
1203 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
1204 AddDebugLogLineN( logLocalClient, wxT("Local Client: OP_CANCELTRANSFER to ") + GetFullIP() );
1205 SendPacket(packet,true,true);
1206 SetSentCancelTransfer(1);
1208 SetDownloadState(DS_ONQUEUE);
1211 return kBpsDown;
1214 uint16 CUpDownClient::GetAvailablePartCount() const
1216 uint16 result = 0;
1217 for (int i = 0;i != m_nPartCount;i++){
1218 if (IsPartAvailable(i))
1219 result++;
1221 return result;
1224 void CUpDownClient::SetRemoteQueueRank(uint16 nr)
1226 m_nOldRemoteQueueRank = m_nRemoteQueueRank;
1227 m_nRemoteQueueRank = nr;
1228 UpdateDisplayedInfo();
1231 void CUpDownClient::UDPReaskACK(uint16 nNewQR)
1233 // 0.42e
1234 m_bUDPPending = false;
1235 SetRemoteQueueRank(nNewQR);
1236 m_dwLastAskedTime = ::GetTickCount();
1239 void CUpDownClient::UDPReaskFNF()
1241 m_bUDPPending = false;
1243 // avoid premature deletion of 'this' client
1244 if (GetDownloadState() != DS_DOWNLOADING){
1245 if (m_reqfile) {
1246 m_reqfile->AddDeadSource(this);
1249 theApp->downloadqueue->RemoveSource(this);
1250 if (!m_socket) {
1251 if (Disconnected(wxT("UDPReaskFNF m_socket=NULL"))) {
1252 Safe_Delete();
1255 } else {
1256 AddDebugLogLineN( logRemoteClient, wxT("UDP ANSWER FNF : ") + GetUserName() + wxT(" - did not remove client because of current download state") );
1260 void CUpDownClient::UDPReaskForDownload()
1263 wxASSERT(m_reqfile);
1265 if(!m_reqfile || m_bUDPPending ) {
1266 return;
1269 //#warning We should implement the quality tests for udp reliability
1271 if( m_nTotalUDPPackets > 3 && ((float)(m_nFailedUDPPackets/m_nTotalUDPPackets) > .3)) {
1272 return;
1276 if (thePrefs::GetEffectiveUDPPort() == 0) {
1277 return;
1280 if (m_nUDPPort != 0 && !theApp->IsFirewalled() && !IsConnected()) {
1281 //don't use udp to ask for sources
1282 if(IsSourceRequestAllowed()) {
1283 return;
1286 m_bUDPPending = true;
1288 CMemFile data(128);
1289 data.WriteHash(m_reqfile->GetFileHash());
1291 if (GetUDPVersion() > 3) {
1292 if (m_reqfile->IsPartFile()) {
1293 static_cast<CPartFile*>(m_reqfile)->WritePartStatus(&data);
1295 else {
1296 data.WriteUInt16(0);
1300 if (GetUDPVersion() > 2) {
1301 data.WriteUInt16(m_reqfile->m_nCompleteSourcesCount);
1304 CPacket* response = new CPacket(data, OP_EMULEPROT, OP_REASKFILEPING);
1305 AddDebugLogLineN( logClientUDP, wxT("Client UDP socket: send OP_REASKFILEPING") );
1306 theStats::AddUpOverheadFileRequest(response->GetPacketSize());
1307 theApp->clientudp->SendPacket(response,GetConnectIP(),GetUDPPort(), ShouldReceiveCryptUDPPackets(), GetUserHash().GetHash(), false, 0);
1308 } else if (HasLowID() && GetBuddyIP() && GetBuddyPort() && HasValidBuddyID()) {
1310 m_bUDPPending = true;
1312 CMemFile data(128);
1314 data.WriteHash(CMD4Hash(GetBuddyID()));
1315 data.WriteHash(m_reqfile->GetFileHash());
1317 if (GetUDPVersion() > 3) {
1318 if (m_reqfile->IsPartFile()) {
1319 static_cast<CPartFile*>(m_reqfile)->WritePartStatus(&data);
1320 } else {
1321 data.WriteUInt16(0);
1325 if (GetUDPVersion() > 2) {
1326 data.WriteUInt16(m_reqfile->m_nCompleteSourcesCount);
1329 CPacket* response = new CPacket(data, OP_EMULEPROT, OP_REASKCALLBACKUDP);
1330 AddDebugLogLineN( logClientUDP, wxT("Client UDP socket: send OP_REASKCALLBACKUDP") );
1331 theStats::AddUpOverheadFileRequest(response->GetPacketSize());
1332 theApp->clientudp->SendPacket(response, GetBuddyIP(), GetBuddyPort(), false, NULL, true, 0 );
1337 // Get the next part that is requested
1338 uint16 CUpDownClient::GetNextRequestedPart() const
1340 uint16 part = 0xffff;
1342 std::list<Pending_Block_Struct*>::const_iterator it = m_PendingBlocks_list.begin();
1343 for (; it != m_PendingBlocks_list.end(); ++it) {
1344 part = (*it)->block->StartOffset / PARTSIZE;
1345 if (part != m_lastDownloadingPart) {
1346 break;
1350 return part;
1354 void CUpDownClient::UpdateDisplayedInfo(bool force)
1356 uint32 curTick = ::GetTickCount();
1357 if (force || curTick-m_lastRefreshedDLDisplay > MINWAIT_BEFORE_DLDISPLAY_WINDOWUPDATE) {
1358 // Check if we actually need to notify of changes
1359 bool update = m_reqfile && m_reqfile->ShowSources();
1361 // Check A4AF files only if needed
1362 if ( !update ) {
1363 A4AFList::iterator it = m_A4AF_list.begin();
1364 for ( ; it != m_A4AF_list.end(); ++it ) {
1365 if ( it->first->ShowSources() ) {
1366 update = true;
1367 break;
1372 // And finnaly trigger an event if there's any reason
1373 if ( update ) {
1374 SourceItemType type;
1375 switch (GetDownloadState()) {
1376 case DS_DOWNLOADING:
1377 case DS_ONQUEUE:
1378 // We will send A4AF, which will be checked.
1379 type = A4AF_SOURCE;
1380 break;
1381 default:
1382 type = UNAVAILABLE_SOURCE;
1383 break;
1386 Notify_SourceCtrlUpdateSource(ECID(), type );
1389 // Shared files view
1390 if (m_uploadingfile && m_uploadingfile->ShowPeers()) {
1391 Notify_SharedCtrlRefreshClient(ECID(), AVAILABLE_SOURCE);
1394 m_lastRefreshedDLDisplay = curTick;
1398 uint8 CUpDownClient::GetObfuscationStatus() const
1400 uint8 ret = OBST_UNDEFINED;
1401 if (thePrefs::IsClientCryptLayerSupported()) {
1402 if (SupportsCryptLayer()) {
1403 if ((RequestsCryptLayer() || thePrefs::IsClientCryptLayerRequested()) && HasObfuscatedConnectionBeenEstablished()) {
1404 ret = OBST_ENABLED;
1405 } else {
1406 ret = OBST_SUPPORTED;
1408 } else {
1409 ret = OBST_NOT_SUPPORTED;
1411 } else {
1412 ret = OBST_DISABLED;
1414 return ret;
1417 // IgnoreNoNeeded = will switch to files of which this source has no needed parts (if no better fiels found)
1418 // ignoreSuspensions = ignore timelimit for A4Af jumping
1419 // bRemoveCompletely = do not readd the file which the source is swapped from to the A4AF lists (needed if deleting or stopping a file)
1420 // toFile = Try to swap to this partfile only
1422 bool CUpDownClient::SwapToAnotherFile(bool bIgnoreNoNeeded, bool ignoreSuspensions, bool bRemoveCompletely, CPartFile* toFile)
1424 // Fail if m_reqfile is invalid
1425 if ( m_reqfile == NULL ) {
1426 return false;
1429 // It would be stupid to swap away a downloading source
1430 if (GetDownloadState() == DS_DOWNLOADING) {
1431 return false;
1434 // The iterator of the final target
1435 A4AFList::iterator target = m_A4AF_list.end();
1437 // Do we want to swap to a specific file?
1438 if ( toFile != NULL ) {
1439 A4AFList::iterator it = m_A4AF_list.find( toFile );
1440 if ( it != m_A4AF_list.end() ) {
1442 // We force ignoring of timestamps
1443 if ( IsValidSwapTarget( it, bIgnoreNoNeeded, true ) ) {
1444 // Set the target
1445 target = it;
1448 } else {
1449 // We want highest priority possible, but need to start with
1450 // a value less than any other priority
1451 char priority = -1;
1453 A4AFList::iterator it = m_A4AF_list.begin();
1454 for ( ; it != m_A4AF_list.end(); ++it ) {
1455 if ( IsValidSwapTarget( it, bIgnoreNoNeeded, ignoreSuspensions ) ) {
1456 char cur_priority = it->first->GetDownPriority();
1458 // We would prefer to get files with needed parts, thus rate them higher.
1459 // However, this really only matters if bIgnoreNoNeeded is true.
1460 if ( it->second.NeededParts )
1461 cur_priority += 10;
1463 // Change target if the current file has a higher rate than the previous
1464 if ( cur_priority > priority ) {
1465 priority = cur_priority;
1467 // Set the new target
1468 target = it;
1470 // Break on the first High-priority file with needed parts
1471 if ( priority == PR_HIGH + 10 ) {
1472 break;
1479 // Try to swap if we found a valid target
1480 if ( target != m_A4AF_list.end() ) {
1482 // Sanity check, if reqfile doesn't own the source, then something
1483 // is wrong and the swap cannot proceed.
1484 if ( m_reqfile->DelSource( this ) ) {
1485 CPartFile* SwapTo = target->first;
1487 // remove this client from the A4AF list of our new m_reqfile
1488 if ( SwapTo->RemoveA4AFSource( this ) ) {
1489 Notify_SourceCtrlRemoveSource(ECID(), SwapTo);
1492 m_reqfile->RemoveDownloadingSource( this );
1494 // Do we want to remove it completly? Say if the old file is getting deleted
1495 if ( !bRemoveCompletely ) {
1496 m_reqfile->AddA4AFSource( this );
1498 // Set the status of the old file
1499 m_A4AF_list[m_reqfile].NeededParts = (GetDownloadState() != DS_NONEEDEDPARTS);
1501 // Avoid swapping to this file for a while
1502 m_A4AF_list[m_reqfile].timestamp = ::GetTickCount();
1504 Notify_SourceCtrlAddSource(m_reqfile, CCLIENTREF(this, wxT("CUpDownClient::SwapToAnotherFile Notify_SourceCtrlAddSource 1")), A4AF_SOURCE);
1505 } else {
1506 Notify_SourceCtrlRemoveSource(ECID(), m_reqfile);
1509 SetDownloadState(DS_NONE);
1510 ResetFileStatusInfo();
1512 m_nRemoteQueueRank = 0;
1513 m_nOldRemoteQueueRank = 0;
1515 m_reqfile->UpdatePartsInfo();
1517 SetRequestFile( SwapTo );
1519 SwapTo->AddSource( this );
1521 Notify_SourceCtrlAddSource(SwapTo, CCLIENTREF(this, wxT("CUpDownClient::SwapToAnotherFile Notify_SourceCtrlAddSource 2")), UNAVAILABLE_SOURCE);
1523 // Remove the new reqfile from the list of other files
1524 m_A4AF_list.erase( target );
1526 return true;
1530 return false;
1534 bool CUpDownClient::IsValidSwapTarget( A4AFList::iterator it, bool ignorenoneeded, bool ignoresuspended )
1536 wxASSERT( it != m_A4AF_list.end() && it->first );
1538 // Check if this file has been suspended
1539 if ( !ignoresuspended ) {
1540 if ( ::GetTickCount() - it->second.timestamp >= PURGESOURCESWAPSTOP ) {
1541 // The wait-time has been exceeded and the file is now a valid target
1542 it->second.timestamp = 0;
1543 } else {
1544 // The file was still suspended and we are not ignoring suspensions
1545 return false;
1549 // Check if the client has needed parts
1550 if ( !ignorenoneeded ) {
1551 if ( !it->second.NeededParts ) {
1552 return false;
1556 // Final checks to see if the client is a valid target
1557 CPartFile* cur_file = it->first;
1558 if ( ( cur_file != m_reqfile && !cur_file->IsStopped() ) &&
1559 ( cur_file->GetStatus() == PS_READY || cur_file->GetStatus() == PS_EMPTY ) &&
1560 ( cur_file->IsPartFile() ) )
1562 return true;
1563 } else {
1564 return false;
1569 void CUpDownClient::SetRequestFile(CPartFile* reqfile)
1571 if ( m_reqfile != reqfile ) {
1572 // Decrement the source-count of the old request-file
1573 if ( m_reqfile ) {
1574 m_reqfile->ClientStateChanged( GetDownloadState(), -1 );
1575 m_reqfile->UpdatePartsFrequency( this, false );
1578 m_nPartCount = 0;
1579 m_downPartStatus.clear();
1581 m_reqfile = reqfile;
1583 if ( reqfile ) {
1584 // Increment the source-count of the new request-file
1585 m_reqfile->ClientStateChanged( -1, GetDownloadState() );
1587 m_nPartCount = reqfile->GetPartCount();
1592 void CUpDownClient::SetReqFileAICHHash(CAICHHash* val){
1593 if(m_pReqFileAICHHash != NULL && m_pReqFileAICHHash != val)
1594 delete m_pReqFileAICHHash;
1595 m_pReqFileAICHHash = val;
1598 void CUpDownClient::SendAICHRequest(CPartFile* pForFile, uint16 nPart){
1599 CAICHRequestedData request;
1600 request.m_nPart = nPart;
1601 request.m_pClient.Link(this CLIENT_DEBUGSTRING("CUpDownClient::SendAICHRequest"));
1602 request.m_pPartFile = pForFile;
1603 CAICHHashSet::m_liRequestedData.push_back(request);
1604 m_fAICHRequested = TRUE;
1605 CMemFile data;
1606 data.WriteHash(pForFile->GetFileHash());
1607 data.WriteUInt16(nPart);
1608 pForFile->GetAICHHashset()->GetMasterHash().Write(&data);
1609 CPacket* packet = new CPacket(data, OP_EMULEPROT, OP_AICHREQUEST);
1610 theStats::AddUpOverheadOther(packet->GetPacketSize());
1611 AddDebugLogLineN(logLocalClient, wxT("Local Client: OP_AICHREQUEST to") + GetFullIP());
1612 SafeSendPacket(packet);
1615 void CUpDownClient::ProcessAICHAnswer(const byte* packet, uint32 size)
1617 if (m_fAICHRequested == FALSE){
1618 throw wxString(wxT("Received unrequested AICH Packet"));
1620 m_fAICHRequested = FALSE;
1622 CMemFile data(packet, size);
1623 if (size <= 16){
1624 CAICHHashSet::ClientAICHRequestFailed(this);
1625 return;
1628 CMD4Hash hash = data.ReadHash();
1629 CPartFile* pPartFile = theApp->downloadqueue->GetFileByID(hash);
1630 CAICHRequestedData request = CAICHHashSet::GetAICHReqDetails(this);
1631 uint16 nPart = data.ReadUInt16();
1632 if (pPartFile != NULL && request.m_pPartFile == pPartFile && request.m_pClient.GetClient() == this && nPart == request.m_nPart){
1633 CAICHHash ahMasterHash(&data);
1634 if ( (pPartFile->GetAICHHashset()->GetStatus() == AICH_TRUSTED || pPartFile->GetAICHHashset()->GetStatus() == AICH_VERIFIED)
1635 && ahMasterHash == pPartFile->GetAICHHashset()->GetMasterHash())
1637 if(pPartFile->GetAICHHashset()->ReadRecoveryData(request.m_nPart*PARTSIZE, &data)){
1638 // finally all checks passed, everythings seem to be fine
1639 AddDebugLogLineN(logAICHTransfer, wxT("AICH Packet Answer: Succeeded to read and validate received recoverydata"));
1640 CAICHHashSet::RemoveClientAICHRequest(this);
1641 pPartFile->AICHRecoveryDataAvailable(request.m_nPart);
1642 return;
1643 } else {
1644 AddDebugLogLineN(logAICHTransfer, wxT("AICH Packet Answer: Succeeded to read and validate received recoverydata"));
1646 } else {
1647 AddDebugLogLineN( logAICHTransfer, wxT("AICH Packet Answer: Masterhash differs from packethash or hashset has no trusted Masterhash") );
1649 } else {
1650 AddDebugLogLineN( logAICHTransfer, wxT("AICH Packet Answer: requested values differ from values in packet") );
1653 CAICHHashSet::ClientAICHRequestFailed(this);
1657 void CUpDownClient::ProcessAICHRequest(const byte* packet, uint32 size)
1659 if (size != 16 + 2 + CAICHHash::GetHashSize()) {
1660 throw wxString(wxT("Received AICH Request Packet with wrong size"));
1663 CMemFile data(packet, size);
1665 CMD4Hash hash = data.ReadHash();
1666 uint16 nPart = data.ReadUInt16();
1667 CAICHHash ahMasterHash(&data);
1668 CKnownFile* pKnownFile = theApp->sharedfiles->GetFileByID(hash);
1669 if (pKnownFile != NULL){
1670 if (pKnownFile->GetAICHHashset()->GetStatus() == AICH_HASHSETCOMPLETE && pKnownFile->GetAICHHashset()->HasValidMasterHash()
1671 && pKnownFile->GetAICHHashset()->GetMasterHash() == ahMasterHash && pKnownFile->GetPartCount() > nPart
1672 && pKnownFile->GetFileSize() > EMBLOCKSIZE && pKnownFile->GetFileSize() - PARTSIZE*nPart > EMBLOCKSIZE)
1674 CMemFile fileResponse;
1675 fileResponse.WriteHash(pKnownFile->GetFileHash());
1676 fileResponse.WriteUInt16(nPart);
1677 pKnownFile->GetAICHHashset()->GetMasterHash().Write(&fileResponse);
1678 if (pKnownFile->GetAICHHashset()->CreatePartRecoveryData(nPart*PARTSIZE, &fileResponse)){
1679 AddDebugLogLineN(logAICHTransfer,
1680 CFormat(wxT("AICH Packet Request: Sucessfully created and send recoverydata for '%s' to %s"))
1681 % pKnownFile->GetFileName() % GetClientFullInfo());
1683 CPacket* packAnswer = new CPacket(fileResponse, OP_EMULEPROT, OP_AICHANSWER);
1684 theStats::AddUpOverheadOther(packAnswer->GetPacketSize());
1685 AddDebugLogLineN(logLocalClient, wxT("Local Client: OP_AICHANSWER to") + GetFullIP());
1686 SafeSendPacket(packAnswer);
1687 return;
1688 } else {
1689 AddDebugLogLineN(logAICHTransfer,
1690 CFormat(wxT("AICH Packet Request: Failed to create recoverydata for '%s' to %s"))
1691 % pKnownFile->GetFileName() % GetClientFullInfo());
1693 } else {
1694 AddDebugLogLineN(logAICHTransfer,
1695 CFormat(wxT("AICH Packet Request: Failed to create recoverydata - Hashset not ready or requested Hash differs from Masterhash for '%s' to %s"))
1696 % pKnownFile->GetFileName() % GetClientFullInfo());
1698 } else {
1699 AddDebugLogLineN( logAICHTransfer, wxT("AICH Packet Request: Failed to find requested shared file - ") + GetClientFullInfo() );
1702 CPacket* packAnswer = new CPacket(OP_AICHANSWER, 16, OP_EMULEPROT);
1703 packAnswer->Copy16ToDataBuffer(hash.GetHash());
1704 theStats::AddUpOverheadOther(packAnswer->GetPacketSize());
1705 AddDebugLogLineN(logLocalClient, wxT("Local Client: OP_AICHANSWER to") + GetFullIP());
1706 SafeSendPacket(packAnswer);
1709 void CUpDownClient::ProcessAICHFileHash(CMemFile* data, const CPartFile* file){
1710 CPartFile* pPartFile;
1711 if (file == NULL){
1712 pPartFile = theApp->downloadqueue->GetFileByID(data->ReadHash());
1713 } else {
1714 pPartFile = const_cast<CPartFile*>(file);
1716 CAICHHash ahMasterHash(data);
1718 if(pPartFile != NULL && pPartFile == GetRequestFile()){
1719 SetReqFileAICHHash(new CAICHHash(ahMasterHash));
1720 pPartFile->GetAICHHashset()->UntrustedHashReceived(ahMasterHash, GetConnectIP());
1721 } else {
1722 AddDebugLogLineN( logAICHTransfer, wxT("ProcessAICHFileHash(): PartFile not found or Partfile differs from requested file, ") + GetClientFullInfo() );
1725 // File_checked_for_headers