Transmission 2.33
[tomato.git] / release / src / router / transmission / libtransmission / peer-msgs.c
blob796071019c3f7ab0792d394caffe23025475a1e8
1 /*
2 * This file Copyright (C) Mnemosyne LLC
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
10 * $Id: peer-msgs.c 12555 2011-07-17 18:11:34Z jordan $
13 #include <assert.h>
14 #include <errno.h>
15 #include <stdarg.h>
16 #include <stdlib.h>
17 #include <string.h>
19 #include <alloca.h>
21 #include <event2/buffer.h>
22 #include <event2/bufferevent.h>
23 #include <event2/event.h>
25 #include "transmission.h"
26 #include "bencode.h"
27 #include "cache.h"
28 #include "completion.h"
29 #include "crypto.h" /* tr_sha1() */
30 #include "peer-io.h"
31 #include "peer-mgr.h"
32 #include "peer-msgs.h"
33 #include "session.h"
34 #include "torrent.h"
35 #include "torrent-magnet.h"
36 #include "tr-dht.h"
37 #include "utils.h"
38 #include "version.h"
40 /**
41 ***
42 **/
44 enum
46 BT_CHOKE = 0,
47 BT_UNCHOKE = 1,
48 BT_INTERESTED = 2,
49 BT_NOT_INTERESTED = 3,
50 BT_HAVE = 4,
51 BT_BITFIELD = 5,
52 BT_REQUEST = 6,
53 BT_PIECE = 7,
54 BT_CANCEL = 8,
55 BT_PORT = 9,
57 BT_FEXT_SUGGEST = 13,
58 BT_FEXT_HAVE_ALL = 14,
59 BT_FEXT_HAVE_NONE = 15,
60 BT_FEXT_REJECT = 16,
61 BT_FEXT_ALLOWED_FAST = 17,
63 BT_LTEP = 20,
65 LTEP_HANDSHAKE = 0,
67 UT_PEX_ID = 1,
68 UT_METADATA_ID = 3,
70 MAX_PEX_PEER_COUNT = 50,
72 MIN_CHOKE_PERIOD_SEC = 10,
74 /* idle seconds before we send a keepalive */
75 KEEPALIVE_INTERVAL_SECS = 100,
77 PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */
79 REQQ = 512,
81 METADATA_REQQ = 64,
83 /* used in lowering the outMessages queue period */
84 IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
85 HIGH_PRIORITY_INTERVAL_SECS = 2,
86 LOW_PRIORITY_INTERVAL_SECS = 10,
88 /* number of pieces we'll allow in our fast set */
89 MAX_FAST_SET_SIZE = 3,
91 /* defined in BEP #9 */
92 METADATA_MSG_TYPE_REQUEST = 0,
93 METADATA_MSG_TYPE_DATA = 1,
94 METADATA_MSG_TYPE_REJECT = 2
97 enum
99 AWAITING_BT_LENGTH,
100 AWAITING_BT_ID,
101 AWAITING_BT_MESSAGE,
102 AWAITING_BT_PIECE
109 struct peer_request
111 uint32_t index;
112 uint32_t offset;
113 uint32_t length;
116 static void
117 blockToReq( const tr_torrent * tor,
118 tr_block_index_t block,
119 struct peer_request * setme )
121 tr_torrentGetBlockLocation( tor, block, &setme->index,
122 &setme->offset,
123 &setme->length );
130 /* this is raw, unchanged data from the peer regarding
131 * the current message that it's sending us. */
132 struct tr_incoming
134 uint8_t id;
135 uint32_t length; /* includes the +1 for id length */
136 struct peer_request blockReq; /* metadata for incoming blocks */
137 struct evbuffer * block; /* piece data for incoming blocks */
141 * Low-level communication state information about a connected peer.
143 * This structure remembers the low-level protocol states that we're
144 * in with this peer, such as active requests, pex messages, and so on.
145 * Its fields are all private to peer-msgs.c.
147 * Data not directly involved with sending & receiving messages is
148 * stored in tr_peer, where it can be accessed by both peermsgs and
149 * the peer manager.
151 * @see struct peer_atom
152 * @see tr_peer
154 struct tr_peermsgs
156 bool peerSupportsPex;
157 bool peerSupportsMetadataXfer;
158 bool clientSentLtepHandshake;
159 bool peerSentLtepHandshake;
161 /*bool haveFastSet;*/
163 int desiredRequestCount;
165 int prefetchCount;
167 /* how long the outMessages batch should be allowed to grow before
168 * it's flushed -- some messages (like requests >:) should be sent
169 * very quickly; others aren't as urgent. */
170 int8_t outMessagesBatchPeriod;
172 uint8_t state;
173 uint8_t ut_pex_id;
174 uint8_t ut_metadata_id;
175 uint16_t pexCount;
176 uint16_t pexCount6;
178 size_t metadata_size_hint;
179 #if 0
180 size_t fastsetSize;
181 tr_piece_index_t fastset[MAX_FAST_SET_SIZE];
182 #endif
184 tr_peer * peer;
186 tr_torrent * torrent;
188 tr_peer_callback * callback;
189 void * callbackData;
191 struct evbuffer * outMessages; /* all the non-piece messages */
193 struct peer_request peerAskedFor[REQQ];
195 int peerAskedForMetadata[METADATA_REQQ];
196 int peerAskedForMetadataCount;
198 tr_pex * pex;
199 tr_pex * pex6;
201 /*time_t clientSentPexAt;*/
202 time_t clientSentAnythingAt;
204 /* when we started batching the outMessages */
205 time_t outMessagesBatchedAt;
207 struct tr_incoming incoming;
209 /* if the peer supports the Extension Protocol in BEP 10 and
210 supplied a reqq argument, it's stored here. Otherwise, the
211 value is zero and should be ignored. */
212 int64_t reqq;
214 struct event * pexTimer;
221 static inline tr_session*
222 getSession( struct tr_peermsgs * msgs )
224 return msgs->torrent->session;
231 static void
232 myDebug( const char * file, int line,
233 const struct tr_peermsgs * msgs,
234 const char * fmt, ... )
236 FILE * fp = tr_getLog( );
238 if( fp )
240 va_list args;
241 char timestr[64];
242 struct evbuffer * buf = evbuffer_new( );
243 char * base = tr_basename( file );
245 evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
246 tr_getLogTimeStr( timestr, sizeof( timestr ) ),
247 tr_torrentName( msgs->torrent ),
248 tr_peerIoGetAddrStr( msgs->peer->io ),
249 msgs->peer->client );
250 va_start( args, fmt );
251 evbuffer_add_vprintf( buf, fmt, args );
252 va_end( args );
253 evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
254 fputs( (const char*)evbuffer_pullup( buf, -1 ), fp );
256 tr_free( base );
257 evbuffer_free( buf );
261 #define dbgmsg( msgs, ... ) \
262 do { \
263 if( tr_deepLoggingIsActive( ) ) \
264 myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
265 } while( 0 )
271 static void
272 pokeBatchPeriod( tr_peermsgs * msgs, int interval )
274 if( msgs->outMessagesBatchPeriod > interval )
276 msgs->outMessagesBatchPeriod = interval;
277 dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
281 static void
282 dbgOutMessageLen( tr_peermsgs * msgs )
284 dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) );
287 static void
288 protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
290 struct evbuffer * out = msgs->outMessages;
292 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
294 evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
295 evbuffer_add_uint8 ( out, BT_FEXT_REJECT );
296 evbuffer_add_uint32( out, req->index );
297 evbuffer_add_uint32( out, req->offset );
298 evbuffer_add_uint32( out, req->length );
300 dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
301 dbgOutMessageLen( msgs );
304 static void
305 protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
307 struct evbuffer * out = msgs->outMessages;
309 evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
310 evbuffer_add_uint8 ( out, BT_REQUEST );
311 evbuffer_add_uint32( out, req->index );
312 evbuffer_add_uint32( out, req->offset );
313 evbuffer_add_uint32( out, req->length );
315 dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
316 dbgOutMessageLen( msgs );
317 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
320 static void
321 protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
323 struct evbuffer * out = msgs->outMessages;
325 evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
326 evbuffer_add_uint8 ( out, BT_CANCEL );
327 evbuffer_add_uint32( out, req->index );
328 evbuffer_add_uint32( out, req->offset );
329 evbuffer_add_uint32( out, req->length );
331 dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
332 dbgOutMessageLen( msgs );
333 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
336 static void
337 protocolSendPort(tr_peermsgs *msgs, uint16_t port)
339 struct evbuffer * out = msgs->outMessages;
341 dbgmsg( msgs, "sending Port %u", port);
342 evbuffer_add_uint32( out, 3 );
343 evbuffer_add_uint8 ( out, BT_PORT );
344 evbuffer_add_uint16( out, port);
347 static void
348 protocolSendHave( tr_peermsgs * msgs, uint32_t index )
350 struct evbuffer * out = msgs->outMessages;
352 evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) );
353 evbuffer_add_uint8 ( out, BT_HAVE );
354 evbuffer_add_uint32( out, index );
356 dbgmsg( msgs, "sending Have %u", index );
357 dbgOutMessageLen( msgs );
358 pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
361 #if 0
362 static void
363 protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
365 tr_peerIo * io = msgs->peer->io;
366 struct evbuffer * out = msgs->outMessages;
368 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
370 evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
371 evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST );
372 evbuffer_add_uint32( io, out, pieceIndex );
374 dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
375 dbgOutMessageLen( msgs );
377 #endif
379 static void
380 protocolSendChoke( tr_peermsgs * msgs, int choke )
382 struct evbuffer * out = msgs->outMessages;
384 evbuffer_add_uint32( out, sizeof( uint8_t ) );
385 evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE );
387 dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
388 dbgOutMessageLen( msgs );
389 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
392 static void
393 protocolSendHaveAll( tr_peermsgs * msgs )
395 struct evbuffer * out = msgs->outMessages;
397 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
399 evbuffer_add_uint32( out, sizeof( uint8_t ) );
400 evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL );
402 dbgmsg( msgs, "sending HAVE_ALL..." );
403 dbgOutMessageLen( msgs );
404 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
407 static void
408 protocolSendHaveNone( tr_peermsgs * msgs )
410 struct evbuffer * out = msgs->outMessages;
412 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
414 evbuffer_add_uint32( out, sizeof( uint8_t ) );
415 evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE );
417 dbgmsg( msgs, "sending HAVE_NONE..." );
418 dbgOutMessageLen( msgs );
419 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
423 *** EVENTS
426 static void
427 publish( tr_peermsgs * msgs, tr_peer_event * e )
429 assert( msgs->peer );
430 assert( msgs->peer->msgs == msgs );
432 if( msgs->callback != NULL )
433 msgs->callback( msgs->peer, e, msgs->callbackData );
436 static void
437 fireError( tr_peermsgs * msgs, int err )
439 tr_peer_event e = TR_PEER_EVENT_INIT;
440 e.eventType = TR_PEER_ERROR;
441 e.err = err;
442 publish( msgs, &e );
445 static void
446 fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
448 tr_peer_event e = TR_PEER_EVENT_INIT;
449 e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
450 e.pieceIndex = req->index;
451 e.offset = req->offset;
452 e.length = req->length;
453 publish( msgs, &e );
456 static void
457 fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
459 tr_peer_event e = TR_PEER_EVENT_INIT;
460 e.eventType = TR_PEER_CLIENT_GOT_REJ;
461 e.pieceIndex = req->index;
462 e.offset = req->offset;
463 e.length = req->length;
464 publish( msgs, &e );
467 static void
468 fireGotChoke( tr_peermsgs * msgs )
470 tr_peer_event e = TR_PEER_EVENT_INIT;
471 e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
472 publish( msgs, &e );
475 static void
476 fireClientGotHaveAll( tr_peermsgs * msgs )
478 tr_peer_event e = TR_PEER_EVENT_INIT;
479 e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
480 publish( msgs, &e );
483 static void
484 fireClientGotHaveNone( tr_peermsgs * msgs )
486 tr_peer_event e = TR_PEER_EVENT_INIT;
487 e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
488 publish( msgs, &e );
491 static void
492 fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData )
494 tr_peer_event e = TR_PEER_EVENT_INIT;
496 e.length = length;
497 e.eventType = TR_PEER_CLIENT_GOT_DATA;
498 e.wasPieceData = wasPieceData;
499 publish( msgs, &e );
502 static void
503 fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
505 tr_peer_event e = TR_PEER_EVENT_INIT;
506 e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
507 e.pieceIndex = pieceIndex;
508 publish( msgs, &e );
511 static void
512 fireClientGotPort( tr_peermsgs * msgs, tr_port port )
514 tr_peer_event e = TR_PEER_EVENT_INIT;
515 e.eventType = TR_PEER_CLIENT_GOT_PORT;
516 e.port = port;
517 publish( msgs, &e );
520 static void
521 fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
523 tr_peer_event e = TR_PEER_EVENT_INIT;
524 e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
525 e.pieceIndex = pieceIndex;
526 publish( msgs, &e );
529 static void
530 fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield )
532 tr_peer_event e = TR_PEER_EVENT_INIT;
533 e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
534 e.bitfield = bitfield;
535 publish( msgs, &e );
538 static void
539 fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index )
541 tr_peer_event e = TR_PEER_EVENT_INIT;
542 e.eventType = TR_PEER_CLIENT_GOT_HAVE;
543 e.pieceIndex = index;
544 publish( msgs, &e );
547 static void
548 firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData )
550 tr_peer_event e = TR_PEER_EVENT_INIT;
552 e.length = length;
553 e.eventType = TR_PEER_PEER_GOT_DATA;
554 e.wasPieceData = wasPieceData;
556 publish( msgs, &e );
560 *** ALLOWED FAST SET
561 *** For explanation, see http://www.bittorrent.org/beps/bep_0006.html
564 #if 0
565 size_t
566 tr_generateAllowedSet( tr_piece_index_t * setmePieces,
567 size_t desiredSetSize,
568 size_t pieceCount,
569 const uint8_t * infohash,
570 const tr_address * addr )
572 size_t setSize = 0;
574 assert( setmePieces );
575 assert( desiredSetSize <= pieceCount );
576 assert( desiredSetSize );
577 assert( pieceCount );
578 assert( infohash );
579 assert( addr );
581 if( addr->type == TR_AF_INET )
583 uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
584 uint8_t x[SHA_DIGEST_LENGTH];
586 uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 ); /* (1) */
587 memcpy( w, &ui32, sizeof( uint32_t ) );
588 walk += sizeof( uint32_t );
589 memcpy( walk, infohash, SHA_DIGEST_LENGTH ); /* (2) */
590 walk += SHA_DIGEST_LENGTH;
591 tr_sha1( x, w, walk-w, NULL ); /* (3) */
592 assert( sizeof( w ) == walk-w );
594 while( setSize<desiredSetSize )
596 int i;
597 for( i=0; i<5 && setSize<desiredSetSize; ++i ) /* (4) */
599 size_t k;
600 uint32_t j = i * 4; /* (5) */
601 uint32_t y = ntohl( *( uint32_t* )( x + j ) ); /* (6) */
602 uint32_t index = y % pieceCount; /* (7) */
604 for( k=0; k<setSize; ++k ) /* (8) */
605 if( setmePieces[k] == index )
606 break;
608 if( k == setSize )
609 setmePieces[setSize++] = index; /* (9) */
612 tr_sha1( x, x, sizeof( x ), NULL ); /* (3) */
616 return setSize;
619 static void
620 updateFastSet( tr_peermsgs * msgs UNUSED )
622 const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
623 const int peerIsNeedy = msgs->peer->progress < 0.10;
625 if( fext && peerIsNeedy && !msgs->haveFastSet )
627 size_t i;
628 const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
629 const tr_info * inf = &msgs->torrent->info;
630 const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
632 /* build the fast set */
633 msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
634 msgs->haveFastSet = 1;
636 /* send it to the peer */
637 for( i=0; i<msgs->fastsetSize; ++i )
638 protocolSendAllowedFast( msgs, msgs->fastset[i] );
641 #endif
644 *** INTEREST
647 static void
648 sendInterest( tr_peermsgs * msgs, bool clientIsInterested )
650 struct evbuffer * out = msgs->outMessages;
652 assert( msgs );
653 assert( tr_isBool( clientIsInterested ) );
655 msgs->peer->clientIsInterested = clientIsInterested;
656 dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
657 evbuffer_add_uint32( out, sizeof( uint8_t ) );
658 evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
660 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
661 dbgOutMessageLen( msgs );
664 static void
665 updateInterest( tr_peermsgs * msgs UNUSED )
667 /* FIXME -- might need to poke the mgr on startup */
670 void
671 tr_peerMsgsSetInterested( tr_peermsgs * msgs, bool clientIsInterested )
673 assert( tr_isBool( clientIsInterested ) );
675 if( clientIsInterested != msgs->peer->clientIsInterested )
676 sendInterest( msgs, clientIsInterested );
679 static bool
680 popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
682 if( msgs->peerAskedForMetadataCount == 0 )
683 return false;
685 *piece = msgs->peerAskedForMetadata[0];
687 tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
688 msgs->peerAskedForMetadataCount-- );
690 return true;
693 static bool
694 popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
696 if( msgs->peer->pendingReqsToClient == 0 )
697 return false;
699 *setme = msgs->peerAskedFor[0];
701 tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
702 msgs->peer->pendingReqsToClient-- );
704 return true;
707 static void
708 cancelAllRequestsToClient( tr_peermsgs * msgs )
710 struct peer_request req;
711 const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
713 while( popNextRequest( msgs, &req ))
714 if( mustSendCancel )
715 protocolSendReject( msgs, &req );
718 void
719 tr_peerMsgsSetChoke( tr_peermsgs * msgs, bool peerIsChoked )
721 const time_t now = tr_time( );
722 const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
724 assert( msgs );
725 assert( msgs->peer );
726 assert( tr_isBool( peerIsChoked ) );
728 if( msgs->peer->chokeChangedAt > fibrillationTime )
730 dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", peerIsChoked );
732 else if( msgs->peer->peerIsChoked != peerIsChoked )
734 msgs->peer->peerIsChoked = peerIsChoked;
735 if( peerIsChoked )
736 cancelAllRequestsToClient( msgs );
737 protocolSendChoke( msgs, peerIsChoked );
738 msgs->peer->chokeChangedAt = now;
746 void
747 tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index )
749 protocolSendHave( msgs, index );
751 /* since we have more pieces now, we might not be interested in this peer */
752 updateInterest( msgs );
759 static bool
760 reqIsValid( const tr_peermsgs * peer,
761 uint32_t index,
762 uint32_t offset,
763 uint32_t length )
765 return tr_torrentReqIsValid( peer->torrent, index, offset, length );
768 static bool
769 requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
771 return reqIsValid( msgs, req->index, req->offset, req->length );
774 void
775 tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
777 struct peer_request req;
778 /*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
779 blockToReq( msgs->torrent, block, &req );
780 protocolSendCancel( msgs, &req );
787 static void
788 sendLtepHandshake( tr_peermsgs * msgs )
790 tr_benc val, *m;
791 bool allow_pex;
792 bool allow_metadata_xfer;
793 struct evbuffer * payload;
794 struct evbuffer * out = msgs->outMessages;
795 const unsigned char * ipv6 = tr_globalIPv6();
797 if( msgs->clientSentLtepHandshake )
798 return;
800 dbgmsg( msgs, "sending an ltep handshake" );
801 msgs->clientSentLtepHandshake = 1;
803 /* decide if we want to advertise metadata xfer support (BEP 9) */
804 if( tr_torrentIsPrivate( msgs->torrent ) )
805 allow_metadata_xfer = 0;
806 else
807 allow_metadata_xfer = 1;
809 /* decide if we want to advertise pex support */
810 if( !tr_torrentAllowsPex( msgs->torrent ) )
811 allow_pex = 0;
812 else if( msgs->peerSentLtepHandshake )
813 allow_pex = msgs->peerSupportsPex ? 1 : 0;
814 else
815 allow_pex = 1;
817 tr_bencInitDict( &val, 8 );
818 tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
819 if( ipv6 != NULL )
820 tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
821 if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
822 && ( msgs->torrent->infoDictLength > 0 ) )
823 tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
824 tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
825 tr_bencDictAddInt( &val, "reqq", REQQ );
826 tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
827 tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
828 m = tr_bencDictAddDict( &val, "m", 2 );
829 if( allow_metadata_xfer )
830 tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
831 if( allow_pex )
832 tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
834 payload = tr_bencToBuf( &val, TR_FMT_BENC );
836 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
837 evbuffer_add_uint8 ( out, BT_LTEP );
838 evbuffer_add_uint8 ( out, LTEP_HANDSHAKE );
839 evbuffer_add_buffer( out, payload );
840 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
841 dbgOutMessageLen( msgs );
843 /* cleanup */
844 evbuffer_free( payload );
845 tr_bencFree( &val );
848 static void
849 parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
851 int64_t i;
852 tr_benc val, * sub;
853 uint8_t * tmp = tr_new( uint8_t, len );
854 const uint8_t *addr;
855 size_t addr_len;
856 tr_pex pex;
857 int8_t seedProbability = -1;
859 memset( &pex, 0, sizeof( tr_pex ) );
861 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
862 msgs->peerSentLtepHandshake = 1;
864 if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
866 dbgmsg( msgs, "GET extended-handshake, couldn't get dictionary" );
867 tr_free( tmp );
868 return;
871 dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len, tmp );
873 /* does the peer prefer encrypted connections? */
874 if( tr_bencDictFindInt( &val, "e", &i ) ) {
875 msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
876 : ENCRYPTION_PREFERENCE_NO;
877 if( i )
878 pex.flags |= ADDED_F_ENCRYPTION_FLAG;
881 /* check supported messages for utorrent pex */
882 msgs->peerSupportsPex = 0;
883 msgs->peerSupportsMetadataXfer = 0;
885 if( tr_bencDictFindDict( &val, "m", &sub ) ) {
886 if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
887 msgs->peerSupportsPex = i != 0;
888 msgs->ut_pex_id = (uint8_t) i;
889 dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
891 if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
892 msgs->peerSupportsMetadataXfer = i != 0;
893 msgs->ut_metadata_id = (uint8_t) i;
894 dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
896 if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) {
897 /* Mysterious µTorrent extension that we don't grok. However,
898 it implies support for µTP, so use it to indicate that. */
899 tr_peerMgrSetUtpFailed( msgs->torrent,
900 tr_peerIoGetAddress( msgs->peer->io, NULL ),
901 false );
905 /* look for metainfo size (BEP 9) */
906 if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) {
907 tr_torrentSetMetadataSizeHint( msgs->torrent, i );
908 msgs->metadata_size_hint = (size_t) i;
911 /* look for upload_only (BEP 21) */
912 if( tr_bencDictFindInt( &val, "upload_only", &i ) )
913 seedProbability = i==0 ? 0 : 100;
915 /* get peer's listening port */
916 if( tr_bencDictFindInt( &val, "p", &i ) ) {
917 pex.port = htons( (uint16_t)i );
918 fireClientGotPort( msgs, pex.port );
919 dbgmsg( msgs, "peer's port is now %d", (int)i );
922 if( tr_peerIoIsIncoming( msgs->peer->io )
923 && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
924 && ( addr_len == 4 ) )
926 pex.addr.type = TR_AF_INET;
927 memcpy( &pex.addr.addr.addr4, addr, 4 );
928 tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
931 if( tr_peerIoIsIncoming( msgs->peer->io )
932 && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
933 && ( addr_len == 16 ) )
935 pex.addr.type = TR_AF_INET6;
936 memcpy( &pex.addr.addr.addr6, addr, 16 );
937 tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
940 /* get peer's maximum request queue size */
941 if( tr_bencDictFindInt( &val, "reqq", &i ) )
942 msgs->reqq = i;
944 tr_bencFree( &val );
945 tr_free( tmp );
948 static void
949 parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
951 tr_benc dict;
952 char * msg_end;
953 char * benc_end;
954 int64_t msg_type = -1;
955 int64_t piece = -1;
956 int64_t total_size = 0;
957 uint8_t * tmp = tr_new( uint8_t, msglen );
959 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
960 msg_end = (char*)tmp + msglen;
962 if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
964 tr_bencDictFindInt( &dict, "msg_type", &msg_type );
965 tr_bencDictFindInt( &dict, "piece", &piece );
966 tr_bencDictFindInt( &dict, "total_size", &total_size );
967 tr_bencFree( &dict );
970 dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
971 (int)msg_type, (int)piece, (int)total_size );
973 if( msg_type == METADATA_MSG_TYPE_REJECT )
975 /* NOOP */
978 if( ( msg_type == METADATA_MSG_TYPE_DATA )
979 && ( !tr_torrentHasMetadata( msgs->torrent ) )
980 && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
981 && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
983 const int pieceLen = msg_end - benc_end;
984 tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
987 if( msg_type == METADATA_MSG_TYPE_REQUEST )
989 if( ( piece >= 0 )
990 && tr_torrentHasMetadata( msgs->torrent )
991 && !tr_torrentIsPrivate( msgs->torrent )
992 && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
994 msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
996 else
998 tr_benc tmp;
999 struct evbuffer * payload;
1000 struct evbuffer * out = msgs->outMessages;
1002 /* build the rejection message */
1003 tr_bencInitDict( &tmp, 2 );
1004 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1005 tr_bencDictAddInt( &tmp, "piece", piece );
1006 payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1008 /* write it out as a LTEP message to our outMessages buffer */
1009 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
1010 evbuffer_add_uint8 ( out, BT_LTEP );
1011 evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1012 evbuffer_add_buffer( out, payload );
1013 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1014 dbgOutMessageLen( msgs );
1016 /* cleanup */
1017 evbuffer_free( payload );
1018 tr_bencFree( &tmp );
1022 tr_free( tmp );
1025 static void
1026 parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1028 int loaded = 0;
1029 uint8_t * tmp = tr_new( uint8_t, msglen );
1030 tr_benc val;
1031 tr_torrent * tor = msgs->torrent;
1032 const uint8_t * added;
1033 size_t added_len;
1035 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1037 if( tr_torrentAllowsPex( tor )
1038 && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1040 if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1042 tr_pex * pex;
1043 size_t i, n;
1044 size_t added_f_len = 0;
1045 const uint8_t * added_f = NULL;
1047 tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1048 pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1050 n = MIN( n, MAX_PEX_PEER_COUNT );
1051 for( i=0; i<n; ++i )
1053 int seedProbability = -1;
1054 if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1055 tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1058 tr_free( pex );
1061 if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1063 tr_pex * pex;
1064 size_t i, n;
1065 size_t added_f_len = 0;
1066 const uint8_t * added_f = NULL;
1068 tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1069 pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1071 n = MIN( n, MAX_PEX_PEER_COUNT );
1072 for( i=0; i<n; ++i )
1074 int seedProbability = -1;
1075 if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1076 tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1079 tr_free( pex );
1083 if( loaded )
1084 tr_bencFree( &val );
1085 tr_free( tmp );
1088 static void sendPex( tr_peermsgs * msgs );
1090 static void
1091 parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1093 uint8_t ltep_msgid;
1095 tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1096 msglen--;
1098 if( ltep_msgid == LTEP_HANDSHAKE )
1100 dbgmsg( msgs, "got ltep handshake" );
1101 parseLtepHandshake( msgs, msglen, inbuf );
1102 if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1104 sendLtepHandshake( msgs );
1105 sendPex( msgs );
1108 else if( ltep_msgid == UT_PEX_ID )
1110 dbgmsg( msgs, "got ut pex" );
1111 msgs->peerSupportsPex = 1;
1112 parseUtPex( msgs, msglen, inbuf );
1114 else if( ltep_msgid == UT_METADATA_ID )
1116 dbgmsg( msgs, "got ut metadata" );
1117 msgs->peerSupportsMetadataXfer = 1;
1118 parseUtMetadata( msgs, msglen, inbuf );
1120 else
1122 dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1123 evbuffer_drain( inbuf, msglen );
1127 static int
1128 readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1130 uint32_t len;
1132 if( inlen < sizeof( len ) )
1133 return READ_LATER;
1135 tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1137 if( len == 0 ) /* peer sent us a keepalive message */
1138 dbgmsg( msgs, "got KeepAlive" );
1139 else
1141 msgs->incoming.length = len;
1142 msgs->state = AWAITING_BT_ID;
1145 return READ_NOW;
1148 static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t );
1150 static int
1151 readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1153 uint8_t id;
1155 if( inlen < sizeof( uint8_t ) )
1156 return READ_LATER;
1158 tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1159 msgs->incoming.id = id;
1160 dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1162 if( id == BT_PIECE )
1164 msgs->state = AWAITING_BT_PIECE;
1165 return READ_NOW;
1167 else if( msgs->incoming.length != 1 )
1169 msgs->state = AWAITING_BT_MESSAGE;
1170 return READ_NOW;
1172 else return readBtMessage( msgs, inbuf, inlen - 1 );
1175 static void
1176 updatePeerProgress( tr_peermsgs * msgs )
1178 tr_peerUpdateProgress( msgs->torrent, msgs->peer );
1180 /*updateFastSet( msgs );*/
1181 updateInterest( msgs );
1184 static void
1185 prefetchPieces( tr_peermsgs *msgs )
1187 int i;
1189 if( !getSession(msgs)->isPrefetchEnabled )
1190 return;
1192 /* Maintain 12 prefetched blocks per unchoked peer */
1193 for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1195 const struct peer_request * req = msgs->peerAskedFor + i;
1196 if( requestIsValid( msgs, req ) )
1198 tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1199 ++msgs->prefetchCount;
1204 static void
1205 peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
1207 const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1208 const int reqIsValid = requestIsValid( msgs, req );
1209 const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1210 const int peerIsChoked = msgs->peer->peerIsChoked;
1212 int allow = false;
1214 if( !reqIsValid )
1215 dbgmsg( msgs, "rejecting an invalid request." );
1216 else if( !clientHasPiece )
1217 dbgmsg( msgs, "rejecting request for a piece we don't have." );
1218 else if( peerIsChoked )
1219 dbgmsg( msgs, "rejecting request from choked peer" );
1220 else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1221 dbgmsg( msgs, "rejecting request ... reqq is full" );
1222 else
1223 allow = true;
1225 if( allow ) {
1226 msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1227 prefetchPieces( msgs );
1228 } else if( fext ) {
1229 protocolSendReject( msgs, req );
1233 static bool
1234 messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1236 switch( id )
1238 case BT_CHOKE:
1239 case BT_UNCHOKE:
1240 case BT_INTERESTED:
1241 case BT_NOT_INTERESTED:
1242 case BT_FEXT_HAVE_ALL:
1243 case BT_FEXT_HAVE_NONE:
1244 return len == 1;
1246 case BT_HAVE:
1247 case BT_FEXT_SUGGEST:
1248 case BT_FEXT_ALLOWED_FAST:
1249 return len == 5;
1251 case BT_BITFIELD:
1252 if( tr_torrentHasMetadata( msg->torrent ) )
1253 return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1254 /* we don't know the piece count yet,
1255 so we can only guess whether to send true or false */
1256 if( msg->metadata_size_hint > 0 )
1257 return len <= msg->metadata_size_hint;
1258 return true;
1260 case BT_REQUEST:
1261 case BT_CANCEL:
1262 case BT_FEXT_REJECT:
1263 return len == 13;
1265 case BT_PIECE:
1266 return len > 9 && len <= 16393;
1268 case BT_PORT:
1269 return len == 3;
1271 case BT_LTEP:
1272 return len >= 2;
1274 default:
1275 return false;
1279 static int clientGotBlock( tr_peermsgs * msgs,
1280 struct evbuffer * block,
1281 const struct peer_request * req );
1283 static int
1284 readBtPiece( tr_peermsgs * msgs,
1285 struct evbuffer * inbuf,
1286 size_t inlen,
1287 size_t * setme_piece_bytes_read )
1289 struct peer_request * req = &msgs->incoming.blockReq;
1291 assert( evbuffer_get_length( inbuf ) >= inlen );
1292 dbgmsg( msgs, "In readBtPiece" );
1294 if( !req->length )
1296 if( inlen < 8 )
1297 return READ_LATER;
1299 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1300 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1301 req->length = msgs->incoming.length - 9;
1302 dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1303 return READ_NOW;
1305 else
1307 int err;
1308 size_t n;
1309 size_t nLeft;
1310 struct evbuffer * block_buffer;
1312 if( msgs->incoming.block == NULL )
1313 msgs->incoming.block = evbuffer_new( );
1314 block_buffer = msgs->incoming.block;
1316 /* read in another chunk of data */
1317 nLeft = req->length - evbuffer_get_length( block_buffer );
1318 n = MIN( nLeft, inlen );
1320 tr_peerIoReadBytesToBuf( msgs->peer->io, inbuf, block_buffer, n );
1322 fireClientGotData( msgs, n, true );
1323 *setme_piece_bytes_read += n;
1324 dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1325 n, req->index, req->offset, req->length,
1326 (int)( req->length - evbuffer_get_length( block_buffer ) ) );
1327 if( evbuffer_get_length( block_buffer ) < req->length )
1328 return READ_LATER;
1330 /* pass the block along... */
1331 err = clientGotBlock( msgs, block_buffer, req );
1332 evbuffer_drain( block_buffer, evbuffer_get_length( block_buffer ) );
1334 /* cleanup */
1335 req->length = 0;
1336 msgs->state = AWAITING_BT_LENGTH;
1337 return err ? READ_ERR : READ_NOW;
1341 static void updateDesiredRequestCount( tr_peermsgs * msgs );
1343 static int
1344 readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1346 uint32_t ui32;
1347 uint32_t msglen = msgs->incoming.length;
1348 const uint8_t id = msgs->incoming.id;
1349 #ifndef NDEBUG
1350 const size_t startBufLen = evbuffer_get_length( inbuf );
1351 #endif
1352 const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1354 --msglen; /* id length */
1356 dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1358 if( inlen < msglen )
1359 return READ_LATER;
1361 if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1363 dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1364 fireError( msgs, EMSGSIZE );
1365 return READ_ERR;
1368 switch( id )
1370 case BT_CHOKE:
1371 dbgmsg( msgs, "got Choke" );
1372 msgs->peer->clientIsChoked = 1;
1373 if( !fext )
1374 fireGotChoke( msgs );
1375 break;
1377 case BT_UNCHOKE:
1378 dbgmsg( msgs, "got Unchoke" );
1379 msgs->peer->clientIsChoked = 0;
1380 updateDesiredRequestCount( msgs );
1381 break;
1383 case BT_INTERESTED:
1384 dbgmsg( msgs, "got Interested" );
1385 msgs->peer->peerIsInterested = 1;
1386 break;
1388 case BT_NOT_INTERESTED:
1389 dbgmsg( msgs, "got Not Interested" );
1390 msgs->peer->peerIsInterested = 0;
1391 break;
1393 case BT_HAVE:
1394 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1395 dbgmsg( msgs, "got Have: %u", ui32 );
1396 if( tr_torrentHasMetadata( msgs->torrent )
1397 && ( ui32 >= msgs->torrent->info.pieceCount ) )
1399 fireError( msgs, ERANGE );
1400 return READ_ERR;
1403 /* a peer can send the same HAVE message twice... */
1404 if( !tr_bitfieldHas( &msgs->peer->have, ui32 ) ) {
1405 tr_bitfieldAdd( &msgs->peer->have, ui32 );
1406 fireClientGotHave( msgs, ui32 );
1408 updatePeerProgress( msgs );
1409 break;
1411 case BT_BITFIELD: {
1412 uint8_t * tmp = tr_new( uint8_t, msglen );
1413 dbgmsg( msgs, "got a bitfield" );
1414 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1415 tr_bitfieldSetRaw( &msgs->peer->have, tmp, msglen );
1416 fireClientGotBitfield( msgs, &msgs->peer->have );
1417 updatePeerProgress( msgs );
1418 tr_free( tmp );
1419 break;
1422 case BT_REQUEST:
1424 struct peer_request r;
1425 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1426 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1427 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1428 dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1429 peerMadeRequest( msgs, &r );
1430 break;
1433 case BT_CANCEL:
1435 int i;
1436 struct peer_request r;
1437 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1438 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1439 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1440 tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1441 dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1443 for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1444 const struct peer_request * req = msgs->peerAskedFor + i;
1445 if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1446 break;
1449 if( i < msgs->peer->pendingReqsToClient )
1450 tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1451 msgs->peer->pendingReqsToClient-- );
1452 break;
1455 case BT_PIECE:
1456 assert( 0 ); /* handled elsewhere! */
1457 break;
1459 case BT_PORT:
1460 dbgmsg( msgs, "Got a BT_PORT" );
1461 tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1462 if( msgs->peer->dht_port > 0 )
1463 tr_dhtAddNode( getSession(msgs),
1464 tr_peerAddress( msgs->peer ),
1465 msgs->peer->dht_port, 0 );
1466 break;
1468 case BT_FEXT_SUGGEST:
1469 dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1470 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1471 if( fext )
1472 fireClientGotSuggest( msgs, ui32 );
1473 else {
1474 fireError( msgs, EMSGSIZE );
1475 return READ_ERR;
1477 break;
1479 case BT_FEXT_ALLOWED_FAST:
1480 dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1481 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1482 if( fext )
1483 fireClientGotAllowedFast( msgs, ui32 );
1484 else {
1485 fireError( msgs, EMSGSIZE );
1486 return READ_ERR;
1488 break;
1490 case BT_FEXT_HAVE_ALL:
1491 dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1492 if( fext ) {
1493 tr_bitfieldSetHasAll( &msgs->peer->have );
1494 assert( tr_bitfieldHasAll( &msgs->peer->have ) );
1495 fireClientGotHaveAll( msgs );
1496 updatePeerProgress( msgs );
1497 } else {
1498 fireError( msgs, EMSGSIZE );
1499 return READ_ERR;
1501 break;
1503 case BT_FEXT_HAVE_NONE:
1504 dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1505 if( fext ) {
1506 tr_bitfieldSetHasNone( &msgs->peer->have );
1507 fireClientGotHaveNone( msgs );
1508 updatePeerProgress( msgs );
1509 } else {
1510 fireError( msgs, EMSGSIZE );
1511 return READ_ERR;
1513 break;
1515 case BT_FEXT_REJECT:
1517 struct peer_request r;
1518 dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1519 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1520 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1521 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1522 if( fext )
1523 fireGotRej( msgs, &r );
1524 else {
1525 fireError( msgs, EMSGSIZE );
1526 return READ_ERR;
1528 break;
1531 case BT_LTEP:
1532 dbgmsg( msgs, "Got a BT_LTEP" );
1533 parseLtep( msgs, msglen, inbuf );
1534 break;
1536 default:
1537 dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1538 tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1539 break;
1542 assert( msglen + 1 == msgs->incoming.length );
1543 assert( evbuffer_get_length( inbuf ) == startBufLen - msglen );
1545 msgs->state = AWAITING_BT_LENGTH;
1546 return READ_NOW;
1549 /* returns 0 on success, or an errno on failure */
1550 static int
1551 clientGotBlock( tr_peermsgs * msgs,
1552 struct evbuffer * data,
1553 const struct peer_request * req )
1555 int err;
1556 tr_torrent * tor = msgs->torrent;
1557 const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1559 assert( msgs );
1560 assert( req );
1562 if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1563 dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1564 tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1565 return EMSGSIZE;
1568 dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1570 if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1571 dbgmsg( msgs, "we didn't ask for this message..." );
1572 return 0;
1574 if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1575 dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1576 return 0;
1580 *** Save the block
1583 if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1584 return err;
1586 tr_bitfieldAdd( &msgs->peer->blame, req->index );
1587 fireGotBlock( msgs, req );
1588 return 0;
1591 static int peerPulse( void * vmsgs );
1593 static void
1594 didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1596 tr_peermsgs * msgs = vmsgs;
1597 firePeerGotData( msgs, bytesWritten, wasPieceData );
1599 if ( tr_isPeerIo( io ) && io->userData )
1600 peerPulse( msgs );
1603 static ReadState
1604 canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1606 ReadState ret;
1607 tr_peermsgs * msgs = vmsgs;
1608 struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1609 const size_t inlen = evbuffer_get_length( in );
1611 dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1613 if( !inlen )
1615 ret = READ_LATER;
1617 else if( msgs->state == AWAITING_BT_PIECE )
1619 ret = readBtPiece( msgs, in, inlen, piece );
1621 else switch( msgs->state )
1623 case AWAITING_BT_LENGTH:
1624 ret = readBtLength ( msgs, in, inlen ); break;
1626 case AWAITING_BT_ID:
1627 ret = readBtId ( msgs, in, inlen ); break;
1629 case AWAITING_BT_MESSAGE:
1630 ret = readBtMessage( msgs, in, inlen ); break;
1632 default:
1633 ret = READ_ERR;
1634 assert( 0 );
1637 dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1639 /* log the raw data that was read */
1640 if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) )
1641 fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false );
1643 return ret;
1647 tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1649 if( msgs->state != AWAITING_BT_PIECE )
1650 return false;
1652 return block == _tr_block( msgs->torrent,
1653 msgs->incoming.blockReq.index,
1654 msgs->incoming.blockReq.offset );
1661 static void
1662 updateDesiredRequestCount( tr_peermsgs * msgs )
1664 const tr_torrent * const torrent = msgs->torrent;
1666 /* there are lots of reasons we might not want to request any blocks... */
1667 if( tr_torrentIsSeed( torrent ) || !tr_torrentHasMetadata( torrent )
1668 || msgs->peer->clientIsChoked
1669 || !msgs->peer->clientIsInterested )
1671 msgs->desiredRequestCount = 0;
1673 else
1675 int estimatedBlocksInPeriod;
1676 int rate_Bps;
1677 int irate_Bps;
1678 const int floor = 4;
1679 const int seconds = REQUEST_BUF_SECS;
1680 const uint64_t now = tr_time_msec( );
1682 /* Get the rate limit we should use.
1683 * FIXME: this needs to consider all the other peers as well... */
1684 rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1685 if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1686 rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1688 /* honor the session limits, if enabled */
1689 if( tr_torrentUsesSessionLimits( torrent ) )
1690 if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1691 rate_Bps = MIN( rate_Bps, irate_Bps );
1693 /* use this desired rate to figure out how
1694 * many requests we should send to this peer */
1695 estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1696 msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1698 /* honor the peer's maximum request count, if specified */
1699 if( msgs->reqq > 0 )
1700 if( msgs->desiredRequestCount > msgs->reqq )
1701 msgs->desiredRequestCount = msgs->reqq;
1705 static void
1706 updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1708 int piece;
1710 if( msgs->peerSupportsMetadataXfer
1711 && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1713 tr_benc tmp;
1714 struct evbuffer * payload;
1715 struct evbuffer * out = msgs->outMessages;
1717 /* build the data message */
1718 tr_bencInitDict( &tmp, 3 );
1719 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1720 tr_bencDictAddInt( &tmp, "piece", piece );
1721 payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1723 dbgmsg( msgs, "requesting metadata piece #%d", piece );
1725 /* write it out as a LTEP message to our outMessages buffer */
1726 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
1727 evbuffer_add_uint8 ( out, BT_LTEP );
1728 evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1729 evbuffer_add_buffer( out, payload );
1730 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1731 dbgOutMessageLen( msgs );
1733 /* cleanup */
1734 evbuffer_free( payload );
1735 tr_bencFree( &tmp );
1739 static void
1740 updateBlockRequests( tr_peermsgs * msgs )
1742 if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1743 && ( msgs->desiredRequestCount > 0 )
1744 && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1746 int i;
1747 int n;
1748 const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1749 tr_block_index_t * blocks = alloca( sizeof( tr_block_index_t ) * numwant );
1751 tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n, false );
1753 for( i=0; i<n; ++i )
1755 struct peer_request req;
1756 blockToReq( msgs->torrent, blocks[i], &req );
1757 protocolSendRequest( msgs, &req );
1762 static size_t
1763 fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1765 int piece;
1766 size_t bytesWritten = 0;
1767 struct peer_request req;
1768 const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0;
1769 const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1772 *** Protocol messages
1775 if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1777 dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) );
1778 msgs->outMessagesBatchedAt = now;
1780 else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1782 const size_t len = evbuffer_get_length( msgs->outMessages );
1783 /* flush the protocol messages */
1784 dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1785 tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false );
1786 msgs->clientSentAnythingAt = now;
1787 msgs->outMessagesBatchedAt = 0;
1788 msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1789 bytesWritten += len;
1793 *** Metadata Pieces
1796 if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1797 && popNextMetadataRequest( msgs, &piece ) )
1799 char * data;
1800 int dataLen;
1801 bool ok = false;
1803 data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1804 if( ( dataLen > 0 ) && ( data != NULL ) )
1806 tr_benc tmp;
1807 struct evbuffer * payload;
1808 struct evbuffer * out = msgs->outMessages;
1810 /* build the data message */
1811 tr_bencInitDict( &tmp, 3 );
1812 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1813 tr_bencDictAddInt( &tmp, "piece", piece );
1814 tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1815 payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1817 /* write it out as a LTEP message to our outMessages buffer */
1818 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) + dataLen );
1819 evbuffer_add_uint8 ( out, BT_LTEP );
1820 evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1821 evbuffer_add_buffer( out, payload );
1822 evbuffer_add ( out, data, dataLen );
1823 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1824 dbgOutMessageLen( msgs );
1826 evbuffer_free( payload );
1827 tr_bencFree( &tmp );
1828 tr_free( data );
1830 ok = true;
1833 if( !ok ) /* send a rejection message */
1835 tr_benc tmp;
1836 struct evbuffer * payload;
1837 struct evbuffer * out = msgs->outMessages;
1839 /* build the rejection message */
1840 tr_bencInitDict( &tmp, 2 );
1841 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1842 tr_bencDictAddInt( &tmp, "piece", piece );
1843 payload = tr_bencToBuf( &tmp, TR_FMT_BENC );
1845 /* write it out as a LTEP message to our outMessages buffer */
1846 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
1847 evbuffer_add_uint8 ( out, BT_LTEP );
1848 evbuffer_add_uint8 ( out, msgs->ut_metadata_id );
1849 evbuffer_add_buffer( out, payload );
1850 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1851 dbgOutMessageLen( msgs );
1853 evbuffer_free( payload );
1854 tr_bencFree( &tmp );
1859 *** Data Blocks
1862 if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1863 && popNextRequest( msgs, &req ) )
1865 --msgs->prefetchCount;
1867 if( requestIsValid( msgs, &req )
1868 && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1870 int err;
1871 const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1872 struct evbuffer * out;
1873 struct evbuffer_iovec iovec[1];
1875 out = evbuffer_new( );
1876 evbuffer_expand( out, msglen );
1878 evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1879 evbuffer_add_uint8 ( out, BT_PIECE );
1880 evbuffer_add_uint32( out, req.index );
1881 evbuffer_add_uint32( out, req.offset );
1883 evbuffer_reserve_space( out, req.length, iovec, 1 );
1884 err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base );
1885 iovec[0].iov_len = req.length;
1886 evbuffer_commit_space( out, iovec, 1 );
1888 /* check the piece if it needs checking... */
1889 if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) )
1890 if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index )))
1891 tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index );
1893 if( err )
1895 if( fext )
1896 protocolSendReject( msgs, &req );
1898 else
1900 const size_t n = evbuffer_get_length( out );
1901 dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1902 assert( n == msglen );
1903 tr_peerIoWriteBuf( msgs->peer->io, out, true );
1904 bytesWritten += n;
1905 msgs->clientSentAnythingAt = now;
1906 tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1909 evbuffer_free( out );
1911 if( err )
1913 bytesWritten = 0;
1914 msgs = NULL;
1917 else if( fext ) /* peer needs a reject message */
1919 protocolSendReject( msgs, &req );
1922 if( msgs != NULL )
1923 prefetchPieces( msgs );
1927 *** Keepalive
1930 if( ( msgs != NULL )
1931 && ( msgs->clientSentAnythingAt != 0 )
1932 && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1934 dbgmsg( msgs, "sending a keepalive message" );
1935 evbuffer_add_uint32( msgs->outMessages, 0 );
1936 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1939 return bytesWritten;
1942 static int
1943 peerPulse( void * vmsgs )
1945 tr_peermsgs * msgs = vmsgs;
1946 const time_t now = tr_time( );
1948 if ( tr_isPeerIo( msgs->peer->io ) ) {
1949 updateDesiredRequestCount( msgs );
1950 updateBlockRequests( msgs );
1951 updateMetadataRequests( msgs, now );
1954 for( ;; )
1955 if( fillOutputBuffer( msgs, now ) < 1 )
1956 break;
1958 return true; /* loop forever */
1961 void
1962 tr_peerMsgsPulse( tr_peermsgs * msgs )
1964 if( msgs != NULL )
1965 peerPulse( msgs );
1968 static void
1969 gotError( tr_peerIo * io UNUSED, short what, void * vmsgs )
1971 if( what & BEV_EVENT_TIMEOUT )
1972 dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
1973 if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) )
1974 dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
1975 what, errno, tr_strerror( errno ) );
1976 fireError( vmsgs, ENOTCONN );
1979 static void
1980 sendBitfield( tr_peermsgs * msgs )
1982 size_t byte_count = 0;
1983 struct evbuffer * out = msgs->outMessages;
1984 void * bytes = tr_cpCreatePieceBitfield( &msgs->torrent->completion, &byte_count );
1986 evbuffer_add_uint32( out, sizeof( uint8_t ) + byte_count );
1987 evbuffer_add_uint8 ( out, BT_BITFIELD );
1988 evbuffer_add ( out, bytes, byte_count );
1989 dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) );
1990 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1992 tr_free( bytes );
1995 static void
1996 tellPeerWhatWeHave( tr_peermsgs * msgs )
1998 const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2000 if( fext && tr_cpHasAll( &msgs->torrent->completion ) )
2002 protocolSendHaveAll( msgs );
2004 else if( fext && tr_cpHasNone( &msgs->torrent->completion ) )
2006 protocolSendHaveNone( msgs );
2008 else if( !tr_cpHasNone( &msgs->torrent->completion ) )
2010 sendBitfield( msgs );
2018 /* some peers give us error messages if we send
2019 more than this many peers in a single pex message
2020 http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2021 #define MAX_PEX_ADDED 50
2022 #define MAX_PEX_DROPPED 50
2024 typedef struct
2026 tr_pex * added;
2027 tr_pex * dropped;
2028 tr_pex * elements;
2029 int addedCount;
2030 int droppedCount;
2031 int elementCount;
2033 PexDiffs;
2035 static void
2036 pexAddedCb( void * vpex, void * userData )
2038 PexDiffs * diffs = userData;
2039 tr_pex * pex = vpex;
2041 if( diffs->addedCount < MAX_PEX_ADDED )
2043 diffs->added[diffs->addedCount++] = *pex;
2044 diffs->elements[diffs->elementCount++] = *pex;
2048 static inline void
2049 pexDroppedCb( void * vpex, void * userData )
2051 PexDiffs * diffs = userData;
2052 tr_pex * pex = vpex;
2054 if( diffs->droppedCount < MAX_PEX_DROPPED )
2056 diffs->dropped[diffs->droppedCount++] = *pex;
2060 static inline void
2061 pexElementCb( void * vpex, void * userData )
2063 PexDiffs * diffs = userData;
2064 tr_pex * pex = vpex;
2066 diffs->elements[diffs->elementCount++] = *pex;
2069 typedef void ( tr_set_func )( void * element, void * userData );
2072 * @brief find the differences and commonalities in two sorted sets
2073 * @param a the first set
2074 * @param aCount the number of elements in the set 'a'
2075 * @param b the second set
2076 * @param bCount the number of elements in the set 'b'
2077 * @param compare the sorting method for both sets
2078 * @param elementSize the sizeof the element in the two sorted sets
2079 * @param in_a called for items in set 'a' but not set 'b'
2080 * @param in_b called for items in set 'b' but not set 'a'
2081 * @param in_both called for items that are in both sets
2082 * @param userData user data passed along to in_a, in_b, and in_both
2084 static void
2085 tr_set_compare( const void * va, size_t aCount,
2086 const void * vb, size_t bCount,
2087 int compare( const void * a, const void * b ),
2088 size_t elementSize,
2089 tr_set_func in_a_cb,
2090 tr_set_func in_b_cb,
2091 tr_set_func in_both_cb,
2092 void * userData )
2094 const uint8_t * a = va;
2095 const uint8_t * b = vb;
2096 const uint8_t * aend = a + elementSize * aCount;
2097 const uint8_t * bend = b + elementSize * bCount;
2099 while( a != aend || b != bend )
2101 if( a == aend )
2103 ( *in_b_cb )( (void*)b, userData );
2104 b += elementSize;
2106 else if( b == bend )
2108 ( *in_a_cb )( (void*)a, userData );
2109 a += elementSize;
2111 else
2113 const int val = ( *compare )( a, b );
2115 if( !val )
2117 ( *in_both_cb )( (void*)a, userData );
2118 a += elementSize;
2119 b += elementSize;
2121 else if( val < 0 )
2123 ( *in_a_cb )( (void*)a, userData );
2124 a += elementSize;
2126 else if( val > 0 )
2128 ( *in_b_cb )( (void*)b, userData );
2129 b += elementSize;
2136 static void
2137 sendPex( tr_peermsgs * msgs )
2139 if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2141 PexDiffs diffs;
2142 PexDiffs diffs6;
2143 tr_pex * newPex = NULL;
2144 tr_pex * newPex6 = NULL;
2145 const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2146 const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2148 /* build the diffs */
2149 diffs.added = tr_new( tr_pex, newCount );
2150 diffs.addedCount = 0;
2151 diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2152 diffs.droppedCount = 0;
2153 diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2154 diffs.elementCount = 0;
2155 tr_set_compare( msgs->pex, msgs->pexCount,
2156 newPex, newCount,
2157 tr_pexCompare, sizeof( tr_pex ),
2158 pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2159 diffs6.added = tr_new( tr_pex, newCount6 );
2160 diffs6.addedCount = 0;
2161 diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2162 diffs6.droppedCount = 0;
2163 diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2164 diffs6.elementCount = 0;
2165 tr_set_compare( msgs->pex6, msgs->pexCount6,
2166 newPex6, newCount6,
2167 tr_pexCompare, sizeof( tr_pex ),
2168 pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2169 dbgmsg(
2170 msgs,
2171 "pex: old peer count %d+%d, new peer count %d+%d, "
2172 "added %d+%d, removed %d+%d",
2173 msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2174 diffs.addedCount, diffs6.addedCount,
2175 diffs.droppedCount, diffs6.droppedCount );
2177 if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2178 !diffs6.droppedCount )
2180 tr_free( diffs.elements );
2181 tr_free( diffs6.elements );
2183 else
2185 int i;
2186 tr_benc val;
2187 uint8_t * tmp, *walk;
2188 struct evbuffer * payload;
2189 struct evbuffer * out = msgs->outMessages;
2191 /* update peer */
2192 tr_free( msgs->pex );
2193 msgs->pex = diffs.elements;
2194 msgs->pexCount = diffs.elementCount;
2195 tr_free( msgs->pex6 );
2196 msgs->pex6 = diffs6.elements;
2197 msgs->pexCount6 = diffs6.elementCount;
2199 /* build the pex payload */
2200 tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2201 * speed vs. likelihood? */
2203 if( diffs.addedCount > 0)
2205 /* "added" */
2206 tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2207 for( i = 0; i < diffs.addedCount; ++i ) {
2208 memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2209 memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2211 assert( ( walk - tmp ) == diffs.addedCount * 6 );
2212 tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2213 tr_free( tmp );
2215 /* "added.f"
2216 * unset each holepunch flag because we don't support it. */
2217 tmp = walk = tr_new( uint8_t, diffs.addedCount );
2218 for( i = 0; i < diffs.addedCount; ++i )
2219 *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2220 assert( ( walk - tmp ) == diffs.addedCount );
2221 tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2222 tr_free( tmp );
2225 if( diffs.droppedCount > 0 )
2227 /* "dropped" */
2228 tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2229 for( i = 0; i < diffs.droppedCount; ++i ) {
2230 memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2231 memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2233 assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2234 tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2235 tr_free( tmp );
2238 if( diffs6.addedCount > 0 )
2240 /* "added6" */
2241 tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2242 for( i = 0; i < diffs6.addedCount; ++i ) {
2243 memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2244 walk += 16;
2245 memcpy( walk, &diffs6.added[i].port, 2 );
2246 walk += 2;
2248 assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2249 tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2250 tr_free( tmp );
2252 /* "added6.f"
2253 * unset each holepunch flag because we don't support it. */
2254 tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2255 for( i = 0; i < diffs6.addedCount; ++i )
2256 *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2257 assert( ( walk - tmp ) == diffs6.addedCount );
2258 tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2259 tr_free( tmp );
2262 if( diffs6.droppedCount > 0 )
2264 /* "dropped6" */
2265 tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2266 for( i = 0; i < diffs6.droppedCount; ++i ) {
2267 memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2268 walk += 16;
2269 memcpy( walk, &diffs6.dropped[i].port, 2 );
2270 walk += 2;
2272 assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2273 tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2274 tr_free( tmp );
2277 /* write the pex message */
2278 payload = tr_bencToBuf( &val, TR_FMT_BENC );
2279 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) );
2280 evbuffer_add_uint8 ( out, BT_LTEP );
2281 evbuffer_add_uint8 ( out, msgs->ut_pex_id );
2282 evbuffer_add_buffer( out, payload );
2283 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2284 dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) );
2285 dbgOutMessageLen( msgs );
2287 evbuffer_free( payload );
2288 tr_bencFree( &val );
2291 /* cleanup */
2292 tr_free( diffs.added );
2293 tr_free( diffs.dropped );
2294 tr_free( newPex );
2295 tr_free( diffs6.added );
2296 tr_free( diffs6.dropped );
2297 tr_free( newPex6 );
2299 /*msgs->clientSentPexAt = tr_time( );*/
2303 static void
2304 pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2306 struct tr_peermsgs * msgs = vmsgs;
2308 sendPex( msgs );
2310 assert( msgs->pexTimer != NULL );
2311 tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2318 tr_peermsgs*
2319 tr_peerMsgsNew( struct tr_torrent * torrent,
2320 struct tr_peer * peer,
2321 tr_peer_callback * callback,
2322 void * callbackData )
2324 tr_peermsgs * m;
2326 assert( peer );
2327 assert( peer->io );
2329 m = tr_new0( tr_peermsgs, 1 );
2330 m->callback = callback;
2331 m->callbackData = callbackData;
2332 m->peer = peer;
2333 m->torrent = torrent;
2334 m->peer->clientIsChoked = 1;
2335 m->peer->peerIsChoked = 1;
2336 m->peer->clientIsInterested = 0;
2337 m->peer->peerIsInterested = 0;
2338 m->state = AWAITING_BT_LENGTH;
2339 m->outMessages = evbuffer_new( );
2340 m->outMessagesBatchedAt = 0;
2341 m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2342 peer->msgs = m;
2344 if( tr_torrentAllowsPex( torrent ) ) {
2345 m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m );
2346 tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 );
2349 if( tr_peerIoSupportsUTP( peer->io ) ) {
2350 const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL );
2351 tr_peerMgrSetUtpSupported( torrent, addr );
2352 tr_peerMgrSetUtpFailed( torrent, addr, false );
2355 if( tr_peerIoSupportsLTEP( peer->io ) )
2356 sendLtepHandshake( m );
2358 tellPeerWhatWeHave( m );
2360 if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2362 /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2363 const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2364 if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2365 protocolSendPort( m, tr_dhtPort( torrent->session ) );
2369 tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2370 updateDesiredRequestCount( m );
2372 return m;
2375 void
2376 tr_peerMsgsFree( tr_peermsgs* msgs )
2378 if( msgs )
2380 if( msgs->pexTimer != NULL )
2381 event_free( msgs->pexTimer );
2383 if( msgs->incoming.block != NULL )
2384 evbuffer_free( msgs->incoming.block );
2386 evbuffer_free( msgs->outMessages );
2387 tr_free( msgs->pex6 );
2388 tr_free( msgs->pex );
2390 memset( msgs, ~0, sizeof( tr_peermsgs ) );
2391 tr_free( msgs );