transmission 2.83
[tomato.git] / release / src-rt-6.x.4708 / router / transmission / libtransmission / peer-mgr.c
blob03682aa8fac4a18edbd2759b92d73bf03940ee8f
1 /*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
7 * $Id: peer-mgr.c 14241 2014-01-21 03:10:30Z jordan $
8 */
10 #include <assert.h>
11 #include <errno.h> /* error codes ERANGE, ... */
12 #include <limits.h> /* INT_MAX */
13 #include <string.h> /* memcpy, memcmp, strstr */
14 #include <stdlib.h> /* qsort */
16 #include <event2/event.h>
18 #include <libutp/utp.h>
20 #include "transmission.h"
21 #include "announcer.h"
22 #include "bandwidth.h"
23 #include "blocklist.h"
24 #include "cache.h"
25 #include "clients.h"
26 #include "completion.h"
27 #include "crypto.h"
28 #include "handshake.h"
29 #include "log.h"
30 #include "net.h"
31 #include "peer-io.h"
32 #include "peer-mgr.h"
33 #include "peer-msgs.h"
34 #include "ptrarray.h"
35 #include "session.h"
36 #include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
37 #include "torrent.h"
38 #include "tr-utp.h"
39 #include "utils.h"
40 #include "webseed.h"
42 enum
44 /* how frequently to cull old atoms */
45 ATOM_PERIOD_MSEC = (60 * 1000),
47 /* how frequently to change which peers are choked */
48 RECHOKE_PERIOD_MSEC = (10 * 1000),
50 /* an optimistically unchoked peer is immune from rechoking
51 for this many calls to rechokeUploads (). */
52 OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
54 /* how frequently to reallocate bandwidth */
55 BANDWIDTH_PERIOD_MSEC = 500,
57 /* how frequently to age out old piece request lists */
58 REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
60 /* how frequently to decide which peers live and die */
61 RECONNECT_PERIOD_MSEC = 500,
63 /* when many peers are available, keep idle ones this long */
64 MIN_UPLOAD_IDLE_SECS = (60),
66 /* when few peers are available, keep idle ones this long */
67 MAX_UPLOAD_IDLE_SECS = (60 * 5),
69 /* max number of peers to ask for per second overall.
70 * this throttle is to avoid overloading the router */
71 MAX_CONNECTIONS_PER_SECOND = 12,
73 MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
75 /* number of bad pieces a peer is allowed to send before we ban them */
76 MAX_BAD_PIECES_PER_PEER = 5,
78 /* amount of time to keep a list of request pieces lying around
79 before it's considered too old and needs to be rebuilt */
80 PIECE_LIST_SHELF_LIFE_SECS = 60,
82 /* use for bitwise operations w/peer_atom.flags2 */
83 MYFLAG_BANNED = 1,
85 /* use for bitwise operations w/peer_atom.flags2 */
86 /* unreachable for now... but not banned.
87 * if they try to connect to us it's okay */
88 MYFLAG_UNREACHABLE = 2,
90 /* the minimum we'll wait before attempting to reconnect to a peer */
91 MINIMUM_RECONNECT_INTERVAL_SECS = 5,
93 /** how long we'll let requests we've made linger before we cancel them */
94 REQUEST_TTL_SECS = 90,
96 NO_BLOCKS_CANCEL_HISTORY = 120,
98 CANCEL_HISTORY_SEC = 60
101 const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, 0 };
103 const tr_swarm_stats TR_SWARM_STATS_INIT = { { 0, 0 }, 0, 0, { 0, 0, 0, 0, 0, 0, 0 } };
110 * Peer information that should be kept even before we've connected and
111 * after we've disconnected. These are kept in a pool of peer_atoms to decide
112 * which ones would make good candidates for connecting to, and to watch out
113 * for banned peers.
115 * @see tr_peer
116 * @see tr_peerMsgs
118 struct peer_atom
120 uint8_t fromFirst; /* where the peer was first found */
121 uint8_t fromBest; /* the "best" value of where the peer has been found */
122 uint8_t flags; /* these match the added_f flags */
123 uint8_t flags2; /* flags that aren't defined in added_f */
124 int8_t seedProbability; /* how likely is this to be a seed... [0..100] or -1 for unknown */
125 int8_t blocklisted; /* -1 for unknown, true for blocklisted, false for not blocklisted */
127 tr_port port;
128 bool utp_failed; /* We recently failed to connect over uTP */
129 uint16_t numFails;
130 time_t time; /* when the peer's connection status last changed */
131 time_t piece_data_time;
133 time_t lastConnectionAttemptAt;
134 time_t lastConnectionAt;
136 /* similar to a TTL field, but less rigid --
137 * if the swarm is small, the atom will be kept past this date. */
138 time_t shelf_date;
139 tr_peer * peer; /* will be NULL if not connected */
140 tr_address addr;
143 #ifdef NDEBUG
144 #define tr_isAtom(a) (TRUE)
145 #else
146 static bool
147 tr_isAtom (const struct peer_atom * atom)
149 return (atom != NULL)
150 && (atom->fromFirst < TR_PEER_FROM__MAX)
151 && (atom->fromBest < TR_PEER_FROM__MAX)
152 && (tr_address_is_valid (&atom->addr));
154 #endif
156 static const char*
157 tr_atomAddrStr (const struct peer_atom * atom)
159 return atom ? tr_peerIoAddrStr (&atom->addr, atom->port) : "[no atom]";
162 struct block_request
164 tr_block_index_t block;
165 tr_peer * peer;
166 time_t sentAt;
169 struct weighted_piece
171 tr_piece_index_t index;
172 int16_t salt;
173 int16_t requestCount;
176 enum piece_sort_state
178 PIECES_UNSORTED,
179 PIECES_SORTED_BY_INDEX,
180 PIECES_SORTED_BY_WEIGHT
183 /** @brief Opaque, per-torrent data structure for peer connection information */
184 typedef struct tr_swarm
186 tr_swarm_stats stats;
188 tr_ptrArray outgoingHandshakes; /* tr_handshake */
189 tr_ptrArray pool; /* struct peer_atom */
190 tr_ptrArray peers; /* tr_peerMsgs */
191 tr_ptrArray webseeds; /* tr_webseed */
193 tr_torrent * tor;
194 struct tr_peerMgr * manager;
196 tr_peerMsgs * optimistic; /* the optimistic peer, or NULL if none */
197 int optimisticUnchokeTimeScaler;
199 bool isRunning;
200 bool needsCompletenessCheck;
202 struct block_request * requests;
203 int requestCount;
204 int requestAlloc;
206 struct weighted_piece * pieces;
207 int pieceCount;
208 enum piece_sort_state pieceSortState;
210 /* An array of pieceCount items stating how many peers have each piece.
211 This is used to help us for downloading pieces "rarest first."
212 This may be NULL if we don't have metainfo yet, or if we're not
213 downloading and don't care about rarity */
214 uint16_t * pieceReplication;
215 size_t pieceReplicationSize;
217 int interestedCount;
218 int maxPeers;
219 time_t lastCancel;
221 /* Before the endgame this should be 0. In endgame, is contains the average
222 * number of pending requests per peer. Only peers which have more pending
223 * requests are considered 'fast' are allowed to request a block that's
224 * already been requested from another (slower?) peer. */
225 int endgame;
227 tr_swarm;
229 struct tr_peerMgr
231 tr_session * session;
232 tr_ptrArray incomingHandshakes; /* tr_handshake */
233 struct event * bandwidthTimer;
234 struct event * rechokeTimer;
235 struct event * refillUpkeepTimer;
236 struct event * atomTimer;
239 #define tordbg(t, ...) \
240 do \
242 if (tr_logGetDeepEnabled ()) \
243 tr_logAddDeep (__FILE__, __LINE__, tr_torrentName (t->tor), __VA_ARGS__); \
245 while (0)
247 #define dbgmsg(...) \
248 do \
250 if (tr_logGetDeepEnabled ()) \
251 tr_logAddDeep (__FILE__, __LINE__, NULL, __VA_ARGS__); \
253 while (0)
256 *** tr_peer virtual functions
259 static bool
260 tr_peerIsTransferringPieces (const tr_peer * peer,
261 uint64_t now,
262 tr_direction direction,
263 unsigned int * Bps)
265 assert (peer != NULL);
266 assert (peer->funcs != NULL);
268 return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps);
271 unsigned int
272 tr_peerGetPieceSpeed_Bps (const tr_peer * peer,
273 uint64_t now,
274 tr_direction direction)
276 unsigned int Bps = 0;
277 tr_peerIsTransferringPieces (peer, now, direction, &Bps);
278 return Bps;
281 static void
282 tr_peerFree (tr_peer * peer)
284 assert (peer != NULL);
285 assert (peer->funcs != NULL);
287 (*peer->funcs->destruct)(peer);
289 tr_free (peer);
292 void
293 tr_peerConstruct (tr_peer * peer, const tr_torrent * tor)
295 assert (peer != NULL);
296 assert (tr_isTorrent (tor));
298 memset (peer, 0, sizeof (tr_peer));
300 peer->client = TR_KEY_NONE;
301 peer->swarm = tor->swarm;
302 tr_bitfieldConstruct (&peer->have, tor->info.pieceCount);
303 tr_bitfieldConstruct (&peer->blame, tor->blockCount);
306 static void peerDeclinedAllRequests (tr_swarm *, const tr_peer *);
308 void
309 tr_peerDestruct (tr_peer * peer)
311 assert (peer != NULL);
313 if (peer->swarm != NULL)
314 peerDeclinedAllRequests (peer->swarm, peer);
316 tr_bitfieldDestruct (&peer->have);
317 tr_bitfieldDestruct (&peer->blame);
319 if (peer->atom)
320 peer->atom->peer = NULL;
327 static inline void
328 managerLock (const struct tr_peerMgr * manager)
330 tr_sessionLock (manager->session);
333 static inline void
334 managerUnlock (const struct tr_peerMgr * manager)
336 tr_sessionUnlock (manager->session);
339 static inline void
340 swarmLock (tr_swarm * swarm)
342 managerLock (swarm->manager);
345 static inline void
346 swarmUnlock (tr_swarm * swarm)
348 managerUnlock (swarm->manager);
351 static inline int
352 swarmIsLocked (const tr_swarm * swarm)
354 return tr_sessionIsLocked (swarm->manager->session);
361 static int
362 handshakeCompareToAddr (const void * va, const void * vb)
364 const tr_handshake * a = va;
366 return tr_address_compare (tr_handshakeGetAddr (a, NULL), vb);
369 static int
370 handshakeCompare (const void * a, const void * b)
372 return handshakeCompareToAddr (a, tr_handshakeGetAddr (b, NULL));
375 static inline tr_handshake*
376 getExistingHandshake (tr_ptrArray * handshakes, const tr_address * addr)
378 if (tr_ptrArrayEmpty (handshakes))
379 return NULL;
381 return tr_ptrArrayFindSorted (handshakes, addr, handshakeCompareToAddr);
384 static int
385 comparePeerAtomToAddress (const void * va, const void * vb)
387 const struct peer_atom * a = va;
389 return tr_address_compare (&a->addr, vb);
392 static int
393 compareAtomsByAddress (const void * va, const void * vb)
395 const struct peer_atom * b = vb;
397 assert (tr_isAtom (b));
399 return comparePeerAtomToAddress (va, &b->addr);
406 const tr_address *
407 tr_peerAddress (const tr_peer * peer)
409 return &peer->atom->addr;
412 static tr_swarm *
413 getExistingSwarm (tr_peerMgr * manager,
414 const uint8_t * hash)
416 tr_torrent * tor = tr_torrentFindFromHash (manager->session, hash);
418 return tor == NULL ? NULL : tor->swarm;
421 static int
422 peerCompare (const void * a, const void * b)
424 return tr_address_compare (tr_peerAddress (a), tr_peerAddress (b));
427 static struct peer_atom*
428 getExistingAtom (const tr_swarm * cswarm,
429 const tr_address * addr)
431 tr_swarm * swarm = (tr_swarm*) cswarm;
432 return tr_ptrArrayFindSorted (&swarm->pool, addr, comparePeerAtomToAddress);
435 static bool
436 peerIsInUse (const tr_swarm * cs, const struct peer_atom * atom)
438 tr_swarm * s = (tr_swarm*) cs;
440 assert (swarmIsLocked (s));
442 return (atom->peer != NULL)
443 || getExistingHandshake (&s->outgoingHandshakes, &atom->addr)
444 || getExistingHandshake (&s->manager->incomingHandshakes, &atom->addr);
447 static inline bool
448 replicationExists (const tr_swarm * s)
450 return s->pieceReplication != NULL;
453 static void
454 replicationFree (tr_swarm * s)
456 tr_free (s->pieceReplication);
457 s->pieceReplication = NULL;
458 s->pieceReplicationSize = 0;
461 static void
462 replicationNew (tr_swarm * s)
464 tr_piece_index_t piece_i;
465 const tr_piece_index_t piece_count = s->tor->info.pieceCount;
466 const int n = tr_ptrArraySize (&s->peers);
468 assert (!replicationExists (s));
470 s->pieceReplicationSize = piece_count;
471 s->pieceReplication = tr_new0 (uint16_t, piece_count);
473 for (piece_i=0; piece_i<piece_count; ++piece_i)
475 int peer_i;
476 uint16_t r = 0;
478 for (peer_i=0; peer_i<n; ++peer_i)
480 tr_peer * peer = tr_ptrArrayNth (&s->peers, peer_i);
481 if (tr_bitfieldHas (&peer->have, piece_i))
482 ++r;
485 s->pieceReplication[piece_i] = r;
489 static void
490 swarmFree (void * vs)
492 tr_swarm * s = vs;
494 assert (s);
495 assert (!s->isRunning);
496 assert (swarmIsLocked (s));
497 assert (tr_ptrArrayEmpty (&s->outgoingHandshakes));
498 assert (tr_ptrArrayEmpty (&s->peers));
500 tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
501 tr_ptrArrayDestruct (&s->pool, (PtrArrayForeachFunc)tr_free);
502 tr_ptrArrayDestruct (&s->outgoingHandshakes, NULL);
503 tr_ptrArrayDestruct (&s->peers, NULL);
504 s->stats = TR_SWARM_STATS_INIT;
506 replicationFree (s);
508 tr_free (s->requests);
509 tr_free (s->pieces);
510 tr_free (s);
513 static void peerCallbackFunc (tr_peer *, const tr_peer_event *, void *);
515 static void
516 rebuildWebseedArray (tr_swarm * s, tr_torrent * tor)
518 unsigned int i;
519 const tr_info * inf = &tor->info;
521 /* clear the array */
522 tr_ptrArrayDestruct (&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
523 s->webseeds = TR_PTR_ARRAY_INIT;
524 s->stats.activeWebseedCount = 0;
526 /* repopulate it */
527 for (i=0; i<inf->webseedCount; ++i)
529 tr_webseed * w = tr_webseedNew (tor, inf->webseeds[i], peerCallbackFunc, s);
530 tr_ptrArrayAppend (&s->webseeds, w);
534 static tr_swarm *
535 swarmNew (tr_peerMgr * manager, tr_torrent * tor)
537 tr_swarm * s;
539 s = tr_new0 (tr_swarm, 1);
540 s->manager = manager;
541 s->tor = tor;
542 s->pool = TR_PTR_ARRAY_INIT;
543 s->peers = TR_PTR_ARRAY_INIT;
544 s->webseeds = TR_PTR_ARRAY_INIT;
545 s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
547 rebuildWebseedArray (s, tor);
549 return s;
552 static void ensureMgrTimersExist (struct tr_peerMgr * m);
554 tr_peerMgr*
555 tr_peerMgrNew (tr_session * session)
557 tr_peerMgr * m = tr_new0 (tr_peerMgr, 1);
558 m->session = session;
559 m->incomingHandshakes = TR_PTR_ARRAY_INIT;
560 ensureMgrTimersExist (m);
561 return m;
564 static void
565 deleteTimer (struct event ** t)
567 if (*t != NULL)
569 event_free (*t);
570 *t = NULL;
574 static void
575 deleteTimers (struct tr_peerMgr * m)
577 deleteTimer (&m->atomTimer);
578 deleteTimer (&m->bandwidthTimer);
579 deleteTimer (&m->rechokeTimer);
580 deleteTimer (&m->refillUpkeepTimer);
583 void
584 tr_peerMgrFree (tr_peerMgr * manager)
586 managerLock (manager);
588 deleteTimers (manager);
590 /* free the handshakes. Abort invokes handshakeDoneCB (), which removes
591 * the item from manager->handshakes, so this is a little roundabout... */
592 while (!tr_ptrArrayEmpty (&manager->incomingHandshakes))
593 tr_handshakeAbort (tr_ptrArrayNth (&manager->incomingHandshakes, 0));
595 tr_ptrArrayDestruct (&manager->incomingHandshakes, NULL);
597 managerUnlock (manager);
598 tr_free (manager);
601 /***
602 ****
603 ***/
605 void
606 tr_peerMgrOnBlocklistChanged (tr_peerMgr * mgr)
608 tr_torrent * tor = NULL;
609 tr_session * session = mgr->session;
611 /* we cache whether or not a peer is blocklisted...
612 since the blocklist has changed, erase that cached value */
613 while ((tor = tr_torrentNext (session, tor)))
615 int i;
616 tr_swarm * s = tor->swarm;
617 const int n = tr_ptrArraySize (&s->pool);
618 for (i=0; i<n; ++i)
620 struct peer_atom * atom = tr_ptrArrayNth (&s->pool, i);
621 atom->blocklisted = -1;
626 static bool
627 isAtomBlocklisted (tr_session * session, struct peer_atom * atom)
629 if (atom->blocklisted < 0)
630 atom->blocklisted = tr_sessionIsAddressBlocked (session, &atom->addr);
632 assert (tr_isBool (atom->blocklisted));
633 return atom->blocklisted;
637 /***
638 ****
639 ***/
641 static void
642 atomSetSeedProbability (struct peer_atom * atom, int seedProbability)
644 assert (atom != NULL);
645 assert (-1<=seedProbability && seedProbability<=100);
647 atom->seedProbability = seedProbability;
649 if (seedProbability == 100)
650 atom->flags |= ADDED_F_SEED_FLAG;
651 else if (seedProbability != -1)
652 atom->flags &= ~ADDED_F_SEED_FLAG;
655 static inline bool
656 atomIsSeed (const struct peer_atom * atom)
658 return atom->seedProbability == 100;
661 static void
662 atomSetSeed (const tr_swarm * s, struct peer_atom * atom)
664 if (!atomIsSeed (atom))
666 tordbg (s, "marking peer %s as a seed", tr_atomAddrStr (atom));
668 atomSetSeedProbability (atom, 100);
673 bool
674 tr_peerMgrPeerIsSeed (const tr_torrent * tor,
675 const tr_address * addr)
677 bool isSeed = false;
678 const tr_swarm * s = tor->swarm;
679 const struct peer_atom * atom = getExistingAtom (s, addr);
681 if (atom)
682 isSeed = atomIsSeed (atom);
684 return isSeed;
687 void
688 tr_peerMgrSetUtpSupported (tr_torrent * tor, const tr_address * addr)
690 struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
692 if (atom)
693 atom->flags |= ADDED_F_UTP_FLAGS;
696 void
697 tr_peerMgrSetUtpFailed (tr_torrent *tor, const tr_address *addr, bool failed)
699 struct peer_atom * atom = getExistingAtom (tor->swarm, addr);
701 if (atom)
702 atom->utp_failed = failed;
707 *** REQUESTS
709 *** There are two data structures associated with managing block requests:
711 *** 1. tr_swarm::requests, an array of "struct block_request" which keeps
712 *** track of which blocks have been requested, and when, and by which peers.
713 *** This is list is used for (a) cancelling requests that have been pending
714 *** for too long and (b) avoiding duplicate requests before endgame.
716 *** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
717 *** pieces that we want to request. It's used to decide which blocks to
718 *** return next when tr_peerMgrGetBlockRequests () is called.
722 *** struct block_request
725 static int
726 compareReqByBlock (const void * va, const void * vb)
728 const struct block_request * a = va;
729 const struct block_request * b = vb;
731 /* primary key: block */
732 if (a->block < b->block) return -1;
733 if (a->block > b->block) return 1;
735 /* secondary key: peer */
736 if (a->peer < b->peer) return -1;
737 if (a->peer > b->peer) return 1;
739 return 0;
742 static void
743 requestListAdd (tr_swarm * s, tr_block_index_t block, tr_peer * peer)
745 struct block_request key;
747 /* ensure enough room is available... */
748 if (s->requestCount + 1 >= s->requestAlloc)
750 const int CHUNK_SIZE = 128;
751 s->requestAlloc += CHUNK_SIZE;
752 s->requests = tr_renew (struct block_request,
753 s->requests, s->requestAlloc);
756 /* populate the record we're inserting */
757 key.block = block;
758 key.peer = peer;
759 key.sentAt = tr_time ();
761 /* insert the request to our array... */
763 bool exact;
764 const int pos = tr_lowerBound (&key, s->requests, s->requestCount,
765 sizeof (struct block_request),
766 compareReqByBlock, &exact);
767 assert (!exact);
768 memmove (s->requests + pos + 1,
769 s->requests + pos,
770 sizeof (struct block_request) * (s->requestCount++ - pos));
771 s->requests[pos] = key;
774 if (peer != NULL)
776 ++peer->pendingReqsToPeer;
777 assert (peer->pendingReqsToPeer >= 0);
780 /*fprintf (stderr, "added request of block %lu from peer %s... "
781 "there are now %d block\n",
782 (unsigned long)block, tr_atomAddrStr (peer->atom), s->requestCount);*/
785 static struct block_request *
786 requestListLookup (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
788 struct block_request key;
789 key.block = block;
790 key.peer = (tr_peer*) peer;
792 return bsearch (&key, s->requests, s->requestCount,
793 sizeof (struct block_request),
794 compareReqByBlock);
798 * Find the peers are we currently requesting the block
799 * with index @a block from and append them to @a peerArr.
801 static void
802 getBlockRequestPeers (tr_swarm * s, tr_block_index_t block,
803 tr_ptrArray * peerArr)
805 bool exact;
806 int i, pos;
807 struct block_request key;
809 key.block = block;
810 key.peer = NULL;
811 pos = tr_lowerBound (&key, s->requests, s->requestCount,
812 sizeof (struct block_request),
813 compareReqByBlock, &exact);
815 assert (!exact); /* shouldn't have a request with .peer == NULL */
817 for (i=pos; i<s->requestCount; ++i)
819 if (s->requests[i].block != block)
820 break;
821 tr_ptrArrayAppend (peerArr, s->requests[i].peer);
825 static void
826 decrementPendingReqCount (const struct block_request * b)
828 if (b->peer != NULL)
829 if (b->peer->pendingReqsToPeer > 0)
830 --b->peer->pendingReqsToPeer;
833 static void
834 requestListRemove (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
836 const struct block_request * b = requestListLookup (s, block, peer);
838 if (b != NULL)
840 const int pos = b - s->requests;
841 assert (pos < s->requestCount);
843 decrementPendingReqCount (b);
845 tr_removeElementFromArray (s->requests,
846 pos,
847 sizeof (struct block_request),
848 s->requestCount--);
850 /*fprintf (stderr, "removing request of block %lu from peer %s... "
851 "there are now %d block requests left\n",
852 (unsigned long)block, tr_atomAddrStr (peer->atom), t->requestCount);*/
856 static int
857 countActiveWebseeds (tr_swarm * s)
859 int activeCount = 0;
861 if (s->tor->isRunning && !tr_torrentIsSeed (s->tor))
863 int i;
864 const int n = tr_ptrArraySize (&s->webseeds);
865 const uint64_t now = tr_time_msec ();
867 for (i=0; i<n; ++i)
868 if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, NULL))
869 ++activeCount;
872 return activeCount;
875 static bool
876 testForEndgame (const tr_swarm * s)
878 /* we consider ourselves to be in endgame if the number of bytes
879 we've got requested is >= the number of bytes left to download */
880 return (s->requestCount * s->tor->blockSize)
881 >= tr_torrentGetLeftUntilDone (s->tor);
884 static void
885 updateEndgame (tr_swarm * s)
887 assert (s->requestCount >= 0);
889 if (!testForEndgame (s))
891 /* not in endgame */
892 s->endgame = 0;
894 else if (!s->endgame) /* only recalculate when endgame first begins */
896 int i;
897 int numDownloading = 0;
898 const int n = tr_ptrArraySize (&s->peers);
900 /* add the active bittorrent peers... */
901 for (i=0; i<n; ++i)
903 const tr_peer * p = tr_ptrArrayNth (&s->peers, i);
904 if (p->pendingReqsToPeer > 0)
905 ++numDownloading;
908 /* add the active webseeds... */
909 numDownloading += countActiveWebseeds (s);
911 /* average number of pending requests per downloading peer */
912 s->endgame = s->requestCount / MAX (numDownloading, 1);
917 /****
918 *****
919 ***** Piece List Manipulation / Accessors
920 *****
921 ****/
923 static inline void
924 invalidatePieceSorting (tr_swarm * s)
926 s->pieceSortState = PIECES_UNSORTED;
929 static const tr_torrent * weightTorrent;
931 static const uint16_t * weightReplication;
933 static void
934 setComparePieceByWeightTorrent (tr_swarm * s)
936 if (!replicationExists (s))
937 replicationNew (s);
939 weightTorrent = s->tor;
940 weightReplication = s->pieceReplication;
943 /* we try to create a "weight" s.t. high-priority pieces come before others,
944 * and that partially-complete pieces come before empty ones. */
945 static int
946 comparePieceByWeight (const void * va, const void * vb)
948 const struct weighted_piece * a = va;
949 const struct weighted_piece * b = vb;
950 int ia, ib, missing, pending;
951 const tr_torrent * tor = weightTorrent;
952 const uint16_t * rep = weightReplication;
954 /* primary key: weight */
955 missing = tr_torrentMissingBlocksInPiece (tor, a->index);
956 pending = a->requestCount;
957 ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
958 missing = tr_torrentMissingBlocksInPiece (tor, b->index);
959 pending = b->requestCount;
960 ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
961 if (ia < ib) return -1;
962 if (ia > ib) return 1;
964 /* secondary key: higher priorities go first */
965 ia = tor->info.pieces[a->index].priority;
966 ib = tor->info.pieces[b->index].priority;
967 if (ia > ib) return -1;
968 if (ia < ib) return 1;
970 /* tertiary key: rarest first. */
971 ia = rep[a->index];
972 ib = rep[b->index];
973 if (ia < ib) return -1;
974 if (ia > ib) return 1;
976 /* quaternary key: random */
977 if (a->salt < b->salt) return -1;
978 if (a->salt > b->salt) return 1;
980 /* okay, they're equal */
981 return 0;
984 static int
985 comparePieceByIndex (const void * va, const void * vb)
987 const struct weighted_piece * a = va;
988 const struct weighted_piece * b = vb;
989 if (a->index < b->index) return -1;
990 if (a->index > b->index) return 1;
991 return 0;
994 static void
995 pieceListSort (tr_swarm * s, enum piece_sort_state state)
997 assert (state==PIECES_SORTED_BY_INDEX
998 || state==PIECES_SORTED_BY_WEIGHT);
1000 if (state == PIECES_SORTED_BY_WEIGHT)
1002 setComparePieceByWeightTorrent (s);
1003 qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByWeight);
1005 else
1007 qsort (s->pieces, s->pieceCount, sizeof (struct weighted_piece), comparePieceByIndex);
1010 s->pieceSortState = state;
1014 * These functions are useful for testing, but too expensive for nightly builds.
1015 * let's leave it disabled but add an easy hook to compile it back in
1017 #if 1
1018 #define assertWeightedPiecesAreSorted(t)
1019 #define assertReplicationCountIsExact(t)
1020 #else
1021 static void
1022 assertWeightedPiecesAreSorted (Torrent * t)
1024 if (!t->endgame)
1026 int i;
1027 setComparePieceByWeightTorrent (t);
1028 for (i=0; i<t->pieceCount-1; ++i)
1029 assert (comparePieceByWeight (&t->pieces[i], &t->pieces[i+1]) <= 0);
1032 static void
1033 assertReplicationCountIsExact (Torrent * t)
1035 /* This assert might fail due to errors of implementations in other
1036 * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1037 * from a client. If a such a behavior is noticed,
1038 * a bug report should be filled to the faulty client. */
1040 size_t piece_i;
1041 const uint16_t * rep = t->pieceReplication;
1042 const size_t piece_count = t->pieceReplicationSize;
1043 const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&t->peers);
1044 const int peer_count = tr_ptrArraySize (&t->peers);
1046 assert (piece_count == t->tor->info.pieceCount);
1048 for (piece_i=0; piece_i<piece_count; ++piece_i)
1050 int peer_i;
1051 uint16_t r = 0;
1053 for (peer_i=0; peer_i<peer_count; ++peer_i)
1054 if (tr_bitsetHas (&peers[peer_i]->have, piece_i))
1055 ++r;
1057 assert (rep[piece_i] == r);
1060 #endif
1062 static struct weighted_piece *
1063 pieceListLookup (tr_swarm * s, tr_piece_index_t index)
1065 int i;
1067 for (i=0; i<s->pieceCount; ++i)
1068 if (s->pieces[i].index == index)
1069 return &s->pieces[i];
1071 return NULL;
1074 static void
1075 pieceListRebuild (tr_swarm * s)
1077 if (!tr_torrentIsSeed (s->tor))
1079 tr_piece_index_t i;
1080 tr_piece_index_t * pool;
1081 tr_piece_index_t poolCount = 0;
1082 const tr_torrent * tor = s->tor;
1083 const tr_info * inf = tr_torrentInfo (tor);
1084 struct weighted_piece * pieces;
1085 int pieceCount;
1087 /* build the new list */
1088 pool = tr_new (tr_piece_index_t, inf->pieceCount);
1089 for (i=0; i<inf->pieceCount; ++i)
1090 if (!inf->pieces[i].dnd)
1091 if (!tr_torrentPieceIsComplete (tor, i))
1092 pool[poolCount++] = i;
1093 pieceCount = poolCount;
1094 pieces = tr_new0 (struct weighted_piece, pieceCount);
1095 for (i=0; i<poolCount; ++i)
1097 struct weighted_piece * piece = pieces + i;
1098 piece->index = pool[i];
1099 piece->requestCount = 0;
1100 piece->salt = tr_cryptoWeakRandInt (4096);
1103 /* if we already had a list of pieces, merge it into
1104 * the new list so we don't lose its requestCounts */
1105 if (s->pieces != NULL)
1107 struct weighted_piece * o = s->pieces;
1108 struct weighted_piece * oend = o + s->pieceCount;
1109 struct weighted_piece * n = pieces;
1110 struct weighted_piece * nend = n + pieceCount;
1112 pieceListSort (s, PIECES_SORTED_BY_INDEX);
1114 while (o!=oend && n!=nend)
1116 if (o->index < n->index)
1117 ++o;
1118 else if (o->index > n->index)
1119 ++n;
1120 else
1121 *n++ = *o++;
1124 tr_free (s->pieces);
1127 s->pieces = pieces;
1128 s->pieceCount = pieceCount;
1130 pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1132 /* cleanup */
1133 tr_free (pool);
1137 static void
1138 pieceListRemovePiece (tr_swarm * s, tr_piece_index_t piece)
1140 struct weighted_piece * p;
1142 if ((p = pieceListLookup (s, piece)))
1144 const int pos = p - s->pieces;
1146 tr_removeElementFromArray (s->pieces,
1147 pos,
1148 sizeof (struct weighted_piece),
1149 s->pieceCount--);
1151 if (s->pieceCount == 0)
1153 tr_free (s->pieces);
1154 s->pieces = NULL;
1159 static void
1160 pieceListResortPiece (tr_swarm * s, struct weighted_piece * p)
1162 int pos;
1163 bool isSorted = true;
1165 if (p == NULL)
1166 return;
1168 /* is the torrent already sorted? */
1169 pos = p - s->pieces;
1170 setComparePieceByWeightTorrent (s);
1171 if (isSorted && (pos > 0) && (comparePieceByWeight (p-1, p) > 0))
1172 isSorted = false;
1173 if (isSorted && (pos < s->pieceCount - 1) && (comparePieceByWeight (p, p+1) > 0))
1174 isSorted = false;
1176 if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1178 pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1179 isSorted = true;
1182 /* if it's not sorted, move it around */
1183 if (!isSorted)
1185 bool exact;
1186 const struct weighted_piece tmp = *p;
1188 tr_removeElementFromArray (s->pieces,
1189 pos,
1190 sizeof (struct weighted_piece),
1191 s->pieceCount--);
1193 pos = tr_lowerBound (&tmp, s->pieces, s->pieceCount,
1194 sizeof (struct weighted_piece),
1195 comparePieceByWeight, &exact);
1197 memmove (&s->pieces[pos + 1],
1198 &s->pieces[pos],
1199 sizeof (struct weighted_piece) * (s->pieceCount++ - pos));
1201 s->pieces[pos] = tmp;
1204 assertWeightedPiecesAreSorted (s);
1207 static void
1208 pieceListRemoveRequest (tr_swarm * s, tr_block_index_t block)
1210 struct weighted_piece * p;
1211 const tr_piece_index_t index = tr_torBlockPiece (s->tor, block);
1213 if (((p = pieceListLookup (s, index))) && (p->requestCount > 0))
1215 --p->requestCount;
1216 pieceListResortPiece (s, p);
1221 /****
1222 *****
1223 ***** Replication count (for rarest first policy)
1224 *****
1225 ****/
1228 * Increase the replication count of this piece and sort it if the
1229 * piece list is already sorted
1231 static void
1232 tr_incrReplicationOfPiece (tr_swarm * s, const size_t index)
1234 assert (replicationExists (s));
1235 assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1237 /* One more replication of this piece is present in the swarm */
1238 ++s->pieceReplication[index];
1240 /* we only resort the piece if the list is already sorted */
1241 if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1242 pieceListResortPiece (s, pieceListLookup (s, index));
1246 * Increases the replication count of pieces present in the bitfield
1248 static void
1249 tr_incrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1251 size_t i;
1252 uint16_t * rep = s->pieceReplication;
1253 const size_t n = s->tor->info.pieceCount;
1255 assert (replicationExists (s));
1257 for (i=0; i<n; ++i)
1258 if (tr_bitfieldHas (b, i))
1259 ++rep[i];
1261 if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1262 invalidatePieceSorting (s);
1266 * Increase the replication count of every piece
1268 static void
1269 tr_incrReplication (tr_swarm * s)
1271 int i;
1272 const int n = s->pieceReplicationSize;
1274 assert (replicationExists (s));
1275 assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1277 for (i=0; i<n; ++i)
1278 ++s->pieceReplication[i];
1282 * Decrease the replication count of pieces present in the bitset.
1284 static void
1285 tr_decrReplicationFromBitfield (tr_swarm * s, const tr_bitfield * b)
1287 int i;
1288 const int n = s->pieceReplicationSize;
1290 assert (replicationExists (s));
1291 assert (s->pieceReplicationSize == s->tor->info.pieceCount);
1293 if (tr_bitfieldHasAll (b))
1295 for (i=0; i<n; ++i)
1296 --s->pieceReplication[i];
1298 else if (!tr_bitfieldHasNone (b))
1300 for (i=0; i<n; ++i)
1301 if (tr_bitfieldHas (b, i))
1302 --s->pieceReplication[i];
1304 if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1305 invalidatePieceSorting (s);
1313 void
1314 tr_peerMgrRebuildRequests (tr_torrent * tor)
1316 assert (tr_isTorrent (tor));
1318 pieceListRebuild (tor->swarm);
1321 void
1322 tr_peerMgrGetNextRequests (tr_torrent * tor,
1323 tr_peer * peer,
1324 int numwant,
1325 tr_block_index_t * setme,
1326 int * numgot,
1327 bool get_intervals)
1329 int i;
1330 int got;
1331 tr_swarm * s;
1332 struct weighted_piece * pieces;
1333 const tr_bitfield * const have = &peer->have;
1335 /* sanity clause */
1336 assert (tr_isTorrent (tor));
1337 assert (numwant > 0);
1339 /* walk through the pieces and find blocks that should be requested */
1340 got = 0;
1341 s = tor->swarm;
1343 /* prep the pieces list */
1344 if (s->pieces == NULL)
1345 pieceListRebuild (s);
1347 if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1348 pieceListSort (s, PIECES_SORTED_BY_WEIGHT);
1350 assertReplicationCountIsExact (s);
1351 assertWeightedPiecesAreSorted (s);
1353 updateEndgame (s);
1354 pieces = s->pieces;
1355 for (i=0; i<s->pieceCount && got<numwant; ++i)
1357 struct weighted_piece * p = pieces + i;
1359 /* if the peer has this piece that we want... */
1360 if (tr_bitfieldHas (have, p->index))
1362 tr_block_index_t b;
1363 tr_block_index_t first;
1364 tr_block_index_t last;
1365 tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1367 tr_torGetPieceBlockRange (tor, p->index, &first, &last);
1369 for (b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b)
1371 int peerCount;
1372 tr_peer ** peers;
1374 /* don't request blocks we've already got */
1375 if (tr_torrentBlockIsComplete (tor, b))
1376 continue;
1378 /* always add peer if this block has no peers yet */
1379 tr_ptrArrayClear (&peerArr);
1380 getBlockRequestPeers (s, b, &peerArr);
1381 peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1382 if (peerCount != 0)
1384 /* don't make a second block request until the endgame */
1385 if (!s->endgame)
1386 continue;
1388 /* don't have more than two peers requesting this block */
1389 if (peerCount > 1)
1390 continue;
1392 /* don't send the same request to the same peer twice */
1393 if (peer == peers[0])
1394 continue;
1396 /* in the endgame allow an additional peer to download a
1397 block but only if the peer seems to be handling requests
1398 relatively fast */
1399 if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1400 continue;
1403 /* update the caller's table */
1404 if (!get_intervals)
1406 setme[got++] = b;
1408 /* if intervals are requested two array entries are necessarry:
1409 one for the interval's starting block and one for its end block */
1410 else if (got && setme[2 * got - 1] == b - 1 && b != first)
1412 /* expand the last interval */
1413 ++setme[2 * got - 1];
1415 else
1417 /* begin a new interval */
1418 setme[2 * got] = setme[2 * got + 1] = b;
1419 ++got;
1422 /* update our own tables */
1423 requestListAdd (s, b, peer);
1424 ++p->requestCount;
1427 tr_ptrArrayDestruct (&peerArr, NULL);
1431 /* In most cases we've just changed the weights of a small number of pieces.
1432 * So rather than qsort ()ing the entire array, it's faster to apply an
1433 * adaptive insertion sort algorithm. */
1434 if (got > 0)
1436 /* not enough requests || last piece modified */
1437 if (i == s->pieceCount)
1438 --i;
1440 setComparePieceByWeightTorrent (s);
1441 while (--i >= 0)
1443 bool exact;
1445 /* relative position! */
1446 const int newpos = tr_lowerBound (&s->pieces[i], &s->pieces[i + 1],
1447 s->pieceCount - (i + 1),
1448 sizeof (struct weighted_piece),
1449 comparePieceByWeight, &exact);
1450 if (newpos > 0)
1452 const struct weighted_piece piece = s->pieces[i];
1453 memmove (&s->pieces[i],
1454 &s->pieces[i + 1],
1455 sizeof (struct weighted_piece) * (newpos));
1456 s->pieces[i + newpos] = piece;
1461 assertWeightedPiecesAreSorted (t);
1462 *numgot = got;
1465 bool
1466 tr_peerMgrDidPeerRequest (const tr_torrent * tor,
1467 const tr_peer * peer,
1468 tr_block_index_t block)
1470 return requestListLookup ((tr_swarm*)tor->swarm, block, peer) != NULL;
1473 /* cancel requests that are too old */
1474 static void
1475 refillUpkeep (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
1477 time_t now;
1478 time_t too_old;
1479 tr_torrent * tor;
1480 int cancel_buflen = 0;
1481 struct block_request * cancel = NULL;
1482 tr_peerMgr * mgr = vmgr;
1483 managerLock (mgr);
1485 now = tr_time ();
1486 too_old = now - REQUEST_TTL_SECS;
1488 /* alloc the temporary "cancel" buffer */
1489 tor = NULL;
1490 while ((tor = tr_torrentNext (mgr->session, tor)))
1491 cancel_buflen = MAX (cancel_buflen, tor->swarm->requestCount);
1492 if (cancel_buflen > 0)
1493 cancel = tr_new (struct block_request, cancel_buflen);
1495 /* prune requests that are too old */
1496 tor = NULL;
1497 while ((tor = tr_torrentNext (mgr->session, tor)))
1499 tr_swarm * s = tor->swarm;
1500 const int n = s->requestCount;
1501 if (n > 0)
1503 int keepCount = 0;
1504 int cancelCount = 0;
1505 const struct block_request * it;
1506 const struct block_request * end;
1508 for (it=s->requests, end=it+n; it!=end; ++it)
1510 tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1512 if ((msgs !=NULL) && (it->sentAt <= too_old) && !tr_peerMsgsIsReadingBlock (msgs, it->block))
1513 cancel[cancelCount++] = *it;
1514 else
1516 if (it != &s->requests[keepCount])
1517 s->requests[keepCount] = *it;
1518 keepCount++;
1522 /* prune out the ones we aren't keeping */
1523 s->requestCount = keepCount;
1525 /* send cancel messages for all the "cancel" ones */
1526 for (it=cancel, end=it+cancelCount; it!=end; ++it)
1528 tr_peerMsgs * msgs = PEER_MSGS(it->peer);
1530 if (msgs != NULL)
1532 tr_historyAdd (&it->peer->cancelsSentToPeer, now, 1);
1533 tr_peerMsgsCancel (msgs, it->block);
1534 decrementPendingReqCount (it);
1538 /* decrement the pending request counts for the timed-out blocks */
1539 for (it=cancel, end=it+cancelCount; it!=end; ++it)
1540 pieceListRemoveRequest (s, it->block);
1544 tr_free (cancel);
1545 tr_timerAddMsec (mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1546 managerUnlock (mgr);
1549 static void
1550 addStrike (tr_swarm * s, tr_peer * peer)
1552 tordbg (s, "increasing peer %s strike count to %d",
1553 tr_atomAddrStr (peer->atom), peer->strikes + 1);
1555 if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1557 struct peer_atom * atom = peer->atom;
1558 atom->flags2 |= MYFLAG_BANNED;
1559 peer->doPurge = 1;
1560 tordbg (s, "banning peer %s", tr_atomAddrStr (atom));
1564 static void
1565 peerSuggestedPiece (tr_swarm * s UNUSED,
1566 tr_peer * peer UNUSED,
1567 tr_piece_index_t pieceIndex UNUSED,
1568 int isFastAllowed UNUSED)
1570 #if 0
1571 assert (t);
1572 assert (peer);
1573 assert (peer->msgs);
1575 /* is this a valid piece? */
1576 if (pieceIndex >= t->tor->info.pieceCount)
1577 return;
1579 /* don't ask for it if we've already got it */
1580 if (tr_torrentPieceIsComplete (t->tor, pieceIndex))
1581 return;
1583 /* don't ask for it if they don't have it */
1584 if (!tr_bitfieldHas (peer->have, pieceIndex))
1585 return;
1587 /* don't ask for it if we're choked and it's not fast */
1588 if (!isFastAllowed && peer->clientIsChoked)
1589 return;
1591 /* request the blocks that we don't have in this piece */
1593 tr_block_index_t b;
1594 tr_block_index_t first;
1595 tr_block_index_t last;
1596 const tr_torrent * tor = t->tor;
1598 tr_torGetPieceBlockRange (t->tor, pieceIndex, &first, &last);
1600 for (b=first; b<=last; ++b)
1602 if (tr_torrentBlockIsComplete (tor, b))
1604 const uint32_t offset = getBlockOffsetInPiece (tor, b);
1605 const uint32_t length = tr_torBlockCountBytes (tor, b);
1606 tr_peerMsgsAddRequest (peer->msgs, pieceIndex, offset, length);
1607 incrementPieceRequests (t, pieceIndex);
1611 #endif
1614 static void
1615 removeRequestFromTables (tr_swarm * s, tr_block_index_t block, const tr_peer * peer)
1617 requestListRemove (s, block, peer);
1618 pieceListRemoveRequest (s, block);
1621 /* peer choked us, or maybe it disconnected.
1622 either way we need to remove all its requests */
1623 static void
1624 peerDeclinedAllRequests (tr_swarm * s, const tr_peer * peer)
1626 int i, n;
1627 tr_block_index_t * blocks = tr_new (tr_block_index_t, s->requestCount);
1629 for (i=n=0; i<s->requestCount; ++i)
1630 if (peer == s->requests[i].peer)
1631 blocks[n++] = s->requests[i].block;
1633 for (i=0; i<n; ++i)
1634 removeRequestFromTables (s, blocks[i], peer);
1636 tr_free (blocks);
1639 static void
1640 cancelAllRequestsForBlock (tr_swarm * s,
1641 tr_block_index_t block,
1642 tr_peer * no_notify)
1644 int i;
1645 int peerCount;
1646 tr_peer ** peers;
1647 tr_ptrArray peerArr;
1649 peerArr = TR_PTR_ARRAY_INIT;
1650 getBlockRequestPeers (s, block, &peerArr);
1651 peers = (tr_peer **) tr_ptrArrayPeek (&peerArr, &peerCount);
1652 for (i=0; i<peerCount; ++i)
1654 tr_peer * p = peers[i];
1656 if ((p != no_notify) && tr_isPeerMsgs (p))
1658 tr_historyAdd (&p->cancelsSentToPeer, tr_time (), 1);
1659 tr_peerMsgsCancel (PEER_MSGS(p), block);
1662 removeRequestFromTables (s, block, p);
1665 tr_ptrArrayDestruct (&peerArr, NULL);
1668 void
1669 tr_peerMgrPieceCompleted (tr_torrent * tor, tr_piece_index_t p)
1671 int i;
1672 bool pieceCameFromPeers = false;
1673 tr_swarm * const s = tor->swarm;
1674 const int n = tr_ptrArraySize (&s->peers);
1676 /* walk through our peers */
1677 for (i=0; i<n; ++i)
1679 tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
1681 /* notify the peer that we now have this piece */
1682 tr_peerMsgsHave (PEER_MSGS(peer), p);
1684 if (!pieceCameFromPeers)
1685 pieceCameFromPeers = tr_bitfieldHas (&peer->blame, p);
1688 if (pieceCameFromPeers) /* webseed downloads don't belong in announce totals */
1689 tr_announcerAddBytes (tor, TR_ANN_DOWN, tr_torPieceCountBytes (tor, p));
1691 /* bookkeeping */
1692 pieceListRemovePiece (s, p);
1693 s->needsCompletenessCheck = true;
1696 static void
1697 peerCallbackFunc (tr_peer * peer, const tr_peer_event * e, void * vs)
1699 tr_swarm * s = vs;
1701 swarmLock (s);
1703 assert (peer != NULL);
1705 switch (e->eventType)
1707 case TR_PEER_PEER_GOT_PIECE_DATA:
1709 const time_t now = tr_time ();
1710 tr_torrent * tor = s->tor;
1712 tor->uploadedCur += e->length;
1713 tr_announcerAddBytes (tor, TR_ANN_UP, e->length);
1714 tr_torrentSetActivityDate (tor, now);
1715 tr_torrentSetDirty (tor);
1716 tr_statsAddUploaded (tor->session, e->length);
1718 if (peer->atom != NULL)
1719 peer->atom->piece_data_time = now;
1721 break;
1724 case TR_PEER_CLIENT_GOT_PIECE_DATA:
1726 const time_t now = tr_time ();
1727 tr_torrent * tor = s->tor;
1729 tor->downloadedCur += e->length;
1730 tr_torrentSetActivityDate (tor, now);
1731 tr_torrentSetDirty (tor);
1733 tr_statsAddDownloaded (tor->session, e->length);
1735 if (peer->atom != NULL)
1736 peer->atom->piece_data_time = now;
1738 break;
1741 case TR_PEER_CLIENT_GOT_HAVE:
1742 if (replicationExists (s))
1744 tr_incrReplicationOfPiece (s, e->pieceIndex);
1745 assertReplicationCountIsExact (s);
1747 break;
1749 case TR_PEER_CLIENT_GOT_HAVE_ALL:
1750 if (replicationExists (s))
1752 tr_incrReplication (s);
1753 assertReplicationCountIsExact (s);
1755 break;
1757 case TR_PEER_CLIENT_GOT_HAVE_NONE:
1758 /* noop */
1759 break;
1761 case TR_PEER_CLIENT_GOT_BITFIELD:
1762 assert (e->bitfield != NULL);
1763 if (replicationExists (s))
1765 tr_incrReplicationFromBitfield (s, e->bitfield);
1766 assertReplicationCountIsExact (s);
1768 break;
1770 case TR_PEER_CLIENT_GOT_REJ:
1772 tr_block_index_t b = _tr_block (s->tor, e->pieceIndex, e->offset);
1773 if (b < s->tor->blockCount)
1774 removeRequestFromTables (s, b, peer);
1775 else
1776 tordbg (s, "Peer %s sent an out-of-range reject message",
1777 tr_atomAddrStr (peer->atom));
1778 break;
1781 case TR_PEER_CLIENT_GOT_CHOKE:
1782 peerDeclinedAllRequests (s, peer);
1783 break;
1785 case TR_PEER_CLIENT_GOT_PORT:
1786 if (peer->atom)
1787 peer->atom->port = e->port;
1788 break;
1790 case TR_PEER_CLIENT_GOT_SUGGEST:
1791 peerSuggestedPiece (s, peer, e->pieceIndex, false);
1792 break;
1794 case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1795 peerSuggestedPiece (s, peer, e->pieceIndex, true);
1796 break;
1798 case TR_PEER_CLIENT_GOT_BLOCK:
1800 tr_torrent * tor = s->tor;
1801 const tr_piece_index_t p = e->pieceIndex;
1802 const tr_block_index_t block = _tr_block (tor, p, e->offset);
1803 cancelAllRequestsForBlock (s, block, peer);
1804 tr_historyAdd (&peer->blocksSentToClient, tr_time(), 1);
1805 pieceListResortPiece (s, pieceListLookup (s, p));
1806 tr_torrentGotBlock (tor, block);
1807 break;
1810 case TR_PEER_ERROR:
1811 if ((e->err == ERANGE) || (e->err == EMSGSIZE) || (e->err == ENOTCONN))
1813 /* some protocol error from the peer */
1814 peer->doPurge = 1;
1815 tordbg (s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1816 tr_atomAddrStr (peer->atom));
1818 else
1820 tordbg (s, "unhandled error: %s", tr_strerror (e->err));
1822 break;
1824 default:
1825 assert (0);
1828 swarmUnlock (s);
1831 static int
1832 getDefaultShelfLife (uint8_t from)
1834 /* in general, peers obtained from firsthand contact
1835 * are better than those from secondhand, etc etc */
1836 switch (from)
1838 case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1839 case TR_PEER_FROM_LTEP : return 60 * 60 * 6;
1840 case TR_PEER_FROM_TRACKER : return 60 * 60 * 3;
1841 case TR_PEER_FROM_DHT : return 60 * 60 * 3;
1842 case TR_PEER_FROM_PEX : return 60 * 60 * 2;
1843 case TR_PEER_FROM_RESUME : return 60 * 60;
1844 case TR_PEER_FROM_LPD : return 10 * 60;
1845 default : return 60 * 60;
1849 static void
1850 ensureAtomExists (tr_swarm * s,
1851 const tr_address * addr,
1852 const tr_port port,
1853 const uint8_t flags,
1854 const int8_t seedProbability,
1855 const uint8_t from)
1857 struct peer_atom * a;
1859 assert (tr_address_is_valid (addr));
1860 assert (from < TR_PEER_FROM__MAX);
1862 a = getExistingAtom (s, addr);
1864 if (a == NULL)
1866 const int jitter = tr_cryptoWeakRandInt (60*10);
1867 a = tr_new0 (struct peer_atom, 1);
1868 a->addr = *addr;
1869 a->port = port;
1870 a->flags = flags;
1871 a->fromFirst = from;
1872 a->fromBest = from;
1873 a->shelf_date = tr_time () + getDefaultShelfLife (from) + jitter;
1874 a->blocklisted = -1;
1875 atomSetSeedProbability (a, seedProbability);
1876 tr_ptrArrayInsertSorted (&s->pool, a, compareAtomsByAddress);
1878 tordbg (s, "got a new atom: %s", tr_atomAddrStr (a));
1880 else
1882 if (from < a->fromBest)
1883 a->fromBest = from;
1885 if (a->seedProbability == -1)
1886 atomSetSeedProbability (a, seedProbability);
1888 a->flags |= flags;
1892 static int
1893 getMaxPeerCount (const tr_torrent * tor)
1895 return tor->maxConnectedPeers;
1898 static int
1899 getPeerCount (const tr_swarm * s)
1901 return tr_ptrArraySize (&s->peers);/* + tr_ptrArraySize (&t->outgoingHandshakes); */
1905 static void
1906 createBitTorrentPeer (tr_torrent * tor,
1907 struct tr_peerIo * io,
1908 struct peer_atom * atom,
1909 tr_quark client)
1911 tr_peer * peer;
1912 tr_peerMsgs * msgs;
1913 tr_swarm * swarm;
1915 assert (atom != NULL);
1916 assert (tr_isTorrent (tor));
1917 assert (tor->swarm != NULL);
1919 swarm = tor->swarm;
1921 peer = (tr_peer*) tr_peerMsgsNew (tor, io, peerCallbackFunc, swarm);
1922 peer->atom = atom;
1923 peer->client = client;
1924 atom->peer = peer;
1926 tr_ptrArrayInsertSorted (&swarm->peers, peer, peerCompare);
1927 ++swarm->stats.peerCount;
1928 ++swarm->stats.peerFromCount[atom->fromFirst];
1930 assert (swarm->stats.peerCount == tr_ptrArraySize (&swarm->peers));
1931 assert (swarm->stats.peerFromCount[atom->fromFirst] <= swarm->stats.peerCount);
1933 msgs = PEER_MSGS (peer);
1934 tr_peerMsgsUpdateActive (msgs, TR_UP);
1935 tr_peerMsgsUpdateActive (msgs, TR_DOWN);
1939 /* FIXME: this is kind of a mess. */
1940 static bool
1941 myHandshakeDoneCB (tr_handshake * handshake,
1942 tr_peerIo * io,
1943 bool readAnythingFromPeer,
1944 bool isConnected,
1945 const uint8_t * peer_id,
1946 void * vmanager)
1948 bool ok = isConnected;
1949 bool success = false;
1950 tr_port port;
1951 const tr_address * addr;
1952 tr_peerMgr * manager = vmanager;
1953 tr_swarm * s;
1955 assert (io);
1956 assert (tr_isBool (ok));
1958 s = tr_peerIoHasTorrentHash (io)
1959 ? getExistingSwarm (manager, tr_peerIoGetTorrentHash (io))
1960 : NULL;
1962 if (tr_peerIoIsIncoming (io))
1963 tr_ptrArrayRemoveSortedPointer (&manager->incomingHandshakes,
1964 handshake, handshakeCompare);
1965 else if (s)
1966 tr_ptrArrayRemoveSortedPointer (&s->outgoingHandshakes,
1967 handshake, handshakeCompare);
1969 if (s)
1970 swarmLock (s);
1972 addr = tr_peerIoGetAddress (io, &port);
1974 if (!ok || !s || !s->isRunning)
1976 if (s)
1978 struct peer_atom * atom = getExistingAtom (s, addr);
1979 if (atom)
1981 ++atom->numFails;
1983 if (!readAnythingFromPeer)
1985 tordbg (s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr (atom), (int)atom->numFails);
1986 atom->flags2 |= MYFLAG_UNREACHABLE;
1991 else /* looking good */
1993 struct peer_atom * atom;
1995 ensureAtomExists (s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
1996 atom = getExistingAtom (s, addr);
1997 atom->time = tr_time ();
1998 atom->piece_data_time = 0;
1999 atom->lastConnectionAt = tr_time ();
2001 if (!tr_peerIoIsIncoming (io))
2003 atom->flags |= ADDED_F_CONNECTABLE;
2004 atom->flags2 &= ~MYFLAG_UNREACHABLE;
2007 /* In principle, this flag specifies whether the peer groks uTP,
2008 not whether it's currently connected over uTP. */
2009 if (io->utp_socket)
2010 atom->flags |= ADDED_F_UTP_FLAGS;
2012 if (atom->flags2 & MYFLAG_BANNED)
2014 tordbg (s, "banned peer %s tried to reconnect",
2015 tr_atomAddrStr (atom));
2017 else if (tr_peerIoIsIncoming (io) && (getPeerCount (s) >= getMaxPeerCount (s->tor)))
2020 else
2022 tr_peer * peer = atom->peer;
2024 if (peer) /* we already have this peer */
2027 else
2029 tr_quark client;
2030 tr_peerIo * io;
2031 char buf[128];
2033 if (peer_id != NULL)
2034 client = tr_quark_new (tr_clientForId (buf, sizeof (buf), peer_id), -1);
2035 else
2036 client = TR_KEY_NONE;
2038 io = tr_handshakeStealIO (handshake); /* this steals its refcount too, which is
2039 balanced by our unref in peerDelete () */
2040 tr_peerIoSetParent (io, &s->tor->bandwidth);
2041 createBitTorrentPeer (s->tor, io, atom, client);
2043 success = true;
2048 if (s != NULL)
2049 swarmUnlock (s);
2051 return success;
2054 void
2055 tr_peerMgrAddIncoming (tr_peerMgr * manager,
2056 tr_address * addr,
2057 tr_port port,
2058 int socket,
2059 struct UTPSocket * utp_socket)
2061 tr_session * session;
2063 managerLock (manager);
2065 assert (tr_isSession (manager->session));
2066 session = manager->session;
2068 if (tr_sessionIsAddressBlocked (session, addr))
2070 tr_logAddDebug ("Banned IP address \"%s\" tried to connect to us", tr_address_to_string (addr));
2071 if (socket >= 0)
2072 tr_netClose (session, socket);
2073 else
2074 UTP_Close (utp_socket);
2076 else if (getExistingHandshake (&manager->incomingHandshakes, addr))
2078 if (socket >= 0)
2079 tr_netClose (session, socket);
2080 else
2081 UTP_Close (utp_socket);
2083 else /* we don't have a connection to them yet... */
2085 tr_peerIo * io;
2086 tr_handshake * handshake;
2088 io = tr_peerIoNewIncoming (session, &session->bandwidth, addr, port, socket, utp_socket);
2090 handshake = tr_handshakeNew (io,
2091 session->encryptionMode,
2092 myHandshakeDoneCB,
2093 manager);
2095 tr_peerIoUnref (io); /* balanced by the implicit ref in tr_peerIoNewIncoming () */
2097 tr_ptrArrayInsertSorted (&manager->incomingHandshakes, handshake,
2098 handshakeCompare);
2101 managerUnlock (manager);
2104 void
2105 tr_peerMgrAddPex (tr_torrent * tor, uint8_t from,
2106 const tr_pex * pex, int8_t seedProbability)
2108 if (tr_isPex (pex)) /* safeguard against corrupt data */
2110 tr_swarm * s = tor->swarm;
2111 managerLock (s->manager);
2113 if (!tr_sessionIsAddressBlocked (s->manager->session, &pex->addr))
2114 if (tr_address_is_valid_for_peers (&pex->addr, pex->port))
2115 ensureAtomExists (s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2117 managerUnlock (s->manager);
2121 void
2122 tr_peerMgrMarkAllAsSeeds (tr_torrent * tor)
2124 tr_swarm * s = tor->swarm;
2125 const int n = tr_ptrArraySize (&s->pool);
2126 struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2127 struct peer_atom ** end = it + n;
2129 while (it != end)
2130 atomSetSeed (s, *it++);
2133 tr_pex *
2134 tr_peerMgrCompactToPex (const void * compact,
2135 size_t compactLen,
2136 const uint8_t * added_f,
2137 size_t added_f_len,
2138 size_t * pexCount)
2140 size_t i;
2141 size_t n = compactLen / 6;
2142 const uint8_t * walk = compact;
2143 tr_pex * pex = tr_new0 (tr_pex, n);
2145 for (i=0; i<n; ++i)
2147 pex[i].addr.type = TR_AF_INET;
2148 memcpy (&pex[i].addr.addr, walk, 4); walk += 4;
2149 memcpy (&pex[i].port, walk, 2); walk += 2;
2150 if (added_f && (n == added_f_len))
2151 pex[i].flags = added_f[i];
2154 *pexCount = n;
2155 return pex;
2158 tr_pex *
2159 tr_peerMgrCompact6ToPex (const void * compact,
2160 size_t compactLen,
2161 const uint8_t * added_f,
2162 size_t added_f_len,
2163 size_t * pexCount)
2165 size_t i;
2166 size_t n = compactLen / 18;
2167 const uint8_t * walk = compact;
2168 tr_pex * pex = tr_new0 (tr_pex, n);
2170 for (i=0; i<n; ++i)
2172 pex[i].addr.type = TR_AF_INET6;
2173 memcpy (&pex[i].addr.addr.addr6.s6_addr, walk, 16); walk += 16;
2174 memcpy (&pex[i].port, walk, 2); walk += 2;
2175 if (added_f && (n == added_f_len))
2176 pex[i].flags = added_f[i];
2179 *pexCount = n;
2180 return pex;
2183 tr_pex *
2184 tr_peerMgrArrayToPex (const void * array,
2185 size_t arrayLen,
2186 size_t * pexCount)
2188 size_t i;
2189 size_t n = arrayLen / (sizeof (tr_address) + 2);
2190 const uint8_t * walk = array;
2191 tr_pex * pex = tr_new0 (tr_pex, n);
2193 for (i=0 ; i<n ; ++i)
2195 memcpy (&pex[i].addr, walk, sizeof (tr_address));
2196 memcpy (&pex[i].port, walk + sizeof (tr_address), 2);
2197 pex[i].flags = 0x00;
2198 walk += sizeof (tr_address) + 2;
2201 *pexCount = n;
2202 return pex;
2209 void
2210 tr_peerMgrGotBadPiece (tr_torrent * tor, tr_piece_index_t pieceIndex)
2212 int i;
2213 int n;
2214 tr_swarm * s = tor->swarm;
2215 const uint32_t byteCount = tr_torPieceCountBytes (tor, pieceIndex);
2217 for (i=0, n=tr_ptrArraySize(&s->peers); i!=n; ++i)
2219 tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2221 if (tr_bitfieldHas (&peer->blame, pieceIndex))
2223 tordbg (s, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2224 tr_atomAddrStr(peer->atom), pieceIndex, (int)peer->strikes + 1);
2225 addStrike (s, peer);
2230 tr_announcerAddBytes (tor, TR_ANN_CORRUPT, byteCount);
2234 tr_pexCompare (const void * va, const void * vb)
2236 int i;
2237 const tr_pex * a = va;
2238 const tr_pex * b = vb;
2240 assert (tr_isPex (a));
2241 assert (tr_isPex (b));
2243 if ((i = tr_address_compare (&a->addr, &b->addr)))
2244 return i;
2246 if (a->port != b->port)
2247 return a->port < b->port ? -1 : 1;
2249 return 0;
2252 /* better goes first */
2253 static int
2254 compareAtomsByUsefulness (const void * va, const void *vb)
2256 const struct peer_atom * a = * (const struct peer_atom**) va;
2257 const struct peer_atom * b = * (const struct peer_atom**) vb;
2259 assert (tr_isAtom (a));
2260 assert (tr_isAtom (b));
2262 if (a->piece_data_time != b->piece_data_time)
2263 return a->piece_data_time > b->piece_data_time ? -1 : 1;
2264 if (a->fromBest != b->fromBest)
2265 return a->fromBest < b->fromBest ? -1 : 1;
2266 if (a->numFails != b->numFails)
2267 return a->numFails < b->numFails ? -1 : 1;
2269 return 0;
2272 static bool
2273 isAtomInteresting (const tr_torrent * tor, struct peer_atom * atom)
2275 if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
2276 return false;
2278 if (peerIsInUse (tor->swarm, atom))
2279 return true;
2281 if (isAtomBlocklisted (tor->session, atom))
2282 return false;
2284 if (atom->flags2 & MYFLAG_BANNED)
2285 return false;
2287 return true;
2291 tr_peerMgrGetPeers (tr_torrent * tor,
2292 tr_pex ** setme_pex,
2293 uint8_t af,
2294 uint8_t list_mode,
2295 int maxCount)
2297 int i;
2298 int n;
2299 int count = 0;
2300 int atomCount = 0;
2301 const tr_swarm * s = tor->swarm;
2302 struct peer_atom ** atoms = NULL;
2303 tr_pex * pex;
2304 tr_pex * walk;
2306 assert (tr_isTorrent (tor));
2307 assert (setme_pex != NULL);
2308 assert (af==TR_AF_INET || af==TR_AF_INET6);
2309 assert (list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING);
2311 managerLock (s->manager);
2314 *** build a list of atoms
2317 if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2319 int i;
2320 const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase (&s->peers);
2321 atomCount = tr_ptrArraySize (&s->peers);
2322 atoms = tr_new (struct peer_atom *, atomCount);
2323 for (i=0; i<atomCount; ++i)
2324 atoms[i] = peers[i]->atom;
2326 else /* TR_PEERS_INTERESTING */
2328 int i;
2329 struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase (&s->pool);
2330 n = tr_ptrArraySize (&s->pool);
2331 atoms = tr_new (struct peer_atom *, n);
2332 for (i=0; i<n; ++i)
2333 if (isAtomInteresting (tor, atomBase[i]))
2334 atoms[atomCount++] = atomBase[i];
2337 qsort (atoms, atomCount, sizeof (struct peer_atom *), compareAtomsByUsefulness);
2340 *** add the first N of them into our return list
2343 n = MIN (atomCount, maxCount);
2344 pex = walk = tr_new0 (tr_pex, n);
2346 for (i=0; i<atomCount && count<n; ++i)
2348 const struct peer_atom * atom = atoms[i];
2349 if (atom->addr.type == af)
2351 assert (tr_address_is_valid (&atom->addr));
2352 walk->addr = atom->addr;
2353 walk->port = atom->port;
2354 walk->flags = atom->flags;
2355 ++count;
2356 ++walk;
2360 qsort (pex, count, sizeof (tr_pex), tr_pexCompare);
2362 assert ((walk - pex) == count);
2363 *setme_pex = pex;
2365 /* cleanup */
2366 tr_free (atoms);
2367 managerUnlock (s->manager);
2368 return count;
2371 static void atomPulse (evutil_socket_t, short, void *);
2372 static void bandwidthPulse (evutil_socket_t, short, void *);
2373 static void rechokePulse (evutil_socket_t, short, void *);
2374 static void reconnectPulse (evutil_socket_t, short, void *);
2376 static struct event *
2377 createTimer (tr_session * session, int msec, event_callback_fn callback, void * cbdata)
2379 struct event * timer = evtimer_new (session->event_base, callback, cbdata);
2380 tr_timerAddMsec (timer, msec);
2381 return timer;
2384 static void
2385 ensureMgrTimersExist (struct tr_peerMgr * m)
2387 if (m->atomTimer == NULL)
2388 m->atomTimer = createTimer (m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2390 if (m->bandwidthTimer == NULL)
2391 m->bandwidthTimer = createTimer (m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2393 if (m->rechokeTimer == NULL)
2394 m->rechokeTimer = createTimer (m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2396 if (m->refillUpkeepTimer == NULL)
2397 m->refillUpkeepTimer = createTimer (m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2400 void
2401 tr_peerMgrStartTorrent (tr_torrent * tor)
2403 tr_swarm * s;
2405 assert (tr_isTorrent (tor));
2406 assert (tr_torrentIsLocked (tor));
2408 s = tor->swarm;
2409 ensureMgrTimersExist (s->manager);
2411 s->isRunning = true;
2412 s->maxPeers = tor->maxConnectedPeers;
2413 s->pieceSortState = PIECES_UNSORTED;
2415 rechokePulse (0, 0, s->manager);
2418 static void removeAllPeers (tr_swarm *);
2420 static void
2421 stopSwarm (tr_swarm * swarm)
2423 swarm->isRunning = false;
2425 replicationFree (swarm);
2426 invalidatePieceSorting (swarm);
2428 removeAllPeers (swarm);
2430 /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB (),
2431 * which removes the handshake from t->outgoingHandshakes... */
2432 while (!tr_ptrArrayEmpty (&swarm->outgoingHandshakes))
2433 tr_handshakeAbort (tr_ptrArrayNth (&swarm->outgoingHandshakes, 0));
2436 void
2437 tr_peerMgrStopTorrent (tr_torrent * tor)
2439 assert (tr_isTorrent (tor));
2440 assert (tr_torrentIsLocked (tor));
2442 stopSwarm (tor->swarm);
2445 void
2446 tr_peerMgrAddTorrent (tr_peerMgr * manager, tr_torrent * tor)
2448 assert (tr_isTorrent (tor));
2449 assert (tr_torrentIsLocked (tor));
2450 assert (tor->swarm == NULL);
2452 tor->swarm = swarmNew (manager, tor);
2455 void
2456 tr_peerMgrRemoveTorrent (tr_torrent * tor)
2458 assert (tr_isTorrent (tor));
2459 assert (tr_torrentIsLocked (tor));
2461 stopSwarm (tor->swarm);
2462 swarmFree (tor->swarm);
2465 void
2466 tr_peerUpdateProgress (tr_torrent * tor, tr_peer * peer)
2468 const tr_bitfield * have = &peer->have;
2470 if (tr_bitfieldHasAll (have))
2472 peer->progress = 1.0;
2474 else if (tr_bitfieldHasNone (have))
2476 peer->progress = 0.0;
2478 else
2480 const float true_count = tr_bitfieldCountTrueBits (have);
2482 if (tr_torrentHasMetadata (tor))
2484 peer->progress = true_count / tor->info.pieceCount;
2486 else /* without pieceCount, this result is only a best guess... */
2488 peer->progress = true_count / (have->bit_count + 1);
2492 /* clamp the progress range */
2493 if (peer->progress < 0.0)
2494 peer->progress = 0.0;
2495 if (peer->progress > 1.0)
2496 peer->progress = 1.0;
2498 if (peer->atom && (peer->progress >= 1.0))
2499 atomSetSeed (tor->swarm, peer->atom);
2502 void
2503 tr_peerMgrOnTorrentGotMetainfo (tr_torrent * tor)
2505 int i;
2506 int peerCount;
2507 tr_peer ** peers;
2509 /* the webseed list may have changed... */
2510 rebuildWebseedArray (tor->swarm, tor);
2512 /* some peer_msgs' progress fields may not be accurate if we
2513 didn't have the metadata before now... so refresh them all... */
2514 peerCount = tr_ptrArraySize (&tor->swarm->peers);
2515 peers = (tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2516 for (i=0; i<peerCount; ++i)
2517 tr_peerUpdateProgress (tor, peers[i]);
2519 /* update the bittorrent peers' willingnes... */
2520 for (i=0; i<peerCount; ++i)
2522 tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_UP);
2523 tr_peerMsgsUpdateActive (tr_peerMsgsCast(peers[i]), TR_DOWN);
2527 void
2528 tr_peerMgrTorrentAvailability (const tr_torrent * tor,
2529 int8_t * tab,
2530 unsigned int tabCount)
2532 assert (tr_isTorrent (tor));
2533 assert (tab != NULL);
2534 assert (tabCount > 0);
2536 memset (tab, 0, tabCount);
2538 if (tr_torrentHasMetadata (tor))
2540 tr_piece_index_t i;
2541 const int peerCount = tr_ptrArraySize (&tor->swarm->peers);
2542 const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&tor->swarm->peers);
2543 const float interval = tor->info.pieceCount / (float)tabCount;
2544 const bool isSeed = tr_torrentGetCompleteness (tor) == TR_SEED;
2546 for (i=0; i<tabCount; ++i)
2548 const int piece = i * interval;
2550 if (isSeed || tr_torrentPieceIsComplete (tor, piece))
2552 tab[i] = -1;
2554 else if (peerCount)
2556 int j;
2557 for (j=0; j<peerCount; ++j)
2558 if (tr_bitfieldHas (&peers[j]->have, piece))
2559 ++tab[i];
2565 void
2566 tr_swarmGetStats (const tr_swarm * swarm, tr_swarm_stats * setme)
2568 assert (swarm != NULL);
2569 assert (setme != NULL);
2571 *setme = swarm->stats;
2574 void
2575 tr_swarmIncrementActivePeers (tr_swarm * swarm, tr_direction direction, bool is_active)
2577 int n = swarm->stats.activePeerCount[direction];
2579 if (is_active)
2580 ++n;
2581 else
2582 --n;
2584 assert (0 <= n);
2585 assert (n <= swarm->stats.peerCount);
2587 swarm->stats.activePeerCount[direction] = n;
2590 bool
2591 tr_peerIsSeed (const tr_peer * peer)
2593 if (peer->progress >= 1.0)
2594 return true;
2596 if (peer->atom && atomIsSeed (peer->atom))
2597 return true;
2599 return false;
2602 /* count how many bytes we want that connected peers have */
2603 uint64_t
2604 tr_peerMgrGetDesiredAvailable (const tr_torrent * tor)
2606 size_t i;
2607 size_t n;
2608 uint64_t desiredAvailable;
2609 const tr_swarm * s;
2611 assert (tr_isTorrent (tor));
2613 /* common shortcuts... */
2615 if (tr_torrentIsSeed (tor))
2616 return 0;
2618 if (!tr_torrentHasMetadata (tor))
2619 return 0;
2621 s = tor->swarm;
2622 if (s == NULL)
2623 return 0;
2625 n = tr_ptrArraySize (&s->peers);
2626 if (n == 0)
2628 return 0;
2630 else
2632 const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase (&s->peers);
2633 for (i=0; i<n; ++i)
2634 if (peers[i]->atom && atomIsSeed (peers[i]->atom))
2635 return tr_torrentGetLeftUntilDone (tor);
2638 if (!s->pieceReplication || !s->pieceReplicationSize)
2639 return 0;
2641 /* do it the hard way */
2643 desiredAvailable = 0;
2644 for (i=0, n=MIN (tor->info.pieceCount, s->pieceReplicationSize); i<n; ++i)
2645 if (!tor->info.pieces[i].dnd && (s->pieceReplication[i] > 0))
2646 desiredAvailable += tr_torrentMissingBytesInPiece (tor, i);
2648 assert (desiredAvailable <= tor->info.totalSize);
2649 return desiredAvailable;
2652 double*
2653 tr_peerMgrWebSpeeds_KBps (const tr_torrent * tor)
2655 unsigned int i;
2656 tr_swarm * s;
2657 unsigned int n;
2658 double * ret = NULL;
2659 const uint64_t now = tr_time_msec ();
2661 assert (tr_isTorrent (tor));
2663 s = tor->swarm;
2664 n = tr_ptrArraySize (&s->webseeds);
2665 ret = tr_new0 (double, n);
2667 assert (s->manager != NULL);
2668 assert (n == tor->info.webseedCount);
2670 for (i=0; i<n; ++i)
2672 unsigned int Bps = 0;
2673 if (tr_peerIsTransferringPieces (tr_ptrArrayNth(&s->webseeds,i), now, TR_DOWN, &Bps))
2674 ret[i] = Bps / (double)tr_speed_K;
2675 else
2676 ret[i] = -1.0;
2679 return ret;
2682 struct tr_peer_stat *
2683 tr_peerMgrPeerStats (const tr_torrent * tor, int * setmeCount)
2685 int i;
2686 int size = 0;
2687 tr_peer_stat * ret;
2688 const tr_swarm * s;
2689 tr_peer ** peers;
2690 const time_t now = tr_time ();
2691 const uint64_t now_msec = tr_time_msec ();
2693 assert (tr_isTorrent (tor));
2694 assert (tor->swarm->manager != NULL);
2696 s = tor->swarm;
2697 peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
2698 size = tr_ptrArraySize (&s->peers);
2699 ret = tr_new0 (tr_peer_stat, size);
2701 for (i=0; i<size; ++i)
2703 char * pch;
2704 tr_peer * peer = peers[i];
2705 tr_peerMsgs * msgs = PEER_MSGS (peer);
2706 const struct peer_atom * atom = peer->atom;
2707 tr_peer_stat * stat = ret + i;
2709 tr_address_to_string_with_buf (&atom->addr, stat->addr, sizeof (stat->addr));
2710 tr_strlcpy (stat->client, tr_quark_get_string(peer->client,NULL), sizeof (stat->client));
2711 stat->port = ntohs (peer->atom->port);
2712 stat->from = atom->fromFirst;
2713 stat->progress = peer->progress;
2714 stat->isUTP = tr_peerMsgsIsUtpConnection (msgs);
2715 stat->isEncrypted = tr_peerMsgsIsEncrypted (msgs);
2716 stat->rateToPeer_KBps = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_CLIENT_TO_PEER));
2717 stat->rateToClient_KBps = toSpeedKBps (tr_peerGetPieceSpeed_Bps (peer, now_msec, TR_PEER_TO_CLIENT));
2718 stat->peerIsChoked = tr_peerMsgsIsPeerChoked (msgs);
2719 stat->peerIsInterested = tr_peerMsgsIsPeerInterested (msgs);
2720 stat->clientIsChoked = tr_peerMsgsIsClientChoked (msgs);
2721 stat->clientIsInterested = tr_peerMsgsIsClientInterested (msgs);
2722 stat->isIncoming = tr_peerMsgsIsIncomingConnection (msgs);
2723 stat->isDownloadingFrom = tr_peerMsgsIsActive (msgs, TR_PEER_TO_CLIENT);
2724 stat->isUploadingTo = tr_peerMsgsIsActive (msgs, TR_CLIENT_TO_PEER);
2725 stat->isSeed = tr_peerIsSeed (peer);
2727 stat->blocksToPeer = tr_historyGet (&peer->blocksSentToPeer, now, CANCEL_HISTORY_SEC);
2728 stat->blocksToClient = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2729 stat->cancelsToPeer = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2730 stat->cancelsToClient = tr_historyGet (&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2732 stat->pendingReqsToPeer = peer->pendingReqsToPeer;
2733 stat->pendingReqsToClient = peer->pendingReqsToClient;
2735 pch = stat->flagStr;
2736 if (stat->isUTP) *pch++ = 'T';
2737 if (s->optimistic == msgs) *pch++ = 'O';
2738 if (stat->isDownloadingFrom) *pch++ = 'D';
2739 else if (stat->clientIsInterested) *pch++ = 'd';
2740 if (stat->isUploadingTo) *pch++ = 'U';
2741 else if (stat->peerIsInterested) *pch++ = 'u';
2742 if (!stat->clientIsChoked && !stat->clientIsInterested) *pch++ = 'K';
2743 if (!stat->peerIsChoked && !stat->peerIsInterested) *pch++ = '?';
2744 if (stat->isEncrypted) *pch++ = 'E';
2745 if (stat->from == TR_PEER_FROM_DHT) *pch++ = 'H';
2746 else if (stat->from == TR_PEER_FROM_PEX) *pch++ = 'X';
2747 if (stat->isIncoming) *pch++ = 'I';
2748 *pch = '\0';
2751 *setmeCount = size;
2752 return ret;
2755 /***
2756 ****
2757 ****
2758 ***/
2760 void
2761 tr_peerMgrClearInterest (tr_torrent * tor)
2763 int i;
2764 tr_swarm * s = tor->swarm;
2765 const int peerCount = tr_ptrArraySize (&s->peers);
2767 assert (tr_isTorrent (tor));
2768 assert (tr_torrentIsLocked (tor));
2770 for (i=0; i<peerCount; ++i)
2771 tr_peerMsgsSetInterested (tr_ptrArrayNth (&s->peers, i), false);
2774 /* does this peer have any pieces that we want? */
2775 static bool
2776 isPeerInteresting (tr_torrent * const tor,
2777 const bool * const piece_is_interesting,
2778 const tr_peer * const peer)
2780 tr_piece_index_t i, n;
2782 /* these cases should have already been handled by the calling code... */
2783 assert (!tr_torrentIsSeed (tor));
2784 assert (tr_torrentIsPieceTransferAllowed (tor, TR_PEER_TO_CLIENT));
2786 if (tr_peerIsSeed (peer))
2787 return true;
2789 for (i=0, n=tor->info.pieceCount; i<n; ++i)
2790 if (piece_is_interesting[i] && tr_bitfieldHas (&peer->have, i))
2791 return true;
2793 return false;
2796 typedef enum
2798 RECHOKE_STATE_GOOD,
2799 RECHOKE_STATE_UNTESTED,
2800 RECHOKE_STATE_BAD
2802 tr_rechoke_state;
2804 struct tr_rechoke_info
2806 tr_peer * peer;
2807 int salt;
2808 int rechoke_state;
2811 static int
2812 compare_rechoke_info (const void * va, const void * vb)
2814 const struct tr_rechoke_info * a = va;
2815 const struct tr_rechoke_info * b = vb;
2817 if (a->rechoke_state != b->rechoke_state)
2818 return a->rechoke_state - b->rechoke_state;
2820 return a->salt - b->salt;
2823 /* determines who we send "interested" messages to */
2824 static void
2825 rechokeDownloads (tr_swarm * s)
2827 int i;
2828 int maxPeers = 0;
2829 int rechoke_count = 0;
2830 struct tr_rechoke_info * rechoke = NULL;
2831 const int MIN_INTERESTING_PEERS = 5;
2832 const int peerCount = tr_ptrArraySize (&s->peers);
2833 const time_t now = tr_time ();
2835 /* some cases where this function isn't necessary */
2836 if (tr_torrentIsSeed (s->tor))
2837 return;
2838 if (!tr_torrentIsPieceTransferAllowed (s->tor, TR_PEER_TO_CLIENT))
2839 return;
2841 /* decide HOW MANY peers to be interested in */
2843 int blocks = 0;
2844 int cancels = 0;
2845 time_t timeSinceCancel;
2847 /* Count up how many blocks & cancels each peer has.
2849 * There are two situations where we send out cancels --
2851 * 1. We've got unresponsive peers, which is handled by deciding
2852 * -which- peers to be interested in.
2854 * 2. We've hit our bandwidth cap, which is handled by deciding
2855 * -how many- peers to be interested in.
2857 * We're working on 2. here, so we need to ignore unresponsive
2858 * peers in our calculations lest they confuse Transmission into
2859 * thinking it's hit its bandwidth cap.
2861 for (i=0; i<peerCount; ++i)
2863 const tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2864 const int b = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2865 const int c = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2867 if (b == 0) /* ignore unresponsive peers, as described above */
2868 continue;
2870 blocks += b;
2871 cancels += c;
2874 if (cancels > 0)
2876 /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2877 * higher values indicate more congestion. */
2878 const double cancelRate = cancels / (double)(cancels + blocks);
2879 const double mult = 1 - MIN (cancelRate, 0.5);
2880 maxPeers = s->interestedCount * mult;
2881 tordbg (s, "cancel rate is %.3f -- reducing the "
2882 "number of peers we're interested in by %.0f percent",
2883 cancelRate, mult * 100);
2884 s->lastCancel = now;
2887 timeSinceCancel = now - s->lastCancel;
2888 if (timeSinceCancel)
2890 const int maxIncrease = 15;
2891 const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2892 const double mult = MIN (timeSinceCancel, maxHistory) / (double) maxHistory;
2893 const int inc = maxIncrease * mult;
2894 maxPeers = s->maxPeers + inc;
2895 tordbg (s, "time since last cancel is %li -- increasing the "
2896 "number of peers we're interested in by %d",
2897 timeSinceCancel, inc);
2901 /* don't let the previous section's number tweaking go too far... */
2902 if (maxPeers < MIN_INTERESTING_PEERS)
2903 maxPeers = MIN_INTERESTING_PEERS;
2904 if (maxPeers > s->tor->maxConnectedPeers)
2905 maxPeers = s->tor->maxConnectedPeers;
2907 s->maxPeers = maxPeers;
2909 if (peerCount > 0)
2911 bool * piece_is_interesting;
2912 const tr_torrent * const tor = s->tor;
2913 const int n = tor->info.pieceCount;
2915 /* build a bitfield of interesting pieces... */
2916 piece_is_interesting = tr_new (bool, n);
2917 for (i=0; i<n; i++)
2918 piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_torrentPieceIsComplete (tor, i);
2920 /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2921 for (i=0; i<peerCount; ++i)
2923 tr_peer * peer = tr_ptrArrayNth (&s->peers, i);
2925 if (!isPeerInteresting (s->tor, piece_is_interesting, peer))
2927 tr_peerMsgsSetInterested (PEER_MSGS(peer), false);
2929 else
2931 tr_rechoke_state rechoke_state;
2932 const int blocks = tr_historyGet (&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2933 const int cancels = tr_historyGet (&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2935 if (!blocks && !cancels)
2936 rechoke_state = RECHOKE_STATE_UNTESTED;
2937 else if (!cancels)
2938 rechoke_state = RECHOKE_STATE_GOOD;
2939 else if (!blocks)
2940 rechoke_state = RECHOKE_STATE_BAD;
2941 else if ((cancels * 10) < blocks)
2942 rechoke_state = RECHOKE_STATE_GOOD;
2943 else
2944 rechoke_state = RECHOKE_STATE_BAD;
2946 if (rechoke == NULL)
2947 rechoke = tr_new (struct tr_rechoke_info, peerCount);
2949 rechoke[rechoke_count].peer = peer;
2950 rechoke[rechoke_count].rechoke_state = rechoke_state;
2951 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt (INT_MAX);
2952 rechoke_count++;
2957 tr_free (piece_is_interesting);
2960 /* now that we know which & how many peers to be interested in... update the peer interest */
2961 qsort (rechoke, rechoke_count, sizeof (struct tr_rechoke_info), compare_rechoke_info);
2962 s->interestedCount = MIN (maxPeers, rechoke_count);
2963 for (i=0; i<rechoke_count; ++i)
2964 tr_peerMsgsSetInterested (PEER_MSGS(rechoke[i].peer), i<s->interestedCount);
2966 /* cleanup */
2967 tr_free (rechoke);
2974 struct ChokeData
2976 bool isInterested;
2977 bool wasChoked;
2978 bool isChoked;
2979 int rate;
2980 int salt;
2981 tr_peerMsgs * msgs;
2984 static int
2985 compareChoke (const void * va, const void * vb)
2987 const struct ChokeData * a = va;
2988 const struct ChokeData * b = vb;
2990 if (a->rate != b->rate) /* prefer higher overall speeds */
2991 return a->rate > b->rate ? -1 : 1;
2993 if (a->wasChoked != b->wasChoked) /* prefer unchoked */
2994 return a->wasChoked ? 1 : -1;
2996 if (a->salt != b->salt) /* random order */
2997 return a->salt - b->salt;
2999 return 0;
3002 /* is this a new connection? */
3003 static bool
3004 isNew (const tr_peerMsgs * msgs)
3006 return (msgs != NULL) && (tr_peerMsgsGetConnectionAge (msgs) < 45);
3009 /* get a rate for deciding which peers to choke and unchoke. */
3010 static int
3011 getRate (const tr_torrent * tor, struct peer_atom * atom, uint64_t now)
3013 unsigned int Bps;
3015 if (tr_torrentIsSeed (tor))
3016 Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3018 /* downloading a private torrent... take upload speed into account
3019 * because there may only be a small window of opportunity to share */
3020 else if (tr_torrentIsPrivate (tor))
3021 Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT)
3022 + tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_CLIENT_TO_PEER);
3024 /* downloading a public torrent */
3025 else
3026 Bps = tr_peerGetPieceSpeed_Bps (atom->peer, now, TR_PEER_TO_CLIENT);
3028 /* convert it to bytes per second */
3029 return Bps;
3032 static inline bool
3033 isBandwidthMaxedOut (const tr_bandwidth * b,
3034 const uint64_t now_msec, tr_direction dir)
3036 if (!tr_bandwidthIsLimited (b, dir))
3038 return false;
3040 else
3042 const unsigned int got = tr_bandwidthGetPieceSpeed_Bps (b, now_msec, dir);
3043 const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps (b, dir);
3044 return got >= want;
3048 static void
3049 rechokeUploads (tr_swarm * s, const uint64_t now)
3051 int i, size, unchokedInterested;
3052 const int peerCount = tr_ptrArraySize (&s->peers);
3053 tr_peer ** peers = (tr_peer**) tr_ptrArrayBase (&s->peers);
3054 struct ChokeData * choke = tr_new0 (struct ChokeData, peerCount);
3055 const tr_session * session = s->manager->session;
3056 const int chokeAll = !tr_torrentIsPieceTransferAllowed (s->tor, TR_CLIENT_TO_PEER);
3057 const bool isMaxedOut = isBandwidthMaxedOut (&s->tor->bandwidth, now, TR_UP);
3059 assert (swarmIsLocked (s));
3061 /* an optimistic unchoke peer's "optimistic"
3062 * state lasts for N calls to rechokeUploads (). */
3063 if (s->optimisticUnchokeTimeScaler > 0)
3064 s->optimisticUnchokeTimeScaler--;
3065 else
3066 s->optimistic = NULL;
3068 /* sort the peers by preference and rate */
3069 for (i=0, size=0; i<peerCount; ++i)
3071 tr_peer * peer = peers[i];
3072 tr_peerMsgs * msgs = PEER_MSGS (peer);
3074 struct peer_atom * atom = peer->atom;
3076 if (tr_peerIsSeed (peer)) /* choke seeds and partial seeds */
3078 tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3080 else if (chokeAll) /* choke everyone if we're not uploading */
3082 tr_peerMsgsSetChoke (PEER_MSGS(peer), true);
3084 else if (msgs != s->optimistic)
3086 struct ChokeData * n = &choke[size++];
3087 n->msgs = msgs;
3088 n->isInterested = tr_peerMsgsIsPeerInterested (msgs);
3089 n->wasChoked = tr_peerMsgsIsPeerChoked (msgs);
3090 n->rate = getRate (s->tor, atom, now);
3091 n->salt = tr_cryptoWeakRandInt (INT_MAX);
3092 n->isChoked = true;
3096 qsort (choke, size, sizeof (struct ChokeData), compareChoke);
3099 * Reciprocation and number of uploads capping is managed by unchoking
3100 * the N peers which have the best upload rate and are interested.
3101 * This maximizes the client's download rate. These N peers are
3102 * referred to as downloaders, because they are interested in downloading
3103 * from the client.
3105 * Peers which have a better upload rate (as compared to the downloaders)
3106 * but aren't interested get unchoked. If they become interested, the
3107 * downloader with the worst upload rate gets choked. If a client has
3108 * a complete file, it uses its upload rate rather than its download
3109 * rate to decide which peers to unchoke.
3111 * If our bandwidth is maxed out, don't unchoke any more peers.
3113 unchokedInterested = 0;
3114 for (i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i)
3116 choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3117 if (choke[i].isInterested)
3118 ++unchokedInterested;
3121 /* optimistic unchoke */
3122 if (!s->optimistic && !isMaxedOut && (i<size))
3124 int n;
3125 struct ChokeData * c;
3126 tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3128 for (; i<size; ++i)
3130 if (choke[i].isInterested)
3132 const tr_peerMsgs * msgs = choke[i].msgs;
3133 int x = 1, y;
3134 if (isNew (msgs)) x *= 3;
3135 for (y=0; y<x; ++y)
3136 tr_ptrArrayAppend (&randPool, &choke[i]);
3140 if ((n = tr_ptrArraySize (&randPool)))
3142 c = tr_ptrArrayNth (&randPool, tr_cryptoWeakRandInt (n));
3143 c->isChoked = false;
3144 s->optimistic = c->msgs;
3145 s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3148 tr_ptrArrayDestruct (&randPool, NULL);
3151 for (i=0; i<size; ++i)
3152 tr_peerMsgsSetChoke (choke[i].msgs, choke[i].isChoked);
3154 /* cleanup */
3155 tr_free (choke);
3158 static void
3159 rechokePulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3161 tr_torrent * tor = NULL;
3162 tr_peerMgr * mgr = vmgr;
3163 const uint64_t now = tr_time_msec ();
3165 managerLock (mgr);
3167 while ((tor = tr_torrentNext (mgr->session, tor)))
3169 if (tor->isRunning)
3171 tr_swarm * s = tor->swarm;
3173 if (s->stats.peerCount > 0)
3175 rechokeUploads (s, now);
3176 rechokeDownloads (s);
3181 tr_timerAddMsec (mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3182 managerUnlock (mgr);
3185 /***
3186 ****
3187 **** Life and Death
3188 ****
3189 ***/
3191 static bool
3192 shouldPeerBeClosed (const tr_swarm * s,
3193 const tr_peer * peer,
3194 int peerCount,
3195 const time_t now)
3197 const tr_torrent * tor = s->tor;
3198 const struct peer_atom * atom = peer->atom;
3200 /* if it's marked for purging, close it */
3201 if (peer->doPurge)
3203 tordbg (s, "purging peer %s because its doPurge flag is set",
3204 tr_atomAddrStr (atom));
3205 return true;
3208 /* disconnect if we're both seeds and enough time has passed for PEX */
3209 if (tr_torrentIsSeed (tor) && tr_peerIsSeed (peer))
3210 return !tr_torrentAllowsPex (tor) || (now-atom->time>=30);
3212 /* disconnect if it's been too long since piece data has been transferred.
3213 * this is on a sliding scale based on number of available peers... */
3215 const int relaxStrictnessIfFewerThanN = (int)((getMaxPeerCount (tor) * 0.9) + 0.5);
3216 /* if we have >= relaxIfFewerThan, strictness is 100%.
3217 * if we have zero connections, strictness is 0% */
3218 const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3219 ? 1.0
3220 : peerCount / (float)relaxStrictnessIfFewerThanN;
3221 const int lo = MIN_UPLOAD_IDLE_SECS;
3222 const int hi = MAX_UPLOAD_IDLE_SECS;
3223 const int limit = hi - ((hi - lo) * strictness);
3224 const int idleTime = now - MAX (atom->time, atom->piece_data_time);
3225 /*fprintf (stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit);*/
3226 if (idleTime > limit)
3228 tordbg (s, "purging peer %s because it's been %d secs since we shared anything",
3229 tr_atomAddrStr (atom), idleTime);
3230 return true;
3234 return false;
3237 static tr_peer **
3238 getPeersToClose (tr_swarm * s, const time_t now_sec, int * setmeSize)
3240 int i, peerCount, outsize;
3241 struct tr_peer ** ret = NULL;
3242 tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek (&s->peers, &peerCount);
3244 assert (swarmIsLocked (s));
3246 for (i=outsize=0; i<peerCount; ++i)
3248 if (shouldPeerBeClosed (s, peers[i], peerCount, now_sec))
3250 if (ret == NULL)
3251 ret = tr_new (tr_peer *, peerCount);
3252 ret[outsize++] = peers[i];
3256 *setmeSize = outsize;
3257 return ret;
3260 static int
3261 getReconnectIntervalSecs (const struct peer_atom * atom, const time_t now)
3263 int sec;
3264 const bool unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3266 /* if we were recently connected to this peer and transferring piece
3267 * data, try to reconnect to them sooner rather that later -- we don't
3268 * want network troubles to get in the way of a good peer. */
3269 if (!unreachable && ((now - atom->piece_data_time) <= (MINIMUM_RECONNECT_INTERVAL_SECS * 2)))
3270 sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3272 /* otherwise, the interval depends on how many times we've tried
3273 * and failed to connect to the peer */
3274 else
3276 int step = atom->numFails;
3278 /* penalize peers that were unreachable the last time we tried */
3279 if (unreachable)
3280 step += 2;
3282 switch (step)
3284 case 0: sec = 0; break;
3285 case 1: sec = 10; break;
3286 case 2: sec = 60 * 2; break;
3287 case 3: sec = 60 * 15; break;
3288 case 4: sec = 60 * 30; break;
3289 case 5: sec = 60 * 60; break;
3290 default: sec = 60 * 120; break;
3294 dbgmsg ("reconnect interval for %s is %d seconds", tr_atomAddrStr (atom), sec);
3295 return sec;
3298 static void
3299 removePeer (tr_swarm * s, tr_peer * peer)
3301 struct peer_atom * atom = peer->atom;
3303 assert (swarmIsLocked (s));
3304 assert (atom);
3306 atom->time = tr_time ();
3308 tr_ptrArrayRemoveSortedPointer (&s->peers, peer, peerCompare);
3309 --s->stats.peerCount;
3310 --s->stats.peerFromCount[atom->fromFirst];
3312 if (replicationExists (s))
3313 tr_decrReplicationFromBitfield (s, &peer->have);
3315 assert (s->stats.peerCount == tr_ptrArraySize (&s->peers));
3316 assert (s->stats.peerFromCount[atom->fromFirst] >= 0);
3318 tr_peerFree (peer);
3321 static void
3322 closePeer (tr_swarm * s, tr_peer * peer)
3324 struct peer_atom * atom;
3326 assert (s != NULL);
3327 assert (peer != NULL);
3329 atom = peer->atom;
3331 /* if we transferred piece data, then they might be good peers,
3332 so reset their `numFails' weight to zero. otherwise we connected
3333 to them fruitlessly, so mark it as another fail */
3334 if (atom->piece_data_time)
3336 tordbg (s, "resetting atom %s numFails to 0", tr_atomAddrStr (atom));
3337 atom->numFails = 0;
3339 else
3341 ++atom->numFails;
3342 tordbg (s, "incremented atom %s numFails to %d", tr_atomAddrStr (atom), (int)atom->numFails);
3345 tordbg (s, "removing bad peer %s", tr_atomAddrStr (peer->atom));
3346 removePeer (s, peer);
3349 static void
3350 removeAllPeers (tr_swarm * s)
3352 while (!tr_ptrArrayEmpty (&s->peers))
3353 removePeer (s, tr_ptrArrayNth (&s->peers, 0));
3355 assert (!s->stats.peerCount);
3358 static void
3359 closeBadPeers (tr_swarm * s, const time_t now_sec)
3361 if (!tr_ptrArrayEmpty (&s->peers))
3363 int i;
3364 int peerCount;
3365 struct tr_peer ** peers;
3367 peers = getPeersToClose (s, now_sec, &peerCount);
3368 for (i=0; i<peerCount; ++i)
3369 closePeer (s, peers[i]);
3370 tr_free (peers);
3374 struct peer_liveliness
3376 tr_peer * peer;
3377 void * clientData;
3378 time_t pieceDataTime;
3379 time_t time;
3380 unsigned int speed;
3381 bool doPurge;
3384 static int
3385 comparePeerLiveliness (const void * va, const void * vb)
3387 const struct peer_liveliness * a = va;
3388 const struct peer_liveliness * b = vb;
3390 if (a->doPurge != b->doPurge)
3391 return a->doPurge ? 1 : -1;
3393 if (a->speed != b->speed) /* faster goes first */
3394 return a->speed > b->speed ? -1 : 1;
3396 /* the one to give us data more recently goes first */
3397 if (a->pieceDataTime != b->pieceDataTime)
3398 return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3400 /* the one we connected to most recently goes first */
3401 if (a->time != b->time)
3402 return a->time > b->time ? -1 : 1;
3404 return 0;
3407 static void
3408 sortPeersByLivelinessImpl (tr_peer ** peers,
3409 void ** clientData,
3410 int n,
3411 uint64_t now,
3412 int (*compare)(const void *va, const void *vb))
3414 int i;
3415 struct peer_liveliness *lives, *l;
3417 /* build a sortable array of peer + extra info */
3418 lives = l = tr_new0 (struct peer_liveliness, n);
3419 for (i=0; i<n; ++i, ++l)
3421 tr_peer * p = peers[i];
3422 l->peer = p;
3423 l->doPurge = p->doPurge;
3424 l->pieceDataTime = p->atom->piece_data_time;
3425 l->time = p->atom->time;
3426 l->speed = tr_peerGetPieceSpeed_Bps (p, now, TR_UP)
3427 + tr_peerGetPieceSpeed_Bps (p, now, TR_DOWN);
3428 if (clientData)
3429 l->clientData = clientData[i];
3432 /* sort 'em */
3433 assert (n == (l - lives));
3434 qsort (lives, n, sizeof (struct peer_liveliness), compare);
3436 /* build the peer array */
3437 for (i=0, l=lives; i<n; ++i, ++l)
3439 peers[i] = l->peer;
3440 if (clientData)
3441 clientData[i] = l->clientData;
3443 assert (n == (l - lives));
3445 /* cleanup */
3446 tr_free (lives);
3449 static void
3450 sortPeersByLiveliness (tr_peer ** peers, void ** clientData, int n, uint64_t now)
3452 sortPeersByLivelinessImpl (peers, clientData, n, now, comparePeerLiveliness);
3456 static void
3457 enforceTorrentPeerLimit (tr_swarm * s, uint64_t now)
3459 int n = tr_ptrArraySize (&s->peers);
3460 const int max = tr_torrentGetPeerLimit (s->tor);
3461 if (n > max)
3463 void * base = tr_ptrArrayBase (&s->peers);
3464 tr_peer ** peers = tr_memdup (base, n*sizeof (tr_peer*));
3465 sortPeersByLiveliness (peers, NULL, n, now);
3466 while (n > max)
3467 closePeer (s, peers[--n]);
3468 tr_free (peers);
3472 static void
3473 enforceSessionPeerLimit (tr_session * session, uint64_t now)
3475 int n = 0;
3476 tr_torrent * tor = NULL;
3477 const int max = tr_sessionGetPeerLimit (session);
3479 /* count the total number of peers */
3480 while ((tor = tr_torrentNext (session, tor)))
3481 n += tr_ptrArraySize (&tor->swarm->peers);
3483 /* if there are too many, prune out the worst */
3484 if (n > max)
3486 tr_peer ** peers = tr_new (tr_peer*, n);
3487 tr_swarm ** swarms = tr_new (tr_swarm*, n);
3489 /* populate the peer array */
3490 n = 0;
3491 tor = NULL;
3492 while ((tor = tr_torrentNext (session, tor)))
3494 int i;
3495 tr_swarm * s = tor->swarm;
3496 const int tn = tr_ptrArraySize (&s->peers);
3497 for (i=0; i<tn; ++i, ++n)
3499 peers[n] = tr_ptrArrayNth (&s->peers, i);
3500 swarms[n] = s;
3504 /* sort 'em */
3505 sortPeersByLiveliness (peers, (void**)swarms, n, now);
3507 /* cull out the crappiest */
3508 while (n-- > max)
3509 closePeer (swarms[n], peers[n]);
3511 /* cleanup */
3512 tr_free (swarms);
3513 tr_free (peers);
3517 static void makeNewPeerConnections (tr_peerMgr * mgr, const int max);
3519 static void
3520 reconnectPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3522 tr_torrent * tor;
3523 tr_peerMgr * mgr = vmgr;
3524 const time_t now_sec = tr_time ();
3525 const uint64_t now_msec = tr_time_msec ();
3528 *** enforce the per-session and per-torrent peer limits
3531 /* if we're over the per-torrent peer limits, cull some peers */
3532 tor = NULL;
3533 while ((tor = tr_torrentNext (mgr->session, tor)))
3534 if (tor->isRunning)
3535 enforceTorrentPeerLimit (tor->swarm, now_msec);
3537 /* if we're over the per-session peer limits, cull some peers */
3538 enforceSessionPeerLimit (mgr->session, now_msec);
3540 /* remove crappy peers */
3541 tor = NULL;
3542 while ((tor = tr_torrentNext (mgr->session, tor)))
3543 if (!tor->swarm->isRunning)
3544 removeAllPeers (tor->swarm);
3545 else
3546 closeBadPeers (tor->swarm, now_sec);
3548 /* try to make new peer connections */
3549 makeNewPeerConnections (mgr, MAX_CONNECTIONS_PER_PULSE);
3552 /****
3553 *****
3554 ***** BANDWIDTH ALLOCATION
3555 *****
3556 ****/
3558 static void
3559 pumpAllPeers (tr_peerMgr * mgr)
3561 tr_torrent * tor = NULL;
3563 while ((tor = tr_torrentNext (mgr->session, tor)))
3565 int j;
3566 tr_swarm * s = tor->swarm;
3568 for (j=0; j<tr_ptrArraySize (&s->peers); ++j)
3569 tr_peerMsgsPulse (tr_ptrArrayNth (&s->peers, j));
3574 static void
3575 queuePulseForeach (void * vtor)
3577 tr_torrent * tor = vtor;
3579 tr_torrentStartNow (tor);
3581 if (tor->queue_started_callback != NULL)
3582 (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3585 static void
3586 queuePulse (tr_session * session, tr_direction dir)
3588 assert (tr_isSession (session));
3589 assert (tr_isDirection (dir));
3591 if (tr_sessionGetQueueEnabled (session, dir))
3593 tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3595 tr_sessionGetNextQueuedTorrents (session,
3596 dir,
3597 tr_sessionCountQueueFreeSlots (session, dir),
3598 &torrents);
3600 tr_ptrArrayForeach (&torrents, queuePulseForeach);
3602 tr_ptrArrayDestruct (&torrents, NULL);
3606 static void
3607 bandwidthPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3609 tr_torrent * tor;
3610 tr_peerMgr * mgr = vmgr;
3611 tr_session * session = mgr->session;
3612 managerLock (mgr);
3614 /* FIXME: this next line probably isn't necessary... */
3615 pumpAllPeers (mgr);
3617 /* allocate bandwidth to the peers */
3618 tr_bandwidthAllocate (&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3619 tr_bandwidthAllocate (&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3621 /* torrent upkeep */
3622 tor = NULL;
3623 while ((tor = tr_torrentNext (session, tor)))
3625 /* possibly stop torrents that have seeded enough */
3626 tr_torrentCheckSeedLimit (tor);
3628 /* run the completeness check for any torrents that need it */
3629 if (tor->swarm->needsCompletenessCheck)
3631 tor->swarm->needsCompletenessCheck = false;
3632 tr_torrentRecheckCompleteness (tor);
3635 /* stop torrents that are ready to stop, but couldn't be stopped
3636 earlier during the peer-io callback call chain */
3637 if (tor->isStopping)
3638 tr_torrentStop (tor);
3640 /* update the torrent's stats */
3641 tor->swarm->stats.activeWebseedCount = countActiveWebseeds (tor->swarm);
3644 /* pump the queues */
3645 queuePulse (session, TR_UP);
3646 queuePulse (session, TR_DOWN);
3648 reconnectPulse (0, 0, mgr);
3650 tr_timerAddMsec (mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3651 managerUnlock (mgr);
3654 /***
3655 ****
3656 ***/
3658 static int
3659 compareAtomPtrsByAddress (const void * va, const void *vb)
3661 const struct peer_atom * a = * (const struct peer_atom**) va;
3662 const struct peer_atom * b = * (const struct peer_atom**) vb;
3664 assert (tr_isAtom (a));
3665 assert (tr_isAtom (b));
3667 return tr_address_compare (&a->addr, &b->addr);
3670 /* best come first, worst go last */
3671 static int
3672 compareAtomPtrsByShelfDate (const void * va, const void *vb)
3674 time_t atime;
3675 time_t btime;
3676 const struct peer_atom * a = * (const struct peer_atom**) va;
3677 const struct peer_atom * b = * (const struct peer_atom**) vb;
3678 const int data_time_cutoff_secs = 60 * 60;
3679 const time_t tr_now = tr_time ();
3681 assert (tr_isAtom (a));
3682 assert (tr_isAtom (b));
3684 /* primary key: the last piece data time *if* it was within the last hour */
3685 atime = a->piece_data_time; if (atime + data_time_cutoff_secs < tr_now) atime = 0;
3686 btime = b->piece_data_time; if (btime + data_time_cutoff_secs < tr_now) btime = 0;
3687 if (atime != btime)
3688 return atime > btime ? -1 : 1;
3690 /* secondary key: shelf date. */
3691 if (a->shelf_date != b->shelf_date)
3692 return a->shelf_date > b->shelf_date ? -1 : 1;
3694 return 0;
3697 static int
3698 getMaxAtomCount (const tr_torrent * tor)
3700 return MIN (50, tor->maxConnectedPeers * 3);
3703 static void
3704 atomPulse (evutil_socket_t foo UNUSED, short bar UNUSED, void * vmgr)
3706 tr_torrent * tor = NULL;
3707 tr_peerMgr * mgr = vmgr;
3708 managerLock (mgr);
3710 while ((tor = tr_torrentNext (mgr->session, tor)))
3712 int atomCount;
3713 tr_swarm * s = tor->swarm;
3714 const int maxAtomCount = getMaxAtomCount (tor);
3715 struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek (&s->pool, &atomCount);
3717 if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3719 int i;
3720 int keepCount = 0;
3721 int testCount = 0;
3722 struct peer_atom ** keep = tr_new (struct peer_atom*, atomCount);
3723 struct peer_atom ** test = tr_new (struct peer_atom*, atomCount);
3725 /* keep the ones that are in use */
3726 for (i=0; i<atomCount; ++i)
3728 struct peer_atom * atom = atoms[i];
3729 if (peerIsInUse (s, atom))
3730 keep[keepCount++] = atom;
3731 else
3732 test[testCount++] = atom;
3735 /* if there's room, keep the best of what's left */
3736 i = 0;
3737 if (keepCount < maxAtomCount)
3739 qsort (test, testCount, sizeof (struct peer_atom *), compareAtomPtrsByShelfDate);
3740 while (i<testCount && keepCount<maxAtomCount)
3741 keep[keepCount++] = test[i++];
3744 /* free the culled atoms */
3745 while (i<testCount)
3746 tr_free (test[i++]);
3748 /* rebuild Torrent.pool with what's left */
3749 tr_ptrArrayDestruct (&s->pool, NULL);
3750 s->pool = TR_PTR_ARRAY_INIT;
3751 qsort (keep, keepCount, sizeof (struct peer_atom *), compareAtomPtrsByAddress);
3752 for (i=0; i<keepCount; ++i)
3753 tr_ptrArrayAppend (&s->pool, keep[i]);
3755 tordbg (s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
3757 /* cleanup */
3758 tr_free (test);
3759 tr_free (keep);
3763 tr_timerAddMsec (mgr->atomTimer, ATOM_PERIOD_MSEC);
3764 managerUnlock (mgr);
3767 /***
3768 ****
3769 ****
3770 ****
3771 ***/
3773 /* is this atom someone that we'd want to initiate a connection to? */
3774 static bool
3775 isPeerCandidate (const tr_torrent * tor, struct peer_atom * atom, const time_t now)
3777 /* not if we're both seeds */
3778 if (tr_torrentIsSeed (tor) && atomIsSeed (atom))
3779 return false;
3781 /* not if we've already got a connection to them... */
3782 if (peerIsInUse (tor->swarm, atom))
3783 return false;
3785 /* not if we just tried them already */
3786 if ((now - atom->time) < getReconnectIntervalSecs (atom, now))
3787 return false;
3789 /* not if they're blocklisted */
3790 if (isAtomBlocklisted (tor->session, atom))
3791 return false;
3793 /* not if they're banned... */
3794 if (atom->flags2 & MYFLAG_BANNED)
3795 return false;
3797 return true;
3800 struct peer_candidate
3802 uint64_t score;
3803 tr_torrent * tor;
3804 struct peer_atom * atom;
3807 static bool
3808 torrentWasRecentlyStarted (const tr_torrent * tor)
3810 return difftime (tr_time (), tor->startDate) < 120;
3813 static inline uint64_t
3814 addValToKey (uint64_t value, int width, uint64_t addme)
3816 value = (value << (uint64_t)width);
3817 value |= addme;
3818 return value;
3821 /* smaller value is better */
3822 static uint64_t
3823 getPeerCandidateScore (const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt)
3825 uint64_t i;
3826 uint64_t score = 0;
3827 const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3829 /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3830 i = failed ? 1 : 0;
3831 score = addValToKey (score, 1, i);
3833 /* prefer the one we attempted least recently (to cycle through all peers) */
3834 i = atom->lastConnectionAttemptAt;
3835 score = addValToKey (score, 32, i);
3837 /* prefer peers belonging to a torrent of a higher priority */
3838 switch (tr_torrentGetPriority (tor))
3840 case TR_PRI_HIGH: i = 0; break;
3841 case TR_PRI_NORMAL: i = 1; break;
3842 case TR_PRI_LOW: i = 2; break;
3844 score = addValToKey (score, 4, i);
3846 /* prefer recently-started torrents */
3847 i = torrentWasRecentlyStarted (tor) ? 0 : 1;
3848 score = addValToKey (score, 1, i);
3850 /* prefer torrents we're downloading with */
3851 i = tr_torrentIsSeed (tor) ? 1 : 0;
3852 score = addValToKey (score, 1, i);
3854 /* prefer peers that are known to be connectible */
3855 i = (atom->flags & ADDED_F_CONNECTABLE) ? 0 : 1;
3856 score = addValToKey (score, 1, i);
3858 /* prefer peers that we might have a chance of uploading to...
3859 so lower seed probability is better */
3860 if (atom->seedProbability == 100) i = 101;
3861 else if (atom->seedProbability == -1) i = 100;
3862 else i = atom->seedProbability;
3863 score = addValToKey (score, 8, i);
3865 /* Prefer peers that we got from more trusted sources.
3866 * lower `fromBest' values indicate more trusted sources */
3867 score = addValToKey (score, 4, atom->fromBest);
3869 /* salt */
3870 score = addValToKey (score, 8, salt);
3872 return score;
3875 static int
3876 comparePeerCandidates (const void * va, const void * vb)
3878 int ret;
3879 const struct peer_candidate * a = va;
3880 const struct peer_candidate * b = vb;
3882 if (a->score < b->score)
3883 ret = -1;
3884 else if (a->score > b->score)
3885 ret = 1;
3886 else
3887 ret = 0;
3889 return ret;
3892 /* Partial sorting -- selecting the k best candidates
3893 Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3894 static void
3895 selectPeerCandidates (struct peer_candidate * candidates, int candidate_count, int select_count)
3897 tr_quickfindFirstK (candidates,
3898 candidate_count,
3899 sizeof(struct peer_candidate),
3900 comparePeerCandidates,
3901 select_count);
3904 #ifndef NDEBUG
3905 static bool
3906 checkBestScoresComeFirst (const struct peer_candidate * candidates, int n, int k)
3908 int i;
3909 uint64_t worstFirstScore = 0;
3910 const int x = MIN (n, k) - 1;
3912 for (i=0; i<x; i++)
3913 if (worstFirstScore < candidates[i].score)
3914 worstFirstScore = candidates[i].score;
3916 for (i=0; i<x; i++)
3917 assert (candidates[i].score <= worstFirstScore);
3919 for (i=x+1; i<n; i++)
3920 assert (candidates[i].score >= worstFirstScore);
3922 return true;
3924 #endif /* NDEBUG */
3926 /** @return an array of all the atoms we might want to connect to */
3927 static struct peer_candidate*
3928 getPeerCandidates (tr_session * session, int * candidateCount, int max)
3930 int atomCount;
3931 int peerCount;
3932 tr_torrent * tor;
3933 struct peer_candidate * candidates;
3934 struct peer_candidate * walk;
3935 const time_t now = tr_time ();
3936 const uint64_t now_msec = tr_time_msec ();
3937 /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3938 const int maxCandidates = tr_sessionGetPeerLimit (session) * 0.95;
3940 /* count how many peers and atoms we've got */
3941 tor= NULL;
3942 atomCount = 0;
3943 peerCount = 0;
3944 while ((tor = tr_torrentNext (session, tor)))
3946 atomCount += tr_ptrArraySize (&tor->swarm->pool);
3947 peerCount += tr_ptrArraySize (&tor->swarm->peers);
3950 /* don't start any new handshakes if we're full up */
3951 if (maxCandidates <= peerCount)
3953 *candidateCount = 0;
3954 return NULL;
3957 /* allocate an array of candidates */
3958 walk = candidates = tr_new (struct peer_candidate, atomCount);
3960 /* populate the candidate array */
3961 tor = NULL;
3962 while ((tor = tr_torrentNext (session, tor)))
3964 int i, nAtoms;
3965 struct peer_atom ** atoms;
3967 if (!tor->swarm->isRunning)
3968 continue;
3970 /* if we've already got enough peers in this torrent... */
3971 if (tr_torrentGetPeerLimit (tor) <= tr_ptrArraySize (&tor->swarm->peers))
3972 continue;
3974 /* if we've already got enough speed in this torrent... */
3975 if (tr_torrentIsSeed (tor) && isBandwidthMaxedOut (&tor->bandwidth, now_msec, TR_UP))
3976 continue;
3978 atoms = (struct peer_atom**) tr_ptrArrayPeek (&tor->swarm->pool, &nAtoms);
3979 for (i=0; i<nAtoms; ++i)
3981 struct peer_atom * atom = atoms[i];
3983 if (isPeerCandidate (tor, atom, now))
3985 const uint8_t salt = tr_cryptoWeakRandInt (1024);
3986 walk->tor = tor;
3987 walk->atom = atom;
3988 walk->score = getPeerCandidateScore (tor, atom, salt);
3989 ++walk;
3994 *candidateCount = walk - candidates;
3995 if (walk != candidates)
3996 selectPeerCandidates (candidates, walk-candidates, max);
3998 assert (checkBestScoresComeFirst (candidates, *candidateCount, max));
3999 return candidates;
4002 static void
4003 initiateConnection (tr_peerMgr * mgr, tr_swarm * s, struct peer_atom * atom)
4005 tr_peerIo * io;
4006 const time_t now = tr_time ();
4007 bool utp = tr_sessionIsUTPEnabled (mgr->session) && !atom->utp_failed;
4009 if (atom->fromFirst == TR_PEER_FROM_PEX)
4010 /* PEX has explicit signalling for uTP support. If an atom
4011 originally came from PEX and doesn't have the uTP flag, skip the
4012 uTP connection attempt. Are we being optimistic here? */
4013 utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4015 tordbg (s, "Starting an OUTGOING%s connection with %s",
4016 utp ? " µTP" : "", tr_atomAddrStr (atom));
4018 io = tr_peerIoNewOutgoing (mgr->session,
4019 &mgr->session->bandwidth,
4020 &atom->addr,
4021 atom->port,
4022 s->tor->info.hash,
4023 s->tor->completeness == TR_SEED,
4024 utp);
4026 if (io == NULL)
4028 tordbg (s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr (atom));
4029 atom->flags2 |= MYFLAG_UNREACHABLE;
4030 atom->numFails++;
4032 else
4034 tr_handshake * handshake = tr_handshakeNew (io,
4035 mgr->session->encryptionMode,
4036 myHandshakeDoneCB,
4037 mgr);
4039 assert (tr_peerIoGetTorrentHash (io));
4041 tr_peerIoUnref (io); /* balanced by the initial ref
4042 in tr_peerIoNewOutgoing () */
4044 tr_ptrArrayInsertSorted (&s->outgoingHandshakes, handshake,
4045 handshakeCompare);
4048 atom->lastConnectionAttemptAt = now;
4049 atom->time = now;
4052 static void
4053 initiateCandidateConnection (tr_peerMgr * mgr, struct peer_candidate * c)
4055 #if 0
4056 fprintf (stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4057 tr_atomAddrStr (c->atom),
4058 tr_torrentName (c->tor),
4059 (int)c->atom->seedProbability,
4060 tr_torrentIsPrivate (c->tor) ? "private" : "public",
4061 tr_torrentIsSeed (c->tor) ? "seed" : "downloader");
4062 #endif
4064 initiateConnection (mgr, c->tor->swarm, c->atom);
4067 static void
4068 makeNewPeerConnections (struct tr_peerMgr * mgr, const int max)
4070 int i, n;
4071 struct peer_candidate * candidates;
4073 candidates = getPeerCandidates (mgr->session, &n, max);
4075 for (i=0; i<n && i<max; ++i)
4076 initiateCandidateConnection (mgr, &candidates[i]);
4078 tr_free (candidates);