Revert "transmission: update from 2.13 to 2.22"
[tomato.git] / release / src / router / transmission / libtransmission / peer-msgs.c
bloba76e7addb5e30b1c6185efbff5b6225ba32500b8
1 /*
2 * This file Copyright (C) 2007-2010 Mnemosyne LLC
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
10 * $Id: peer-msgs.c 11425 2010-11-16 15:17:34Z charles $
13 #include <assert.h>
14 #include <errno.h>
15 #include <limits.h> /* INT_MAX */
16 #include <stdarg.h>
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
21 #include <event.h>
23 #include "transmission.h"
24 #include "bencode.h"
25 #include "cache.h"
26 #include "completion.h"
27 #include "crypto.h"
28 #ifdef WIN32
29 #include "net.h" /* for ECONN */
30 #endif
31 #include "peer-io.h"
32 #include "peer-mgr.h"
33 #include "peer-msgs.h"
34 #include "session.h"
35 #include "stats.h"
36 #include "torrent.h"
37 #include "torrent-magnet.h"
38 #include "tr-dht.h"
39 #include "utils.h"
40 #include "version.h"
42 /**
43 ***
44 **/
46 enum
48 BT_CHOKE = 0,
49 BT_UNCHOKE = 1,
50 BT_INTERESTED = 2,
51 BT_NOT_INTERESTED = 3,
52 BT_HAVE = 4,
53 BT_BITFIELD = 5,
54 BT_REQUEST = 6,
55 BT_PIECE = 7,
56 BT_CANCEL = 8,
57 BT_PORT = 9,
59 BT_FEXT_SUGGEST = 13,
60 BT_FEXT_HAVE_ALL = 14,
61 BT_FEXT_HAVE_NONE = 15,
62 BT_FEXT_REJECT = 16,
63 BT_FEXT_ALLOWED_FAST = 17,
65 BT_LTEP = 20,
67 LTEP_HANDSHAKE = 0,
69 UT_PEX_ID = 1,
70 UT_METADATA_ID = 3,
72 MAX_PEX_PEER_COUNT = 50,
74 MIN_CHOKE_PERIOD_SEC = 10,
76 /* idle seconds before we send a keepalive */
77 KEEPALIVE_INTERVAL_SECS = 100,
79 PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */
81 REQQ = 512,
83 METADATA_REQQ = 64,
85 /* used in lowering the outMessages queue period */
86 IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
87 HIGH_PRIORITY_INTERVAL_SECS = 2,
88 LOW_PRIORITY_INTERVAL_SECS = 10,
90 /* number of pieces to remove from the bitfield when
91 * lazy bitfields are turned on */
92 LAZY_PIECE_COUNT = 26,
94 /* number of pieces we'll allow in our fast set */
95 MAX_FAST_SET_SIZE = 3,
97 /* defined in BEP #9 */
98 METADATA_MSG_TYPE_REQUEST = 0,
99 METADATA_MSG_TYPE_DATA = 1,
100 METADATA_MSG_TYPE_REJECT = 2
103 enum
105 AWAITING_BT_LENGTH,
106 AWAITING_BT_ID,
107 AWAITING_BT_MESSAGE,
108 AWAITING_BT_PIECE
115 struct peer_request
117 uint32_t index;
118 uint32_t offset;
119 uint32_t length;
122 static uint32_t
123 getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
125 const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
126 const uint64_t blockPos = tor->blockSize * b;
127 assert( blockPos >= piecePos );
128 return (uint32_t)( blockPos - piecePos );
131 static void
132 blockToReq( const tr_torrent * tor,
133 tr_block_index_t block,
134 struct peer_request * setme )
136 assert( setme != NULL );
138 setme->index = tr_torBlockPiece( tor, block );
139 setme->offset = getBlockOffsetInPiece( tor, block );
140 setme->length = tr_torBlockCountBytes( tor, block );
147 /* this is raw, unchanged data from the peer regarding
148 * the current message that it's sending us. */
149 struct tr_incoming
151 uint8_t id;
152 uint32_t length; /* includes the +1 for id length */
153 struct peer_request blockReq; /* metadata for incoming blocks */
154 struct evbuffer * block; /* piece data for incoming blocks */
158 * Low-level communication state information about a connected peer.
160 * This structure remembers the low-level protocol states that we're
161 * in with this peer, such as active requests, pex messages, and so on.
162 * Its fields are all private to peer-msgs.c.
164 * Data not directly involved with sending & receiving messages is
165 * stored in tr_peer, where it can be accessed by both peermsgs and
166 * the peer manager.
168 * @see struct peer_atom
169 * @see tr_peer
171 struct tr_peermsgs
173 tr_bool peerSupportsPex;
174 tr_bool peerSupportsMetadataXfer;
175 tr_bool clientSentLtepHandshake;
176 tr_bool peerSentLtepHandshake;
178 /*tr_bool haveFastSet;*/
180 int desiredRequestCount;
182 int prefetchCount;
184 /* how long the outMessages batch should be allowed to grow before
185 * it's flushed -- some messages (like requests >:) should be sent
186 * very quickly; others aren't as urgent. */
187 int8_t outMessagesBatchPeriod;
189 uint8_t state;
190 uint8_t ut_pex_id;
191 uint8_t ut_metadata_id;
192 uint16_t pexCount;
193 uint16_t pexCount6;
195 #if 0
196 size_t fastsetSize;
197 tr_piece_index_t fastset[MAX_FAST_SET_SIZE];
198 #endif
200 tr_peer * peer;
202 tr_torrent * torrent;
204 tr_peer_callback * callback;
205 void * callbackData;
207 struct evbuffer * outMessages; /* all the non-piece messages */
209 struct peer_request peerAskedFor[REQQ];
211 int peerAskedForMetadata[METADATA_REQQ];
212 int peerAskedForMetadataCount;
214 tr_pex * pex;
215 tr_pex * pex6;
217 /*time_t clientSentPexAt;*/
218 time_t clientSentAnythingAt;
220 /* when we started batching the outMessages */
221 time_t outMessagesBatchedAt;
223 struct tr_incoming incoming;
225 /* if the peer supports the Extension Protocol in BEP 10 and
226 supplied a reqq argument, it's stored here. otherwise the
227 value is zero and should be ignored. */
228 int64_t reqq;
230 struct event pexTimer;
237 #if 0
238 static tr_bitfield*
239 getHave( const struct tr_peermsgs * msgs )
241 if( msgs->peer->have == NULL )
242 msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
243 return msgs->peer->have;
245 #endif
247 static inline tr_session*
248 getSession( struct tr_peermsgs * msgs )
250 return msgs->torrent->session;
257 static void
258 myDebug( const char * file, int line,
259 const struct tr_peermsgs * msgs,
260 const char * fmt, ... )
262 FILE * fp = tr_getLog( );
264 if( fp )
266 va_list args;
267 char timestr[64];
268 struct evbuffer * buf = evbuffer_new( );
269 char * base = tr_basename( file );
271 evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
272 tr_getLogTimeStr( timestr, sizeof( timestr ) ),
273 tr_torrentName( msgs->torrent ),
274 tr_peerIoGetAddrStr( msgs->peer->io ),
275 msgs->peer->client );
276 va_start( args, fmt );
277 evbuffer_add_vprintf( buf, fmt, args );
278 va_end( args );
279 evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
280 /* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
281 fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
283 tr_free( base );
284 evbuffer_free( buf );
288 #define dbgmsg( msgs, ... ) \
289 do { \
290 if( tr_deepLoggingIsActive( ) ) \
291 myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
292 } while( 0 )
298 static void
299 pokeBatchPeriod( tr_peermsgs * msgs,
300 int interval )
302 if( msgs->outMessagesBatchPeriod > interval )
304 msgs->outMessagesBatchPeriod = interval;
305 dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
309 static void
310 dbgOutMessageLen( tr_peermsgs * msgs )
312 dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
315 static void
316 protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
318 tr_peerIo * io = msgs->peer->io;
319 struct evbuffer * out = msgs->outMessages;
321 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
323 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
324 tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
325 tr_peerIoWriteUint32( io, out, req->index );
326 tr_peerIoWriteUint32( io, out, req->offset );
327 tr_peerIoWriteUint32( io, out, req->length );
329 dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
330 dbgOutMessageLen( msgs );
333 static void
334 protocolSendRequest( tr_peermsgs * msgs,
335 const struct peer_request * req )
337 tr_peerIo * io = msgs->peer->io;
338 struct evbuffer * out = msgs->outMessages;
340 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
341 tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
342 tr_peerIoWriteUint32( io, out, req->index );
343 tr_peerIoWriteUint32( io, out, req->offset );
344 tr_peerIoWriteUint32( io, out, req->length );
346 dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
347 dbgOutMessageLen( msgs );
348 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
351 static void
352 protocolSendCancel( tr_peermsgs * msgs,
353 const struct peer_request * req )
355 tr_peerIo * io = msgs->peer->io;
356 struct evbuffer * out = msgs->outMessages;
358 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
359 tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
360 tr_peerIoWriteUint32( io, out, req->index );
361 tr_peerIoWriteUint32( io, out, req->offset );
362 tr_peerIoWriteUint32( io, out, req->length );
364 dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
365 dbgOutMessageLen( msgs );
366 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
369 static void
370 protocolSendPort(tr_peermsgs *msgs, uint16_t port)
372 tr_peerIo * io = msgs->peer->io;
373 struct evbuffer * out = msgs->outMessages;
375 dbgmsg( msgs, "sending Port %u", port);
376 tr_peerIoWriteUint32( io, out, 3 );
377 tr_peerIoWriteUint8 ( io, out, BT_PORT );
378 tr_peerIoWriteUint16( io, out, port);
381 static void
382 protocolSendHave( tr_peermsgs * msgs,
383 uint32_t index )
385 tr_peerIo * io = msgs->peer->io;
386 struct evbuffer * out = msgs->outMessages;
388 tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
389 tr_peerIoWriteUint8 ( io, out, BT_HAVE );
390 tr_peerIoWriteUint32( io, out, index );
392 dbgmsg( msgs, "sending Have %u", index );
393 dbgOutMessageLen( msgs );
394 pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
397 #if 0
398 static void
399 protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
401 tr_peerIo * io = msgs->peer->io;
402 struct evbuffer * out = msgs->outMessages;
404 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
406 tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
407 tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
408 tr_peerIoWriteUint32( io, out, pieceIndex );
410 dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
411 dbgOutMessageLen( msgs );
413 #endif
415 static void
416 protocolSendChoke( tr_peermsgs * msgs,
417 int choke )
419 tr_peerIo * io = msgs->peer->io;
420 struct evbuffer * out = msgs->outMessages;
422 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
423 tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
425 dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
426 dbgOutMessageLen( msgs );
427 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
430 static void
431 protocolSendHaveAll( tr_peermsgs * msgs )
433 tr_peerIo * io = msgs->peer->io;
434 struct evbuffer * out = msgs->outMessages;
436 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
438 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
439 tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
441 dbgmsg( msgs, "sending HAVE_ALL..." );
442 dbgOutMessageLen( msgs );
443 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
446 static void
447 protocolSendHaveNone( tr_peermsgs * msgs )
449 tr_peerIo * io = msgs->peer->io;
450 struct evbuffer * out = msgs->outMessages;
452 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
454 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
455 tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
457 dbgmsg( msgs, "sending HAVE_NONE..." );
458 dbgOutMessageLen( msgs );
459 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
463 *** EVENTS
466 static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
468 static void
469 publish( tr_peermsgs * msgs, tr_peer_event * e )
471 assert( msgs->peer );
472 assert( msgs->peer->msgs == msgs );
474 if( msgs->callback != NULL )
475 msgs->callback( msgs->peer, e, msgs->callbackData );
478 static void
479 fireError( tr_peermsgs * msgs, int err )
481 tr_peer_event e = blankEvent;
482 e.eventType = TR_PEER_ERROR;
483 e.err = err;
484 publish( msgs, &e );
487 static void
488 firePeerProgress( tr_peermsgs * msgs )
490 tr_peer_event e = blankEvent;
491 e.eventType = TR_PEER_PEER_PROGRESS;
492 e.progress = msgs->peer->progress;
493 publish( msgs, &e );
496 static void
497 fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
499 tr_peer_event e = blankEvent;
500 e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
501 e.pieceIndex = req->index;
502 e.offset = req->offset;
503 e.length = req->length;
504 publish( msgs, &e );
507 static void
508 fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
510 tr_peer_event e = blankEvent;
511 e.eventType = TR_PEER_CLIENT_GOT_REJ;
512 e.pieceIndex = req->index;
513 e.offset = req->offset;
514 e.length = req->length;
515 publish( msgs, &e );
518 static void
519 fireGotChoke( tr_peermsgs * msgs )
521 tr_peer_event e = blankEvent;
522 e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
523 publish( msgs, &e );
526 static void
527 fireClientGotData( tr_peermsgs * msgs,
528 uint32_t length,
529 int wasPieceData )
531 tr_peer_event e = blankEvent;
533 e.length = length;
534 e.eventType = TR_PEER_CLIENT_GOT_DATA;
535 e.wasPieceData = wasPieceData;
536 publish( msgs, &e );
539 static void
540 fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
542 tr_peer_event e = blankEvent;
543 e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
544 e.pieceIndex = pieceIndex;
545 publish( msgs, &e );
548 static void
549 fireClientGotPort( tr_peermsgs * msgs, tr_port port )
551 tr_peer_event e = blankEvent;
552 e.eventType = TR_PEER_CLIENT_GOT_PORT;
553 e.port = port;
554 publish( msgs, &e );
557 static void
558 fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
560 tr_peer_event e = blankEvent;
561 e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
562 e.pieceIndex = pieceIndex;
563 publish( msgs, &e );
566 static void
567 firePeerGotData( tr_peermsgs * msgs,
568 uint32_t length,
569 int wasPieceData )
571 tr_peer_event e = blankEvent;
573 e.length = length;
574 e.eventType = TR_PEER_PEER_GOT_DATA;
575 e.wasPieceData = wasPieceData;
577 publish( msgs, &e );
581 *** ALLOWED FAST SET
582 *** For explanation, see http://www.bittorrent.org/beps/bep_0006.html
585 size_t
586 tr_generateAllowedSet( tr_piece_index_t * setmePieces,
587 size_t desiredSetSize,
588 size_t pieceCount,
589 const uint8_t * infohash,
590 const tr_address * addr )
592 size_t setSize = 0;
594 assert( setmePieces );
595 assert( desiredSetSize <= pieceCount );
596 assert( desiredSetSize );
597 assert( pieceCount );
598 assert( infohash );
599 assert( addr );
601 if( addr->type == TR_AF_INET )
603 uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
604 uint8_t x[SHA_DIGEST_LENGTH];
606 uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 ); /* (1) */
607 memcpy( w, &ui32, sizeof( uint32_t ) );
608 walk += sizeof( uint32_t );
609 memcpy( walk, infohash, SHA_DIGEST_LENGTH ); /* (2) */
610 walk += SHA_DIGEST_LENGTH;
611 tr_sha1( x, w, walk-w, NULL ); /* (3) */
612 assert( sizeof( w ) == walk-w );
614 while( setSize<desiredSetSize )
616 int i;
617 for( i=0; i<5 && setSize<desiredSetSize; ++i ) /* (4) */
619 size_t k;
620 uint32_t j = i * 4; /* (5) */
621 uint32_t y = ntohl( *( uint32_t* )( x + j ) ); /* (6) */
622 uint32_t index = y % pieceCount; /* (7) */
624 for( k=0; k<setSize; ++k ) /* (8) */
625 if( setmePieces[k] == index )
626 break;
628 if( k == setSize )
629 setmePieces[setSize++] = index; /* (9) */
632 tr_sha1( x, x, sizeof( x ), NULL ); /* (3) */
636 return setSize;
639 static void
640 updateFastSet( tr_peermsgs * msgs UNUSED )
642 #if 0
643 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
644 const int peerIsNeedy = msgs->peer->progress < 0.10;
646 if( fext && peerIsNeedy && !msgs->haveFastSet )
648 size_t i;
649 const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
650 const tr_info * inf = &msgs->torrent->info;
651 const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
653 /* build the fast set */
654 msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
655 msgs->haveFastSet = 1;
657 /* send it to the peer */
658 for( i=0; i<msgs->fastsetSize; ++i )
659 protocolSendAllowedFast( msgs, msgs->fastset[i] );
661 #endif
665 *** INTEREST
668 static void
669 sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
671 struct evbuffer * out = msgs->outMessages;
673 assert( msgs );
674 assert( tr_isBool( clientIsInterested ) );
676 msgs->peer->clientIsInterested = clientIsInterested;
677 dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
678 tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
679 tr_peerIoWriteUint8 ( msgs->peer->io, out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
681 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
682 dbgOutMessageLen( msgs );
685 static void
686 updateInterest( tr_peermsgs * msgs UNUSED )
688 /* FIXME -- might need to poke the mgr on startup */
691 void
692 tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
694 assert( tr_isBool( isInterested ) );
696 if( isInterested != msgs->peer->clientIsInterested )
697 sendInterest( msgs, isInterested );
700 static tr_bool
701 popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
703 if( msgs->peerAskedForMetadataCount == 0 )
704 return FALSE;
706 *piece = msgs->peerAskedForMetadata[0];
708 tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
709 msgs->peerAskedForMetadataCount-- );
711 return TRUE;
714 static tr_bool
715 popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
717 if( msgs->peer->pendingReqsToClient == 0 )
718 return FALSE;
720 *setme = msgs->peerAskedFor[0];
722 tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
723 msgs->peer->pendingReqsToClient-- );
725 return TRUE;
728 static void
729 cancelAllRequestsToClient( tr_peermsgs * msgs )
731 struct peer_request req;
732 const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
734 while( popNextRequest( msgs, &req ))
735 if( mustSendCancel )
736 protocolSendReject( msgs, &req );
739 void
740 tr_peerMsgsSetChoke( tr_peermsgs * msgs,
741 int choke )
743 const time_t now = tr_time( );
744 const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
746 assert( msgs );
747 assert( msgs->peer );
748 assert( choke == 0 || choke == 1 );
750 if( msgs->peer->chokeChangedAt > fibrillationTime )
752 dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
754 else if( msgs->peer->peerIsChoked != choke )
756 msgs->peer->peerIsChoked = choke;
757 if( choke )
758 cancelAllRequestsToClient( msgs );
759 protocolSendChoke( msgs, choke );
760 msgs->peer->chokeChangedAt = now;
768 void
769 tr_peerMsgsHave( tr_peermsgs * msgs,
770 uint32_t index )
772 protocolSendHave( msgs, index );
774 /* since we have more pieces now, we might not be interested in this peer */
775 updateInterest( msgs );
782 static tr_bool
783 reqIsValid( const tr_peermsgs * peer,
784 uint32_t index,
785 uint32_t offset,
786 uint32_t length )
788 return tr_torrentReqIsValid( peer->torrent, index, offset, length );
791 static tr_bool
792 requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
794 return reqIsValid( msgs, req->index, req->offset, req->length );
797 void
798 tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
800 struct peer_request req;
801 /*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
802 blockToReq( msgs->torrent, block, &req );
803 protocolSendCancel( msgs, &req );
810 static void
811 sendLtepHandshake( tr_peermsgs * msgs )
813 tr_benc val, *m;
814 char * buf;
815 int len;
816 tr_bool allow_pex;
817 tr_bool allow_metadata_xfer;
818 struct evbuffer * out = msgs->outMessages;
819 const unsigned char * ipv6 = tr_globalIPv6();
821 if( msgs->clientSentLtepHandshake )
822 return;
824 dbgmsg( msgs, "sending an ltep handshake" );
825 msgs->clientSentLtepHandshake = 1;
827 /* decide if we want to advertise metadata xfer support (BEP 9) */
828 if( tr_torrentIsPrivate( msgs->torrent ) )
829 allow_metadata_xfer = 0;
830 else
831 allow_metadata_xfer = 1;
833 /* decide if we want to advertise pex support */
834 if( !tr_torrentAllowsPex( msgs->torrent ) )
835 allow_pex = 0;
836 else if( msgs->peerSentLtepHandshake )
837 allow_pex = msgs->peerSupportsPex ? 1 : 0;
838 else
839 allow_pex = 1;
841 tr_bencInitDict( &val, 8 );
842 tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
843 if( ipv6 != NULL )
844 tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
845 if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
846 && ( msgs->torrent->infoDictLength > 0 ) )
847 tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
848 tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) );
849 tr_bencDictAddInt( &val, "reqq", REQQ );
850 tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
851 tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
852 m = tr_bencDictAddDict( &val, "m", 2 );
853 if( allow_metadata_xfer )
854 tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
855 if( allow_pex )
856 tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
858 buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
860 tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
861 tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
862 tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
863 tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
864 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
865 dbgOutMessageLen( msgs );
867 /* cleanup */
868 tr_bencFree( &val );
869 tr_free( buf );
872 static void
873 parseLtepHandshake( tr_peermsgs * msgs,
874 int len,
875 struct evbuffer * inbuf )
877 int64_t i;
878 tr_benc val, * sub;
879 uint8_t * tmp = tr_new( uint8_t, len );
880 const uint8_t *addr;
881 size_t addr_len;
882 tr_pex pex;
883 int8_t seedProbability = -1;
885 memset( &pex, 0, sizeof( tr_pex ) );
887 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
888 msgs->peerSentLtepHandshake = 1;
890 if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
892 dbgmsg( msgs, "GET extended-handshake, couldn't get dictionary" );
893 tr_free( tmp );
894 return;
897 dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len, tmp );
899 /* does the peer prefer encrypted connections? */
900 if( tr_bencDictFindInt( &val, "e", &i ) ) {
901 msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
902 : ENCRYPTION_PREFERENCE_NO;
903 if( i )
904 pex.flags |= ADDED_F_ENCRYPTION_FLAG;
907 /* check supported messages for utorrent pex */
908 msgs->peerSupportsPex = 0;
909 msgs->peerSupportsMetadataXfer = 0;
911 if( tr_bencDictFindDict( &val, "m", &sub ) ) {
912 if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
913 msgs->peerSupportsPex = i != 0;
914 msgs->ut_pex_id = (uint8_t) i;
915 dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
917 if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
918 msgs->peerSupportsMetadataXfer = i != 0;
919 msgs->ut_metadata_id = (uint8_t) i;
920 dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
924 /* look for metainfo size (BEP 9) */
925 if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
926 tr_torrentSetMetadataSizeHint( msgs->torrent, i );
928 /* look for upload_only (BEP 21) */
929 if( tr_bencDictFindInt( &val, "upload_only", &i ) )
930 seedProbability = i==0 ? 0 : 100;
932 /* get peer's listening port */
933 if( tr_bencDictFindInt( &val, "p", &i ) ) {
934 pex.port = htons( (uint16_t)i );
935 fireClientGotPort( msgs, pex.port );
936 dbgmsg( msgs, "peer's port is now %d", (int)i );
939 if( tr_peerIoIsIncoming( msgs->peer->io )
940 && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
941 && ( addr_len == 4 ) )
943 pex.addr.type = TR_AF_INET;
944 memcpy( &pex.addr.addr.addr4, addr, 4 );
945 tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
948 if( tr_peerIoIsIncoming( msgs->peer->io )
949 && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
950 && ( addr_len == 16 ) )
952 pex.addr.type = TR_AF_INET6;
953 memcpy( &pex.addr.addr.addr6, addr, 16 );
954 tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
957 /* get peer's maximum request queue size */
958 if( tr_bencDictFindInt( &val, "reqq", &i ) )
959 msgs->reqq = i;
961 tr_bencFree( &val );
962 tr_free( tmp );
965 static void
966 parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
968 tr_benc dict;
969 char * msg_end;
970 char * benc_end;
971 int64_t msg_type = -1;
972 int64_t piece = -1;
973 int64_t total_size = 0;
974 uint8_t * tmp = tr_new( uint8_t, msglen );
976 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
977 msg_end = (char*)tmp + msglen;
979 if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
981 tr_bencDictFindInt( &dict, "msg_type", &msg_type );
982 tr_bencDictFindInt( &dict, "piece", &piece );
983 tr_bencDictFindInt( &dict, "total_size", &total_size );
984 tr_bencFree( &dict );
987 dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
988 (int)msg_type, (int)piece, (int)total_size );
990 if( msg_type == METADATA_MSG_TYPE_REJECT )
992 /* NOOP */
995 if( ( msg_type == METADATA_MSG_TYPE_DATA )
996 && ( !tr_torrentHasMetadata( msgs->torrent ) )
997 && ( msg_end - benc_end <= METADATA_PIECE_SIZE )
998 && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
1000 const int pieceLen = msg_end - benc_end;
1001 tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
1004 if( msg_type == METADATA_MSG_TYPE_REQUEST )
1006 if( ( piece >= 0 )
1007 && tr_torrentHasMetadata( msgs->torrent )
1008 && !tr_torrentIsPrivate( msgs->torrent )
1009 && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
1011 msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1013 else
1015 tr_benc tmp;
1016 int payloadLen;
1017 char * payload;
1018 tr_peerIo * io = msgs->peer->io;
1019 struct evbuffer * out = msgs->outMessages;
1021 /* build the rejection message */
1022 tr_bencInitDict( &tmp, 2 );
1023 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1024 tr_bencDictAddInt( &tmp, "piece", piece );
1025 payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1026 tr_bencFree( &tmp );
1028 /* write it out as a LTEP message to our outMessages buffer */
1029 tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1030 tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1031 tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1032 tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1033 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1034 dbgOutMessageLen( msgs );
1036 tr_free( payload );
1040 tr_free( tmp );
1043 static void
1044 parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1046 int loaded = 0;
1047 uint8_t * tmp = tr_new( uint8_t, msglen );
1048 tr_benc val;
1049 tr_torrent * tor = msgs->torrent;
1050 const uint8_t * added;
1051 size_t added_len;
1053 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
1055 if( tr_torrentAllowsPex( tor )
1056 && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
1058 if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
1060 tr_pex * pex;
1061 size_t i, n;
1062 size_t added_f_len = 0;
1063 const uint8_t * added_f = NULL;
1065 tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
1066 pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
1068 n = MIN( n, MAX_PEX_PEER_COUNT );
1069 for( i=0; i<n; ++i )
1071 int seedProbability = -1;
1072 if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1073 tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1076 tr_free( pex );
1079 if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
1081 tr_pex * pex;
1082 size_t i, n;
1083 size_t added_f_len = 0;
1084 const uint8_t * added_f = NULL;
1086 tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
1087 pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
1089 n = MIN( n, MAX_PEX_PEER_COUNT );
1090 for( i=0; i<n; ++i )
1092 int seedProbability = -1;
1093 if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
1094 tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
1097 tr_free( pex );
1101 if( loaded )
1102 tr_bencFree( &val );
1103 tr_free( tmp );
1106 static void sendPex( tr_peermsgs * msgs );
1108 static void
1109 parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
1111 uint8_t ltep_msgid;
1113 tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
1114 msglen--;
1116 if( ltep_msgid == LTEP_HANDSHAKE )
1118 dbgmsg( msgs, "got ltep handshake" );
1119 parseLtepHandshake( msgs, msglen, inbuf );
1120 if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
1122 sendLtepHandshake( msgs );
1123 sendPex( msgs );
1126 else if( ltep_msgid == UT_PEX_ID )
1128 dbgmsg( msgs, "got ut pex" );
1129 msgs->peerSupportsPex = 1;
1130 parseUtPex( msgs, msglen, inbuf );
1132 else if( ltep_msgid == UT_METADATA_ID )
1134 dbgmsg( msgs, "got ut metadata" );
1135 msgs->peerSupportsMetadataXfer = 1;
1136 parseUtMetadata( msgs, msglen, inbuf );
1138 else
1140 dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
1141 evbuffer_drain( inbuf, msglen );
1145 static int
1146 readBtLength( tr_peermsgs * msgs,
1147 struct evbuffer * inbuf,
1148 size_t inlen )
1150 uint32_t len;
1152 if( inlen < sizeof( len ) )
1153 return READ_LATER;
1155 tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
1157 if( len == 0 ) /* peer sent us a keepalive message */
1158 dbgmsg( msgs, "got KeepAlive" );
1159 else
1161 msgs->incoming.length = len;
1162 msgs->state = AWAITING_BT_ID;
1165 return READ_NOW;
1168 static int readBtMessage( tr_peermsgs * msgs,
1169 struct evbuffer * inbuf,
1170 size_t inlen );
1172 static int
1173 readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1175 uint8_t id;
1177 if( inlen < sizeof( uint8_t ) )
1178 return READ_LATER;
1180 tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
1181 msgs->incoming.id = id;
1182 dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
1184 if( id == BT_PIECE )
1186 msgs->state = AWAITING_BT_PIECE;
1187 return READ_NOW;
1189 else if( msgs->incoming.length != 1 )
1191 msgs->state = AWAITING_BT_MESSAGE;
1192 return READ_NOW;
1194 else return readBtMessage( msgs, inbuf, inlen - 1 );
1197 static void
1198 updatePeerProgress( tr_peermsgs * msgs )
1200 msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
1201 dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
1202 updateFastSet( msgs );
1203 updateInterest( msgs );
1204 firePeerProgress( msgs );
1207 static void
1208 prefetchPieces( tr_peermsgs *msgs )
1210 int i;
1212 /* Maintain 12 prefetched blocks per unchoked peer */
1213 for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
1215 const struct peer_request * req = msgs->peerAskedFor + i;
1216 if( requestIsValid( msgs, req ) )
1218 tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length );
1219 ++msgs->prefetchCount;
1224 static void
1225 peerMadeRequest( tr_peermsgs * msgs,
1226 const struct peer_request * req )
1228 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1229 const int reqIsValid = requestIsValid( msgs, req );
1230 const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
1231 const int peerIsChoked = msgs->peer->peerIsChoked;
1233 int allow = FALSE;
1235 if( !reqIsValid )
1236 dbgmsg( msgs, "rejecting an invalid request." );
1237 else if( !clientHasPiece )
1238 dbgmsg( msgs, "rejecting request for a piece we don't have." );
1239 else if( peerIsChoked )
1240 dbgmsg( msgs, "rejecting request from choked peer" );
1241 else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
1242 dbgmsg( msgs, "rejecting request ... reqq is full" );
1243 else
1244 allow = TRUE;
1246 if( allow ) {
1247 msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
1248 prefetchPieces( msgs );
1249 } else if( fext ) {
1250 protocolSendReject( msgs, req );
1254 static tr_bool
1255 messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
1257 switch( id )
1259 case BT_CHOKE:
1260 case BT_UNCHOKE:
1261 case BT_INTERESTED:
1262 case BT_NOT_INTERESTED:
1263 case BT_FEXT_HAVE_ALL:
1264 case BT_FEXT_HAVE_NONE:
1265 return len == 1;
1267 case BT_HAVE:
1268 case BT_FEXT_SUGGEST:
1269 case BT_FEXT_ALLOWED_FAST:
1270 return len == 5;
1272 case BT_BITFIELD:
1273 return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
1275 case BT_REQUEST:
1276 case BT_CANCEL:
1277 case BT_FEXT_REJECT:
1278 return len == 13;
1280 case BT_PIECE:
1281 return len > 9 && len <= 16393;
1283 case BT_PORT:
1284 return len == 3;
1286 case BT_LTEP:
1287 return len >= 2;
1289 default:
1290 return FALSE;
1294 static int clientGotBlock( tr_peermsgs * msgs,
1295 const uint8_t * block,
1296 const struct peer_request * req );
1298 static int
1299 readBtPiece( tr_peermsgs * msgs,
1300 struct evbuffer * inbuf,
1301 size_t inlen,
1302 size_t * setme_piece_bytes_read )
1304 struct peer_request * req = &msgs->incoming.blockReq;
1306 assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
1307 dbgmsg( msgs, "In readBtPiece" );
1309 if( !req->length )
1311 if( inlen < 8 )
1312 return READ_LATER;
1314 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
1315 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
1316 req->length = msgs->incoming.length - 9;
1317 dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
1318 return READ_NOW;
1320 else
1322 int err;
1324 /* read in another chunk of data */
1325 const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
1326 size_t n = MIN( nLeft, inlen );
1327 size_t i = n;
1328 void * buf = tr_sessionGetBuffer( getSession( msgs ) );
1329 const size_t buflen = SESSION_BUFFER_SIZE;
1331 while( i > 0 )
1333 const size_t thisPass = MIN( i, buflen );
1334 tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
1335 evbuffer_add( msgs->incoming.block, buf, thisPass );
1336 i -= thisPass;
1339 tr_sessionReleaseBuffer( getSession( msgs ) );
1340 buf = NULL;
1342 fireClientGotData( msgs, n, TRUE );
1343 *setme_piece_bytes_read += n;
1344 dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
1345 n, req->index, req->offset, req->length,
1346 (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
1347 if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
1348 return READ_LATER;
1350 /* we've got the whole block ... process it */
1351 err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
1353 /* cleanup */
1354 evbuffer_free( msgs->incoming.block );
1355 msgs->incoming.block = evbuffer_new( );
1356 req->length = 0;
1357 msgs->state = AWAITING_BT_LENGTH;
1358 return err ? READ_ERR : READ_NOW;
1362 static void updateDesiredRequestCount( tr_peermsgs * msgs );
1364 static int
1365 readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
1367 uint32_t ui32;
1368 uint32_t msglen = msgs->incoming.length;
1369 const uint8_t id = msgs->incoming.id;
1370 #ifndef NDEBUG
1371 const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
1372 #endif
1373 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1375 --msglen; /* id length */
1377 dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
1379 if( inlen < msglen )
1380 return READ_LATER;
1382 if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
1384 dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
1385 fireError( msgs, EMSGSIZE );
1386 return READ_ERR;
1389 switch( id )
1391 case BT_CHOKE:
1392 dbgmsg( msgs, "got Choke" );
1393 msgs->peer->clientIsChoked = 1;
1394 if( !fext )
1395 fireGotChoke( msgs );
1396 break;
1398 case BT_UNCHOKE:
1399 dbgmsg( msgs, "got Unchoke" );
1400 msgs->peer->clientIsChoked = 0;
1401 updateDesiredRequestCount( msgs );
1402 break;
1404 case BT_INTERESTED:
1405 dbgmsg( msgs, "got Interested" );
1406 msgs->peer->peerIsInterested = 1;
1407 break;
1409 case BT_NOT_INTERESTED:
1410 dbgmsg( msgs, "got Not Interested" );
1411 msgs->peer->peerIsInterested = 0;
1412 break;
1414 case BT_HAVE:
1415 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1416 dbgmsg( msgs, "got Have: %u", ui32 );
1417 if( tr_torrentHasMetadata( msgs->torrent )
1418 && ( ui32 >= msgs->torrent->info.pieceCount ) )
1420 fireError( msgs, ERANGE );
1421 return READ_ERR;
1423 if( tr_bitsetAdd( &msgs->peer->have, ui32 ) )
1425 fireError( msgs, ERANGE );
1426 return READ_ERR;
1428 updatePeerProgress( msgs );
1429 break;
1431 case BT_BITFIELD: {
1432 const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
1433 ? msgs->torrent->info.pieceCount
1434 : msglen * 8;
1435 dbgmsg( msgs, "got a bitfield" );
1436 tr_bitsetReserve( &msgs->peer->have, bitCount );
1437 tr_peerIoReadBytes( msgs->peer->io, inbuf,
1438 msgs->peer->have.bitfield.bits, msglen );
1439 updatePeerProgress( msgs );
1440 break;
1443 case BT_REQUEST:
1445 struct peer_request r;
1446 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1447 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1448 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1449 dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
1450 peerMadeRequest( msgs, &r );
1451 break;
1454 case BT_CANCEL:
1456 int i;
1457 struct peer_request r;
1458 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1459 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1460 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1461 tr_historyAdd( msgs->peer->cancelsSentToClient, tr_time( ), 1 );
1462 dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
1464 for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
1465 const struct peer_request * req = msgs->peerAskedFor + i;
1466 if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
1467 break;
1470 if( i < msgs->peer->pendingReqsToClient )
1471 tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
1472 msgs->peer->pendingReqsToClient-- );
1473 break;
1476 case BT_PIECE:
1477 assert( 0 ); /* handled elsewhere! */
1478 break;
1480 case BT_PORT:
1481 dbgmsg( msgs, "Got a BT_PORT" );
1482 tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
1483 if( msgs->peer->dht_port > 0 )
1484 tr_dhtAddNode( getSession(msgs),
1485 tr_peerAddress( msgs->peer ),
1486 msgs->peer->dht_port, 0 );
1487 break;
1489 case BT_FEXT_SUGGEST:
1490 dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
1491 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1492 if( fext )
1493 fireClientGotSuggest( msgs, ui32 );
1494 else {
1495 fireError( msgs, EMSGSIZE );
1496 return READ_ERR;
1498 break;
1500 case BT_FEXT_ALLOWED_FAST:
1501 dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
1502 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
1503 if( fext )
1504 fireClientGotAllowedFast( msgs, ui32 );
1505 else {
1506 fireError( msgs, EMSGSIZE );
1507 return READ_ERR;
1509 break;
1511 case BT_FEXT_HAVE_ALL:
1512 dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
1513 if( fext ) {
1514 tr_bitsetSetHaveAll( &msgs->peer->have );
1515 updatePeerProgress( msgs );
1516 } else {
1517 fireError( msgs, EMSGSIZE );
1518 return READ_ERR;
1520 break;
1522 case BT_FEXT_HAVE_NONE:
1523 dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
1524 if( fext ) {
1525 tr_bitsetSetHaveNone( &msgs->peer->have );
1526 updatePeerProgress( msgs );
1527 } else {
1528 fireError( msgs, EMSGSIZE );
1529 return READ_ERR;
1531 break;
1533 case BT_FEXT_REJECT:
1535 struct peer_request r;
1536 dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
1537 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
1538 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
1539 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
1540 if( fext )
1541 fireGotRej( msgs, &r );
1542 else {
1543 fireError( msgs, EMSGSIZE );
1544 return READ_ERR;
1546 break;
1549 case BT_LTEP:
1550 dbgmsg( msgs, "Got a BT_LTEP" );
1551 parseLtep( msgs, msglen, inbuf );
1552 break;
1554 default:
1555 dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
1556 tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
1557 break;
1560 assert( msglen + 1 == msgs->incoming.length );
1561 assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
1563 msgs->state = AWAITING_BT_LENGTH;
1564 return READ_NOW;
1567 static void
1568 addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
1570 if( !msgs->peer->blame )
1571 msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
1572 tr_bitfieldAdd( msgs->peer->blame, index );
1575 /* returns 0 on success, or an errno on failure */
1576 static int
1577 clientGotBlock( tr_peermsgs * msgs,
1578 const uint8_t * data,
1579 const struct peer_request * req )
1581 int err;
1582 tr_torrent * tor = msgs->torrent;
1583 const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
1585 assert( msgs );
1586 assert( req );
1588 if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
1589 dbgmsg( msgs, "wrong block size -- expected %u, got %d",
1590 tr_torBlockCountBytes( msgs->torrent, block ), req->length );
1591 return EMSGSIZE;
1594 dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
1596 if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
1597 dbgmsg( msgs, "we didn't ask for this message..." );
1598 return 0;
1600 if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
1601 dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
1602 return 0;
1606 *** Save the block
1609 if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data )))
1610 return err;
1612 addPeerToBlamefield( msgs, req->index );
1613 fireGotBlock( msgs, req );
1614 return 0;
1617 static int peerPulse( void * vmsgs );
1619 static void
1620 didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
1622 tr_peermsgs * msgs = vmsgs;
1623 firePeerGotData( msgs, bytesWritten, wasPieceData );
1625 if ( tr_isPeerIo( io ) && io->userData )
1626 peerPulse( msgs );
1629 static ReadState
1630 canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
1632 ReadState ret;
1633 tr_peermsgs * msgs = vmsgs;
1634 struct evbuffer * in = tr_peerIoGetReadBuffer( io );
1635 const size_t inlen = EVBUFFER_LENGTH( in );
1637 dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
1639 if( !inlen )
1641 ret = READ_LATER;
1643 else if( msgs->state == AWAITING_BT_PIECE )
1645 ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
1647 else switch( msgs->state )
1649 case AWAITING_BT_LENGTH:
1650 ret = readBtLength ( msgs, in, inlen ); break;
1652 case AWAITING_BT_ID:
1653 ret = readBtId ( msgs, in, inlen ); break;
1655 case AWAITING_BT_MESSAGE:
1656 ret = readBtMessage( msgs, in, inlen ); break;
1658 default:
1659 ret = READ_ERR;
1660 assert( 0 );
1663 dbgmsg( msgs, "canRead: ret is %d", (int)ret );
1665 /* log the raw data that was read */
1666 if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
1667 fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
1669 return ret;
1673 tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
1675 if( msgs->state != AWAITING_BT_PIECE )
1676 return FALSE;
1678 return block == _tr_block( msgs->torrent,
1679 msgs->incoming.blockReq.index,
1680 msgs->incoming.blockReq.offset );
1687 static void
1688 updateDesiredRequestCount( tr_peermsgs * msgs )
1690 const tr_torrent * const torrent = msgs->torrent;
1692 if( tr_torrentIsSeed( msgs->torrent ) )
1694 msgs->desiredRequestCount = 0;
1696 else if( msgs->peer->clientIsChoked )
1698 msgs->desiredRequestCount = 0;
1700 else if( !msgs->peer->clientIsInterested )
1702 msgs->desiredRequestCount = 0;
1704 else
1706 int estimatedBlocksInPeriod;
1707 int rate_Bps;
1708 int irate_Bps;
1709 const int floor = 4;
1710 const int seconds = REQUEST_BUF_SECS;
1711 const uint64_t now = tr_time_msec( );
1713 /* Get the rate limit we should use.
1714 * FIXME: this needs to consider all the other peers as well... */
1715 rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT );
1716 if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
1717 rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) );
1719 /* honor the session limits, if enabled */
1720 if( tr_torrentUsesSessionLimits( torrent ) )
1721 if( tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) )
1722 rate_Bps = MIN( rate_Bps, irate_Bps );
1724 /* use this desired rate to figure out how
1725 * many requests we should send to this peer */
1726 estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize;
1727 msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
1729 /* honor the peer's maximum request count, if specified */
1730 if( msgs->reqq > 0 )
1731 if( msgs->desiredRequestCount > msgs->reqq )
1732 msgs->desiredRequestCount = msgs->reqq;
1736 static void
1737 updateMetadataRequests( tr_peermsgs * msgs, time_t now )
1739 int piece;
1741 if( msgs->peerSupportsMetadataXfer
1742 && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
1744 tr_benc tmp;
1745 int payloadLen;
1746 char * payload;
1747 tr_peerIo * io = msgs->peer->io;
1748 struct evbuffer * out = msgs->outMessages;
1750 /* build the data message */
1751 tr_bencInitDict( &tmp, 3 );
1752 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
1753 tr_bencDictAddInt( &tmp, "piece", piece );
1754 payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1755 tr_bencFree( &tmp );
1757 dbgmsg( msgs, "requesting metadata piece #%d", piece );
1759 /* write it out as a LTEP message to our outMessages buffer */
1760 tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1761 tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1762 tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1763 tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1764 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1765 dbgOutMessageLen( msgs );
1767 tr_free( payload );
1771 static void
1772 updateBlockRequests( tr_peermsgs * msgs )
1774 if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
1775 && ( msgs->desiredRequestCount > 0 )
1776 && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
1778 int i;
1779 int n;
1780 const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
1781 tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
1783 tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
1785 for( i=0; i<n; ++i )
1787 struct peer_request req;
1788 blockToReq( msgs->torrent, blocks[i], &req );
1789 protocolSendRequest( msgs, &req );
1792 tr_free( blocks );
1796 static size_t
1797 fillOutputBuffer( tr_peermsgs * msgs, time_t now )
1799 int piece;
1800 size_t bytesWritten = 0;
1801 struct peer_request req;
1802 const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
1803 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
1806 *** Protocol messages
1809 if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
1811 dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
1812 msgs->outMessagesBatchedAt = now;
1814 else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
1816 const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
1817 /* flush the protocol messages */
1818 dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
1819 tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
1820 msgs->clientSentAnythingAt = now;
1821 msgs->outMessagesBatchedAt = 0;
1822 msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
1823 bytesWritten += len;
1827 *** Metadata Pieces
1830 if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
1831 && popNextMetadataRequest( msgs, &piece ) )
1833 char * data;
1834 int dataLen;
1835 tr_bool ok = FALSE;
1837 data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
1838 if( ( dataLen > 0 ) && ( data != NULL ) )
1840 tr_benc tmp;
1841 int payloadLen;
1842 char * payload;
1843 tr_peerIo * io = msgs->peer->io;
1844 struct evbuffer * out = msgs->outMessages;
1846 /* build the data message */
1847 tr_bencInitDict( &tmp, 3 );
1848 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
1849 tr_bencDictAddInt( &tmp, "piece", piece );
1850 tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
1851 payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1852 tr_bencFree( &tmp );
1854 /* write it out as a LTEP message to our outMessages buffer */
1855 tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
1856 tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1857 tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1858 tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1859 tr_peerIoWriteBytes ( io, out, data, dataLen );
1860 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1861 dbgOutMessageLen( msgs );
1863 tr_free( payload );
1864 tr_free( data );
1866 ok = TRUE;
1869 if( !ok ) /* send a rejection message */
1871 tr_benc tmp;
1872 int payloadLen;
1873 char * payload;
1874 tr_peerIo * io = msgs->peer->io;
1875 struct evbuffer * out = msgs->outMessages;
1877 /* build the rejection message */
1878 tr_bencInitDict( &tmp, 2 );
1879 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
1880 tr_bencDictAddInt( &tmp, "piece", piece );
1881 payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
1882 tr_bencFree( &tmp );
1884 /* write it out as a LTEP message to our outMessages buffer */
1885 tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
1886 tr_peerIoWriteUint8 ( io, out, BT_LTEP );
1887 tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
1888 tr_peerIoWriteBytes ( io, out, payload, payloadLen );
1889 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
1890 dbgOutMessageLen( msgs );
1892 tr_free( payload );
1897 *** Data Blocks
1900 if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
1901 && popNextRequest( msgs, &req ) )
1903 --msgs->prefetchCount;
1905 if( requestIsValid( msgs, &req )
1906 && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
1908 /* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
1909 int err;
1910 const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
1911 struct evbuffer * out;
1912 tr_peerIo * io = msgs->peer->io;
1914 out = evbuffer_new( );
1915 evbuffer_expand( out, msglen );
1917 tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
1918 tr_peerIoWriteUint8 ( io, out, BT_PIECE );
1919 tr_peerIoWriteUint32( io, out, req.index );
1920 tr_peerIoWriteUint32( io, out, req.offset );
1922 err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
1923 if( err )
1925 if( fext )
1926 protocolSendReject( msgs, &req );
1928 else
1930 dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
1931 EVBUFFER_LENGTH(out) += req.length;
1932 assert( EVBUFFER_LENGTH( out ) == msglen );
1933 tr_peerIoWriteBuf( io, out, TRUE );
1934 bytesWritten += EVBUFFER_LENGTH( out );
1935 msgs->clientSentAnythingAt = now;
1936 tr_historyAdd( msgs->peer->blocksSentToPeer, tr_time( ), 1 );
1939 evbuffer_free( out );
1941 if( err )
1943 bytesWritten = 0;
1944 msgs = NULL;
1947 else if( fext ) /* peer needs a reject message */
1949 protocolSendReject( msgs, &req );
1952 if( msgs != NULL )
1953 prefetchPieces( msgs );
1957 *** Keepalive
1960 if( ( msgs != NULL )
1961 && ( msgs->clientSentAnythingAt != 0 )
1962 && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
1964 dbgmsg( msgs, "sending a keepalive message" );
1965 tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
1966 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
1969 return bytesWritten;
1972 static int
1973 peerPulse( void * vmsgs )
1975 tr_peermsgs * msgs = vmsgs;
1976 const time_t now = tr_time( );
1978 if ( tr_isPeerIo( msgs->peer->io ) ) {
1979 updateDesiredRequestCount( msgs );
1980 updateBlockRequests( msgs );
1981 updateMetadataRequests( msgs, now );
1984 for( ;; )
1985 if( fillOutputBuffer( msgs, now ) < 1 )
1986 break;
1988 return TRUE; /* loop forever */
1991 void
1992 tr_peerMsgsPulse( tr_peermsgs * msgs )
1994 if( msgs != NULL )
1995 peerPulse( msgs );
1998 static void
1999 gotError( tr_peerIo * io UNUSED,
2000 short what,
2001 void * vmsgs )
2003 if( what & EVBUFFER_TIMEOUT )
2004 dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
2005 if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
2006 dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
2007 what, errno, tr_strerror( errno ) );
2008 fireError( vmsgs, ENOTCONN );
2011 static void
2012 sendBitfield( tr_peermsgs * msgs )
2014 struct evbuffer * out = msgs->outMessages;
2015 tr_bitfield * field;
2016 tr_piece_index_t lazyPieces[LAZY_PIECE_COUNT];
2017 size_t i;
2018 size_t lazyCount = 0;
2020 field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
2022 if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
2024 /** Lazy bitfields aren't a high priority or secure, so I'm opting for
2025 speed over a truly random sample -- let's limit the pool size to
2026 the first 1000 pieces so large torrents don't bog things down */
2027 size_t poolSize;
2028 const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
2029 tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
2031 /* build the pool */
2032 for( i=poolSize=0; i<maxPoolSize; ++i )
2033 if( tr_bitfieldHas( field, i ) )
2034 pool[poolSize++] = i;
2036 /* pull random piece indices from the pool */
2037 while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
2039 const int pos = tr_cryptoWeakRandInt( poolSize );
2040 const tr_piece_index_t piece = pool[pos];
2041 tr_bitfieldRem( field, piece );
2042 lazyPieces[lazyCount++] = piece;
2043 pool[pos] = pool[--poolSize];
2046 /* cleanup */
2047 tr_free( pool );
2050 tr_peerIoWriteUint32( msgs->peer->io, out,
2051 sizeof( uint8_t ) + field->byteCount );
2052 tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
2053 /* FIXME(libevent2): use evbuffer_add_reference() */
2054 tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
2055 dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
2056 EVBUFFER_LENGTH( out ) );
2057 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
2059 for( i = 0; i < lazyCount; ++i )
2060 protocolSendHave( msgs, lazyPieces[i] );
2062 tr_bitfieldFree( field );
2065 static void
2066 tellPeerWhatWeHave( tr_peermsgs * msgs )
2068 const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
2070 if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
2072 protocolSendHaveAll( msgs );
2074 else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
2076 protocolSendHaveNone( msgs );
2078 else
2080 sendBitfield( msgs );
2088 /* some peers give us error messages if we send
2089 more than this many peers in a single pex message
2090 http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2091 #define MAX_PEX_ADDED 50
2092 #define MAX_PEX_DROPPED 50
2094 typedef struct
2096 tr_pex * added;
2097 tr_pex * dropped;
2098 tr_pex * elements;
2099 int addedCount;
2100 int droppedCount;
2101 int elementCount;
2103 PexDiffs;
2105 static void
2106 pexAddedCb( void * vpex,
2107 void * userData )
2109 PexDiffs * diffs = userData;
2110 tr_pex * pex = vpex;
2112 if( diffs->addedCount < MAX_PEX_ADDED )
2114 diffs->added[diffs->addedCount++] = *pex;
2115 diffs->elements[diffs->elementCount++] = *pex;
2119 static inline void
2120 pexDroppedCb( void * vpex,
2121 void * userData )
2123 PexDiffs * diffs = userData;
2124 tr_pex * pex = vpex;
2126 if( diffs->droppedCount < MAX_PEX_DROPPED )
2128 diffs->dropped[diffs->droppedCount++] = *pex;
2132 static inline void
2133 pexElementCb( void * vpex,
2134 void * userData )
2136 PexDiffs * diffs = userData;
2137 tr_pex * pex = vpex;
2139 diffs->elements[diffs->elementCount++] = *pex;
2142 static void
2143 sendPex( tr_peermsgs * msgs )
2145 if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
2147 PexDiffs diffs;
2148 PexDiffs diffs6;
2149 tr_pex * newPex = NULL;
2150 tr_pex * newPex6 = NULL;
2151 const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2152 const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
2154 /* build the diffs */
2155 diffs.added = tr_new( tr_pex, newCount );
2156 diffs.addedCount = 0;
2157 diffs.dropped = tr_new( tr_pex, msgs->pexCount );
2158 diffs.droppedCount = 0;
2159 diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
2160 diffs.elementCount = 0;
2161 tr_set_compare( msgs->pex, msgs->pexCount,
2162 newPex, newCount,
2163 tr_pexCompare, sizeof( tr_pex ),
2164 pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
2165 diffs6.added = tr_new( tr_pex, newCount6 );
2166 diffs6.addedCount = 0;
2167 diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
2168 diffs6.droppedCount = 0;
2169 diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
2170 diffs6.elementCount = 0;
2171 tr_set_compare( msgs->pex6, msgs->pexCount6,
2172 newPex6, newCount6,
2173 tr_pexCompare, sizeof( tr_pex ),
2174 pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
2175 dbgmsg(
2176 msgs,
2177 "pex: old peer count %d+%d, new peer count %d+%d, "
2178 "added %d+%d, removed %d+%d",
2179 msgs->pexCount, msgs->pexCount6, newCount, newCount6,
2180 diffs.addedCount, diffs6.addedCount,
2181 diffs.droppedCount, diffs6.droppedCount );
2183 if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
2184 !diffs6.droppedCount )
2186 tr_free( diffs.elements );
2187 tr_free( diffs6.elements );
2189 else
2191 int i;
2192 tr_benc val;
2193 char * benc;
2194 int bencLen;
2195 uint8_t * tmp, *walk;
2196 tr_peerIo * io = msgs->peer->io;
2197 struct evbuffer * out = msgs->outMessages;
2199 /* update peer */
2200 tr_free( msgs->pex );
2201 msgs->pex = diffs.elements;
2202 msgs->pexCount = diffs.elementCount;
2203 tr_free( msgs->pex6 );
2204 msgs->pex6 = diffs6.elements;
2205 msgs->pexCount6 = diffs6.elementCount;
2207 /* build the pex payload */
2208 tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
2209 * speed vs. likelihood? */
2211 if( diffs.addedCount > 0)
2213 /* "added" */
2214 tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
2215 for( i = 0; i < diffs.addedCount; ++i ) {
2216 memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
2217 memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
2219 assert( ( walk - tmp ) == diffs.addedCount * 6 );
2220 tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
2221 tr_free( tmp );
2223 /* "added.f" */
2224 tmp = walk = tr_new( uint8_t, diffs.addedCount );
2225 for( i = 0; i < diffs.addedCount; ++i )
2226 *walk++ = diffs.added[i].flags;
2227 assert( ( walk - tmp ) == diffs.addedCount );
2228 tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
2229 tr_free( tmp );
2232 if( diffs.droppedCount > 0 )
2234 /* "dropped" */
2235 tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
2236 for( i = 0; i < diffs.droppedCount; ++i ) {
2237 memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
2238 memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
2240 assert( ( walk - tmp ) == diffs.droppedCount * 6 );
2241 tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
2242 tr_free( tmp );
2245 if( diffs6.addedCount > 0 )
2247 /* "added6" */
2248 tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
2249 for( i = 0; i < diffs6.addedCount; ++i ) {
2250 memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
2251 walk += 16;
2252 memcpy( walk, &diffs6.added[i].port, 2 );
2253 walk += 2;
2255 assert( ( walk - tmp ) == diffs6.addedCount * 18 );
2256 tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
2257 tr_free( tmp );
2259 /* "added6.f" */
2260 tmp = walk = tr_new( uint8_t, diffs6.addedCount );
2261 for( i = 0; i < diffs6.addedCount; ++i )
2262 *walk++ = diffs6.added[i].flags;
2263 assert( ( walk - tmp ) == diffs6.addedCount );
2264 tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
2265 tr_free( tmp );
2268 if( diffs6.droppedCount > 0 )
2270 /* "dropped6" */
2271 tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
2272 for( i = 0; i < diffs6.droppedCount; ++i ) {
2273 memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
2274 walk += 16;
2275 memcpy( walk, &diffs6.dropped[i].port, 2 );
2276 walk += 2;
2278 assert( ( walk - tmp ) == diffs6.droppedCount * 18);
2279 tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
2280 tr_free( tmp );
2283 /* write the pex message */
2284 benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
2285 tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
2286 tr_peerIoWriteUint8 ( io, out, BT_LTEP );
2287 tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
2288 tr_peerIoWriteBytes ( io, out, benc, bencLen );
2289 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
2290 dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
2291 dbgOutMessageLen( msgs );
2293 tr_free( benc );
2294 tr_bencFree( &val );
2297 /* cleanup */
2298 tr_free( diffs.added );
2299 tr_free( diffs.dropped );
2300 tr_free( newPex );
2301 tr_free( diffs6.added );
2302 tr_free( diffs6.dropped );
2303 tr_free( newPex6 );
2305 /*msgs->clientSentPexAt = tr_time( );*/
2309 static void
2310 pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
2312 struct tr_peermsgs * msgs = vmsgs;
2314 sendPex( msgs );
2316 tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
2323 tr_peermsgs*
2324 tr_peerMsgsNew( struct tr_torrent * torrent,
2325 struct tr_peer * peer,
2326 tr_peer_callback * callback,
2327 void * callbackData )
2329 tr_peermsgs * m;
2331 assert( peer );
2332 assert( peer->io );
2334 m = tr_new0( tr_peermsgs, 1 );
2335 m->callback = callback;
2336 m->callbackData = callbackData;
2337 m->peer = peer;
2338 m->torrent = torrent;
2339 m->peer->clientIsChoked = 1;
2340 m->peer->peerIsChoked = 1;
2341 m->peer->clientIsInterested = 0;
2342 m->peer->peerIsInterested = 0;
2343 m->state = AWAITING_BT_LENGTH;
2344 m->outMessages = evbuffer_new( );
2345 m->outMessagesBatchedAt = 0;
2346 m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2347 m->incoming.block = evbuffer_new( );
2348 evtimer_set( &m->pexTimer, pexPulse, m );
2349 tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
2350 peer->msgs = m;
2352 if( tr_peerIoSupportsLTEP( peer->io ) )
2353 sendLtepHandshake( m );
2355 tellPeerWhatWeHave( m );
2357 if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
2359 /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2360 const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
2361 if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
2362 protocolSendPort( m, tr_dhtPort( torrent->session ) );
2366 tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
2367 updateDesiredRequestCount( m );
2369 return m;
2372 void
2373 tr_peerMsgsFree( tr_peermsgs* msgs )
2375 if( msgs )
2377 evtimer_del( &msgs->pexTimer );
2379 evbuffer_free( msgs->incoming.block );
2380 evbuffer_free( msgs->outMessages );
2381 tr_free( msgs->pex6 );
2382 tr_free( msgs->pex );
2384 memset( msgs, ~0, sizeof( tr_peermsgs ) );
2385 tr_free( msgs );