2 * This file Copyright (C) Mnemosyne LLC
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
10 * $Id: peer-msgs.c 12555 2011-07-17 18:11:34Z jordan $
21 #include <event2/buffer.h>
22 #include <event2/bufferevent.h>
23 #include <event2/event.h>
25 #include "transmission.h"
28 #include "completion.h"
29 #include "crypto.h" /* tr_sha1() */
32 #include "peer-msgs.h"
35 #include "torrent-magnet.h"
49 BT_NOT_INTERESTED
= 3,
58 BT_FEXT_HAVE_ALL
= 14,
59 BT_FEXT_HAVE_NONE
= 15,
61 BT_FEXT_ALLOWED_FAST
= 17,
70 MAX_PEX_PEER_COUNT
= 50,
72 MIN_CHOKE_PERIOD_SEC
= 10,
74 /* idle seconds before we send a keepalive */
75 KEEPALIVE_INTERVAL_SECS
= 100,
77 PEX_INTERVAL_SECS
= 90, /* sec between sendPex() calls */
83 /* used in lowering the outMessages queue period */
84 IMMEDIATE_PRIORITY_INTERVAL_SECS
= 0,
85 HIGH_PRIORITY_INTERVAL_SECS
= 2,
86 LOW_PRIORITY_INTERVAL_SECS
= 10,
88 /* number of pieces we'll allow in our fast set */
89 MAX_FAST_SET_SIZE
= 3,
91 /* defined in BEP #9 */
92 METADATA_MSG_TYPE_REQUEST
= 0,
93 METADATA_MSG_TYPE_DATA
= 1,
94 METADATA_MSG_TYPE_REJECT
= 2
117 blockToReq( const tr_torrent
* tor
,
118 tr_block_index_t block
,
119 struct peer_request
* setme
)
121 tr_torrentGetBlockLocation( tor
, block
, &setme
->index
,
130 /* this is raw, unchanged data from the peer regarding
131 * the current message that it's sending us. */
135 uint32_t length
; /* includes the +1 for id length */
136 struct peer_request blockReq
; /* metadata for incoming blocks */
137 struct evbuffer
* block
; /* piece data for incoming blocks */
141 * Low-level communication state information about a connected peer.
143 * This structure remembers the low-level protocol states that we're
144 * in with this peer, such as active requests, pex messages, and so on.
145 * Its fields are all private to peer-msgs.c.
147 * Data not directly involved with sending & receiving messages is
148 * stored in tr_peer, where it can be accessed by both peermsgs and
151 * @see struct peer_atom
156 bool peerSupportsPex
;
157 bool peerSupportsMetadataXfer
;
158 bool clientSentLtepHandshake
;
159 bool peerSentLtepHandshake
;
161 /*bool haveFastSet;*/
163 int desiredRequestCount
;
167 /* how long the outMessages batch should be allowed to grow before
168 * it's flushed -- some messages (like requests >:) should be sent
169 * very quickly; others aren't as urgent. */
170 int8_t outMessagesBatchPeriod
;
174 uint8_t ut_metadata_id
;
178 size_t metadata_size_hint
;
181 tr_piece_index_t fastset
[MAX_FAST_SET_SIZE
];
186 tr_torrent
* torrent
;
188 tr_peer_callback
* callback
;
191 struct evbuffer
* outMessages
; /* all the non-piece messages */
193 struct peer_request peerAskedFor
[REQQ
];
195 int peerAskedForMetadata
[METADATA_REQQ
];
196 int peerAskedForMetadataCount
;
201 /*time_t clientSentPexAt;*/
202 time_t clientSentAnythingAt
;
204 /* when we started batching the outMessages */
205 time_t outMessagesBatchedAt
;
207 struct tr_incoming incoming
;
209 /* if the peer supports the Extension Protocol in BEP 10 and
210 supplied a reqq argument, it's stored here. Otherwise, the
211 value is zero and should be ignored. */
214 struct event
* pexTimer
;
221 static inline tr_session
*
222 getSession( struct tr_peermsgs
* msgs
)
224 return msgs
->torrent
->session
;
232 myDebug( const char * file
, int line
,
233 const struct tr_peermsgs
* msgs
,
234 const char * fmt
, ... )
236 FILE * fp
= tr_getLog( );
242 struct evbuffer
* buf
= evbuffer_new( );
243 char * base
= tr_basename( file
);
245 evbuffer_add_printf( buf
, "[%s] %s - %s [%s]: ",
246 tr_getLogTimeStr( timestr
, sizeof( timestr
) ),
247 tr_torrentName( msgs
->torrent
),
248 tr_peerIoGetAddrStr( msgs
->peer
->io
),
249 msgs
->peer
->client
);
250 va_start( args
, fmt
);
251 evbuffer_add_vprintf( buf
, fmt
, args
);
253 evbuffer_add_printf( buf
, " (%s:%d)\n", base
, line
);
254 fputs( (const char*)evbuffer_pullup( buf
, -1 ), fp
);
257 evbuffer_free( buf
);
261 #define dbgmsg( msgs, ... ) \
263 if( tr_deepLoggingIsActive( ) ) \
264 myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
272 pokeBatchPeriod( tr_peermsgs
* msgs
, int interval
)
274 if( msgs
->outMessagesBatchPeriod
> interval
)
276 msgs
->outMessagesBatchPeriod
= interval
;
277 dbgmsg( msgs
, "lowering batch interval to %d seconds", interval
);
282 dbgOutMessageLen( tr_peermsgs
* msgs
)
284 dbgmsg( msgs
, "outMessage size is now %zu", evbuffer_get_length( msgs
->outMessages
) );
288 protocolSendReject( tr_peermsgs
* msgs
, const struct peer_request
* req
)
290 struct evbuffer
* out
= msgs
->outMessages
;
292 assert( tr_peerIoSupportsFEXT( msgs
->peer
->io
) );
294 evbuffer_add_uint32( out
, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
295 evbuffer_add_uint8 ( out
, BT_FEXT_REJECT
);
296 evbuffer_add_uint32( out
, req
->index
);
297 evbuffer_add_uint32( out
, req
->offset
);
298 evbuffer_add_uint32( out
, req
->length
);
300 dbgmsg( msgs
, "rejecting %u:%u->%u...", req
->index
, req
->offset
, req
->length
);
301 dbgOutMessageLen( msgs
);
305 protocolSendRequest( tr_peermsgs
* msgs
, const struct peer_request
* req
)
307 struct evbuffer
* out
= msgs
->outMessages
;
309 evbuffer_add_uint32( out
, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
310 evbuffer_add_uint8 ( out
, BT_REQUEST
);
311 evbuffer_add_uint32( out
, req
->index
);
312 evbuffer_add_uint32( out
, req
->offset
);
313 evbuffer_add_uint32( out
, req
->length
);
315 dbgmsg( msgs
, "requesting %u:%u->%u...", req
->index
, req
->offset
, req
->length
);
316 dbgOutMessageLen( msgs
);
317 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
321 protocolSendCancel( tr_peermsgs
* msgs
, const struct peer_request
* req
)
323 struct evbuffer
* out
= msgs
->outMessages
;
325 evbuffer_add_uint32( out
, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
326 evbuffer_add_uint8 ( out
, BT_CANCEL
);
327 evbuffer_add_uint32( out
, req
->index
);
328 evbuffer_add_uint32( out
, req
->offset
);
329 evbuffer_add_uint32( out
, req
->length
);
331 dbgmsg( msgs
, "cancelling %u:%u->%u...", req
->index
, req
->offset
, req
->length
);
332 dbgOutMessageLen( msgs
);
333 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
337 protocolSendPort(tr_peermsgs
*msgs
, uint16_t port
)
339 struct evbuffer
* out
= msgs
->outMessages
;
341 dbgmsg( msgs
, "sending Port %u", port
);
342 evbuffer_add_uint32( out
, 3 );
343 evbuffer_add_uint8 ( out
, BT_PORT
);
344 evbuffer_add_uint16( out
, port
);
348 protocolSendHave( tr_peermsgs
* msgs
, uint32_t index
)
350 struct evbuffer
* out
= msgs
->outMessages
;
352 evbuffer_add_uint32( out
, sizeof(uint8_t) + sizeof(uint32_t) );
353 evbuffer_add_uint8 ( out
, BT_HAVE
);
354 evbuffer_add_uint32( out
, index
);
356 dbgmsg( msgs
, "sending Have %u", index
);
357 dbgOutMessageLen( msgs
);
358 pokeBatchPeriod( msgs
, LOW_PRIORITY_INTERVAL_SECS
);
363 protocolSendAllowedFast( tr_peermsgs
* msgs
, uint32_t pieceIndex
)
365 tr_peerIo
* io
= msgs
->peer
->io
;
366 struct evbuffer
* out
= msgs
->outMessages
;
368 assert( tr_peerIoSupportsFEXT( msgs
->peer
->io
) );
370 evbuffer_add_uint32( io
, out
, sizeof(uint8_t) + sizeof(uint32_t) );
371 evbuffer_add_uint8 ( io
, out
, BT_FEXT_ALLOWED_FAST
);
372 evbuffer_add_uint32( io
, out
, pieceIndex
);
374 dbgmsg( msgs
, "sending Allowed Fast %u...", pieceIndex
);
375 dbgOutMessageLen( msgs
);
380 protocolSendChoke( tr_peermsgs
* msgs
, int choke
)
382 struct evbuffer
* out
= msgs
->outMessages
;
384 evbuffer_add_uint32( out
, sizeof( uint8_t ) );
385 evbuffer_add_uint8 ( out
, choke
? BT_CHOKE
: BT_UNCHOKE
);
387 dbgmsg( msgs
, "sending %s...", choke
? "Choke" : "Unchoke" );
388 dbgOutMessageLen( msgs
);
389 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
393 protocolSendHaveAll( tr_peermsgs
* msgs
)
395 struct evbuffer
* out
= msgs
->outMessages
;
397 assert( tr_peerIoSupportsFEXT( msgs
->peer
->io
) );
399 evbuffer_add_uint32( out
, sizeof( uint8_t ) );
400 evbuffer_add_uint8 ( out
, BT_FEXT_HAVE_ALL
);
402 dbgmsg( msgs
, "sending HAVE_ALL..." );
403 dbgOutMessageLen( msgs
);
404 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
408 protocolSendHaveNone( tr_peermsgs
* msgs
)
410 struct evbuffer
* out
= msgs
->outMessages
;
412 assert( tr_peerIoSupportsFEXT( msgs
->peer
->io
) );
414 evbuffer_add_uint32( out
, sizeof( uint8_t ) );
415 evbuffer_add_uint8 ( out
, BT_FEXT_HAVE_NONE
);
417 dbgmsg( msgs
, "sending HAVE_NONE..." );
418 dbgOutMessageLen( msgs
);
419 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
427 publish( tr_peermsgs
* msgs
, tr_peer_event
* e
)
429 assert( msgs
->peer
);
430 assert( msgs
->peer
->msgs
== msgs
);
432 if( msgs
->callback
!= NULL
)
433 msgs
->callback( msgs
->peer
, e
, msgs
->callbackData
);
437 fireError( tr_peermsgs
* msgs
, int err
)
439 tr_peer_event e
= TR_PEER_EVENT_INIT
;
440 e
.eventType
= TR_PEER_ERROR
;
446 fireGotBlock( tr_peermsgs
* msgs
, const struct peer_request
* req
)
448 tr_peer_event e
= TR_PEER_EVENT_INIT
;
449 e
.eventType
= TR_PEER_CLIENT_GOT_BLOCK
;
450 e
.pieceIndex
= req
->index
;
451 e
.offset
= req
->offset
;
452 e
.length
= req
->length
;
457 fireGotRej( tr_peermsgs
* msgs
, const struct peer_request
* req
)
459 tr_peer_event e
= TR_PEER_EVENT_INIT
;
460 e
.eventType
= TR_PEER_CLIENT_GOT_REJ
;
461 e
.pieceIndex
= req
->index
;
462 e
.offset
= req
->offset
;
463 e
.length
= req
->length
;
468 fireGotChoke( tr_peermsgs
* msgs
)
470 tr_peer_event e
= TR_PEER_EVENT_INIT
;
471 e
.eventType
= TR_PEER_CLIENT_GOT_CHOKE
;
476 fireClientGotHaveAll( tr_peermsgs
* msgs
)
478 tr_peer_event e
= TR_PEER_EVENT_INIT
;
479 e
.eventType
= TR_PEER_CLIENT_GOT_HAVE_ALL
;
484 fireClientGotHaveNone( tr_peermsgs
* msgs
)
486 tr_peer_event e
= TR_PEER_EVENT_INIT
;
487 e
.eventType
= TR_PEER_CLIENT_GOT_HAVE_NONE
;
492 fireClientGotData( tr_peermsgs
* msgs
, uint32_t length
, int wasPieceData
)
494 tr_peer_event e
= TR_PEER_EVENT_INIT
;
497 e
.eventType
= TR_PEER_CLIENT_GOT_DATA
;
498 e
.wasPieceData
= wasPieceData
;
503 fireClientGotSuggest( tr_peermsgs
* msgs
, uint32_t pieceIndex
)
505 tr_peer_event e
= TR_PEER_EVENT_INIT
;
506 e
.eventType
= TR_PEER_CLIENT_GOT_SUGGEST
;
507 e
.pieceIndex
= pieceIndex
;
512 fireClientGotPort( tr_peermsgs
* msgs
, tr_port port
)
514 tr_peer_event e
= TR_PEER_EVENT_INIT
;
515 e
.eventType
= TR_PEER_CLIENT_GOT_PORT
;
521 fireClientGotAllowedFast( tr_peermsgs
* msgs
, uint32_t pieceIndex
)
523 tr_peer_event e
= TR_PEER_EVENT_INIT
;
524 e
.eventType
= TR_PEER_CLIENT_GOT_ALLOWED_FAST
;
525 e
.pieceIndex
= pieceIndex
;
530 fireClientGotBitfield( tr_peermsgs
* msgs
, tr_bitfield
* bitfield
)
532 tr_peer_event e
= TR_PEER_EVENT_INIT
;
533 e
.eventType
= TR_PEER_CLIENT_GOT_BITFIELD
;
534 e
.bitfield
= bitfield
;
539 fireClientGotHave( tr_peermsgs
* msgs
, tr_piece_index_t index
)
541 tr_peer_event e
= TR_PEER_EVENT_INIT
;
542 e
.eventType
= TR_PEER_CLIENT_GOT_HAVE
;
543 e
.pieceIndex
= index
;
548 firePeerGotData( tr_peermsgs
* msgs
, uint32_t length
, bool wasPieceData
)
550 tr_peer_event e
= TR_PEER_EVENT_INIT
;
553 e
.eventType
= TR_PEER_PEER_GOT_DATA
;
554 e
.wasPieceData
= wasPieceData
;
561 *** For explanation, see http://www.bittorrent.org/beps/bep_0006.html
566 tr_generateAllowedSet( tr_piece_index_t
* setmePieces
,
567 size_t desiredSetSize
,
569 const uint8_t * infohash
,
570 const tr_address
* addr
)
574 assert( setmePieces
);
575 assert( desiredSetSize
<= pieceCount
);
576 assert( desiredSetSize
);
577 assert( pieceCount
);
581 if( addr
->type
== TR_AF_INET
)
583 uint8_t w
[SHA_DIGEST_LENGTH
+ 4], *walk
=w
;
584 uint8_t x
[SHA_DIGEST_LENGTH
];
586 uint32_t ui32
= ntohl( htonl( addr
->addr
.addr4
.s_addr
) & 0xffffff00 ); /* (1) */
587 memcpy( w
, &ui32
, sizeof( uint32_t ) );
588 walk
+= sizeof( uint32_t );
589 memcpy( walk
, infohash
, SHA_DIGEST_LENGTH
); /* (2) */
590 walk
+= SHA_DIGEST_LENGTH
;
591 tr_sha1( x
, w
, walk
-w
, NULL
); /* (3) */
592 assert( sizeof( w
) == walk
-w
);
594 while( setSize
<desiredSetSize
)
597 for( i
=0; i
<5 && setSize
<desiredSetSize
; ++i
) /* (4) */
600 uint32_t j
= i
* 4; /* (5) */
601 uint32_t y
= ntohl( *( uint32_t* )( x
+ j
) ); /* (6) */
602 uint32_t index
= y
% pieceCount
; /* (7) */
604 for( k
=0; k
<setSize
; ++k
) /* (8) */
605 if( setmePieces
[k
] == index
)
609 setmePieces
[setSize
++] = index
; /* (9) */
612 tr_sha1( x
, x
, sizeof( x
), NULL
); /* (3) */
620 updateFastSet( tr_peermsgs
* msgs UNUSED
)
622 const bool fext
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
623 const int peerIsNeedy
= msgs
->peer
->progress
< 0.10;
625 if( fext
&& peerIsNeedy
&& !msgs
->haveFastSet
)
628 const struct tr_address
* addr
= tr_peerIoGetAddress( msgs
->peer
->io
, NULL
);
629 const tr_info
* inf
= &msgs
->torrent
->info
;
630 const size_t numwant
= MIN( MAX_FAST_SET_SIZE
, inf
->pieceCount
);
632 /* build the fast set */
633 msgs
->fastsetSize
= tr_generateAllowedSet( msgs
->fastset
, numwant
, inf
->pieceCount
, inf
->hash
, addr
);
634 msgs
->haveFastSet
= 1;
636 /* send it to the peer */
637 for( i
=0; i
<msgs
->fastsetSize
; ++i
)
638 protocolSendAllowedFast( msgs
, msgs
->fastset
[i
] );
648 sendInterest( tr_peermsgs
* msgs
, bool clientIsInterested
)
650 struct evbuffer
* out
= msgs
->outMessages
;
653 assert( tr_isBool( clientIsInterested
) );
655 msgs
->peer
->clientIsInterested
= clientIsInterested
;
656 dbgmsg( msgs
, "Sending %s", clientIsInterested
? "Interested" : "Not Interested" );
657 evbuffer_add_uint32( out
, sizeof( uint8_t ) );
658 evbuffer_add_uint8 ( out
, clientIsInterested
? BT_INTERESTED
: BT_NOT_INTERESTED
);
660 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
661 dbgOutMessageLen( msgs
);
665 updateInterest( tr_peermsgs
* msgs UNUSED
)
667 /* FIXME -- might need to poke the mgr on startup */
671 tr_peerMsgsSetInterested( tr_peermsgs
* msgs
, bool clientIsInterested
)
673 assert( tr_isBool( clientIsInterested
) );
675 if( clientIsInterested
!= msgs
->peer
->clientIsInterested
)
676 sendInterest( msgs
, clientIsInterested
);
680 popNextMetadataRequest( tr_peermsgs
* msgs
, int * piece
)
682 if( msgs
->peerAskedForMetadataCount
== 0 )
685 *piece
= msgs
->peerAskedForMetadata
[0];
687 tr_removeElementFromArray( msgs
->peerAskedForMetadata
, 0, sizeof( int ),
688 msgs
->peerAskedForMetadataCount
-- );
694 popNextRequest( tr_peermsgs
* msgs
, struct peer_request
* setme
)
696 if( msgs
->peer
->pendingReqsToClient
== 0 )
699 *setme
= msgs
->peerAskedFor
[0];
701 tr_removeElementFromArray( msgs
->peerAskedFor
, 0, sizeof( struct peer_request
),
702 msgs
->peer
->pendingReqsToClient
-- );
708 cancelAllRequestsToClient( tr_peermsgs
* msgs
)
710 struct peer_request req
;
711 const int mustSendCancel
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
713 while( popNextRequest( msgs
, &req
))
715 protocolSendReject( msgs
, &req
);
719 tr_peerMsgsSetChoke( tr_peermsgs
* msgs
, bool peerIsChoked
)
721 const time_t now
= tr_time( );
722 const time_t fibrillationTime
= now
- MIN_CHOKE_PERIOD_SEC
;
725 assert( msgs
->peer
);
726 assert( tr_isBool( peerIsChoked
) );
728 if( msgs
->peer
->chokeChangedAt
> fibrillationTime
)
730 dbgmsg( msgs
, "Not changing choke to %d to avoid fibrillation", peerIsChoked
);
732 else if( msgs
->peer
->peerIsChoked
!= peerIsChoked
)
734 msgs
->peer
->peerIsChoked
= peerIsChoked
;
736 cancelAllRequestsToClient( msgs
);
737 protocolSendChoke( msgs
, peerIsChoked
);
738 msgs
->peer
->chokeChangedAt
= now
;
747 tr_peerMsgsHave( tr_peermsgs
* msgs
, uint32_t index
)
749 protocolSendHave( msgs
, index
);
751 /* since we have more pieces now, we might not be interested in this peer */
752 updateInterest( msgs
);
760 reqIsValid( const tr_peermsgs
* peer
,
765 return tr_torrentReqIsValid( peer
->torrent
, index
, offset
, length
);
769 requestIsValid( const tr_peermsgs
* msgs
, const struct peer_request
* req
)
771 return reqIsValid( msgs
, req
->index
, req
->offset
, req
->length
);
775 tr_peerMsgsCancel( tr_peermsgs
* msgs
, tr_block_index_t block
)
777 struct peer_request req
;
778 /*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
779 blockToReq( msgs
->torrent
, block
, &req
);
780 protocolSendCancel( msgs
, &req
);
788 sendLtepHandshake( tr_peermsgs
* msgs
)
792 bool allow_metadata_xfer
;
793 struct evbuffer
* payload
;
794 struct evbuffer
* out
= msgs
->outMessages
;
795 const unsigned char * ipv6
= tr_globalIPv6();
797 if( msgs
->clientSentLtepHandshake
)
800 dbgmsg( msgs
, "sending an ltep handshake" );
801 msgs
->clientSentLtepHandshake
= 1;
803 /* decide if we want to advertise metadata xfer support (BEP 9) */
804 if( tr_torrentIsPrivate( msgs
->torrent
) )
805 allow_metadata_xfer
= 0;
807 allow_metadata_xfer
= 1;
809 /* decide if we want to advertise pex support */
810 if( !tr_torrentAllowsPex( msgs
->torrent
) )
812 else if( msgs
->peerSentLtepHandshake
)
813 allow_pex
= msgs
->peerSupportsPex
? 1 : 0;
817 tr_bencInitDict( &val
, 8 );
818 tr_bencDictAddInt( &val
, "e", getSession(msgs
)->encryptionMode
!= TR_CLEAR_PREFERRED
);
820 tr_bencDictAddRaw( &val
, "ipv6", ipv6
, 16 );
821 if( allow_metadata_xfer
&& tr_torrentHasMetadata( msgs
->torrent
)
822 && ( msgs
->torrent
->infoDictLength
> 0 ) )
823 tr_bencDictAddInt( &val
, "metadata_size", msgs
->torrent
->infoDictLength
);
824 tr_bencDictAddInt( &val
, "p", tr_sessionGetPublicPeerPort( getSession(msgs
) ) );
825 tr_bencDictAddInt( &val
, "reqq", REQQ
);
826 tr_bencDictAddInt( &val
, "upload_only", tr_torrentIsSeed( msgs
->torrent
) );
827 tr_bencDictAddStr( &val
, "v", TR_NAME
" " USERAGENT_PREFIX
);
828 m
= tr_bencDictAddDict( &val
, "m", 2 );
829 if( allow_metadata_xfer
)
830 tr_bencDictAddInt( m
, "ut_metadata", UT_METADATA_ID
);
832 tr_bencDictAddInt( m
, "ut_pex", UT_PEX_ID
);
834 payload
= tr_bencToBuf( &val
, TR_FMT_BENC
);
836 evbuffer_add_uint32( out
, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload
) );
837 evbuffer_add_uint8 ( out
, BT_LTEP
);
838 evbuffer_add_uint8 ( out
, LTEP_HANDSHAKE
);
839 evbuffer_add_buffer( out
, payload
);
840 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
841 dbgOutMessageLen( msgs
);
844 evbuffer_free( payload
);
849 parseLtepHandshake( tr_peermsgs
* msgs
, int len
, struct evbuffer
* inbuf
)
853 uint8_t * tmp
= tr_new( uint8_t, len
);
857 int8_t seedProbability
= -1;
859 memset( &pex
, 0, sizeof( tr_pex
) );
861 tr_peerIoReadBytes( msgs
->peer
->io
, inbuf
, tmp
, len
);
862 msgs
->peerSentLtepHandshake
= 1;
864 if( tr_bencLoad( tmp
, len
, &val
, NULL
) || !tr_bencIsDict( &val
) )
866 dbgmsg( msgs
, "GET extended-handshake, couldn't get dictionary" );
871 dbgmsg( msgs
, "here is the handshake: [%*.*s]", len
, len
, tmp
);
873 /* does the peer prefer encrypted connections? */
874 if( tr_bencDictFindInt( &val
, "e", &i
) ) {
875 msgs
->peer
->encryption_preference
= i
? ENCRYPTION_PREFERENCE_YES
876 : ENCRYPTION_PREFERENCE_NO
;
878 pex
.flags
|= ADDED_F_ENCRYPTION_FLAG
;
881 /* check supported messages for utorrent pex */
882 msgs
->peerSupportsPex
= 0;
883 msgs
->peerSupportsMetadataXfer
= 0;
885 if( tr_bencDictFindDict( &val
, "m", &sub
) ) {
886 if( tr_bencDictFindInt( sub
, "ut_pex", &i
) ) {
887 msgs
->peerSupportsPex
= i
!= 0;
888 msgs
->ut_pex_id
= (uint8_t) i
;
889 dbgmsg( msgs
, "msgs->ut_pex is %d", (int)msgs
->ut_pex_id
);
891 if( tr_bencDictFindInt( sub
, "ut_metadata", &i
) ) {
892 msgs
->peerSupportsMetadataXfer
= i
!= 0;
893 msgs
->ut_metadata_id
= (uint8_t) i
;
894 dbgmsg( msgs
, "msgs->ut_metadata_id is %d", (int)msgs
->ut_metadata_id
);
896 if( tr_bencDictFindInt( sub
, "ut_holepunch", &i
) ) {
897 /* Mysterious µTorrent extension that we don't grok. However,
898 it implies support for µTP, so use it to indicate that. */
899 tr_peerMgrSetUtpFailed( msgs
->torrent
,
900 tr_peerIoGetAddress( msgs
->peer
->io
, NULL
),
905 /* look for metainfo size (BEP 9) */
906 if( tr_bencDictFindInt( &val
, "metadata_size", &i
) ) {
907 tr_torrentSetMetadataSizeHint( msgs
->torrent
, i
);
908 msgs
->metadata_size_hint
= (size_t) i
;
911 /* look for upload_only (BEP 21) */
912 if( tr_bencDictFindInt( &val
, "upload_only", &i
) )
913 seedProbability
= i
==0 ? 0 : 100;
915 /* get peer's listening port */
916 if( tr_bencDictFindInt( &val
, "p", &i
) ) {
917 pex
.port
= htons( (uint16_t)i
);
918 fireClientGotPort( msgs
, pex
.port
);
919 dbgmsg( msgs
, "peer's port is now %d", (int)i
);
922 if( tr_peerIoIsIncoming( msgs
->peer
->io
)
923 && tr_bencDictFindRaw( &val
, "ipv4", &addr
, &addr_len
)
924 && ( addr_len
== 4 ) )
926 pex
.addr
.type
= TR_AF_INET
;
927 memcpy( &pex
.addr
.addr
.addr4
, addr
, 4 );
928 tr_peerMgrAddPex( msgs
->torrent
, TR_PEER_FROM_LTEP
, &pex
, seedProbability
);
931 if( tr_peerIoIsIncoming( msgs
->peer
->io
)
932 && tr_bencDictFindRaw( &val
, "ipv6", &addr
, &addr_len
)
933 && ( addr_len
== 16 ) )
935 pex
.addr
.type
= TR_AF_INET6
;
936 memcpy( &pex
.addr
.addr
.addr6
, addr
, 16 );
937 tr_peerMgrAddPex( msgs
->torrent
, TR_PEER_FROM_LTEP
, &pex
, seedProbability
);
940 /* get peer's maximum request queue size */
941 if( tr_bencDictFindInt( &val
, "reqq", &i
) )
949 parseUtMetadata( tr_peermsgs
* msgs
, int msglen
, struct evbuffer
* inbuf
)
954 int64_t msg_type
= -1;
956 int64_t total_size
= 0;
957 uint8_t * tmp
= tr_new( uint8_t, msglen
);
959 tr_peerIoReadBytes( msgs
->peer
->io
, inbuf
, tmp
, msglen
);
960 msg_end
= (char*)tmp
+ msglen
;
962 if( !tr_bencLoad( tmp
, msglen
, &dict
, &benc_end
) )
964 tr_bencDictFindInt( &dict
, "msg_type", &msg_type
);
965 tr_bencDictFindInt( &dict
, "piece", &piece
);
966 tr_bencDictFindInt( &dict
, "total_size", &total_size
);
967 tr_bencFree( &dict
);
970 dbgmsg( msgs
, "got ut_metadata msg: type %d, piece %d, total_size %d",
971 (int)msg_type
, (int)piece
, (int)total_size
);
973 if( msg_type
== METADATA_MSG_TYPE_REJECT
)
978 if( ( msg_type
== METADATA_MSG_TYPE_DATA
)
979 && ( !tr_torrentHasMetadata( msgs
->torrent
) )
980 && ( msg_end
- benc_end
<= METADATA_PIECE_SIZE
)
981 && ( piece
* METADATA_PIECE_SIZE
+ (msg_end
- benc_end
) <= total_size
) )
983 const int pieceLen
= msg_end
- benc_end
;
984 tr_torrentSetMetadataPiece( msgs
->torrent
, piece
, benc_end
, pieceLen
);
987 if( msg_type
== METADATA_MSG_TYPE_REQUEST
)
990 && tr_torrentHasMetadata( msgs
->torrent
)
991 && !tr_torrentIsPrivate( msgs
->torrent
)
992 && ( msgs
->peerAskedForMetadataCount
< METADATA_REQQ
) )
994 msgs
->peerAskedForMetadata
[msgs
->peerAskedForMetadataCount
++] = piece
;
999 struct evbuffer
* payload
;
1000 struct evbuffer
* out
= msgs
->outMessages
;
1002 /* build the rejection message */
1003 tr_bencInitDict( &tmp
, 2 );
1004 tr_bencDictAddInt( &tmp
, "msg_type", METADATA_MSG_TYPE_REJECT
);
1005 tr_bencDictAddInt( &tmp
, "piece", piece
);
1006 payload
= tr_bencToBuf( &tmp
, TR_FMT_BENC
);
1008 /* write it out as a LTEP message to our outMessages buffer */
1009 evbuffer_add_uint32( out
, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload
) );
1010 evbuffer_add_uint8 ( out
, BT_LTEP
);
1011 evbuffer_add_uint8 ( out
, msgs
->ut_metadata_id
);
1012 evbuffer_add_buffer( out
, payload
);
1013 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
1014 dbgOutMessageLen( msgs
);
1017 evbuffer_free( payload
);
1018 tr_bencFree( &tmp
);
1026 parseUtPex( tr_peermsgs
* msgs
, int msglen
, struct evbuffer
* inbuf
)
1029 uint8_t * tmp
= tr_new( uint8_t, msglen
);
1031 tr_torrent
* tor
= msgs
->torrent
;
1032 const uint8_t * added
;
1035 tr_peerIoReadBytes( msgs
->peer
->io
, inbuf
, tmp
, msglen
);
1037 if( tr_torrentAllowsPex( tor
)
1038 && ( ( loaded
= !tr_bencLoad( tmp
, msglen
, &val
, NULL
) ) ) )
1040 if( tr_bencDictFindRaw( &val
, "added", &added
, &added_len
) )
1044 size_t added_f_len
= 0;
1045 const uint8_t * added_f
= NULL
;
1047 tr_bencDictFindRaw( &val
, "added.f", &added_f
, &added_f_len
);
1048 pex
= tr_peerMgrCompactToPex( added
, added_len
, added_f
, added_f_len
, &n
);
1050 n
= MIN( n
, MAX_PEX_PEER_COUNT
);
1051 for( i
=0; i
<n
; ++i
)
1053 int seedProbability
= -1;
1054 if( i
< added_f_len
) seedProbability
= ( added_f
[i
] & ADDED_F_SEED_FLAG
) ? 100 : 0;
1055 tr_peerMgrAddPex( tor
, TR_PEER_FROM_PEX
, pex
+i
, seedProbability
);
1061 if( tr_bencDictFindRaw( &val
, "added6", &added
, &added_len
) )
1065 size_t added_f_len
= 0;
1066 const uint8_t * added_f
= NULL
;
1068 tr_bencDictFindRaw( &val
, "added6.f", &added_f
, &added_f_len
);
1069 pex
= tr_peerMgrCompact6ToPex( added
, added_len
, added_f
, added_f_len
, &n
);
1071 n
= MIN( n
, MAX_PEX_PEER_COUNT
);
1072 for( i
=0; i
<n
; ++i
)
1074 int seedProbability
= -1;
1075 if( i
< added_f_len
) seedProbability
= ( added_f
[i
] & ADDED_F_SEED_FLAG
) ? 100 : 0;
1076 tr_peerMgrAddPex( tor
, TR_PEER_FROM_PEX
, pex
+i
, seedProbability
);
1084 tr_bencFree( &val
);
1088 static void sendPex( tr_peermsgs
* msgs
);
1091 parseLtep( tr_peermsgs
* msgs
, int msglen
, struct evbuffer
* inbuf
)
1095 tr_peerIoReadUint8( msgs
->peer
->io
, inbuf
, <ep_msgid
);
1098 if( ltep_msgid
== LTEP_HANDSHAKE
)
1100 dbgmsg( msgs
, "got ltep handshake" );
1101 parseLtepHandshake( msgs
, msglen
, inbuf
);
1102 if( tr_peerIoSupportsLTEP( msgs
->peer
->io
) )
1104 sendLtepHandshake( msgs
);
1108 else if( ltep_msgid
== UT_PEX_ID
)
1110 dbgmsg( msgs
, "got ut pex" );
1111 msgs
->peerSupportsPex
= 1;
1112 parseUtPex( msgs
, msglen
, inbuf
);
1114 else if( ltep_msgid
== UT_METADATA_ID
)
1116 dbgmsg( msgs
, "got ut metadata" );
1117 msgs
->peerSupportsMetadataXfer
= 1;
1118 parseUtMetadata( msgs
, msglen
, inbuf
);
1122 dbgmsg( msgs
, "skipping unknown ltep message (%d)", (int)ltep_msgid
);
1123 evbuffer_drain( inbuf
, msglen
);
1128 readBtLength( tr_peermsgs
* msgs
, struct evbuffer
* inbuf
, size_t inlen
)
1132 if( inlen
< sizeof( len
) )
1135 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &len
);
1137 if( len
== 0 ) /* peer sent us a keepalive message */
1138 dbgmsg( msgs
, "got KeepAlive" );
1141 msgs
->incoming
.length
= len
;
1142 msgs
->state
= AWAITING_BT_ID
;
1148 static int readBtMessage( tr_peermsgs
*, struct evbuffer
*, size_t );
1151 readBtId( tr_peermsgs
* msgs
, struct evbuffer
* inbuf
, size_t inlen
)
1155 if( inlen
< sizeof( uint8_t ) )
1158 tr_peerIoReadUint8( msgs
->peer
->io
, inbuf
, &id
);
1159 msgs
->incoming
.id
= id
;
1160 dbgmsg( msgs
, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id
, (size_t)msgs
->incoming
.length
);
1162 if( id
== BT_PIECE
)
1164 msgs
->state
= AWAITING_BT_PIECE
;
1167 else if( msgs
->incoming
.length
!= 1 )
1169 msgs
->state
= AWAITING_BT_MESSAGE
;
1172 else return readBtMessage( msgs
, inbuf
, inlen
- 1 );
1176 updatePeerProgress( tr_peermsgs
* msgs
)
1178 tr_peerUpdateProgress( msgs
->torrent
, msgs
->peer
);
1180 /*updateFastSet( msgs );*/
1181 updateInterest( msgs
);
1185 prefetchPieces( tr_peermsgs
*msgs
)
1189 if( !getSession(msgs
)->isPrefetchEnabled
)
1192 /* Maintain 12 prefetched blocks per unchoked peer */
1193 for( i
=msgs
->prefetchCount
; i
<msgs
->peer
->pendingReqsToClient
&& i
<12; ++i
)
1195 const struct peer_request
* req
= msgs
->peerAskedFor
+ i
;
1196 if( requestIsValid( msgs
, req
) )
1198 tr_cachePrefetchBlock( getSession(msgs
)->cache
, msgs
->torrent
, req
->index
, req
->offset
, req
->length
);
1199 ++msgs
->prefetchCount
;
1205 peerMadeRequest( tr_peermsgs
* msgs
, const struct peer_request
* req
)
1207 const bool fext
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
1208 const int reqIsValid
= requestIsValid( msgs
, req
);
1209 const int clientHasPiece
= reqIsValid
&& tr_cpPieceIsComplete( &msgs
->torrent
->completion
, req
->index
);
1210 const int peerIsChoked
= msgs
->peer
->peerIsChoked
;
1215 dbgmsg( msgs
, "rejecting an invalid request." );
1216 else if( !clientHasPiece
)
1217 dbgmsg( msgs
, "rejecting request for a piece we don't have." );
1218 else if( peerIsChoked
)
1219 dbgmsg( msgs
, "rejecting request from choked peer" );
1220 else if( msgs
->peer
->pendingReqsToClient
+ 1 >= REQQ
)
1221 dbgmsg( msgs
, "rejecting request ... reqq is full" );
1226 msgs
->peerAskedFor
[msgs
->peer
->pendingReqsToClient
++] = *req
;
1227 prefetchPieces( msgs
);
1229 protocolSendReject( msgs
, req
);
1234 messageLengthIsCorrect( const tr_peermsgs
* msg
, uint8_t id
, uint32_t len
)
1241 case BT_NOT_INTERESTED
:
1242 case BT_FEXT_HAVE_ALL
:
1243 case BT_FEXT_HAVE_NONE
:
1247 case BT_FEXT_SUGGEST
:
1248 case BT_FEXT_ALLOWED_FAST
:
1252 if( tr_torrentHasMetadata( msg
->torrent
) )
1253 return len
== ( msg
->torrent
->info
.pieceCount
+ 7u ) / 8u + 1u;
1254 /* we don't know the piece count yet,
1255 so we can only guess whether to send true or false */
1256 if( msg
->metadata_size_hint
> 0 )
1257 return len
<= msg
->metadata_size_hint
;
1262 case BT_FEXT_REJECT
:
1266 return len
> 9 && len
<= 16393;
1279 static int clientGotBlock( tr_peermsgs
* msgs
,
1280 struct evbuffer
* block
,
1281 const struct peer_request
* req
);
1284 readBtPiece( tr_peermsgs
* msgs
,
1285 struct evbuffer
* inbuf
,
1287 size_t * setme_piece_bytes_read
)
1289 struct peer_request
* req
= &msgs
->incoming
.blockReq
;
1291 assert( evbuffer_get_length( inbuf
) >= inlen
);
1292 dbgmsg( msgs
, "In readBtPiece" );
1299 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &req
->index
);
1300 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &req
->offset
);
1301 req
->length
= msgs
->incoming
.length
- 9;
1302 dbgmsg( msgs
, "got incoming block header %u:%u->%u", req
->index
, req
->offset
, req
->length
);
1310 struct evbuffer
* block_buffer
;
1312 if( msgs
->incoming
.block
== NULL
)
1313 msgs
->incoming
.block
= evbuffer_new( );
1314 block_buffer
= msgs
->incoming
.block
;
1316 /* read in another chunk of data */
1317 nLeft
= req
->length
- evbuffer_get_length( block_buffer
);
1318 n
= MIN( nLeft
, inlen
);
1320 tr_peerIoReadBytesToBuf( msgs
->peer
->io
, inbuf
, block_buffer
, n
);
1322 fireClientGotData( msgs
, n
, true );
1323 *setme_piece_bytes_read
+= n
;
1324 dbgmsg( msgs
, "got %zu bytes for block %u:%u->%u ... %d remain",
1325 n
, req
->index
, req
->offset
, req
->length
,
1326 (int)( req
->length
- evbuffer_get_length( block_buffer
) ) );
1327 if( evbuffer_get_length( block_buffer
) < req
->length
)
1330 /* pass the block along... */
1331 err
= clientGotBlock( msgs
, block_buffer
, req
);
1332 evbuffer_drain( block_buffer
, evbuffer_get_length( block_buffer
) );
1336 msgs
->state
= AWAITING_BT_LENGTH
;
1337 return err
? READ_ERR
: READ_NOW
;
1341 static void updateDesiredRequestCount( tr_peermsgs
* msgs
);
1344 readBtMessage( tr_peermsgs
* msgs
, struct evbuffer
* inbuf
, size_t inlen
)
1347 uint32_t msglen
= msgs
->incoming
.length
;
1348 const uint8_t id
= msgs
->incoming
.id
;
1350 const size_t startBufLen
= evbuffer_get_length( inbuf
);
1352 const bool fext
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
1354 --msglen
; /* id length */
1356 dbgmsg( msgs
, "got BT id %d, len %d, buffer size is %zu", (int)id
, (int)msglen
, inlen
);
1358 if( inlen
< msglen
)
1361 if( !messageLengthIsCorrect( msgs
, id
, msglen
+ 1 ) )
1363 dbgmsg( msgs
, "bad packet - BT message #%d with a length of %d", (int)id
, (int)msglen
);
1364 fireError( msgs
, EMSGSIZE
);
1371 dbgmsg( msgs
, "got Choke" );
1372 msgs
->peer
->clientIsChoked
= 1;
1374 fireGotChoke( msgs
);
1378 dbgmsg( msgs
, "got Unchoke" );
1379 msgs
->peer
->clientIsChoked
= 0;
1380 updateDesiredRequestCount( msgs
);
1384 dbgmsg( msgs
, "got Interested" );
1385 msgs
->peer
->peerIsInterested
= 1;
1388 case BT_NOT_INTERESTED
:
1389 dbgmsg( msgs
, "got Not Interested" );
1390 msgs
->peer
->peerIsInterested
= 0;
1394 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &ui32
);
1395 dbgmsg( msgs
, "got Have: %u", ui32
);
1396 if( tr_torrentHasMetadata( msgs
->torrent
)
1397 && ( ui32
>= msgs
->torrent
->info
.pieceCount
) )
1399 fireError( msgs
, ERANGE
);
1403 /* a peer can send the same HAVE message twice... */
1404 if( !tr_bitfieldHas( &msgs
->peer
->have
, ui32
) ) {
1405 tr_bitfieldAdd( &msgs
->peer
->have
, ui32
);
1406 fireClientGotHave( msgs
, ui32
);
1408 updatePeerProgress( msgs
);
1412 uint8_t * tmp
= tr_new( uint8_t, msglen
);
1413 dbgmsg( msgs
, "got a bitfield" );
1414 tr_peerIoReadBytes( msgs
->peer
->io
, inbuf
, tmp
, msglen
);
1415 tr_bitfieldSetRaw( &msgs
->peer
->have
, tmp
, msglen
);
1416 fireClientGotBitfield( msgs
, &msgs
->peer
->have
);
1417 updatePeerProgress( msgs
);
1424 struct peer_request r
;
1425 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.index
);
1426 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.offset
);
1427 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.length
);
1428 dbgmsg( msgs
, "got Request: %u:%u->%u", r
.index
, r
.offset
, r
.length
);
1429 peerMadeRequest( msgs
, &r
);
1436 struct peer_request r
;
1437 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.index
);
1438 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.offset
);
1439 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.length
);
1440 tr_historyAdd( &msgs
->peer
->cancelsSentToClient
, tr_time( ), 1 );
1441 dbgmsg( msgs
, "got a Cancel %u:%u->%u", r
.index
, r
.offset
, r
.length
);
1443 for( i
=0; i
<msgs
->peer
->pendingReqsToClient
; ++i
) {
1444 const struct peer_request
* req
= msgs
->peerAskedFor
+ i
;
1445 if( ( req
->index
== r
.index
) && ( req
->offset
== r
.offset
) && ( req
->length
== r
.length
) )
1449 if( i
< msgs
->peer
->pendingReqsToClient
)
1450 tr_removeElementFromArray( msgs
->peerAskedFor
, i
, sizeof( struct peer_request
),
1451 msgs
->peer
->pendingReqsToClient
-- );
1456 assert( 0 ); /* handled elsewhere! */
1460 dbgmsg( msgs
, "Got a BT_PORT" );
1461 tr_peerIoReadUint16( msgs
->peer
->io
, inbuf
, &msgs
->peer
->dht_port
);
1462 if( msgs
->peer
->dht_port
> 0 )
1463 tr_dhtAddNode( getSession(msgs
),
1464 tr_peerAddress( msgs
->peer
),
1465 msgs
->peer
->dht_port
, 0 );
1468 case BT_FEXT_SUGGEST
:
1469 dbgmsg( msgs
, "Got a BT_FEXT_SUGGEST" );
1470 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &ui32
);
1472 fireClientGotSuggest( msgs
, ui32
);
1474 fireError( msgs
, EMSGSIZE
);
1479 case BT_FEXT_ALLOWED_FAST
:
1480 dbgmsg( msgs
, "Got a BT_FEXT_ALLOWED_FAST" );
1481 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &ui32
);
1483 fireClientGotAllowedFast( msgs
, ui32
);
1485 fireError( msgs
, EMSGSIZE
);
1490 case BT_FEXT_HAVE_ALL
:
1491 dbgmsg( msgs
, "Got a BT_FEXT_HAVE_ALL" );
1493 tr_bitfieldSetHasAll( &msgs
->peer
->have
);
1494 assert( tr_bitfieldHasAll( &msgs
->peer
->have
) );
1495 fireClientGotHaveAll( msgs
);
1496 updatePeerProgress( msgs
);
1498 fireError( msgs
, EMSGSIZE
);
1503 case BT_FEXT_HAVE_NONE
:
1504 dbgmsg( msgs
, "Got a BT_FEXT_HAVE_NONE" );
1506 tr_bitfieldSetHasNone( &msgs
->peer
->have
);
1507 fireClientGotHaveNone( msgs
);
1508 updatePeerProgress( msgs
);
1510 fireError( msgs
, EMSGSIZE
);
1515 case BT_FEXT_REJECT
:
1517 struct peer_request r
;
1518 dbgmsg( msgs
, "Got a BT_FEXT_REJECT" );
1519 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.index
);
1520 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.offset
);
1521 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.length
);
1523 fireGotRej( msgs
, &r
);
1525 fireError( msgs
, EMSGSIZE
);
1532 dbgmsg( msgs
, "Got a BT_LTEP" );
1533 parseLtep( msgs
, msglen
, inbuf
);
1537 dbgmsg( msgs
, "peer sent us an UNKNOWN: %d", (int)id
);
1538 tr_peerIoDrain( msgs
->peer
->io
, inbuf
, msglen
);
1542 assert( msglen
+ 1 == msgs
->incoming
.length
);
1543 assert( evbuffer_get_length( inbuf
) == startBufLen
- msglen
);
1545 msgs
->state
= AWAITING_BT_LENGTH
;
1549 /* returns 0 on success, or an errno on failure */
1551 clientGotBlock( tr_peermsgs
* msgs
,
1552 struct evbuffer
* data
,
1553 const struct peer_request
* req
)
1556 tr_torrent
* tor
= msgs
->torrent
;
1557 const tr_block_index_t block
= _tr_block( tor
, req
->index
, req
->offset
);
1562 if( req
->length
!= tr_torBlockCountBytes( msgs
->torrent
, block
) ) {
1563 dbgmsg( msgs
, "wrong block size -- expected %u, got %d",
1564 tr_torBlockCountBytes( msgs
->torrent
, block
), req
->length
);
1568 dbgmsg( msgs
, "got block %u:%u->%u", req
->index
, req
->offset
, req
->length
);
1570 if( !tr_peerMgrDidPeerRequest( msgs
->torrent
, msgs
->peer
, block
) ) {
1571 dbgmsg( msgs
, "we didn't ask for this message..." );
1574 if( tr_cpPieceIsComplete( &msgs
->torrent
->completion
, req
->index
) ) {
1575 dbgmsg( msgs
, "we did ask for this message, but the piece is already complete..." );
1583 if(( err
= tr_cacheWriteBlock( getSession(msgs
)->cache
, tor
, req
->index
, req
->offset
, req
->length
, data
)))
1586 tr_bitfieldAdd( &msgs
->peer
->blame
, req
->index
);
1587 fireGotBlock( msgs
, req
);
1591 static int peerPulse( void * vmsgs
);
1594 didWrite( tr_peerIo
* io UNUSED
, size_t bytesWritten
, int wasPieceData
, void * vmsgs
)
1596 tr_peermsgs
* msgs
= vmsgs
;
1597 firePeerGotData( msgs
, bytesWritten
, wasPieceData
);
1599 if ( tr_isPeerIo( io
) && io
->userData
)
1604 canRead( tr_peerIo
* io
, void * vmsgs
, size_t * piece
)
1607 tr_peermsgs
* msgs
= vmsgs
;
1608 struct evbuffer
* in
= tr_peerIoGetReadBuffer( io
);
1609 const size_t inlen
= evbuffer_get_length( in
);
1611 dbgmsg( msgs
, "canRead: inlen is %zu, msgs->state is %d", inlen
, msgs
->state
);
1617 else if( msgs
->state
== AWAITING_BT_PIECE
)
1619 ret
= readBtPiece( msgs
, in
, inlen
, piece
);
1621 else switch( msgs
->state
)
1623 case AWAITING_BT_LENGTH
:
1624 ret
= readBtLength ( msgs
, in
, inlen
); break;
1626 case AWAITING_BT_ID
:
1627 ret
= readBtId ( msgs
, in
, inlen
); break;
1629 case AWAITING_BT_MESSAGE
:
1630 ret
= readBtMessage( msgs
, in
, inlen
); break;
1637 dbgmsg( msgs
, "canRead: ret is %d", (int)ret
);
1639 /* log the raw data that was read */
1640 if( ( ret
!= READ_ERR
) && ( evbuffer_get_length( in
) != inlen
) )
1641 fireClientGotData( msgs
, inlen
- evbuffer_get_length( in
), false );
1647 tr_peerMsgsIsReadingBlock( const tr_peermsgs
* msgs
, tr_block_index_t block
)
1649 if( msgs
->state
!= AWAITING_BT_PIECE
)
1652 return block
== _tr_block( msgs
->torrent
,
1653 msgs
->incoming
.blockReq
.index
,
1654 msgs
->incoming
.blockReq
.offset
);
1662 updateDesiredRequestCount( tr_peermsgs
* msgs
)
1664 const tr_torrent
* const torrent
= msgs
->torrent
;
1666 /* there are lots of reasons we might not want to request any blocks... */
1667 if( tr_torrentIsSeed( torrent
) || !tr_torrentHasMetadata( torrent
)
1668 || msgs
->peer
->clientIsChoked
1669 || !msgs
->peer
->clientIsInterested
)
1671 msgs
->desiredRequestCount
= 0;
1675 int estimatedBlocksInPeriod
;
1678 const int floor
= 4;
1679 const int seconds
= REQUEST_BUF_SECS
;
1680 const uint64_t now
= tr_time_msec( );
1682 /* Get the rate limit we should use.
1683 * FIXME: this needs to consider all the other peers as well... */
1684 rate_Bps
= tr_peerGetPieceSpeed_Bps( msgs
->peer
, now
, TR_PEER_TO_CLIENT
);
1685 if( tr_torrentUsesSpeedLimit( torrent
, TR_PEER_TO_CLIENT
) )
1686 rate_Bps
= MIN( rate_Bps
, tr_torrentGetSpeedLimit_Bps( torrent
, TR_PEER_TO_CLIENT
) );
1688 /* honor the session limits, if enabled */
1689 if( tr_torrentUsesSessionLimits( torrent
) )
1690 if( tr_sessionGetActiveSpeedLimit_Bps( torrent
->session
, TR_PEER_TO_CLIENT
, &irate_Bps
) )
1691 rate_Bps
= MIN( rate_Bps
, irate_Bps
);
1693 /* use this desired rate to figure out how
1694 * many requests we should send to this peer */
1695 estimatedBlocksInPeriod
= ( rate_Bps
* seconds
) / torrent
->blockSize
;
1696 msgs
->desiredRequestCount
= MAX( floor
, estimatedBlocksInPeriod
);
1698 /* honor the peer's maximum request count, if specified */
1699 if( msgs
->reqq
> 0 )
1700 if( msgs
->desiredRequestCount
> msgs
->reqq
)
1701 msgs
->desiredRequestCount
= msgs
->reqq
;
1706 updateMetadataRequests( tr_peermsgs
* msgs
, time_t now
)
1710 if( msgs
->peerSupportsMetadataXfer
1711 && tr_torrentGetNextMetadataRequest( msgs
->torrent
, now
, &piece
) )
1714 struct evbuffer
* payload
;
1715 struct evbuffer
* out
= msgs
->outMessages
;
1717 /* build the data message */
1718 tr_bencInitDict( &tmp
, 3 );
1719 tr_bencDictAddInt( &tmp
, "msg_type", METADATA_MSG_TYPE_REQUEST
);
1720 tr_bencDictAddInt( &tmp
, "piece", piece
);
1721 payload
= tr_bencToBuf( &tmp
, TR_FMT_BENC
);
1723 dbgmsg( msgs
, "requesting metadata piece #%d", piece
);
1725 /* write it out as a LTEP message to our outMessages buffer */
1726 evbuffer_add_uint32( out
, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload
) );
1727 evbuffer_add_uint8 ( out
, BT_LTEP
);
1728 evbuffer_add_uint8 ( out
, msgs
->ut_metadata_id
);
1729 evbuffer_add_buffer( out
, payload
);
1730 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
1731 dbgOutMessageLen( msgs
);
1734 evbuffer_free( payload
);
1735 tr_bencFree( &tmp
);
1740 updateBlockRequests( tr_peermsgs
* msgs
)
1742 if( tr_torrentIsPieceTransferAllowed( msgs
->torrent
, TR_PEER_TO_CLIENT
)
1743 && ( msgs
->desiredRequestCount
> 0 )
1744 && ( msgs
->peer
->pendingReqsToPeer
<= ( msgs
->desiredRequestCount
* 0.66 ) ) )
1748 const int numwant
= msgs
->desiredRequestCount
- msgs
->peer
->pendingReqsToPeer
;
1749 tr_block_index_t
* blocks
= alloca( sizeof( tr_block_index_t
) * numwant
);
1751 tr_peerMgrGetNextRequests( msgs
->torrent
, msgs
->peer
, numwant
, blocks
, &n
, false );
1753 for( i
=0; i
<n
; ++i
)
1755 struct peer_request req
;
1756 blockToReq( msgs
->torrent
, blocks
[i
], &req
);
1757 protocolSendRequest( msgs
, &req
);
1763 fillOutputBuffer( tr_peermsgs
* msgs
, time_t now
)
1766 size_t bytesWritten
= 0;
1767 struct peer_request req
;
1768 const bool haveMessages
= evbuffer_get_length( msgs
->outMessages
) != 0;
1769 const bool fext
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
1772 *** Protocol messages
1775 if( haveMessages
&& !msgs
->outMessagesBatchedAt
) /* fresh batch */
1777 dbgmsg( msgs
, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs
->outMessages
) );
1778 msgs
->outMessagesBatchedAt
= now
;
1780 else if( haveMessages
&& ( ( now
- msgs
->outMessagesBatchedAt
) >= msgs
->outMessagesBatchPeriod
) )
1782 const size_t len
= evbuffer_get_length( msgs
->outMessages
);
1783 /* flush the protocol messages */
1784 dbgmsg( msgs
, "flushing outMessages... to %p (length is %zu)", msgs
->peer
->io
, len
);
1785 tr_peerIoWriteBuf( msgs
->peer
->io
, msgs
->outMessages
, false );
1786 msgs
->clientSentAnythingAt
= now
;
1787 msgs
->outMessagesBatchedAt
= 0;
1788 msgs
->outMessagesBatchPeriod
= LOW_PRIORITY_INTERVAL_SECS
;
1789 bytesWritten
+= len
;
1796 if( ( tr_peerIoGetWriteBufferSpace( msgs
->peer
->io
, now
) >= METADATA_PIECE_SIZE
)
1797 && popNextMetadataRequest( msgs
, &piece
) )
1803 data
= tr_torrentGetMetadataPiece( msgs
->torrent
, piece
, &dataLen
);
1804 if( ( dataLen
> 0 ) && ( data
!= NULL
) )
1807 struct evbuffer
* payload
;
1808 struct evbuffer
* out
= msgs
->outMessages
;
1810 /* build the data message */
1811 tr_bencInitDict( &tmp
, 3 );
1812 tr_bencDictAddInt( &tmp
, "msg_type", METADATA_MSG_TYPE_DATA
);
1813 tr_bencDictAddInt( &tmp
, "piece", piece
);
1814 tr_bencDictAddInt( &tmp
, "total_size", msgs
->torrent
->infoDictLength
);
1815 payload
= tr_bencToBuf( &tmp
, TR_FMT_BENC
);
1817 /* write it out as a LTEP message to our outMessages buffer */
1818 evbuffer_add_uint32( out
, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload
) + dataLen
);
1819 evbuffer_add_uint8 ( out
, BT_LTEP
);
1820 evbuffer_add_uint8 ( out
, msgs
->ut_metadata_id
);
1821 evbuffer_add_buffer( out
, payload
);
1822 evbuffer_add ( out
, data
, dataLen
);
1823 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
1824 dbgOutMessageLen( msgs
);
1826 evbuffer_free( payload
);
1827 tr_bencFree( &tmp
);
1833 if( !ok
) /* send a rejection message */
1836 struct evbuffer
* payload
;
1837 struct evbuffer
* out
= msgs
->outMessages
;
1839 /* build the rejection message */
1840 tr_bencInitDict( &tmp
, 2 );
1841 tr_bencDictAddInt( &tmp
, "msg_type", METADATA_MSG_TYPE_REJECT
);
1842 tr_bencDictAddInt( &tmp
, "piece", piece
);
1843 payload
= tr_bencToBuf( &tmp
, TR_FMT_BENC
);
1845 /* write it out as a LTEP message to our outMessages buffer */
1846 evbuffer_add_uint32( out
, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload
) );
1847 evbuffer_add_uint8 ( out
, BT_LTEP
);
1848 evbuffer_add_uint8 ( out
, msgs
->ut_metadata_id
);
1849 evbuffer_add_buffer( out
, payload
);
1850 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
1851 dbgOutMessageLen( msgs
);
1853 evbuffer_free( payload
);
1854 tr_bencFree( &tmp
);
1862 if( ( tr_peerIoGetWriteBufferSpace( msgs
->peer
->io
, now
) >= msgs
->torrent
->blockSize
)
1863 && popNextRequest( msgs
, &req
) )
1865 --msgs
->prefetchCount
;
1867 if( requestIsValid( msgs
, &req
)
1868 && tr_cpPieceIsComplete( &msgs
->torrent
->completion
, req
.index
) )
1871 const uint32_t msglen
= 4 + 1 + 4 + 4 + req
.length
;
1872 struct evbuffer
* out
;
1873 struct evbuffer_iovec iovec
[1];
1875 out
= evbuffer_new( );
1876 evbuffer_expand( out
, msglen
);
1878 evbuffer_add_uint32( out
, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req
.length
);
1879 evbuffer_add_uint8 ( out
, BT_PIECE
);
1880 evbuffer_add_uint32( out
, req
.index
);
1881 evbuffer_add_uint32( out
, req
.offset
);
1883 evbuffer_reserve_space( out
, req
.length
, iovec
, 1 );
1884 err
= tr_cacheReadBlock( getSession(msgs
)->cache
, msgs
->torrent
, req
.index
, req
.offset
, req
.length
, iovec
[0].iov_base
);
1885 iovec
[0].iov_len
= req
.length
;
1886 evbuffer_commit_space( out
, iovec
, 1 );
1888 /* check the piece if it needs checking... */
1889 if( !err
&& tr_torrentPieceNeedsCheck( msgs
->torrent
, req
.index
) )
1890 if(( err
= !tr_torrentCheckPiece( msgs
->torrent
, req
.index
)))
1891 tr_torrentSetLocalError( msgs
->torrent
, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req
.index
);
1896 protocolSendReject( msgs
, &req
);
1900 const size_t n
= evbuffer_get_length( out
);
1901 dbgmsg( msgs
, "sending block %u:%u->%u", req
.index
, req
.offset
, req
.length
);
1902 assert( n
== msglen
);
1903 tr_peerIoWriteBuf( msgs
->peer
->io
, out
, true );
1905 msgs
->clientSentAnythingAt
= now
;
1906 tr_historyAdd( &msgs
->peer
->blocksSentToPeer
, tr_time( ), 1 );
1909 evbuffer_free( out
);
1917 else if( fext
) /* peer needs a reject message */
1919 protocolSendReject( msgs
, &req
);
1923 prefetchPieces( msgs
);
1930 if( ( msgs
!= NULL
)
1931 && ( msgs
->clientSentAnythingAt
!= 0 )
1932 && ( ( now
- msgs
->clientSentAnythingAt
) > KEEPALIVE_INTERVAL_SECS
) )
1934 dbgmsg( msgs
, "sending a keepalive message" );
1935 evbuffer_add_uint32( msgs
->outMessages
, 0 );
1936 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
1939 return bytesWritten
;
1943 peerPulse( void * vmsgs
)
1945 tr_peermsgs
* msgs
= vmsgs
;
1946 const time_t now
= tr_time( );
1948 if ( tr_isPeerIo( msgs
->peer
->io
) ) {
1949 updateDesiredRequestCount( msgs
);
1950 updateBlockRequests( msgs
);
1951 updateMetadataRequests( msgs
, now
);
1955 if( fillOutputBuffer( msgs
, now
) < 1 )
1958 return true; /* loop forever */
1962 tr_peerMsgsPulse( tr_peermsgs
* msgs
)
1969 gotError( tr_peerIo
* io UNUSED
, short what
, void * vmsgs
)
1971 if( what
& BEV_EVENT_TIMEOUT
)
1972 dbgmsg( vmsgs
, "libevent got a timeout, what=%hd", what
);
1973 if( what
& ( BEV_EVENT_EOF
| BEV_EVENT_ERROR
) )
1974 dbgmsg( vmsgs
, "libevent got an error! what=%hd, errno=%d (%s)",
1975 what
, errno
, tr_strerror( errno
) );
1976 fireError( vmsgs
, ENOTCONN
);
1980 sendBitfield( tr_peermsgs
* msgs
)
1982 size_t byte_count
= 0;
1983 struct evbuffer
* out
= msgs
->outMessages
;
1984 void * bytes
= tr_cpCreatePieceBitfield( &msgs
->torrent
->completion
, &byte_count
);
1986 evbuffer_add_uint32( out
, sizeof( uint8_t ) + byte_count
);
1987 evbuffer_add_uint8 ( out
, BT_BITFIELD
);
1988 evbuffer_add ( out
, bytes
, byte_count
);
1989 dbgmsg( msgs
, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out
) );
1990 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
1996 tellPeerWhatWeHave( tr_peermsgs
* msgs
)
1998 const bool fext
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
2000 if( fext
&& tr_cpHasAll( &msgs
->torrent
->completion
) )
2002 protocolSendHaveAll( msgs
);
2004 else if( fext
&& tr_cpHasNone( &msgs
->torrent
->completion
) )
2006 protocolSendHaveNone( msgs
);
2008 else if( !tr_cpHasNone( &msgs
->torrent
->completion
) )
2010 sendBitfield( msgs
);
2018 /* some peers give us error messages if we send
2019 more than this many peers in a single pex message
2020 http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2021 #define MAX_PEX_ADDED 50
2022 #define MAX_PEX_DROPPED 50
2036 pexAddedCb( void * vpex
, void * userData
)
2038 PexDiffs
* diffs
= userData
;
2039 tr_pex
* pex
= vpex
;
2041 if( diffs
->addedCount
< MAX_PEX_ADDED
)
2043 diffs
->added
[diffs
->addedCount
++] = *pex
;
2044 diffs
->elements
[diffs
->elementCount
++] = *pex
;
2049 pexDroppedCb( void * vpex
, void * userData
)
2051 PexDiffs
* diffs
= userData
;
2052 tr_pex
* pex
= vpex
;
2054 if( diffs
->droppedCount
< MAX_PEX_DROPPED
)
2056 diffs
->dropped
[diffs
->droppedCount
++] = *pex
;
2061 pexElementCb( void * vpex
, void * userData
)
2063 PexDiffs
* diffs
= userData
;
2064 tr_pex
* pex
= vpex
;
2066 diffs
->elements
[diffs
->elementCount
++] = *pex
;
2069 typedef void ( tr_set_func
)( void * element
, void * userData
);
2072 * @brief find the differences and commonalities in two sorted sets
2073 * @param a the first set
2074 * @param aCount the number of elements in the set 'a'
2075 * @param b the second set
2076 * @param bCount the number of elements in the set 'b'
2077 * @param compare the sorting method for both sets
2078 * @param elementSize the sizeof the element in the two sorted sets
2079 * @param in_a called for items in set 'a' but not set 'b'
2080 * @param in_b called for items in set 'b' but not set 'a'
2081 * @param in_both called for items that are in both sets
2082 * @param userData user data passed along to in_a, in_b, and in_both
2085 tr_set_compare( const void * va
, size_t aCount
,
2086 const void * vb
, size_t bCount
,
2087 int compare( const void * a
, const void * b
),
2089 tr_set_func in_a_cb
,
2090 tr_set_func in_b_cb
,
2091 tr_set_func in_both_cb
,
2094 const uint8_t * a
= va
;
2095 const uint8_t * b
= vb
;
2096 const uint8_t * aend
= a
+ elementSize
* aCount
;
2097 const uint8_t * bend
= b
+ elementSize
* bCount
;
2099 while( a
!= aend
|| b
!= bend
)
2103 ( *in_b_cb
)( (void*)b
, userData
);
2106 else if( b
== bend
)
2108 ( *in_a_cb
)( (void*)a
, userData
);
2113 const int val
= ( *compare
)( a
, b
);
2117 ( *in_both_cb
)( (void*)a
, userData
);
2123 ( *in_a_cb
)( (void*)a
, userData
);
2128 ( *in_b_cb
)( (void*)b
, userData
);
2137 sendPex( tr_peermsgs
* msgs
)
2139 if( msgs
->peerSupportsPex
&& tr_torrentAllowsPex( msgs
->torrent
) )
2143 tr_pex
* newPex
= NULL
;
2144 tr_pex
* newPex6
= NULL
;
2145 const int newCount
= tr_peerMgrGetPeers( msgs
->torrent
, &newPex
, TR_AF_INET
, TR_PEERS_CONNECTED
, MAX_PEX_PEER_COUNT
);
2146 const int newCount6
= tr_peerMgrGetPeers( msgs
->torrent
, &newPex6
, TR_AF_INET6
, TR_PEERS_CONNECTED
, MAX_PEX_PEER_COUNT
);
2148 /* build the diffs */
2149 diffs
.added
= tr_new( tr_pex
, newCount
);
2150 diffs
.addedCount
= 0;
2151 diffs
.dropped
= tr_new( tr_pex
, msgs
->pexCount
);
2152 diffs
.droppedCount
= 0;
2153 diffs
.elements
= tr_new( tr_pex
, newCount
+ msgs
->pexCount
);
2154 diffs
.elementCount
= 0;
2155 tr_set_compare( msgs
->pex
, msgs
->pexCount
,
2157 tr_pexCompare
, sizeof( tr_pex
),
2158 pexDroppedCb
, pexAddedCb
, pexElementCb
, &diffs
);
2159 diffs6
.added
= tr_new( tr_pex
, newCount6
);
2160 diffs6
.addedCount
= 0;
2161 diffs6
.dropped
= tr_new( tr_pex
, msgs
->pexCount6
);
2162 diffs6
.droppedCount
= 0;
2163 diffs6
.elements
= tr_new( tr_pex
, newCount6
+ msgs
->pexCount6
);
2164 diffs6
.elementCount
= 0;
2165 tr_set_compare( msgs
->pex6
, msgs
->pexCount6
,
2167 tr_pexCompare
, sizeof( tr_pex
),
2168 pexDroppedCb
, pexAddedCb
, pexElementCb
, &diffs6
);
2171 "pex: old peer count %d+%d, new peer count %d+%d, "
2172 "added %d+%d, removed %d+%d",
2173 msgs
->pexCount
, msgs
->pexCount6
, newCount
, newCount6
,
2174 diffs
.addedCount
, diffs6
.addedCount
,
2175 diffs
.droppedCount
, diffs6
.droppedCount
);
2177 if( !diffs
.addedCount
&& !diffs
.droppedCount
&& !diffs6
.addedCount
&&
2178 !diffs6
.droppedCount
)
2180 tr_free( diffs
.elements
);
2181 tr_free( diffs6
.elements
);
2187 uint8_t * tmp
, *walk
;
2188 struct evbuffer
* payload
;
2189 struct evbuffer
* out
= msgs
->outMessages
;
2192 tr_free( msgs
->pex
);
2193 msgs
->pex
= diffs
.elements
;
2194 msgs
->pexCount
= diffs
.elementCount
;
2195 tr_free( msgs
->pex6
);
2196 msgs
->pex6
= diffs6
.elements
;
2197 msgs
->pexCount6
= diffs6
.elementCount
;
2199 /* build the pex payload */
2200 tr_bencInitDict( &val
, 3 ); /* ipv6 support: left as 3:
2201 * speed vs. likelihood? */
2203 if( diffs
.addedCount
> 0)
2206 tmp
= walk
= tr_new( uint8_t, diffs
.addedCount
* 6 );
2207 for( i
= 0; i
< diffs
.addedCount
; ++i
) {
2208 memcpy( walk
, &diffs
.added
[i
].addr
.addr
, 4 ); walk
+= 4;
2209 memcpy( walk
, &diffs
.added
[i
].port
, 2 ); walk
+= 2;
2211 assert( ( walk
- tmp
) == diffs
.addedCount
* 6 );
2212 tr_bencDictAddRaw( &val
, "added", tmp
, walk
- tmp
);
2216 * unset each holepunch flag because we don't support it. */
2217 tmp
= walk
= tr_new( uint8_t, diffs
.addedCount
);
2218 for( i
= 0; i
< diffs
.addedCount
; ++i
)
2219 *walk
++ = diffs
.added
[i
].flags
& ~ADDED_F_HOLEPUNCH
;
2220 assert( ( walk
- tmp
) == diffs
.addedCount
);
2221 tr_bencDictAddRaw( &val
, "added.f", tmp
, walk
- tmp
);
2225 if( diffs
.droppedCount
> 0 )
2228 tmp
= walk
= tr_new( uint8_t, diffs
.droppedCount
* 6 );
2229 for( i
= 0; i
< diffs
.droppedCount
; ++i
) {
2230 memcpy( walk
, &diffs
.dropped
[i
].addr
.addr
, 4 ); walk
+= 4;
2231 memcpy( walk
, &diffs
.dropped
[i
].port
, 2 ); walk
+= 2;
2233 assert( ( walk
- tmp
) == diffs
.droppedCount
* 6 );
2234 tr_bencDictAddRaw( &val
, "dropped", tmp
, walk
- tmp
);
2238 if( diffs6
.addedCount
> 0 )
2241 tmp
= walk
= tr_new( uint8_t, diffs6
.addedCount
* 18 );
2242 for( i
= 0; i
< diffs6
.addedCount
; ++i
) {
2243 memcpy( walk
, &diffs6
.added
[i
].addr
.addr
.addr6
.s6_addr
, 16 );
2245 memcpy( walk
, &diffs6
.added
[i
].port
, 2 );
2248 assert( ( walk
- tmp
) == diffs6
.addedCount
* 18 );
2249 tr_bencDictAddRaw( &val
, "added6", tmp
, walk
- tmp
);
2253 * unset each holepunch flag because we don't support it. */
2254 tmp
= walk
= tr_new( uint8_t, diffs6
.addedCount
);
2255 for( i
= 0; i
< diffs6
.addedCount
; ++i
)
2256 *walk
++ = diffs6
.added
[i
].flags
& ~ADDED_F_HOLEPUNCH
;
2257 assert( ( walk
- tmp
) == diffs6
.addedCount
);
2258 tr_bencDictAddRaw( &val
, "added6.f", tmp
, walk
- tmp
);
2262 if( diffs6
.droppedCount
> 0 )
2265 tmp
= walk
= tr_new( uint8_t, diffs6
.droppedCount
* 18 );
2266 for( i
= 0; i
< diffs6
.droppedCount
; ++i
) {
2267 memcpy( walk
, &diffs6
.dropped
[i
].addr
.addr
.addr6
.s6_addr
, 16 );
2269 memcpy( walk
, &diffs6
.dropped
[i
].port
, 2 );
2272 assert( ( walk
- tmp
) == diffs6
.droppedCount
* 18);
2273 tr_bencDictAddRaw( &val
, "dropped6", tmp
, walk
- tmp
);
2277 /* write the pex message */
2278 payload
= tr_bencToBuf( &val
, TR_FMT_BENC
);
2279 evbuffer_add_uint32( out
, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload
) );
2280 evbuffer_add_uint8 ( out
, BT_LTEP
);
2281 evbuffer_add_uint8 ( out
, msgs
->ut_pex_id
);
2282 evbuffer_add_buffer( out
, payload
);
2283 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
2284 dbgmsg( msgs
, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out
) );
2285 dbgOutMessageLen( msgs
);
2287 evbuffer_free( payload
);
2288 tr_bencFree( &val
);
2292 tr_free( diffs
.added
);
2293 tr_free( diffs
.dropped
);
2295 tr_free( diffs6
.added
);
2296 tr_free( diffs6
.dropped
);
2299 /*msgs->clientSentPexAt = tr_time( );*/
2304 pexPulse( int foo UNUSED
, short bar UNUSED
, void * vmsgs
)
2306 struct tr_peermsgs
* msgs
= vmsgs
;
2310 assert( msgs
->pexTimer
!= NULL
);
2311 tr_timerAdd( msgs
->pexTimer
, PEX_INTERVAL_SECS
, 0 );
2319 tr_peerMsgsNew( struct tr_torrent
* torrent
,
2320 struct tr_peer
* peer
,
2321 tr_peer_callback
* callback
,
2322 void * callbackData
)
2329 m
= tr_new0( tr_peermsgs
, 1 );
2330 m
->callback
= callback
;
2331 m
->callbackData
= callbackData
;
2333 m
->torrent
= torrent
;
2334 m
->peer
->clientIsChoked
= 1;
2335 m
->peer
->peerIsChoked
= 1;
2336 m
->peer
->clientIsInterested
= 0;
2337 m
->peer
->peerIsInterested
= 0;
2338 m
->state
= AWAITING_BT_LENGTH
;
2339 m
->outMessages
= evbuffer_new( );
2340 m
->outMessagesBatchedAt
= 0;
2341 m
->outMessagesBatchPeriod
= LOW_PRIORITY_INTERVAL_SECS
;
2344 if( tr_torrentAllowsPex( torrent
) ) {
2345 m
->pexTimer
= evtimer_new( torrent
->session
->event_base
, pexPulse
, m
);
2346 tr_timerAdd( m
->pexTimer
, PEX_INTERVAL_SECS
, 0 );
2349 if( tr_peerIoSupportsUTP( peer
->io
) ) {
2350 const tr_address
* addr
= tr_peerIoGetAddress( peer
->io
, NULL
);
2351 tr_peerMgrSetUtpSupported( torrent
, addr
);
2352 tr_peerMgrSetUtpFailed( torrent
, addr
, false );
2355 if( tr_peerIoSupportsLTEP( peer
->io
) )
2356 sendLtepHandshake( m
);
2358 tellPeerWhatWeHave( m
);
2360 if( tr_dhtEnabled( torrent
->session
) && tr_peerIoSupportsDHT( peer
->io
))
2362 /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2363 const struct tr_address
*addr
= tr_peerIoGetAddress( peer
->io
, NULL
);
2364 if( addr
->type
== TR_AF_INET
|| tr_globalIPv6() ) {
2365 protocolSendPort( m
, tr_dhtPort( torrent
->session
) );
2369 tr_peerIoSetIOFuncs( m
->peer
->io
, canRead
, didWrite
, gotError
, m
);
2370 updateDesiredRequestCount( m
);
2376 tr_peerMsgsFree( tr_peermsgs
* msgs
)
2380 if( msgs
->pexTimer
!= NULL
)
2381 event_free( msgs
->pexTimer
);
2383 if( msgs
->incoming
.block
!= NULL
)
2384 evbuffer_free( msgs
->incoming
.block
);
2386 evbuffer_free( msgs
->outMessages
);
2387 tr_free( msgs
->pex6
);
2388 tr_free( msgs
->pex
);
2390 memset( msgs
, ~0, sizeof( tr_peermsgs
) );