1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
10 #include <arpa/inet.h>
11 #include <sys/socket.h>
30 #include "trackerlogic.h"
31 #include "ot_vector.h"
35 #ifndef WANT_SYNC_LIVE
36 #define WANT_SYNC_LIVE
38 #include "ot_livesync.h"
41 uint16_t g_serverport
= 9009;
42 uint32_t g_tracker_id
;
43 char groupip_1
[4] = { 224,0,23,5 };
46 /* If you have more than 10 peers, don't use this proxy
47 Use 20 slots for 10 peers to have room for 10 incoming connection slots
51 #define LIVESYNC_INCOMING_BUFFSIZE (256*256)
52 #define STREAMSYNC_OUTGOING_BUFFSIZE (256*256)
54 #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
55 #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
56 #define LIVESYNC_MAXDELAY 15 /* seconds */
58 /* The amount of time a complete sync cycle should take */
59 #define OT_SYNC_INTERVAL_MINUTES 2
61 /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
62 #define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) )
64 enum { OT_SYNC_PEER4
, OT_SYNC_PEER6
};
65 enum { FLAG_SERVERSOCKET
= 1 };
67 /* For incoming packets */
68 static int64 g_socket_in
= -1;
69 static uint8_t g_inbuffer
[LIVESYNC_INCOMING_BUFFSIZE
];
71 /* For outgoing packets */
72 static int64 g_socket_out
= -1;
73 static uint8_t g_peerbuffer_start
[LIVESYNC_OUTGOING_BUFFSIZE_PEERS
];
74 static uint8_t *g_peerbuffer_pos
;
75 static uint8_t *g_peerbuffer_highwater
= g_peerbuffer_start
+ LIVESYNC_OUTGOING_BUFFSIZE_PEERS
- LIVESYNC_OUTGOING_WATERMARK_PEERS
;
76 static ot_time g_next_packet_time
;
78 static void * livesync_worker( void * args
);
79 static void * streamsync_worker( void * args
);
80 static void livesync_proxytell( uint8_t prefix
, uint8_t *info_hash
, uint8_t *peer
);
82 void exerr( char * message
) {
83 fprintf( stderr
, "%s\n", message
);
87 void stats_issue_event( ot_status_event event
, PROTO_FLAG proto
, uintptr_t event_data
) {
93 void livesync_bind_mcast( ot_ip6 ip
, uint16_t port
) {
94 char tmpip
[4] = {0,0,0,0};
97 if( !ip6_isv4mapped(ip
))
98 exerr("v6 mcast support not yet available.");
101 if( g_socket_in
!= -1 )
102 exerr("Error: Livesync listen ip specified twice.");
104 if( ( g_socket_in
= socket_udp4( )) < 0)
105 exerr("Error: Cant create live sync incoming socket." );
106 ndelay_off(g_socket_in
);
108 if( socket_bind4_reuse( g_socket_in
, tmpip
, port
) == -1 )
109 exerr("Error: Cant bind live sync incoming socket." );
111 if( socket_mcjoin4( g_socket_in
, groupip_1
, v4ip
) )
112 exerr("Error: Cant make live sync incoming socket join mcast group.");
114 if( ( g_socket_out
= socket_udp4()) < 0)
115 exerr("Error: Cant create live sync outgoing socket." );
116 if( socket_bind4_reuse( g_socket_out
, v4ip
, port
) == -1 )
117 exerr("Error: Cant bind live sync outgoing socket." );
119 socket_mcttl4(g_socket_out
, 1);
120 socket_mcloop4(g_socket_out
, 1);
123 size_t add_peer_to_torrent_proxy( ot_hash hash
, ot_peer
*peer
, size_t peer_size
) {
126 ot_peerlist
*peer_list
;
128 ot_vector
*torrents_list
= mutex_bucket_lock_by_hash( hash
);
129 size_t compare_size
= OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size
);
131 torrent
= vector_find_or_insert( torrents_list
, (void*)hash
, sizeof( ot_torrent
), compare_size
, &exactmatch
);
136 /* Create a new torrent entry, then */
137 memcpy( torrent
->hash
, hash
, sizeof(ot_hash
) );
139 if( !( torrent
->peer_list6
= malloc( sizeof (ot_peerlist
) ) ) ||
140 !( torrent
->peer_list4
= malloc( sizeof (ot_peerlist
) ) ) ) {
141 vector_remove_torrent( torrents_list
, torrent
);
142 mutex_bucket_unlock_by_hash( hash
, 0 );
146 byte_zero( torrent
->peer_list6
, sizeof( ot_peerlist
) );
147 byte_zero( torrent
->peer_list4
, sizeof( ot_peerlist
) );
150 peer_list
= peer_size
== OT_PEER_SIZE6
? torrent
->peer_list6
: torrent
->peer_list4
;
152 /* Check for peer in torrent */
153 peer_dest
= vector_find_or_insert_peer( &(peer_list
->peers
), peer
, peer_size
, &exactmatch
);
155 mutex_bucket_unlock_by_hash( hash
, 0 );
158 /* Tell peer that it's fresh */
159 OT_PEERTIME( peer
, peer_size
) = 0;
161 /* If we hadn't had a match create peer there */
163 peer_list
->peer_count
++;
164 if( OT_PEERFLAG_D(peer
, peer_size
) & PEER_FLAG_SEEDING
)
165 peer_list
->seed_count
++;
167 memcpy( peer_dest
, peer
, peer_size
);
168 mutex_bucket_unlock_by_hash( hash
, 0 );
172 size_t remove_peer_from_torrent_proxy( ot_hash hash
, ot_peer
*peer
, size_t peer_size
) {
174 ot_vector
*torrents_list
= mutex_bucket_lock_by_hash( hash
);
175 ot_torrent
*torrent
= binary_search( hash
, torrents_list
->data
, torrents_list
->size
, sizeof( ot_torrent
), OT_HASH_COMPARE_SIZE
, &exactmatch
);
178 ot_peerlist
*peer_list
= peer_list
= peer_size
== OT_PEER_SIZE6
? torrent
->peer_list6
: torrent
->peer_list4
;
179 switch( vector_remove_peer( &peer_list
->peers
, peer
, peer_size
) ) {
180 case 2: peer_list
->seed_count
--; /* Intentional fallthrough */
181 case 1: peer_list
->peer_count
--; /* Intentional fallthrough */
186 mutex_bucket_unlock_by_hash( hash
, 0 );
190 void free_peerlist( ot_peerlist
*peer_list
) {
191 if( peer_list
->peers
.data
) {
192 if( OT_PEERLIST_HASBUCKETS( peer_list
) ) {
193 ot_vector
*bucket_list
= (ot_vector
*)(peer_list
->peers
.data
);
195 while( peer_list
->peers
.size
-- )
196 free( bucket_list
++->data
);
198 free( peer_list
->peers
.data
);
203 static void livesync_handle_peersync( ssize_t datalen
, size_t peer_size
) {
204 int off
= sizeof( g_tracker_id
) + sizeof( uint32_t );
206 fprintf( stderr
, "." );
208 while( (ssize_t
)(off
+ sizeof( ot_hash
) + peer_size
) <= datalen
) {
209 ot_peer
*peer
= (ot_peer
*)(g_inbuffer
+ off
+ sizeof(ot_hash
));
210 ot_hash
*hash
= (ot_hash
*)(g_inbuffer
+ off
);
212 if( OT_PEERFLAG_D(peer
, peer_size
) & PEER_FLAG_STOPPED
)
213 remove_peer_from_torrent_proxy( *hash
, peer
, peer_size
);
215 add_peer_to_torrent_proxy( *hash
, peer
, peer_size
);
217 off
+= sizeof( ot_hash
) + peer_size
;
221 int usage( char *self
) {
222 fprintf( stderr
, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self
);
227 FLAG_OUTGOING
= 0x80,
229 FLAG_DISCONNECTED
= 0x00,
230 FLAG_CONNECTING
= 0x01,
231 FLAG_WAITTRACKERID
= 0x02,
232 FLAG_CONNECTED
= 0x03,
237 #define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING)
238 #define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED)
239 #define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED)
240 #define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING)
241 #define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID)
242 #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED)
245 int state
; /* Whether we want to connect, how far our handshake is, etc. */
246 ot_ip6 ip
; /* The peer to connect to */
247 uint16_t port
; /* The peers port */
248 uint8_t indata
[8192*16]; /* Any data not processed yet */
249 size_t indata_length
; /* Length of unprocessed data */
250 uint32_t tracker_id
; /* How the other end greeted */
251 int64 fd
; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
252 io_batch outdata
; /* The iobatch containing our sync data */
254 size_t packet_tcount
; /* Number of unprocessed torrents in packet we currently receive */
255 uint8_t packet_tprefix
; /* Prefix byte for all torrents in current packet */
256 uint8_t packet_type
; /* Type of current packet */
257 uint32_t packet_tid
; /* Tracker id for current packet */
260 static void process_indata( proxy_peer
* peer
);
262 void reset_info_block( proxy_peer
* peer
) {
263 peer
->indata_length
= 0;
264 peer
->tracker_id
= 0;
266 peer
->packet_tcount
= 0;
267 iob_reset( &peer
->outdata
);
268 PROXYPEER_SETDISCONNECTED( peer
->state
);
271 /* Number of connections to peers
272 * If a peer's IP is set, we try to reconnect, when the connection drops
273 * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
274 * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
275 * Reconnect attempts occur only twice a minute
277 static int g_connection_count
;
278 static ot_time g_connection_reconn
;
279 static proxy_peer g_connections
[MAX_PEERS
];
281 static void handle_reconnects( void ) {
283 for( i
=0; i
<g_connection_count
; ++i
)
284 if( PROXYPEER_NEEDSCONNECT( g_connections
[i
].state
) ) {
285 int64 newfd
= socket_tcp6( );
286 fprintf( stderr
, "(Re)connecting to peer..." );
287 if( newfd
< 0 ) continue; /* No socket for you */
289 if( socket_bind6_reuse(newfd
,g_serverip
,g_serverport
,0) ) {
293 if( socket_connect6(newfd
,g_connections
[i
].ip
,g_connections
[i
].port
,0) == -1 &&
294 errno
!= EINPROGRESS
&& errno
!= EWOULDBLOCK
) {
298 io_wantwrite(newfd
); /* So we will be informed when it is connected */
299 io_setcookie(newfd
,g_connections
+i
);
301 /* Prepare connection info block */
302 reset_info_block( g_connections
+i
);
303 g_connections
[i
].fd
= newfd
;
304 PROXYPEER_SETCONNECTING( g_connections
[i
].state
);
306 g_connection_reconn
= time(NULL
) + 30;
309 /* Handle incoming connection requests, check against whitelist */
310 static void handle_accept( int64 serversocket
) {
315 while( ( newfd
= socket_accept6( serversocket
, ip
, &port
, NULL
) ) != -1 ) {
317 /* XXX some access control */
319 /* Put fd into a non-blocking mode */
320 io_nonblock( newfd
);
322 if( !io_fd( newfd
) )
325 /* Find a new home for our incoming connection */
327 for( i
=0; i
<MAX_PEERS
; ++i
)
328 if( g_connections
[i
].state
== FLAG_DISCONNECTED
)
330 if( i
== MAX_PEERS
) {
331 fprintf( stderr
, "No room for incoming connection." );
336 /* Prepare connection info block */
337 reset_info_block( g_connections
+i
);
338 PROXYPEER_SETCONNECTING( g_connections
[i
].state
);
339 g_connections
[i
].port
= port
;
340 g_connections
[i
].fd
= newfd
;
342 io_setcookie( newfd
, g_connections
+ i
);
344 /* We expect the connecting side to begin with its tracker_id */
345 io_wantread( newfd
);
352 /* New sync data on the stream */
353 static void handle_read( int64 peersocket
) {
357 proxy_peer
*peer
= io_getcookie( peersocket
);
360 /* Can't happen ;) */
361 io_close( peersocket
);
364 switch( peer
->state
& FLAG_MASK
) {
365 case FLAG_DISCONNECTED
:
366 io_close( peersocket
);
367 break; /* Shouldnt happen */
368 case FLAG_CONNECTING
:
369 case FLAG_WAITTRACKERID
:
370 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
371 This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
372 if( io_tryread( peersocket
, (void*)&tracker_id
, sizeof( tracker_id
) ) != sizeof( tracker_id
) )
375 /* See, if we already have a connection to that peer */
376 for( i
=0; i
<MAX_PEERS
; ++i
)
377 if( ( g_connections
[i
].state
& FLAG_MASK
) == FLAG_CONNECTED
&&
378 g_connections
[i
].tracker_id
== tracker_id
) {
379 fprintf( stderr
, "Peer already connected. Closing connection.\n" );
383 /* Also no need for soliloquy */
384 if( tracker_id
== g_tracker_id
)
387 /* The new connection is good, send our tracker_id on incoming connections */
388 if( peer
->state
== FLAG_CONNECTING
)
389 if( io_trywrite( peersocket
, (void*)&g_tracker_id
, sizeof( g_tracker_id
) ) != sizeof( g_tracker_id
) )
392 peer
->tracker_id
= tracker_id
;
393 PROXYPEER_SETCONNECTED( peer
->state
);
395 if( peer
->state
& FLAG_OUTGOING
)
396 fprintf( stderr
, "succeeded.\n" );
398 fprintf( stderr
, "Incoming connection successful.\n" );
402 fprintf( stderr
, "Handshake incomplete, closing socket\n" );
403 io_close( peersocket
);
404 reset_info_block( peer
);
407 /* Here we acutally expect data from peer
408 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
409 datalen
= io_tryread( peersocket
, (void*)(peer
->indata
+ peer
->indata_length
), sizeof( peer
->indata
) - peer
->indata_length
);
410 if( !datalen
|| datalen
< -1 ) {
411 fprintf( stderr
, "Connection closed by remote peer.\n" );
412 io_close( peersocket
);
413 reset_info_block( peer
);
414 } else if( datalen
> 0 ) {
415 peer
->indata_length
+= datalen
;
416 process_indata( peer
);
422 /* Can write new sync data to the stream */
423 static void handle_write( int64 peersocket
) {
424 proxy_peer
*peer
= io_getcookie( peersocket
);
427 /* Can't happen ;) */
428 io_close( peersocket
);
432 switch( peer
->state
& FLAG_MASK
) {
433 case FLAG_DISCONNECTED
:
434 default: /* Should not happen */
435 io_close( peersocket
);
437 case FLAG_CONNECTING
:
438 /* Ensure that the connection is established and handle connection error */
439 if( peer
->state
& FLAG_OUTGOING
&& !socket_connected( peersocket
) ) {
440 fprintf( stderr
, "failed\n" );
441 reset_info_block( peer
);
442 io_close( peersocket
);
446 if( io_trywrite( peersocket
, (void*)&g_tracker_id
, sizeof( g_tracker_id
) ) == sizeof( g_tracker_id
) ) {
447 PROXYPEER_SETWAITTRACKERID( peer
->state
);
448 io_dontwantwrite( peersocket
);
449 io_wantread( peersocket
);
451 fprintf( stderr
, "Handshake incomplete, closing socket\n" );
452 io_close( peersocket
);
453 reset_info_block( peer
);
457 switch( iob_send( peersocket
, &peer
->outdata
) ) {
458 case 0: /* all data sent */
459 io_dontwantwrite( peersocket
);
461 case -3: /* an error occured */
462 io_close( peersocket
);
463 reset_info_block( peer
);
465 default: /* Normal operation or eagain */
474 static void server_mainloop() {
477 /* inlined livesync_init() */
478 memset( g_peerbuffer_start
, 0, sizeof( g_peerbuffer_start
) );
479 g_peerbuffer_pos
= g_peerbuffer_start
;
480 memcpy( g_peerbuffer_pos
, &g_tracker_id
, sizeof( g_tracker_id
) );
481 uint32_pack_big( (char*)g_peerbuffer_pos
+ sizeof( g_tracker_id
), OT_SYNC_PEER
);
482 g_peerbuffer_pos
+= sizeof( g_tracker_id
) + sizeof( uint32_t);
483 g_next_packet_time
= time(NULL
) + LIVESYNC_MAXDELAY
;
486 /* See if we need to connect to anyone */
487 if( time(NULL
) > g_connection_reconn
)
488 handle_reconnects( );
490 /* Wait for io events until next approx reconn check time */
491 io_waituntil2( 30*1000 );
493 /* Loop over readable sockets */
494 while( ( sock
= io_canread( ) ) != -1 ) {
495 const void *cookie
= io_getcookie( sock
);
496 if( (uintptr_t)cookie
== FLAG_SERVERSOCKET
)
497 handle_accept( sock
);
502 /* Loop over writable sockets */
503 while( ( sock
= io_canwrite( ) ) != -1 )
504 handle_write( sock
);
510 static void panic( const char *routine
) {
511 fprintf( stderr
, "%s: %s\n", routine
, strerror(errno
) );
515 static int64_t ot_try_bind( ot_ip6 ip
, uint16_t port
) {
516 int64 sock
= socket_tcp6( );
518 if( socket_bind6_reuse( sock
, ip
, port
, 0 ) == -1 )
519 panic( "socket_bind6_reuse" );
521 if( socket_listen( sock
, SOMAXCONN
) == -1 )
522 panic( "socket_listen" );
527 io_setcookie( sock
, (void*)FLAG_SERVERSOCKET
);
533 static int scan_ip6_port( const char *src
, ot_ip6 ip
, uint16
*port
) {
535 int off
, bracket
= 0;
536 while( isspace(*s
) ) ++s
;
537 if( *s
== '[' ) ++s
, ++bracket
; /* for v6 style notation */
538 if( !(off
= scan_ip6( s
, ip
) ) )
541 if( *s
== 0 || isspace(*s
)) return s
-src
;
542 if( *s
== ']' && bracket
) ++s
;
543 if( !ip6_isv4mapped(ip
)){
544 if( ( bracket
&& *(s
) != ':' ) || ( *(s
) != '.' ) ) return 0;
547 if( *(s
++) != ':' ) return 0;
549 if( !(off
= scan_ushort (s
, port
) ) )
554 int main( int argc
, char **argv
) {
555 static pthread_t sync_in_thread_id
;
556 static pthread_t sync_out_thread_id
;
559 int scanon
= 1, lbound
= 0, sbound
= 0;
561 srandom( time(NULL
) );
562 #ifdef WANT_ARC4RANDOM
563 g_tracker_id
= arc4random();
565 g_tracker_id
= random();
569 switch( getopt( argc
, argv
, ":l:c:L:h" ) ) {
570 case -1: scanon
= 0; break;
573 if( !scan_ip6_port( optarg
, serverip
, &tmpport
) || !tmpport
) { usage( argv
[0] ); exit( 1 ); }
574 ot_try_bind( serverip
, tmpport
);
578 if( g_connection_count
> MAX_PEERS
/ 2 ) exerr( "Connection limit exceeded.\n" );
580 if( !scan_ip6_port( optarg
,
581 g_connections
[g_connection_count
].ip
,
582 &g_connections
[g_connection_count
].port
) ||
583 !g_connections
[g_connection_count
].port
) { usage( argv
[0] ); exit( 1 ); }
584 g_connections
[g_connection_count
++].state
= FLAG_OUTGOING
;
588 if( !scan_ip6_port( optarg
, serverip
, &tmpport
) || !tmpport
) { usage( argv
[0] ); exit( 1 ); }
589 livesync_bind_mcast( serverip
, tmpport
); ++lbound
; break;
591 case '?': usage( argv
[0] ); exit( 1 );
595 if( !lbound
) exerr( "No livesync port bound." );
596 if( !g_connection_count
&& !sbound
) exerr( "No streamsync port bound." );
597 pthread_create( &sync_in_thread_id
, NULL
, livesync_worker
, NULL
);
598 pthread_create( &sync_out_thread_id
, NULL
, streamsync_worker
, NULL
);
604 static void * streamsync_worker( void * args
) {
608 /* For each bucket... */
609 for( bucket
=0; bucket
<OT_BUCKET_COUNT
; ++bucket
) {
610 /* Get exclusive access to that bucket */
611 ot_vector
*torrents_list
= mutex_bucket_lock( bucket
);
612 size_t tor_offset
, count_def
= 0, count_one
= 0, count_two
= 0, count_peers
= 0;
613 size_t mem
, mem_a
= 0, mem_b
= 0;
614 uint8_t *ptr
= 0, *ptr_a
, *ptr_b
, *ptr_c
;
616 if( !torrents_list
->size
) goto unlock_continue
;
618 /* For each torrent in this bucket.. */
619 for( tor_offset
=0; tor_offset
<torrents_list
->size
; ++tor_offset
) {
620 /* Address torrents members */
621 ot_peerlist
*peer_list
= ( ((ot_torrent
*)(torrents_list
->data
))[tor_offset
] ).peer_list
;
622 switch( peer_list
->peer_count
) {
623 case 2: count_two
++; break;
624 case 1: count_one
++; break;
626 default: count_def
++;
627 count_peers
+= peer_list
->peer_count
;
631 /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
632 mem
= 3 * ( 1 + 1 + 2 ) + ( count_one
+ count_two
) * ( 19 + 1 ) + count_def
* ( 19 + 8 ) +
633 ( count_one
+ 2 * count_two
+ count_peers
) * 7;
635 fprintf( stderr
, "Mem: %zd\n", mem
);
637 ptr
= ptr_a
= ptr_b
= ptr_c
= malloc( mem
);
638 if( !ptr
) goto unlock_continue
;
640 if( count_one
> 4 || !count_def
) {
641 mem_a
= 1 + 1 + 2 + count_one
* ( 19 + 7 );
642 ptr_b
+= mem_a
; ptr_c
+= mem_a
;
643 ptr_a
[0] = 1; /* Offset 0: packet type 1 */
644 ptr_a
[1] = (bucket
<< 8) >> OT_BUCKET_COUNT_BITS
; /* Offset 1: the shared prefix */
645 ptr_a
[2] = count_one
>> 8;
646 ptr_a
[3] = count_one
& 255;
649 count_def
+= count_one
;
651 if( count_two
> 4 || !count_def
) {
652 mem_b
= 1 + 1 + 2 + count_two
* ( 19 + 14 );
654 ptr_b
[0] = 2; /* Offset 0: packet type 2 */
655 ptr_b
[1] = (bucket
<< 8) >> OT_BUCKET_COUNT_BITS
; /* Offset 1: the shared prefix */
656 ptr_b
[2] = count_two
>> 8;
657 ptr_b
[3] = count_two
& 255;
660 count_def
+= count_two
;
663 ptr_c
[0] = 0; /* Offset 0: packet type 0 */
664 ptr_c
[1] = (bucket
<< 8) >> OT_BUCKET_COUNT_BITS
; /* Offset 1: the shared prefix */
665 ptr_c
[2] = count_def
>> 8;
666 ptr_c
[3] = count_def
& 255;
670 /* For each torrent in this bucket.. */
671 for( tor_offset
=0; tor_offset
<torrents_list
->size
; ++tor_offset
) {
672 /* Address torrents members */
673 ot_torrent
*torrent
= ((ot_torrent
*)(torrents_list
->data
)) + tor_offset
;
674 ot_peerlist
*peer_list
= torrent
->peer_list
;
675 ot_peer
*peers
= (ot_peer
*)(peer_list
->peers
.data
);
678 /* Determine destination slot */
679 count_peers
= peer_list
->peer_count
;
680 switch( count_peers
) {
682 case 1: dst
= mem_a
? &ptr_a
: &ptr_c
; break;
683 case 2: dst
= mem_b
? &ptr_b
: &ptr_c
; break;
684 default: dst
= &ptr_c
; break;
687 /* Copy tail of info_hash, advance pointer */
688 memcpy( *dst
, ((uint8_t*)torrent
->hash
) + 1, sizeof( ot_hash
) - 1);
689 *dst
+= sizeof( ot_hash
) - 1;
691 /* Encode peer count */
693 while( count_peers
) {
694 if( count_peers
<= 0x7f )
695 *(*dst
)++ = count_peers
;
697 *(*dst
)++ = 0x80 | ( count_peers
& 0x7f );
702 count_peers
= peer_list
->peer_count
;
703 while( count_peers
-- ) {
704 memcpy( *dst
, peers
++, OT_IP_SIZE
+ 3 );
705 *dst
+= OT_IP_SIZE
+ 3;
707 free_peerlist(peer_list
);
710 free( torrents_list
->data
);
711 memset( torrents_list
, 0, sizeof(*torrents_list
) );
713 mutex_bucket_unlock( bucket
, 0 );
718 if( ptr_b
> ptr_c
) ptr_c
= ptr_b
;
719 if( ptr_a
> ptr_c
) ptr_c
= ptr_a
;
722 for( i
=0; i
< MAX_PEERS
; ++i
) {
723 if( PROXYPEER_ISCONNECTED(g_connections
[i
].state
) ) {
724 void *tmp
= malloc( mem
);
726 memcpy( tmp
, ptr
, mem
);
727 iob_addbuf_free( &g_connections
[i
].outdata
, tmp
, mem
);
728 io_wantwrite( g_connections
[i
].fd
);
735 usleep( OT_SYNC_SLEEP
);
741 static void livesync_issue_peersync( ) {
742 socket_send4(g_socket_out
, (char*)g_peerbuffer_start
, g_peerbuffer_pos
- g_peerbuffer_start
,
743 groupip_1
, LIVESYNC_PORT
);
744 g_peerbuffer_pos
= g_peerbuffer_start
+ sizeof( g_tracker_id
) + sizeof( uint32_t );
745 g_next_packet_time
= time(NULL
) + LIVESYNC_MAXDELAY
;
748 void livesync_ticker( ) {
749 /* livesync_issue_peersync sets g_next_packet_time */
750 if( time(NULL
) > g_next_packet_time
&&
751 g_peerbuffer_pos
> g_peerbuffer_start
+ sizeof( g_tracker_id
) )
752 livesync_issue_peersync();
755 static void livesync_proxytell( uint8_t prefix
, uint8_t *info_hash
, uint8_t *peer
) {
758 *g_peerbuffer_pos
= prefix
;
759 memcpy( g_peerbuffer_pos
+ 1, info_hash
, sizeof(ot_hash
) - 1 );
760 memcpy( g_peerbuffer_pos
+ sizeof(ot_hash
), peer
, sizeof(ot_peer
) - 1 );
764 for( i
=0; i
<sizeof(ot_hash
); ++i
)
765 printf( "%02X", g_peerbuffer_pos
[i
] );
768 g_peerbuffer_pos
+= sizeof(ot_hash
);
770 printf( "%hhu.%hhu.%hhu.%hhu:%hu (%02X %02X)\n", g_peerbuffer_pos
[0], g_peerbuffer_pos
[1], g_peerbuffer_pos
[2], g_peerbuffer_pos
[3],
771 g_peerbuffer_pos
[4] | ( g_peerbuffer_pos
[5] << 8 ), g_peerbuffer_pos
[6], g_peerbuffer_pos
[7] );
773 g_peerbuffer_pos
+= sizeof(ot_peer
);
775 if( g_peerbuffer_pos
>= g_peerbuffer_highwater
)
776 livesync_issue_peersync();
779 static void process_indata( proxy_peer
* peer
) {
780 size_t consumed
, peers
;
781 uint8_t *data
= peer
->indata
, *hash
;
782 uint8_t *dataend
= data
+ peer
->indata_length
;
785 /* If we're not inside of a packet, make a new one */
786 if( !peer
->packet_tcount
) {
787 /* Ensure the header is complete or postpone processing */
788 if( data
+ 4 > dataend
) break;
789 peer
->packet_type
= data
[0];
790 peer
->packet_tprefix
= data
[1];
791 peer
->packet_tcount
= data
[2] * 256 + data
[3];
793 printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer
->packet_type
, peer
->packet_tprefix
, peer
->packet_tcount
);
796 /* Ensure size for a minimal torrent block */
797 if( data
+ sizeof(ot_hash
) + OT_IP_SIZE
+ 3 > dataend
) break;
799 /* Advance pointer to peer count or peers */
801 data
+= sizeof(ot_hash
) - 1;
803 /* Type 0 has peer count encoded before each peers */
804 peers
= peer
->packet_type
;
807 do peers
|= ( 0x7f & *data
) << ( 7 * shift
);
808 while ( *(data
++) & 0x80 && shift
++ < 6 );
811 printf( "peers: %zd\n", peers
);
813 /* Ensure enough data being read to hold all peers */
814 if( data
+ (OT_IP_SIZE
+ 3) * peers
> dataend
) {
819 livesync_proxytell( peer
->packet_tprefix
, hash
, data
);
820 data
+= OT_IP_SIZE
+ 3;
822 --peer
->packet_tcount
;
825 consumed
= data
- peer
->indata
;
826 memmove( peer
->indata
, data
, peer
->indata_length
- consumed
);
827 peer
->indata_length
-= consumed
;
830 static void * livesync_worker( void * args
) {
833 ot_ip6 in_ip
; uint16_t in_port
;
834 size_t datalen
= socket_recv4(g_socket_in
, (char*)g_inbuffer
, LIVESYNC_INCOMING_BUFFSIZE
, 12+(char*)in_ip
, &in_port
);
836 /* Expect at least tracker id and packet type */
837 if( datalen
<= (ssize_t
)(sizeof( g_tracker_id
) + sizeof( uint32_t )) )
839 if( !memcmp( g_inbuffer
, &g_tracker_id
, sizeof( g_tracker_id
) ) ) {
840 /* drop packet coming from ourselves */
843 switch( uint32_read_big( (char*)g_inbuffer
+ sizeof( g_tracker_id
) ) ) {
845 livesync_handle_peersync( datalen
, OT_PEER_SIZE4
);
848 livesync_handle_peersync( datalen
, OT_PEER_SIZE6
);
851 // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );