2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
10 * $Id: peer-msgs.c 11425 2010-11-16 15:17:34Z charles $
15 #include <limits.h> /* INT_MAX */
23 #include "transmission.h"
26 #include "completion.h"
29 #include "net.h" /* for ECONN */
33 #include "peer-msgs.h"
37 #include "torrent-magnet.h"
51 BT_NOT_INTERESTED
= 3,
60 BT_FEXT_HAVE_ALL
= 14,
61 BT_FEXT_HAVE_NONE
= 15,
63 BT_FEXT_ALLOWED_FAST
= 17,
72 MAX_PEX_PEER_COUNT
= 50,
74 MIN_CHOKE_PERIOD_SEC
= 10,
76 /* idle seconds before we send a keepalive */
77 KEEPALIVE_INTERVAL_SECS
= 100,
79 PEX_INTERVAL_SECS
= 90, /* sec between sendPex() calls */
85 /* used in lowering the outMessages queue period */
86 IMMEDIATE_PRIORITY_INTERVAL_SECS
= 0,
87 HIGH_PRIORITY_INTERVAL_SECS
= 2,
88 LOW_PRIORITY_INTERVAL_SECS
= 10,
90 /* number of pieces to remove from the bitfield when
91 * lazy bitfields are turned on */
92 LAZY_PIECE_COUNT
= 26,
94 /* number of pieces we'll allow in our fast set */
95 MAX_FAST_SET_SIZE
= 3,
97 /* defined in BEP #9 */
98 METADATA_MSG_TYPE_REQUEST
= 0,
99 METADATA_MSG_TYPE_DATA
= 1,
100 METADATA_MSG_TYPE_REJECT
= 2
123 getBlockOffsetInPiece( const tr_torrent
* tor
, uint64_t b
)
125 const uint64_t piecePos
= tor
->info
.pieceSize
* tr_torBlockPiece( tor
, b
);
126 const uint64_t blockPos
= tor
->blockSize
* b
;
127 assert( blockPos
>= piecePos
);
128 return (uint32_t)( blockPos
- piecePos
);
132 blockToReq( const tr_torrent
* tor
,
133 tr_block_index_t block
,
134 struct peer_request
* setme
)
136 assert( setme
!= NULL
);
138 setme
->index
= tr_torBlockPiece( tor
, block
);
139 setme
->offset
= getBlockOffsetInPiece( tor
, block
);
140 setme
->length
= tr_torBlockCountBytes( tor
, block
);
147 /* this is raw, unchanged data from the peer regarding
148 * the current message that it's sending us. */
152 uint32_t length
; /* includes the +1 for id length */
153 struct peer_request blockReq
; /* metadata for incoming blocks */
154 struct evbuffer
* block
; /* piece data for incoming blocks */
158 * Low-level communication state information about a connected peer.
160 * This structure remembers the low-level protocol states that we're
161 * in with this peer, such as active requests, pex messages, and so on.
162 * Its fields are all private to peer-msgs.c.
164 * Data not directly involved with sending & receiving messages is
165 * stored in tr_peer, where it can be accessed by both peermsgs and
168 * @see struct peer_atom
173 tr_bool peerSupportsPex
;
174 tr_bool peerSupportsMetadataXfer
;
175 tr_bool clientSentLtepHandshake
;
176 tr_bool peerSentLtepHandshake
;
178 /*tr_bool haveFastSet;*/
180 int desiredRequestCount
;
184 /* how long the outMessages batch should be allowed to grow before
185 * it's flushed -- some messages (like requests >:) should be sent
186 * very quickly; others aren't as urgent. */
187 int8_t outMessagesBatchPeriod
;
191 uint8_t ut_metadata_id
;
197 tr_piece_index_t fastset
[MAX_FAST_SET_SIZE
];
202 tr_torrent
* torrent
;
204 tr_peer_callback
* callback
;
207 struct evbuffer
* outMessages
; /* all the non-piece messages */
209 struct peer_request peerAskedFor
[REQQ
];
211 int peerAskedForMetadata
[METADATA_REQQ
];
212 int peerAskedForMetadataCount
;
217 /*time_t clientSentPexAt;*/
218 time_t clientSentAnythingAt
;
220 /* when we started batching the outMessages */
221 time_t outMessagesBatchedAt
;
223 struct tr_incoming incoming
;
225 /* if the peer supports the Extension Protocol in BEP 10 and
226 supplied a reqq argument, it's stored here. otherwise the
227 value is zero and should be ignored. */
230 struct event pexTimer
;
239 getHave( const struct tr_peermsgs
* msgs
)
241 if( msgs
->peer
->have
== NULL
)
242 msgs
->peer
->have
= tr_bitfieldNew( msgs
->torrent
->info
.pieceCount
);
243 return msgs
->peer
->have
;
247 static inline tr_session
*
248 getSession( struct tr_peermsgs
* msgs
)
250 return msgs
->torrent
->session
;
258 myDebug( const char * file
, int line
,
259 const struct tr_peermsgs
* msgs
,
260 const char * fmt
, ... )
262 FILE * fp
= tr_getLog( );
268 struct evbuffer
* buf
= evbuffer_new( );
269 char * base
= tr_basename( file
);
271 evbuffer_add_printf( buf
, "[%s] %s - %s [%s]: ",
272 tr_getLogTimeStr( timestr
, sizeof( timestr
) ),
273 tr_torrentName( msgs
->torrent
),
274 tr_peerIoGetAddrStr( msgs
->peer
->io
),
275 msgs
->peer
->client
);
276 va_start( args
, fmt
);
277 evbuffer_add_vprintf( buf
, fmt
, args
);
279 evbuffer_add_printf( buf
, " (%s:%d)\n", base
, line
);
280 /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
281 fwrite( EVBUFFER_DATA( buf
), 1, EVBUFFER_LENGTH( buf
), fp
);
284 evbuffer_free( buf
);
288 #define dbgmsg( msgs, ... ) \
290 if( tr_deepLoggingIsActive( ) ) \
291 myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
299 pokeBatchPeriod( tr_peermsgs
* msgs
,
302 if( msgs
->outMessagesBatchPeriod
> interval
)
304 msgs
->outMessagesBatchPeriod
= interval
;
305 dbgmsg( msgs
, "lowering batch interval to %d seconds", interval
);
310 dbgOutMessageLen( tr_peermsgs
* msgs
)
312 dbgmsg( msgs
, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs
->outMessages
) );
316 protocolSendReject( tr_peermsgs
* msgs
, const struct peer_request
* req
)
318 tr_peerIo
* io
= msgs
->peer
->io
;
319 struct evbuffer
* out
= msgs
->outMessages
;
321 assert( tr_peerIoSupportsFEXT( msgs
->peer
->io
) );
323 tr_peerIoWriteUint32( io
, out
, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
324 tr_peerIoWriteUint8 ( io
, out
, BT_FEXT_REJECT
);
325 tr_peerIoWriteUint32( io
, out
, req
->index
);
326 tr_peerIoWriteUint32( io
, out
, req
->offset
);
327 tr_peerIoWriteUint32( io
, out
, req
->length
);
329 dbgmsg( msgs
, "rejecting %u:%u->%u...", req
->index
, req
->offset
, req
->length
);
330 dbgOutMessageLen( msgs
);
334 protocolSendRequest( tr_peermsgs
* msgs
,
335 const struct peer_request
* req
)
337 tr_peerIo
* io
= msgs
->peer
->io
;
338 struct evbuffer
* out
= msgs
->outMessages
;
340 tr_peerIoWriteUint32( io
, out
, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
341 tr_peerIoWriteUint8 ( io
, out
, BT_REQUEST
);
342 tr_peerIoWriteUint32( io
, out
, req
->index
);
343 tr_peerIoWriteUint32( io
, out
, req
->offset
);
344 tr_peerIoWriteUint32( io
, out
, req
->length
);
346 dbgmsg( msgs
, "requesting %u:%u->%u...", req
->index
, req
->offset
, req
->length
);
347 dbgOutMessageLen( msgs
);
348 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
352 protocolSendCancel( tr_peermsgs
* msgs
,
353 const struct peer_request
* req
)
355 tr_peerIo
* io
= msgs
->peer
->io
;
356 struct evbuffer
* out
= msgs
->outMessages
;
358 tr_peerIoWriteUint32( io
, out
, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
359 tr_peerIoWriteUint8 ( io
, out
, BT_CANCEL
);
360 tr_peerIoWriteUint32( io
, out
, req
->index
);
361 tr_peerIoWriteUint32( io
, out
, req
->offset
);
362 tr_peerIoWriteUint32( io
, out
, req
->length
);
364 dbgmsg( msgs
, "cancelling %u:%u->%u...", req
->index
, req
->offset
, req
->length
);
365 dbgOutMessageLen( msgs
);
366 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
370 protocolSendPort(tr_peermsgs
*msgs
, uint16_t port
)
372 tr_peerIo
* io
= msgs
->peer
->io
;
373 struct evbuffer
* out
= msgs
->outMessages
;
375 dbgmsg( msgs
, "sending Port %u", port
);
376 tr_peerIoWriteUint32( io
, out
, 3 );
377 tr_peerIoWriteUint8 ( io
, out
, BT_PORT
);
378 tr_peerIoWriteUint16( io
, out
, port
);
382 protocolSendHave( tr_peermsgs
* msgs
,
385 tr_peerIo
* io
= msgs
->peer
->io
;
386 struct evbuffer
* out
= msgs
->outMessages
;
388 tr_peerIoWriteUint32( io
, out
, sizeof(uint8_t) + sizeof(uint32_t) );
389 tr_peerIoWriteUint8 ( io
, out
, BT_HAVE
);
390 tr_peerIoWriteUint32( io
, out
, index
);
392 dbgmsg( msgs
, "sending Have %u", index
);
393 dbgOutMessageLen( msgs
);
394 pokeBatchPeriod( msgs
, LOW_PRIORITY_INTERVAL_SECS
);
399 protocolSendAllowedFast( tr_peermsgs
* msgs
, uint32_t pieceIndex
)
401 tr_peerIo
* io
= msgs
->peer
->io
;
402 struct evbuffer
* out
= msgs
->outMessages
;
404 assert( tr_peerIoSupportsFEXT( msgs
->peer
->io
) );
406 tr_peerIoWriteUint32( io
, out
, sizeof(uint8_t) + sizeof(uint32_t) );
407 tr_peerIoWriteUint8 ( io
, out
, BT_FEXT_ALLOWED_FAST
);
408 tr_peerIoWriteUint32( io
, out
, pieceIndex
);
410 dbgmsg( msgs
, "sending Allowed Fast %u...", pieceIndex
);
411 dbgOutMessageLen( msgs
);
416 protocolSendChoke( tr_peermsgs
* msgs
,
419 tr_peerIo
* io
= msgs
->peer
->io
;
420 struct evbuffer
* out
= msgs
->outMessages
;
422 tr_peerIoWriteUint32( io
, out
, sizeof( uint8_t ) );
423 tr_peerIoWriteUint8 ( io
, out
, choke
? BT_CHOKE
: BT_UNCHOKE
);
425 dbgmsg( msgs
, "sending %s...", choke
? "Choke" : "Unchoke" );
426 dbgOutMessageLen( msgs
);
427 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
431 protocolSendHaveAll( tr_peermsgs
* msgs
)
433 tr_peerIo
* io
= msgs
->peer
->io
;
434 struct evbuffer
* out
= msgs
->outMessages
;
436 assert( tr_peerIoSupportsFEXT( msgs
->peer
->io
) );
438 tr_peerIoWriteUint32( io
, out
, sizeof( uint8_t ) );
439 tr_peerIoWriteUint8 ( io
, out
, BT_FEXT_HAVE_ALL
);
441 dbgmsg( msgs
, "sending HAVE_ALL..." );
442 dbgOutMessageLen( msgs
);
443 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
447 protocolSendHaveNone( tr_peermsgs
* msgs
)
449 tr_peerIo
* io
= msgs
->peer
->io
;
450 struct evbuffer
* out
= msgs
->outMessages
;
452 assert( tr_peerIoSupportsFEXT( msgs
->peer
->io
) );
454 tr_peerIoWriteUint32( io
, out
, sizeof( uint8_t ) );
455 tr_peerIoWriteUint8 ( io
, out
, BT_FEXT_HAVE_NONE
);
457 dbgmsg( msgs
, "sending HAVE_NONE..." );
458 dbgOutMessageLen( msgs
);
459 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
466 static const tr_peer_event blankEvent
= { 0, 0, 0, 0, 0.0f
, 0, 0, 0 };
469 publish( tr_peermsgs
* msgs
, tr_peer_event
* e
)
471 assert( msgs
->peer
);
472 assert( msgs
->peer
->msgs
== msgs
);
474 if( msgs
->callback
!= NULL
)
475 msgs
->callback( msgs
->peer
, e
, msgs
->callbackData
);
479 fireError( tr_peermsgs
* msgs
, int err
)
481 tr_peer_event e
= blankEvent
;
482 e
.eventType
= TR_PEER_ERROR
;
488 firePeerProgress( tr_peermsgs
* msgs
)
490 tr_peer_event e
= blankEvent
;
491 e
.eventType
= TR_PEER_PEER_PROGRESS
;
492 e
.progress
= msgs
->peer
->progress
;
497 fireGotBlock( tr_peermsgs
* msgs
, const struct peer_request
* req
)
499 tr_peer_event e
= blankEvent
;
500 e
.eventType
= TR_PEER_CLIENT_GOT_BLOCK
;
501 e
.pieceIndex
= req
->index
;
502 e
.offset
= req
->offset
;
503 e
.length
= req
->length
;
508 fireGotRej( tr_peermsgs
* msgs
, const struct peer_request
* req
)
510 tr_peer_event e
= blankEvent
;
511 e
.eventType
= TR_PEER_CLIENT_GOT_REJ
;
512 e
.pieceIndex
= req
->index
;
513 e
.offset
= req
->offset
;
514 e
.length
= req
->length
;
519 fireGotChoke( tr_peermsgs
* msgs
)
521 tr_peer_event e
= blankEvent
;
522 e
.eventType
= TR_PEER_CLIENT_GOT_CHOKE
;
527 fireClientGotData( tr_peermsgs
* msgs
,
531 tr_peer_event e
= blankEvent
;
534 e
.eventType
= TR_PEER_CLIENT_GOT_DATA
;
535 e
.wasPieceData
= wasPieceData
;
540 fireClientGotSuggest( tr_peermsgs
* msgs
, uint32_t pieceIndex
)
542 tr_peer_event e
= blankEvent
;
543 e
.eventType
= TR_PEER_CLIENT_GOT_SUGGEST
;
544 e
.pieceIndex
= pieceIndex
;
549 fireClientGotPort( tr_peermsgs
* msgs
, tr_port port
)
551 tr_peer_event e
= blankEvent
;
552 e
.eventType
= TR_PEER_CLIENT_GOT_PORT
;
558 fireClientGotAllowedFast( tr_peermsgs
* msgs
, uint32_t pieceIndex
)
560 tr_peer_event e
= blankEvent
;
561 e
.eventType
= TR_PEER_CLIENT_GOT_ALLOWED_FAST
;
562 e
.pieceIndex
= pieceIndex
;
567 firePeerGotData( tr_peermsgs
* msgs
,
571 tr_peer_event e
= blankEvent
;
574 e
.eventType
= TR_PEER_PEER_GOT_DATA
;
575 e
.wasPieceData
= wasPieceData
;
582 *** For explanation, see http://www.bittorrent.org/beps/bep_0006.html
586 tr_generateAllowedSet( tr_piece_index_t
* setmePieces
,
587 size_t desiredSetSize
,
589 const uint8_t * infohash
,
590 const tr_address
* addr
)
594 assert( setmePieces
);
595 assert( desiredSetSize
<= pieceCount
);
596 assert( desiredSetSize
);
597 assert( pieceCount
);
601 if( addr
->type
== TR_AF_INET
)
603 uint8_t w
[SHA_DIGEST_LENGTH
+ 4], *walk
=w
;
604 uint8_t x
[SHA_DIGEST_LENGTH
];
606 uint32_t ui32
= ntohl( htonl( addr
->addr
.addr4
.s_addr
) & 0xffffff00 ); /* (1) */
607 memcpy( w
, &ui32
, sizeof( uint32_t ) );
608 walk
+= sizeof( uint32_t );
609 memcpy( walk
, infohash
, SHA_DIGEST_LENGTH
); /* (2) */
610 walk
+= SHA_DIGEST_LENGTH
;
611 tr_sha1( x
, w
, walk
-w
, NULL
); /* (3) */
612 assert( sizeof( w
) == walk
-w
);
614 while( setSize
<desiredSetSize
)
617 for( i
=0; i
<5 && setSize
<desiredSetSize
; ++i
) /* (4) */
620 uint32_t j
= i
* 4; /* (5) */
621 uint32_t y
= ntohl( *( uint32_t* )( x
+ j
) ); /* (6) */
622 uint32_t index
= y
% pieceCount
; /* (7) */
624 for( k
=0; k
<setSize
; ++k
) /* (8) */
625 if( setmePieces
[k
] == index
)
629 setmePieces
[setSize
++] = index
; /* (9) */
632 tr_sha1( x
, x
, sizeof( x
), NULL
); /* (3) */
640 updateFastSet( tr_peermsgs
* msgs UNUSED
)
643 const tr_bool fext
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
644 const int peerIsNeedy
= msgs
->peer
->progress
< 0.10;
646 if( fext
&& peerIsNeedy
&& !msgs
->haveFastSet
)
649 const struct tr_address
* addr
= tr_peerIoGetAddress( msgs
->peer
->io
, NULL
);
650 const tr_info
* inf
= &msgs
->torrent
->info
;
651 const size_t numwant
= MIN( MAX_FAST_SET_SIZE
, inf
->pieceCount
);
653 /* build the fast set */
654 msgs
->fastsetSize
= tr_generateAllowedSet( msgs
->fastset
, numwant
, inf
->pieceCount
, inf
->hash
, addr
);
655 msgs
->haveFastSet
= 1;
657 /* send it to the peer */
658 for( i
=0; i
<msgs
->fastsetSize
; ++i
)
659 protocolSendAllowedFast( msgs
, msgs
->fastset
[i
] );
669 sendInterest( tr_peermsgs
* msgs
, tr_bool clientIsInterested
)
671 struct evbuffer
* out
= msgs
->outMessages
;
674 assert( tr_isBool( clientIsInterested
) );
676 msgs
->peer
->clientIsInterested
= clientIsInterested
;
677 dbgmsg( msgs
, "Sending %s", clientIsInterested
? "Interested" : "Not Interested" );
678 tr_peerIoWriteUint32( msgs
->peer
->io
, out
, sizeof( uint8_t ) );
679 tr_peerIoWriteUint8 ( msgs
->peer
->io
, out
, clientIsInterested
? BT_INTERESTED
: BT_NOT_INTERESTED
);
681 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
682 dbgOutMessageLen( msgs
);
686 updateInterest( tr_peermsgs
* msgs UNUSED
)
688 /* FIXME -- might need to poke the mgr on startup */
692 tr_peerMsgsSetInterested( tr_peermsgs
* msgs
, int isInterested
)
694 assert( tr_isBool( isInterested
) );
696 if( isInterested
!= msgs
->peer
->clientIsInterested
)
697 sendInterest( msgs
, isInterested
);
701 popNextMetadataRequest( tr_peermsgs
* msgs
, int * piece
)
703 if( msgs
->peerAskedForMetadataCount
== 0 )
706 *piece
= msgs
->peerAskedForMetadata
[0];
708 tr_removeElementFromArray( msgs
->peerAskedForMetadata
, 0, sizeof( int ),
709 msgs
->peerAskedForMetadataCount
-- );
715 popNextRequest( tr_peermsgs
* msgs
, struct peer_request
* setme
)
717 if( msgs
->peer
->pendingReqsToClient
== 0 )
720 *setme
= msgs
->peerAskedFor
[0];
722 tr_removeElementFromArray( msgs
->peerAskedFor
, 0, sizeof( struct peer_request
),
723 msgs
->peer
->pendingReqsToClient
-- );
729 cancelAllRequestsToClient( tr_peermsgs
* msgs
)
731 struct peer_request req
;
732 const int mustSendCancel
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
734 while( popNextRequest( msgs
, &req
))
736 protocolSendReject( msgs
, &req
);
740 tr_peerMsgsSetChoke( tr_peermsgs
* msgs
,
743 const time_t now
= tr_time( );
744 const time_t fibrillationTime
= now
- MIN_CHOKE_PERIOD_SEC
;
747 assert( msgs
->peer
);
748 assert( choke
== 0 || choke
== 1 );
750 if( msgs
->peer
->chokeChangedAt
> fibrillationTime
)
752 dbgmsg( msgs
, "Not changing choke to %d to avoid fibrillation", choke
);
754 else if( msgs
->peer
->peerIsChoked
!= choke
)
756 msgs
->peer
->peerIsChoked
= choke
;
758 cancelAllRequestsToClient( msgs
);
759 protocolSendChoke( msgs
, choke
);
760 msgs
->peer
->chokeChangedAt
= now
;
769 tr_peerMsgsHave( tr_peermsgs
* msgs
,
772 protocolSendHave( msgs
, index
);
774 /* since we have more pieces now, we might not be interested in this peer */
775 updateInterest( msgs
);
783 reqIsValid( const tr_peermsgs
* peer
,
788 return tr_torrentReqIsValid( peer
->torrent
, index
, offset
, length
);
792 requestIsValid( const tr_peermsgs
* msgs
, const struct peer_request
* req
)
794 return reqIsValid( msgs
, req
->index
, req
->offset
, req
->length
);
798 tr_peerMsgsCancel( tr_peermsgs
* msgs
, tr_block_index_t block
)
800 struct peer_request req
;
801 /*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
802 blockToReq( msgs
->torrent
, block
, &req
);
803 protocolSendCancel( msgs
, &req
);
811 sendLtepHandshake( tr_peermsgs
* msgs
)
817 tr_bool allow_metadata_xfer
;
818 struct evbuffer
* out
= msgs
->outMessages
;
819 const unsigned char * ipv6
= tr_globalIPv6();
821 if( msgs
->clientSentLtepHandshake
)
824 dbgmsg( msgs
, "sending an ltep handshake" );
825 msgs
->clientSentLtepHandshake
= 1;
827 /* decide if we want to advertise metadata xfer support (BEP 9) */
828 if( tr_torrentIsPrivate( msgs
->torrent
) )
829 allow_metadata_xfer
= 0;
831 allow_metadata_xfer
= 1;
833 /* decide if we want to advertise pex support */
834 if( !tr_torrentAllowsPex( msgs
->torrent
) )
836 else if( msgs
->peerSentLtepHandshake
)
837 allow_pex
= msgs
->peerSupportsPex
? 1 : 0;
841 tr_bencInitDict( &val
, 8 );
842 tr_bencDictAddInt( &val
, "e", getSession(msgs
)->encryptionMode
!= TR_CLEAR_PREFERRED
);
844 tr_bencDictAddRaw( &val
, "ipv6", ipv6
, 16 );
845 if( allow_metadata_xfer
&& tr_torrentHasMetadata( msgs
->torrent
)
846 && ( msgs
->torrent
->infoDictLength
> 0 ) )
847 tr_bencDictAddInt( &val
, "metadata_size", msgs
->torrent
->infoDictLength
);
848 tr_bencDictAddInt( &val
, "p", tr_sessionGetPublicPeerPort( getSession(msgs
) ) );
849 tr_bencDictAddInt( &val
, "reqq", REQQ
);
850 tr_bencDictAddInt( &val
, "upload_only", tr_torrentIsSeed( msgs
->torrent
) );
851 tr_bencDictAddStr( &val
, "v", TR_NAME
" " USERAGENT_PREFIX
);
852 m
= tr_bencDictAddDict( &val
, "m", 2 );
853 if( allow_metadata_xfer
)
854 tr_bencDictAddInt( m
, "ut_metadata", UT_METADATA_ID
);
856 tr_bencDictAddInt( m
, "ut_pex", UT_PEX_ID
);
858 buf
= tr_bencToStr( &val
, TR_FMT_BENC
, &len
);
860 tr_peerIoWriteUint32( msgs
->peer
->io
, out
, 2 * sizeof( uint8_t ) + len
);
861 tr_peerIoWriteUint8 ( msgs
->peer
->io
, out
, BT_LTEP
);
862 tr_peerIoWriteUint8 ( msgs
->peer
->io
, out
, LTEP_HANDSHAKE
);
863 tr_peerIoWriteBytes ( msgs
->peer
->io
, out
, buf
, len
);
864 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
865 dbgOutMessageLen( msgs
);
873 parseLtepHandshake( tr_peermsgs
* msgs
,
875 struct evbuffer
* inbuf
)
879 uint8_t * tmp
= tr_new( uint8_t, len
);
883 int8_t seedProbability
= -1;
885 memset( &pex
, 0, sizeof( tr_pex
) );
887 tr_peerIoReadBytes( msgs
->peer
->io
, inbuf
, tmp
, len
);
888 msgs
->peerSentLtepHandshake
= 1;
890 if( tr_bencLoad( tmp
, len
, &val
, NULL
) || !tr_bencIsDict( &val
) )
892 dbgmsg( msgs
, "GET extended-handshake, couldn't get dictionary" );
897 dbgmsg( msgs
, "here is the handshake: [%*.*s]", len
, len
, tmp
);
899 /* does the peer prefer encrypted connections? */
900 if( tr_bencDictFindInt( &val
, "e", &i
) ) {
901 msgs
->peer
->encryption_preference
= i
? ENCRYPTION_PREFERENCE_YES
902 : ENCRYPTION_PREFERENCE_NO
;
904 pex
.flags
|= ADDED_F_ENCRYPTION_FLAG
;
907 /* check supported messages for utorrent pex */
908 msgs
->peerSupportsPex
= 0;
909 msgs
->peerSupportsMetadataXfer
= 0;
911 if( tr_bencDictFindDict( &val
, "m", &sub
) ) {
912 if( tr_bencDictFindInt( sub
, "ut_pex", &i
) ) {
913 msgs
->peerSupportsPex
= i
!= 0;
914 msgs
->ut_pex_id
= (uint8_t) i
;
915 dbgmsg( msgs
, "msgs->ut_pex is %d", (int)msgs
->ut_pex_id
);
917 if( tr_bencDictFindInt( sub
, "ut_metadata", &i
) ) {
918 msgs
->peerSupportsMetadataXfer
= i
!= 0;
919 msgs
->ut_metadata_id
= (uint8_t) i
;
920 dbgmsg( msgs
, "msgs->ut_metadata_id is %d", (int)msgs
->ut_metadata_id
);
924 /* look for metainfo size (BEP 9) */
925 if( tr_bencDictFindInt( &val
, "metadata_size", &i
) )
926 tr_torrentSetMetadataSizeHint( msgs
->torrent
, i
);
928 /* look for upload_only (BEP 21) */
929 if( tr_bencDictFindInt( &val
, "upload_only", &i
) )
930 seedProbability
= i
==0 ? 0 : 100;
932 /* get peer's listening port */
933 if( tr_bencDictFindInt( &val
, "p", &i
) ) {
934 pex
.port
= htons( (uint16_t)i
);
935 fireClientGotPort( msgs
, pex
.port
);
936 dbgmsg( msgs
, "peer's port is now %d", (int)i
);
939 if( tr_peerIoIsIncoming( msgs
->peer
->io
)
940 && tr_bencDictFindRaw( &val
, "ipv4", &addr
, &addr_len
)
941 && ( addr_len
== 4 ) )
943 pex
.addr
.type
= TR_AF_INET
;
944 memcpy( &pex
.addr
.addr
.addr4
, addr
, 4 );
945 tr_peerMgrAddPex( msgs
->torrent
, TR_PEER_FROM_LTEP
, &pex
, seedProbability
);
948 if( tr_peerIoIsIncoming( msgs
->peer
->io
)
949 && tr_bencDictFindRaw( &val
, "ipv6", &addr
, &addr_len
)
950 && ( addr_len
== 16 ) )
952 pex
.addr
.type
= TR_AF_INET6
;
953 memcpy( &pex
.addr
.addr
.addr6
, addr
, 16 );
954 tr_peerMgrAddPex( msgs
->torrent
, TR_PEER_FROM_LTEP
, &pex
, seedProbability
);
957 /* get peer's maximum request queue size */
958 if( tr_bencDictFindInt( &val
, "reqq", &i
) )
966 parseUtMetadata( tr_peermsgs
* msgs
, int msglen
, struct evbuffer
* inbuf
)
971 int64_t msg_type
= -1;
973 int64_t total_size
= 0;
974 uint8_t * tmp
= tr_new( uint8_t, msglen
);
976 tr_peerIoReadBytes( msgs
->peer
->io
, inbuf
, tmp
, msglen
);
977 msg_end
= (char*)tmp
+ msglen
;
979 if( !tr_bencLoad( tmp
, msglen
, &dict
, &benc_end
) )
981 tr_bencDictFindInt( &dict
, "msg_type", &msg_type
);
982 tr_bencDictFindInt( &dict
, "piece", &piece
);
983 tr_bencDictFindInt( &dict
, "total_size", &total_size
);
984 tr_bencFree( &dict
);
987 dbgmsg( msgs
, "got ut_metadata msg: type %d, piece %d, total_size %d",
988 (int)msg_type
, (int)piece
, (int)total_size
);
990 if( msg_type
== METADATA_MSG_TYPE_REJECT
)
995 if( ( msg_type
== METADATA_MSG_TYPE_DATA
)
996 && ( !tr_torrentHasMetadata( msgs
->torrent
) )
997 && ( msg_end
- benc_end
<= METADATA_PIECE_SIZE
)
998 && ( piece
* METADATA_PIECE_SIZE
+ (msg_end
- benc_end
) <= total_size
) )
1000 const int pieceLen
= msg_end
- benc_end
;
1001 tr_torrentSetMetadataPiece( msgs
->torrent
, piece
, benc_end
, pieceLen
);
1004 if( msg_type
== METADATA_MSG_TYPE_REQUEST
)
1007 && tr_torrentHasMetadata( msgs
->torrent
)
1008 && !tr_torrentIsPrivate( msgs
->torrent
)
1009 && ( msgs
->peerAskedForMetadataCount
< METADATA_REQQ
) )
1011 msgs
->peerAskedForMetadata
[msgs
->peerAskedForMetadataCount
++] = piece
;
1018 tr_peerIo
* io
= msgs
->peer
->io
;
1019 struct evbuffer
* out
= msgs
->outMessages
;
1021 /* build the rejection message */
1022 tr_bencInitDict( &tmp
, 2 );
1023 tr_bencDictAddInt( &tmp
, "msg_type", METADATA_MSG_TYPE_REJECT
);
1024 tr_bencDictAddInt( &tmp
, "piece", piece
);
1025 payload
= tr_bencToStr( &tmp
, TR_FMT_BENC
, &payloadLen
);
1026 tr_bencFree( &tmp
);
1028 /* write it out as a LTEP message to our outMessages buffer */
1029 tr_peerIoWriteUint32( io
, out
, 2 * sizeof( uint8_t ) + payloadLen
);
1030 tr_peerIoWriteUint8 ( io
, out
, BT_LTEP
);
1031 tr_peerIoWriteUint8 ( io
, out
, msgs
->ut_metadata_id
);
1032 tr_peerIoWriteBytes ( io
, out
, payload
, payloadLen
);
1033 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
1034 dbgOutMessageLen( msgs
);
1044 parseUtPex( tr_peermsgs
* msgs
, int msglen
, struct evbuffer
* inbuf
)
1047 uint8_t * tmp
= tr_new( uint8_t, msglen
);
1049 tr_torrent
* tor
= msgs
->torrent
;
1050 const uint8_t * added
;
1053 tr_peerIoReadBytes( msgs
->peer
->io
, inbuf
, tmp
, msglen
);
1055 if( tr_torrentAllowsPex( tor
)
1056 && ( ( loaded
= !tr_bencLoad( tmp
, msglen
, &val
, NULL
) ) ) )
1058 if( tr_bencDictFindRaw( &val
, "added", &added
, &added_len
) )
1062 size_t added_f_len
= 0;
1063 const uint8_t * added_f
= NULL
;
1065 tr_bencDictFindRaw( &val
, "added.f", &added_f
, &added_f_len
);
1066 pex
= tr_peerMgrCompactToPex( added
, added_len
, added_f
, added_f_len
, &n
);
1068 n
= MIN( n
, MAX_PEX_PEER_COUNT
);
1069 for( i
=0; i
<n
; ++i
)
1071 int seedProbability
= -1;
1072 if( i
< added_f_len
) seedProbability
= ( added_f
[i
] & ADDED_F_SEED_FLAG
) ? 100 : 0;
1073 tr_peerMgrAddPex( tor
, TR_PEER_FROM_PEX
, pex
+i
, seedProbability
);
1079 if( tr_bencDictFindRaw( &val
, "added6", &added
, &added_len
) )
1083 size_t added_f_len
= 0;
1084 const uint8_t * added_f
= NULL
;
1086 tr_bencDictFindRaw( &val
, "added6.f", &added_f
, &added_f_len
);
1087 pex
= tr_peerMgrCompact6ToPex( added
, added_len
, added_f
, added_f_len
, &n
);
1089 n
= MIN( n
, MAX_PEX_PEER_COUNT
);
1090 for( i
=0; i
<n
; ++i
)
1092 int seedProbability
= -1;
1093 if( i
< added_f_len
) seedProbability
= ( added_f
[i
] & ADDED_F_SEED_FLAG
) ? 100 : 0;
1094 tr_peerMgrAddPex( tor
, TR_PEER_FROM_PEX
, pex
+i
, seedProbability
);
1102 tr_bencFree( &val
);
1106 static void sendPex( tr_peermsgs
* msgs
);
1109 parseLtep( tr_peermsgs
* msgs
, int msglen
, struct evbuffer
* inbuf
)
1113 tr_peerIoReadUint8( msgs
->peer
->io
, inbuf
, <ep_msgid
);
1116 if( ltep_msgid
== LTEP_HANDSHAKE
)
1118 dbgmsg( msgs
, "got ltep handshake" );
1119 parseLtepHandshake( msgs
, msglen
, inbuf
);
1120 if( tr_peerIoSupportsLTEP( msgs
->peer
->io
) )
1122 sendLtepHandshake( msgs
);
1126 else if( ltep_msgid
== UT_PEX_ID
)
1128 dbgmsg( msgs
, "got ut pex" );
1129 msgs
->peerSupportsPex
= 1;
1130 parseUtPex( msgs
, msglen
, inbuf
);
1132 else if( ltep_msgid
== UT_METADATA_ID
)
1134 dbgmsg( msgs
, "got ut metadata" );
1135 msgs
->peerSupportsMetadataXfer
= 1;
1136 parseUtMetadata( msgs
, msglen
, inbuf
);
1140 dbgmsg( msgs
, "skipping unknown ltep message (%d)", (int)ltep_msgid
);
1141 evbuffer_drain( inbuf
, msglen
);
1146 readBtLength( tr_peermsgs
* msgs
,
1147 struct evbuffer
* inbuf
,
1152 if( inlen
< sizeof( len
) )
1155 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &len
);
1157 if( len
== 0 ) /* peer sent us a keepalive message */
1158 dbgmsg( msgs
, "got KeepAlive" );
1161 msgs
->incoming
.length
= len
;
1162 msgs
->state
= AWAITING_BT_ID
;
1168 static int readBtMessage( tr_peermsgs
* msgs
,
1169 struct evbuffer
* inbuf
,
1173 readBtId( tr_peermsgs
* msgs
, struct evbuffer
* inbuf
, size_t inlen
)
1177 if( inlen
< sizeof( uint8_t ) )
1180 tr_peerIoReadUint8( msgs
->peer
->io
, inbuf
, &id
);
1181 msgs
->incoming
.id
= id
;
1182 dbgmsg( msgs
, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id
, (size_t)msgs
->incoming
.length
);
1184 if( id
== BT_PIECE
)
1186 msgs
->state
= AWAITING_BT_PIECE
;
1189 else if( msgs
->incoming
.length
!= 1 )
1191 msgs
->state
= AWAITING_BT_MESSAGE
;
1194 else return readBtMessage( msgs
, inbuf
, inlen
- 1 );
1198 updatePeerProgress( tr_peermsgs
* msgs
)
1200 msgs
->peer
->progress
= tr_bitsetPercent( &msgs
->peer
->have
);
1201 dbgmsg( msgs
, "peer progress is %f", msgs
->peer
->progress
);
1202 updateFastSet( msgs
);
1203 updateInterest( msgs
);
1204 firePeerProgress( msgs
);
1208 prefetchPieces( tr_peermsgs
*msgs
)
1212 /* Maintain 12 prefetched blocks per unchoked peer */
1213 for( i
=msgs
->prefetchCount
; i
<msgs
->peer
->pendingReqsToClient
&& i
<12; ++i
)
1215 const struct peer_request
* req
= msgs
->peerAskedFor
+ i
;
1216 if( requestIsValid( msgs
, req
) )
1218 tr_cachePrefetchBlock( getSession(msgs
)->cache
, msgs
->torrent
, req
->index
, req
->offset
, req
->length
);
1219 ++msgs
->prefetchCount
;
1225 peerMadeRequest( tr_peermsgs
* msgs
,
1226 const struct peer_request
* req
)
1228 const tr_bool fext
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
1229 const int reqIsValid
= requestIsValid( msgs
, req
);
1230 const int clientHasPiece
= reqIsValid
&& tr_cpPieceIsComplete( &msgs
->torrent
->completion
, req
->index
);
1231 const int peerIsChoked
= msgs
->peer
->peerIsChoked
;
1236 dbgmsg( msgs
, "rejecting an invalid request." );
1237 else if( !clientHasPiece
)
1238 dbgmsg( msgs
, "rejecting request for a piece we don't have." );
1239 else if( peerIsChoked
)
1240 dbgmsg( msgs
, "rejecting request from choked peer" );
1241 else if( msgs
->peer
->pendingReqsToClient
+ 1 >= REQQ
)
1242 dbgmsg( msgs
, "rejecting request ... reqq is full" );
1247 msgs
->peerAskedFor
[msgs
->peer
->pendingReqsToClient
++] = *req
;
1248 prefetchPieces( msgs
);
1250 protocolSendReject( msgs
, req
);
1255 messageLengthIsCorrect( const tr_peermsgs
* msg
, uint8_t id
, uint32_t len
)
1262 case BT_NOT_INTERESTED
:
1263 case BT_FEXT_HAVE_ALL
:
1264 case BT_FEXT_HAVE_NONE
:
1268 case BT_FEXT_SUGGEST
:
1269 case BT_FEXT_ALLOWED_FAST
:
1273 return len
== ( msg
->torrent
->info
.pieceCount
+ 7u ) / 8u + 1u;
1277 case BT_FEXT_REJECT
:
1281 return len
> 9 && len
<= 16393;
1294 static int clientGotBlock( tr_peermsgs
* msgs
,
1295 const uint8_t * block
,
1296 const struct peer_request
* req
);
1299 readBtPiece( tr_peermsgs
* msgs
,
1300 struct evbuffer
* inbuf
,
1302 size_t * setme_piece_bytes_read
)
1304 struct peer_request
* req
= &msgs
->incoming
.blockReq
;
1306 assert( EVBUFFER_LENGTH( inbuf
) >= inlen
);
1307 dbgmsg( msgs
, "In readBtPiece" );
1314 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &req
->index
);
1315 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &req
->offset
);
1316 req
->length
= msgs
->incoming
.length
- 9;
1317 dbgmsg( msgs
, "got incoming block header %u:%u->%u", req
->index
, req
->offset
, req
->length
);
1324 /* read in another chunk of data */
1325 const size_t nLeft
= req
->length
- EVBUFFER_LENGTH( msgs
->incoming
.block
);
1326 size_t n
= MIN( nLeft
, inlen
);
1328 void * buf
= tr_sessionGetBuffer( getSession( msgs
) );
1329 const size_t buflen
= SESSION_BUFFER_SIZE
;
1333 const size_t thisPass
= MIN( i
, buflen
);
1334 tr_peerIoReadBytes( msgs
->peer
->io
, inbuf
, buf
, thisPass
);
1335 evbuffer_add( msgs
->incoming
.block
, buf
, thisPass
);
1339 tr_sessionReleaseBuffer( getSession( msgs
) );
1342 fireClientGotData( msgs
, n
, TRUE
);
1343 *setme_piece_bytes_read
+= n
;
1344 dbgmsg( msgs
, "got %zu bytes for block %u:%u->%u ... %d remain",
1345 n
, req
->index
, req
->offset
, req
->length
,
1346 (int)( req
->length
- EVBUFFER_LENGTH( msgs
->incoming
.block
) ) );
1347 if( EVBUFFER_LENGTH( msgs
->incoming
.block
) < req
->length
)
1350 /* we've got the whole block ... process it */
1351 err
= clientGotBlock( msgs
, EVBUFFER_DATA( msgs
->incoming
.block
), req
);
1354 evbuffer_free( msgs
->incoming
.block
);
1355 msgs
->incoming
.block
= evbuffer_new( );
1357 msgs
->state
= AWAITING_BT_LENGTH
;
1358 return err
? READ_ERR
: READ_NOW
;
1362 static void updateDesiredRequestCount( tr_peermsgs
* msgs
);
1365 readBtMessage( tr_peermsgs
* msgs
, struct evbuffer
* inbuf
, size_t inlen
)
1368 uint32_t msglen
= msgs
->incoming
.length
;
1369 const uint8_t id
= msgs
->incoming
.id
;
1371 const size_t startBufLen
= EVBUFFER_LENGTH( inbuf
);
1373 const tr_bool fext
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
1375 --msglen
; /* id length */
1377 dbgmsg( msgs
, "got BT id %d, len %d, buffer size is %zu", (int)id
, (int)msglen
, inlen
);
1379 if( inlen
< msglen
)
1382 if( !messageLengthIsCorrect( msgs
, id
, msglen
+ 1 ) )
1384 dbgmsg( msgs
, "bad packet - BT message #%d with a length of %d", (int)id
, (int)msglen
);
1385 fireError( msgs
, EMSGSIZE
);
1392 dbgmsg( msgs
, "got Choke" );
1393 msgs
->peer
->clientIsChoked
= 1;
1395 fireGotChoke( msgs
);
1399 dbgmsg( msgs
, "got Unchoke" );
1400 msgs
->peer
->clientIsChoked
= 0;
1401 updateDesiredRequestCount( msgs
);
1405 dbgmsg( msgs
, "got Interested" );
1406 msgs
->peer
->peerIsInterested
= 1;
1409 case BT_NOT_INTERESTED
:
1410 dbgmsg( msgs
, "got Not Interested" );
1411 msgs
->peer
->peerIsInterested
= 0;
1415 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &ui32
);
1416 dbgmsg( msgs
, "got Have: %u", ui32
);
1417 if( tr_torrentHasMetadata( msgs
->torrent
)
1418 && ( ui32
>= msgs
->torrent
->info
.pieceCount
) )
1420 fireError( msgs
, ERANGE
);
1423 if( tr_bitsetAdd( &msgs
->peer
->have
, ui32
) )
1425 fireError( msgs
, ERANGE
);
1428 updatePeerProgress( msgs
);
1432 const size_t bitCount
= tr_torrentHasMetadata( msgs
->torrent
)
1433 ? msgs
->torrent
->info
.pieceCount
1435 dbgmsg( msgs
, "got a bitfield" );
1436 tr_bitsetReserve( &msgs
->peer
->have
, bitCount
);
1437 tr_peerIoReadBytes( msgs
->peer
->io
, inbuf
,
1438 msgs
->peer
->have
.bitfield
.bits
, msglen
);
1439 updatePeerProgress( msgs
);
1445 struct peer_request r
;
1446 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.index
);
1447 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.offset
);
1448 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.length
);
1449 dbgmsg( msgs
, "got Request: %u:%u->%u", r
.index
, r
.offset
, r
.length
);
1450 peerMadeRequest( msgs
, &r
);
1457 struct peer_request r
;
1458 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.index
);
1459 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.offset
);
1460 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.length
);
1461 tr_historyAdd( msgs
->peer
->cancelsSentToClient
, tr_time( ), 1 );
1462 dbgmsg( msgs
, "got a Cancel %u:%u->%u", r
.index
, r
.offset
, r
.length
);
1464 for( i
=0; i
<msgs
->peer
->pendingReqsToClient
; ++i
) {
1465 const struct peer_request
* req
= msgs
->peerAskedFor
+ i
;
1466 if( ( req
->index
== r
.index
) && ( req
->offset
== r
.offset
) && ( req
->length
== r
.length
) )
1470 if( i
< msgs
->peer
->pendingReqsToClient
)
1471 tr_removeElementFromArray( msgs
->peerAskedFor
, i
, sizeof( struct peer_request
),
1472 msgs
->peer
->pendingReqsToClient
-- );
1477 assert( 0 ); /* handled elsewhere! */
1481 dbgmsg( msgs
, "Got a BT_PORT" );
1482 tr_peerIoReadUint16( msgs
->peer
->io
, inbuf
, &msgs
->peer
->dht_port
);
1483 if( msgs
->peer
->dht_port
> 0 )
1484 tr_dhtAddNode( getSession(msgs
),
1485 tr_peerAddress( msgs
->peer
),
1486 msgs
->peer
->dht_port
, 0 );
1489 case BT_FEXT_SUGGEST
:
1490 dbgmsg( msgs
, "Got a BT_FEXT_SUGGEST" );
1491 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &ui32
);
1493 fireClientGotSuggest( msgs
, ui32
);
1495 fireError( msgs
, EMSGSIZE
);
1500 case BT_FEXT_ALLOWED_FAST
:
1501 dbgmsg( msgs
, "Got a BT_FEXT_ALLOWED_FAST" );
1502 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &ui32
);
1504 fireClientGotAllowedFast( msgs
, ui32
);
1506 fireError( msgs
, EMSGSIZE
);
1511 case BT_FEXT_HAVE_ALL
:
1512 dbgmsg( msgs
, "Got a BT_FEXT_HAVE_ALL" );
1514 tr_bitsetSetHaveAll( &msgs
->peer
->have
);
1515 updatePeerProgress( msgs
);
1517 fireError( msgs
, EMSGSIZE
);
1522 case BT_FEXT_HAVE_NONE
:
1523 dbgmsg( msgs
, "Got a BT_FEXT_HAVE_NONE" );
1525 tr_bitsetSetHaveNone( &msgs
->peer
->have
);
1526 updatePeerProgress( msgs
);
1528 fireError( msgs
, EMSGSIZE
);
1533 case BT_FEXT_REJECT
:
1535 struct peer_request r
;
1536 dbgmsg( msgs
, "Got a BT_FEXT_REJECT" );
1537 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.index
);
1538 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.offset
);
1539 tr_peerIoReadUint32( msgs
->peer
->io
, inbuf
, &r
.length
);
1541 fireGotRej( msgs
, &r
);
1543 fireError( msgs
, EMSGSIZE
);
1550 dbgmsg( msgs
, "Got a BT_LTEP" );
1551 parseLtep( msgs
, msglen
, inbuf
);
1555 dbgmsg( msgs
, "peer sent us an UNKNOWN: %d", (int)id
);
1556 tr_peerIoDrain( msgs
->peer
->io
, inbuf
, msglen
);
1560 assert( msglen
+ 1 == msgs
->incoming
.length
);
1561 assert( EVBUFFER_LENGTH( inbuf
) == startBufLen
- msglen
);
1563 msgs
->state
= AWAITING_BT_LENGTH
;
1568 addPeerToBlamefield( tr_peermsgs
* msgs
, uint32_t index
)
1570 if( !msgs
->peer
->blame
)
1571 msgs
->peer
->blame
= tr_bitfieldNew( msgs
->torrent
->info
.pieceCount
);
1572 tr_bitfieldAdd( msgs
->peer
->blame
, index
);
1575 /* returns 0 on success, or an errno on failure */
1577 clientGotBlock( tr_peermsgs
* msgs
,
1578 const uint8_t * data
,
1579 const struct peer_request
* req
)
1582 tr_torrent
* tor
= msgs
->torrent
;
1583 const tr_block_index_t block
= _tr_block( tor
, req
->index
, req
->offset
);
1588 if( req
->length
!= tr_torBlockCountBytes( msgs
->torrent
, block
) ) {
1589 dbgmsg( msgs
, "wrong block size -- expected %u, got %d",
1590 tr_torBlockCountBytes( msgs
->torrent
, block
), req
->length
);
1594 dbgmsg( msgs
, "got block %u:%u->%u", req
->index
, req
->offset
, req
->length
);
1596 if( !tr_peerMgrDidPeerRequest( msgs
->torrent
, msgs
->peer
, block
) ) {
1597 dbgmsg( msgs
, "we didn't ask for this message..." );
1600 if( tr_cpPieceIsComplete( &msgs
->torrent
->completion
, req
->index
) ) {
1601 dbgmsg( msgs
, "we did ask for this message, but the piece is already complete..." );
1609 if(( err
= tr_cacheWriteBlock( getSession(msgs
)->cache
, tor
, req
->index
, req
->offset
, req
->length
, data
)))
1612 addPeerToBlamefield( msgs
, req
->index
);
1613 fireGotBlock( msgs
, req
);
1617 static int peerPulse( void * vmsgs
);
1620 didWrite( tr_peerIo
* io UNUSED
, size_t bytesWritten
, int wasPieceData
, void * vmsgs
)
1622 tr_peermsgs
* msgs
= vmsgs
;
1623 firePeerGotData( msgs
, bytesWritten
, wasPieceData
);
1625 if ( tr_isPeerIo( io
) && io
->userData
)
1630 canRead( tr_peerIo
* io
, void * vmsgs
, size_t * piece
)
1633 tr_peermsgs
* msgs
= vmsgs
;
1634 struct evbuffer
* in
= tr_peerIoGetReadBuffer( io
);
1635 const size_t inlen
= EVBUFFER_LENGTH( in
);
1637 dbgmsg( msgs
, "canRead: inlen is %zu, msgs->state is %d", inlen
, msgs
->state
);
1643 else if( msgs
->state
== AWAITING_BT_PIECE
)
1645 ret
= inlen
? readBtPiece( msgs
, in
, inlen
, piece
) : READ_LATER
;
1647 else switch( msgs
->state
)
1649 case AWAITING_BT_LENGTH
:
1650 ret
= readBtLength ( msgs
, in
, inlen
); break;
1652 case AWAITING_BT_ID
:
1653 ret
= readBtId ( msgs
, in
, inlen
); break;
1655 case AWAITING_BT_MESSAGE
:
1656 ret
= readBtMessage( msgs
, in
, inlen
); break;
1663 dbgmsg( msgs
, "canRead: ret is %d", (int)ret
);
1665 /* log the raw data that was read */
1666 if( ( ret
!= READ_ERR
) && ( EVBUFFER_LENGTH( in
) != inlen
) )
1667 fireClientGotData( msgs
, inlen
- EVBUFFER_LENGTH( in
), FALSE
);
1673 tr_peerMsgsIsReadingBlock( const tr_peermsgs
* msgs
, tr_block_index_t block
)
1675 if( msgs
->state
!= AWAITING_BT_PIECE
)
1678 return block
== _tr_block( msgs
->torrent
,
1679 msgs
->incoming
.blockReq
.index
,
1680 msgs
->incoming
.blockReq
.offset
);
1688 updateDesiredRequestCount( tr_peermsgs
* msgs
)
1690 const tr_torrent
* const torrent
= msgs
->torrent
;
1692 if( tr_torrentIsSeed( msgs
->torrent
) )
1694 msgs
->desiredRequestCount
= 0;
1696 else if( msgs
->peer
->clientIsChoked
)
1698 msgs
->desiredRequestCount
= 0;
1700 else if( !msgs
->peer
->clientIsInterested
)
1702 msgs
->desiredRequestCount
= 0;
1706 int estimatedBlocksInPeriod
;
1709 const int floor
= 4;
1710 const int seconds
= REQUEST_BUF_SECS
;
1711 const uint64_t now
= tr_time_msec( );
1713 /* Get the rate limit we should use.
1714 * FIXME: this needs to consider all the other peers as well... */
1715 rate_Bps
= tr_peerGetPieceSpeed_Bps( msgs
->peer
, now
, TR_PEER_TO_CLIENT
);
1716 if( tr_torrentUsesSpeedLimit( torrent
, TR_PEER_TO_CLIENT
) )
1717 rate_Bps
= MIN( rate_Bps
, tr_torrentGetSpeedLimit_Bps( torrent
, TR_PEER_TO_CLIENT
) );
1719 /* honor the session limits, if enabled */
1720 if( tr_torrentUsesSessionLimits( torrent
) )
1721 if( tr_sessionGetActiveSpeedLimit_Bps( torrent
->session
, TR_PEER_TO_CLIENT
, &irate_Bps
) )
1722 rate_Bps
= MIN( rate_Bps
, irate_Bps
);
1724 /* use this desired rate to figure out how
1725 * many requests we should send to this peer */
1726 estimatedBlocksInPeriod
= ( rate_Bps
* seconds
) / torrent
->blockSize
;
1727 msgs
->desiredRequestCount
= MAX( floor
, estimatedBlocksInPeriod
);
1729 /* honor the peer's maximum request count, if specified */
1730 if( msgs
->reqq
> 0 )
1731 if( msgs
->desiredRequestCount
> msgs
->reqq
)
1732 msgs
->desiredRequestCount
= msgs
->reqq
;
1737 updateMetadataRequests( tr_peermsgs
* msgs
, time_t now
)
1741 if( msgs
->peerSupportsMetadataXfer
1742 && tr_torrentGetNextMetadataRequest( msgs
->torrent
, now
, &piece
) )
1747 tr_peerIo
* io
= msgs
->peer
->io
;
1748 struct evbuffer
* out
= msgs
->outMessages
;
1750 /* build the data message */
1751 tr_bencInitDict( &tmp
, 3 );
1752 tr_bencDictAddInt( &tmp
, "msg_type", METADATA_MSG_TYPE_REQUEST
);
1753 tr_bencDictAddInt( &tmp
, "piece", piece
);
1754 payload
= tr_bencToStr( &tmp
, TR_FMT_BENC
, &payloadLen
);
1755 tr_bencFree( &tmp
);
1757 dbgmsg( msgs
, "requesting metadata piece #%d", piece
);
1759 /* write it out as a LTEP message to our outMessages buffer */
1760 tr_peerIoWriteUint32( io
, out
, 2 * sizeof( uint8_t ) + payloadLen
);
1761 tr_peerIoWriteUint8 ( io
, out
, BT_LTEP
);
1762 tr_peerIoWriteUint8 ( io
, out
, msgs
->ut_metadata_id
);
1763 tr_peerIoWriteBytes ( io
, out
, payload
, payloadLen
);
1764 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
1765 dbgOutMessageLen( msgs
);
1772 updateBlockRequests( tr_peermsgs
* msgs
)
1774 if( tr_torrentIsPieceTransferAllowed( msgs
->torrent
, TR_PEER_TO_CLIENT
)
1775 && ( msgs
->desiredRequestCount
> 0 )
1776 && ( msgs
->peer
->pendingReqsToPeer
<= ( msgs
->desiredRequestCount
* 0.66 ) ) )
1780 const int numwant
= msgs
->desiredRequestCount
- msgs
->peer
->pendingReqsToPeer
;
1781 tr_block_index_t
* blocks
= tr_new( tr_block_index_t
, numwant
);
1783 tr_peerMgrGetNextRequests( msgs
->torrent
, msgs
->peer
, numwant
, blocks
, &n
);
1785 for( i
=0; i
<n
; ++i
)
1787 struct peer_request req
;
1788 blockToReq( msgs
->torrent
, blocks
[i
], &req
);
1789 protocolSendRequest( msgs
, &req
);
1797 fillOutputBuffer( tr_peermsgs
* msgs
, time_t now
)
1800 size_t bytesWritten
= 0;
1801 struct peer_request req
;
1802 const tr_bool haveMessages
= EVBUFFER_LENGTH( msgs
->outMessages
) != 0;
1803 const tr_bool fext
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
1806 *** Protocol messages
1809 if( haveMessages
&& !msgs
->outMessagesBatchedAt
) /* fresh batch */
1811 dbgmsg( msgs
, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs
->outMessages
) );
1812 msgs
->outMessagesBatchedAt
= now
;
1814 else if( haveMessages
&& ( ( now
- msgs
->outMessagesBatchedAt
) >= msgs
->outMessagesBatchPeriod
) )
1816 const size_t len
= EVBUFFER_LENGTH( msgs
->outMessages
);
1817 /* flush the protocol messages */
1818 dbgmsg( msgs
, "flushing outMessages... to %p (length is %zu)", msgs
->peer
->io
, len
);
1819 tr_peerIoWriteBuf( msgs
->peer
->io
, msgs
->outMessages
, FALSE
);
1820 msgs
->clientSentAnythingAt
= now
;
1821 msgs
->outMessagesBatchedAt
= 0;
1822 msgs
->outMessagesBatchPeriod
= LOW_PRIORITY_INTERVAL_SECS
;
1823 bytesWritten
+= len
;
1830 if( ( tr_peerIoGetWriteBufferSpace( msgs
->peer
->io
, now
) >= METADATA_PIECE_SIZE
)
1831 && popNextMetadataRequest( msgs
, &piece
) )
1837 data
= tr_torrentGetMetadataPiece( msgs
->torrent
, piece
, &dataLen
);
1838 if( ( dataLen
> 0 ) && ( data
!= NULL
) )
1843 tr_peerIo
* io
= msgs
->peer
->io
;
1844 struct evbuffer
* out
= msgs
->outMessages
;
1846 /* build the data message */
1847 tr_bencInitDict( &tmp
, 3 );
1848 tr_bencDictAddInt( &tmp
, "msg_type", METADATA_MSG_TYPE_DATA
);
1849 tr_bencDictAddInt( &tmp
, "piece", piece
);
1850 tr_bencDictAddInt( &tmp
, "total_size", msgs
->torrent
->infoDictLength
);
1851 payload
= tr_bencToStr( &tmp
, TR_FMT_BENC
, &payloadLen
);
1852 tr_bencFree( &tmp
);
1854 /* write it out as a LTEP message to our outMessages buffer */
1855 tr_peerIoWriteUint32( io
, out
, 2 * sizeof( uint8_t ) + payloadLen
+ dataLen
);
1856 tr_peerIoWriteUint8 ( io
, out
, BT_LTEP
);
1857 tr_peerIoWriteUint8 ( io
, out
, msgs
->ut_metadata_id
);
1858 tr_peerIoWriteBytes ( io
, out
, payload
, payloadLen
);
1859 tr_peerIoWriteBytes ( io
, out
, data
, dataLen
);
1860 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
1861 dbgOutMessageLen( msgs
);
1869 if( !ok
) /* send a rejection message */
1874 tr_peerIo
* io
= msgs
->peer
->io
;
1875 struct evbuffer
* out
= msgs
->outMessages
;
1877 /* build the rejection message */
1878 tr_bencInitDict( &tmp
, 2 );
1879 tr_bencDictAddInt( &tmp
, "msg_type", METADATA_MSG_TYPE_REJECT
);
1880 tr_bencDictAddInt( &tmp
, "piece", piece
);
1881 payload
= tr_bencToStr( &tmp
, TR_FMT_BENC
, &payloadLen
);
1882 tr_bencFree( &tmp
);
1884 /* write it out as a LTEP message to our outMessages buffer */
1885 tr_peerIoWriteUint32( io
, out
, 2 * sizeof( uint8_t ) + payloadLen
);
1886 tr_peerIoWriteUint8 ( io
, out
, BT_LTEP
);
1887 tr_peerIoWriteUint8 ( io
, out
, msgs
->ut_metadata_id
);
1888 tr_peerIoWriteBytes ( io
, out
, payload
, payloadLen
);
1889 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
1890 dbgOutMessageLen( msgs
);
1900 if( ( tr_peerIoGetWriteBufferSpace( msgs
->peer
->io
, now
) >= msgs
->torrent
->blockSize
)
1901 && popNextRequest( msgs
, &req
) )
1903 --msgs
->prefetchCount
;
1905 if( requestIsValid( msgs
, &req
)
1906 && tr_cpPieceIsComplete( &msgs
->torrent
->completion
, req
.index
) )
1908 /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1910 const uint32_t msglen
= 4 + 1 + 4 + 4 + req
.length
;
1911 struct evbuffer
* out
;
1912 tr_peerIo
* io
= msgs
->peer
->io
;
1914 out
= evbuffer_new( );
1915 evbuffer_expand( out
, msglen
);
1917 tr_peerIoWriteUint32( io
, out
, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req
.length
);
1918 tr_peerIoWriteUint8 ( io
, out
, BT_PIECE
);
1919 tr_peerIoWriteUint32( io
, out
, req
.index
);
1920 tr_peerIoWriteUint32( io
, out
, req
.offset
);
1922 err
= tr_cacheReadBlock( getSession(msgs
)->cache
, msgs
->torrent
, req
.index
, req
.offset
, req
.length
, EVBUFFER_DATA(out
)+EVBUFFER_LENGTH(out
) );
1926 protocolSendReject( msgs
, &req
);
1930 dbgmsg( msgs
, "sending block %u:%u->%u", req
.index
, req
.offset
, req
.length
);
1931 EVBUFFER_LENGTH(out
) += req
.length
;
1932 assert( EVBUFFER_LENGTH( out
) == msglen
);
1933 tr_peerIoWriteBuf( io
, out
, TRUE
);
1934 bytesWritten
+= EVBUFFER_LENGTH( out
);
1935 msgs
->clientSentAnythingAt
= now
;
1936 tr_historyAdd( msgs
->peer
->blocksSentToPeer
, tr_time( ), 1 );
1939 evbuffer_free( out
);
1947 else if( fext
) /* peer needs a reject message */
1949 protocolSendReject( msgs
, &req
);
1953 prefetchPieces( msgs
);
1960 if( ( msgs
!= NULL
)
1961 && ( msgs
->clientSentAnythingAt
!= 0 )
1962 && ( ( now
- msgs
->clientSentAnythingAt
) > KEEPALIVE_INTERVAL_SECS
) )
1964 dbgmsg( msgs
, "sending a keepalive message" );
1965 tr_peerIoWriteUint32( msgs
->peer
->io
, msgs
->outMessages
, 0 );
1966 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
1969 return bytesWritten
;
1973 peerPulse( void * vmsgs
)
1975 tr_peermsgs
* msgs
= vmsgs
;
1976 const time_t now
= tr_time( );
1978 if ( tr_isPeerIo( msgs
->peer
->io
) ) {
1979 updateDesiredRequestCount( msgs
);
1980 updateBlockRequests( msgs
);
1981 updateMetadataRequests( msgs
, now
);
1985 if( fillOutputBuffer( msgs
, now
) < 1 )
1988 return TRUE
; /* loop forever */
1992 tr_peerMsgsPulse( tr_peermsgs
* msgs
)
1999 gotError( tr_peerIo
* io UNUSED
,
2003 if( what
& EVBUFFER_TIMEOUT
)
2004 dbgmsg( vmsgs
, "libevent got a timeout, what=%hd", what
);
2005 if( what
& ( EVBUFFER_EOF
| EVBUFFER_ERROR
) )
2006 dbgmsg( vmsgs
, "libevent got an error! what=%hd, errno=%d (%s)",
2007 what
, errno
, tr_strerror( errno
) );
2008 fireError( vmsgs
, ENOTCONN
);
2012 sendBitfield( tr_peermsgs
* msgs
)
2014 struct evbuffer
* out
= msgs
->outMessages
;
2015 tr_bitfield
* field
;
2016 tr_piece_index_t lazyPieces
[LAZY_PIECE_COUNT
];
2018 size_t lazyCount
= 0;
2020 field
= tr_bitfieldDup( tr_cpPieceBitfield( &msgs
->torrent
->completion
) );
2022 if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs
) ) )
2024 /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2025 speed over a truly random sample -- let's limit the pool size to
2026 the first 1000 pieces so large torrents don't bog things down */
2028 const size_t maxPoolSize
= MIN( msgs
->torrent
->info
.pieceCount
, 1000 );
2029 tr_piece_index_t
* pool
= tr_new( tr_piece_index_t
, maxPoolSize
);
2031 /* build the pool */
2032 for( i
=poolSize
=0; i
<maxPoolSize
; ++i
)
2033 if( tr_bitfieldHas( field
, i
) )
2034 pool
[poolSize
++] = i
;
2036 /* pull random piece indices from the pool */
2037 while( ( poolSize
> 0 ) && ( lazyCount
< LAZY_PIECE_COUNT
) )
2039 const int pos
= tr_cryptoWeakRandInt( poolSize
);
2040 const tr_piece_index_t piece
= pool
[pos
];
2041 tr_bitfieldRem( field
, piece
);
2042 lazyPieces
[lazyCount
++] = piece
;
2043 pool
[pos
] = pool
[--poolSize
];
2050 tr_peerIoWriteUint32( msgs
->peer
->io
, out
,
2051 sizeof( uint8_t ) + field
->byteCount
);
2052 tr_peerIoWriteUint8 ( msgs
->peer
->io
, out
, BT_BITFIELD
);
2053 /* FIXME(libevent2): use evbuffer_add_reference() */
2054 tr_peerIoWriteBytes ( msgs
->peer
->io
, out
, field
->bits
, field
->byteCount
);
2055 dbgmsg( msgs
, "sending bitfield... outMessage size is now %zu",
2056 EVBUFFER_LENGTH( out
) );
2057 pokeBatchPeriod( msgs
, IMMEDIATE_PRIORITY_INTERVAL_SECS
);
2059 for( i
= 0; i
< lazyCount
; ++i
)
2060 protocolSendHave( msgs
, lazyPieces
[i
] );
2062 tr_bitfieldFree( field
);
2066 tellPeerWhatWeHave( tr_peermsgs
* msgs
)
2068 const tr_bool fext
= tr_peerIoSupportsFEXT( msgs
->peer
->io
);
2070 if( fext
&& ( tr_cpGetStatus( &msgs
->torrent
->completion
) == TR_SEED
) )
2072 protocolSendHaveAll( msgs
);
2074 else if( fext
&& ( tr_cpHaveValid( &msgs
->torrent
->completion
) == 0 ) )
2076 protocolSendHaveNone( msgs
);
2080 sendBitfield( msgs
);
2088 /* some peers give us error messages if we send
2089 more than this many peers in a single pex message
2090 http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2091 #define MAX_PEX_ADDED 50
2092 #define MAX_PEX_DROPPED 50
2106 pexAddedCb( void * vpex
,
2109 PexDiffs
* diffs
= userData
;
2110 tr_pex
* pex
= vpex
;
2112 if( diffs
->addedCount
< MAX_PEX_ADDED
)
2114 diffs
->added
[diffs
->addedCount
++] = *pex
;
2115 diffs
->elements
[diffs
->elementCount
++] = *pex
;
2120 pexDroppedCb( void * vpex
,
2123 PexDiffs
* diffs
= userData
;
2124 tr_pex
* pex
= vpex
;
2126 if( diffs
->droppedCount
< MAX_PEX_DROPPED
)
2128 diffs
->dropped
[diffs
->droppedCount
++] = *pex
;
2133 pexElementCb( void * vpex
,
2136 PexDiffs
* diffs
= userData
;
2137 tr_pex
* pex
= vpex
;
2139 diffs
->elements
[diffs
->elementCount
++] = *pex
;
2143 sendPex( tr_peermsgs
* msgs
)
2145 if( msgs
->peerSupportsPex
&& tr_torrentAllowsPex( msgs
->torrent
) )
2149 tr_pex
* newPex
= NULL
;
2150 tr_pex
* newPex6
= NULL
;
2151 const int newCount
= tr_peerMgrGetPeers( msgs
->torrent
, &newPex
, TR_AF_INET
, TR_PEERS_CONNECTED
, MAX_PEX_PEER_COUNT
);
2152 const int newCount6
= tr_peerMgrGetPeers( msgs
->torrent
, &newPex6
, TR_AF_INET6
, TR_PEERS_CONNECTED
, MAX_PEX_PEER_COUNT
);
2154 /* build the diffs */
2155 diffs
.added
= tr_new( tr_pex
, newCount
);
2156 diffs
.addedCount
= 0;
2157 diffs
.dropped
= tr_new( tr_pex
, msgs
->pexCount
);
2158 diffs
.droppedCount
= 0;
2159 diffs
.elements
= tr_new( tr_pex
, newCount
+ msgs
->pexCount
);
2160 diffs
.elementCount
= 0;
2161 tr_set_compare( msgs
->pex
, msgs
->pexCount
,
2163 tr_pexCompare
, sizeof( tr_pex
),
2164 pexDroppedCb
, pexAddedCb
, pexElementCb
, &diffs
);
2165 diffs6
.added
= tr_new( tr_pex
, newCount6
);
2166 diffs6
.addedCount
= 0;
2167 diffs6
.dropped
= tr_new( tr_pex
, msgs
->pexCount6
);
2168 diffs6
.droppedCount
= 0;
2169 diffs6
.elements
= tr_new( tr_pex
, newCount6
+ msgs
->pexCount6
);
2170 diffs6
.elementCount
= 0;
2171 tr_set_compare( msgs
->pex6
, msgs
->pexCount6
,
2173 tr_pexCompare
, sizeof( tr_pex
),
2174 pexDroppedCb
, pexAddedCb
, pexElementCb
, &diffs6
);
2177 "pex: old peer count %d+%d, new peer count %d+%d, "
2178 "added %d+%d, removed %d+%d",
2179 msgs
->pexCount
, msgs
->pexCount6
, newCount
, newCount6
,
2180 diffs
.addedCount
, diffs6
.addedCount
,
2181 diffs
.droppedCount
, diffs6
.droppedCount
);
2183 if( !diffs
.addedCount
&& !diffs
.droppedCount
&& !diffs6
.addedCount
&&
2184 !diffs6
.droppedCount
)
2186 tr_free( diffs
.elements
);
2187 tr_free( diffs6
.elements
);
2195 uint8_t * tmp
, *walk
;
2196 tr_peerIo
* io
= msgs
->peer
->io
;
2197 struct evbuffer
* out
= msgs
->outMessages
;
2200 tr_free( msgs
->pex
);
2201 msgs
->pex
= diffs
.elements
;
2202 msgs
->pexCount
= diffs
.elementCount
;
2203 tr_free( msgs
->pex6
);
2204 msgs
->pex6
= diffs6
.elements
;
2205 msgs
->pexCount6
= diffs6
.elementCount
;
2207 /* build the pex payload */
2208 tr_bencInitDict( &val
, 3 ); /* ipv6 support: left as 3:
2209 * speed vs. likelihood? */
2211 if( diffs
.addedCount
> 0)
2214 tmp
= walk
= tr_new( uint8_t, diffs
.addedCount
* 6 );
2215 for( i
= 0; i
< diffs
.addedCount
; ++i
) {
2216 memcpy( walk
, &diffs
.added
[i
].addr
.addr
, 4 ); walk
+= 4;
2217 memcpy( walk
, &diffs
.added
[i
].port
, 2 ); walk
+= 2;
2219 assert( ( walk
- tmp
) == diffs
.addedCount
* 6 );
2220 tr_bencDictAddRaw( &val
, "added", tmp
, walk
- tmp
);
2224 tmp
= walk
= tr_new( uint8_t, diffs
.addedCount
);
2225 for( i
= 0; i
< diffs
.addedCount
; ++i
)
2226 *walk
++ = diffs
.added
[i
].flags
;
2227 assert( ( walk
- tmp
) == diffs
.addedCount
);
2228 tr_bencDictAddRaw( &val
, "added.f", tmp
, walk
- tmp
);
2232 if( diffs
.droppedCount
> 0 )
2235 tmp
= walk
= tr_new( uint8_t, diffs
.droppedCount
* 6 );
2236 for( i
= 0; i
< diffs
.droppedCount
; ++i
) {
2237 memcpy( walk
, &diffs
.dropped
[i
].addr
.addr
, 4 ); walk
+= 4;
2238 memcpy( walk
, &diffs
.dropped
[i
].port
, 2 ); walk
+= 2;
2240 assert( ( walk
- tmp
) == diffs
.droppedCount
* 6 );
2241 tr_bencDictAddRaw( &val
, "dropped", tmp
, walk
- tmp
);
2245 if( diffs6
.addedCount
> 0 )
2248 tmp
= walk
= tr_new( uint8_t, diffs6
.addedCount
* 18 );
2249 for( i
= 0; i
< diffs6
.addedCount
; ++i
) {
2250 memcpy( walk
, &diffs6
.added
[i
].addr
.addr
.addr6
.s6_addr
, 16 );
2252 memcpy( walk
, &diffs6
.added
[i
].port
, 2 );
2255 assert( ( walk
- tmp
) == diffs6
.addedCount
* 18 );
2256 tr_bencDictAddRaw( &val
, "added6", tmp
, walk
- tmp
);
2260 tmp
= walk
= tr_new( uint8_t, diffs6
.addedCount
);
2261 for( i
= 0; i
< diffs6
.addedCount
; ++i
)
2262 *walk
++ = diffs6
.added
[i
].flags
;
2263 assert( ( walk
- tmp
) == diffs6
.addedCount
);
2264 tr_bencDictAddRaw( &val
, "added6.f", tmp
, walk
- tmp
);
2268 if( diffs6
.droppedCount
> 0 )
2271 tmp
= walk
= tr_new( uint8_t, diffs6
.droppedCount
* 18 );
2272 for( i
= 0; i
< diffs6
.droppedCount
; ++i
) {
2273 memcpy( walk
, &diffs6
.dropped
[i
].addr
.addr
.addr6
.s6_addr
, 16 );
2275 memcpy( walk
, &diffs6
.dropped
[i
].port
, 2 );
2278 assert( ( walk
- tmp
) == diffs6
.droppedCount
* 18);
2279 tr_bencDictAddRaw( &val
, "dropped6", tmp
, walk
- tmp
);
2283 /* write the pex message */
2284 benc
= tr_bencToStr( &val
, TR_FMT_BENC
, &bencLen
);
2285 tr_peerIoWriteUint32( io
, out
, 2 * sizeof( uint8_t ) + bencLen
);
2286 tr_peerIoWriteUint8 ( io
, out
, BT_LTEP
);
2287 tr_peerIoWriteUint8 ( io
, out
, msgs
->ut_pex_id
);
2288 tr_peerIoWriteBytes ( io
, out
, benc
, bencLen
);
2289 pokeBatchPeriod( msgs
, HIGH_PRIORITY_INTERVAL_SECS
);
2290 dbgmsg( msgs
, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out
) );
2291 dbgOutMessageLen( msgs
);
2294 tr_bencFree( &val
);
2298 tr_free( diffs
.added
);
2299 tr_free( diffs
.dropped
);
2301 tr_free( diffs6
.added
);
2302 tr_free( diffs6
.dropped
);
2305 /*msgs->clientSentPexAt = tr_time( );*/
2310 pexPulse( int foo UNUSED
, short bar UNUSED
, void * vmsgs
)
2312 struct tr_peermsgs
* msgs
= vmsgs
;
2316 tr_timerAdd( &msgs
->pexTimer
, PEX_INTERVAL_SECS
, 0 );
2324 tr_peerMsgsNew( struct tr_torrent
* torrent
,
2325 struct tr_peer
* peer
,
2326 tr_peer_callback
* callback
,
2327 void * callbackData
)
2334 m
= tr_new0( tr_peermsgs
, 1 );
2335 m
->callback
= callback
;
2336 m
->callbackData
= callbackData
;
2338 m
->torrent
= torrent
;
2339 m
->peer
->clientIsChoked
= 1;
2340 m
->peer
->peerIsChoked
= 1;
2341 m
->peer
->clientIsInterested
= 0;
2342 m
->peer
->peerIsInterested
= 0;
2343 m
->state
= AWAITING_BT_LENGTH
;
2344 m
->outMessages
= evbuffer_new( );
2345 m
->outMessagesBatchedAt
= 0;
2346 m
->outMessagesBatchPeriod
= LOW_PRIORITY_INTERVAL_SECS
;
2347 m
->incoming
.block
= evbuffer_new( );
2348 evtimer_set( &m
->pexTimer
, pexPulse
, m
);
2349 tr_timerAdd( &m
->pexTimer
, PEX_INTERVAL_SECS
, 0 );
2352 if( tr_peerIoSupportsLTEP( peer
->io
) )
2353 sendLtepHandshake( m
);
2355 tellPeerWhatWeHave( m
);
2357 if( tr_dhtEnabled( torrent
->session
) && tr_peerIoSupportsDHT( peer
->io
))
2359 /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2360 const struct tr_address
*addr
= tr_peerIoGetAddress( peer
->io
, NULL
);
2361 if( addr
->type
== TR_AF_INET
|| tr_globalIPv6() ) {
2362 protocolSendPort( m
, tr_dhtPort( torrent
->session
) );
2366 tr_peerIoSetIOFuncs( m
->peer
->io
, canRead
, didWrite
, gotError
, m
);
2367 updateDesiredRequestCount( m
);
2373 tr_peerMsgsFree( tr_peermsgs
* msgs
)
2377 evtimer_del( &msgs
->pexTimer
);
2379 evbuffer_free( msgs
->incoming
.block
);
2380 evbuffer_free( msgs
->outMessages
);
2381 tr_free( msgs
->pex6
);
2382 tr_free( msgs
->pex
);
2384 memset( msgs
, ~0, sizeof( tr_peermsgs
) );