Make chunked transfers use gzip also
[opentracker.git] / proxy.c
blobc25611be943bc08ebac6b2f2b173883c739a25ce
1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
4 $Id$ */
6 /* System */
7 #include <stdint.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <arpa/inet.h>
11 #include <sys/socket.h>
12 #include <unistd.h>
13 #include <errno.h>
14 #include <signal.h>
15 #include <stdio.h>
16 #include <pwd.h>
17 #include <ctype.h>
18 #include <pthread.h>
20 /* Libowfat */
21 #include "socket.h"
22 #include "io.h"
23 #include "iob.h"
24 #include "byte.h"
25 #include "scan.h"
26 #include "ip6.h"
27 #include "ndelay.h"
29 /* Opentracker */
30 #include "trackerlogic.h"
31 #include "ot_vector.h"
32 #include "ot_mutex.h"
33 #include "ot_stats.h"
35 #ifndef WANT_SYNC_LIVE
36 #define WANT_SYNC_LIVE
37 #endif
38 #include "ot_livesync.h"
40 ot_ip6 g_serverip;
41 uint16_t g_serverport = 9009;
42 uint32_t g_tracker_id;
43 char groupip_1[4] = { 224,0,23,5 };
44 int g_self_pipe[2];
46 /* If you have more than 10 peers, don't use this proxy
47 Use 20 slots for 10 peers to have room for 10 incoming connection slots
49 #define MAX_PEERS 20
51 #define LIVESYNC_INCOMING_BUFFSIZE (256*256)
52 #define STREAMSYNC_OUTGOING_BUFFSIZE (256*256)
54 #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
55 #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
56 #define LIVESYNC_MAXDELAY 15 /* seconds */
58 /* The amount of time a complete sync cycle should take */
59 #define OT_SYNC_INTERVAL_MINUTES 2
61 /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
62 #define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) )
64 enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
65 enum { FLAG_SERVERSOCKET = 1 };
67 /* For incoming packets */
68 static int64 g_socket_in = -1;
69 static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];
71 /* For outgoing packets */
72 static int64 g_socket_out = -1;
73 static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
74 static uint8_t *g_peerbuffer_pos;
75 static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
76 static ot_time g_next_packet_time;
78 static void * livesync_worker( void * args );
79 static void * streamsync_worker( void * args );
80 static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer );
82 void exerr( char * message ) {
83 fprintf( stderr, "%s\n", message );
84 exit( 111 );
87 void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) {
88 (void) event;
89 (void) proto;
90 (void) event_data;
93 void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
94 char tmpip[4] = {0,0,0,0};
95 char *v4ip;
97 if( !ip6_isv4mapped(ip))
98 exerr("v6 mcast support not yet available.");
99 v4ip = ip+12;
101 if( g_socket_in != -1 )
102 exerr("Error: Livesync listen ip specified twice.");
104 if( ( g_socket_in = socket_udp4( )) < 0)
105 exerr("Error: Cant create live sync incoming socket." );
106 ndelay_off(g_socket_in);
108 if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 )
109 exerr("Error: Cant bind live sync incoming socket." );
111 if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) )
112 exerr("Error: Cant make live sync incoming socket join mcast group.");
114 if( ( g_socket_out = socket_udp4()) < 0)
115 exerr("Error: Cant create live sync outgoing socket." );
116 if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 )
117 exerr("Error: Cant bind live sync outgoing socket." );
119 socket_mcttl4(g_socket_out, 1);
120 socket_mcloop4(g_socket_out, 1);
123 size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) {
124 int exactmatch;
125 ot_torrent *torrent;
126 ot_peerlist *peer_list;
127 ot_peer *peer_dest;
128 ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
129 size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size);
131 torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), compare_size, &exactmatch );
132 if( !torrent )
133 return -1;
135 if( !exactmatch ) {
136 /* Create a new torrent entry, then */
137 memcpy( torrent->hash, hash, sizeof(ot_hash) );
139 if( !( torrent->peer_list6 = malloc( sizeof (ot_peerlist) ) ) ||
140 !( torrent->peer_list4 = malloc( sizeof (ot_peerlist) ) ) ) {
141 vector_remove_torrent( torrents_list, torrent );
142 mutex_bucket_unlock_by_hash( hash, 0 );
143 return -1;
146 byte_zero( torrent->peer_list6, sizeof( ot_peerlist ) );
147 byte_zero( torrent->peer_list4, sizeof( ot_peerlist ) );
150 peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
152 /* Check for peer in torrent */
153 peer_dest = vector_find_or_insert_peer( &(peer_list->peers), peer, peer_size, &exactmatch );
154 if( !peer_dest ) {
155 mutex_bucket_unlock_by_hash( hash, 0 );
156 return -1;
158 /* Tell peer that it's fresh */
159 OT_PEERTIME( peer, peer_size ) = 0;
161 /* If we hadn't had a match create peer there */
162 if( !exactmatch ) {
163 peer_list->peer_count++;
164 if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING )
165 peer_list->seed_count++;
167 memcpy( peer_dest, peer, peer_size );
168 mutex_bucket_unlock_by_hash( hash, 0 );
169 return 0;
172 size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) {
173 int exactmatch;
174 ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
175 ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
177 if( exactmatch ) {
178 ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
179 switch( vector_remove_peer( &peer_list->peers, peer, peer_size ) ) {
180 case 2: peer_list->seed_count--; /* Intentional fallthrough */
181 case 1: peer_list->peer_count--; /* Intentional fallthrough */
182 default: break;
186 mutex_bucket_unlock_by_hash( hash, 0 );
187 return 0;
190 void free_peerlist( ot_peerlist *peer_list ) {
191 if( peer_list->peers.data ) {
192 if( OT_PEERLIST_HASBUCKETS( peer_list ) ) {
193 ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data);
195 while( peer_list->peers.size-- )
196 free( bucket_list++->data );
198 free( peer_list->peers.data );
200 free( peer_list );
203 static void livesync_handle_peersync( ssize_t datalen, size_t peer_size ) {
204 int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
206 fprintf( stderr, "." );
208 while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= datalen ) {
209 ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash));
210 ot_hash *hash = (ot_hash*)(g_inbuffer + off);
212 if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED )
213 remove_peer_from_torrent_proxy( *hash, peer, peer_size );
214 else
215 add_peer_to_torrent_proxy( *hash, peer, peer_size );
217 off += sizeof( ot_hash ) + peer_size;
221 int usage( char *self ) {
222 fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self );
223 return 0;
226 enum {
227 FLAG_OUTGOING = 0x80,
229 FLAG_DISCONNECTED = 0x00,
230 FLAG_CONNECTING = 0x01,
231 FLAG_WAITTRACKERID = 0x02,
232 FLAG_CONNECTED = 0x03,
234 FLAG_MASK = 0x07
237 #define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING)
238 #define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED)
239 #define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED)
240 #define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING)
241 #define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID)
242 #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED)
244 typedef struct {
245 int state; /* Whether we want to connect, how far our handshake is, etc. */
246 ot_ip6 ip; /* The peer to connect to */
247 uint16_t port; /* The peers port */
248 uint8_t indata[8192*16]; /* Any data not processed yet */
249 size_t indata_length; /* Length of unprocessed data */
250 uint32_t tracker_id; /* How the other end greeted */
251 int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
252 io_batch outdata; /* The iobatch containing our sync data */
254 size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */
255 uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */
256 uint8_t packet_type; /* Type of current packet */
257 uint32_t packet_tid; /* Tracker id for current packet */
259 } proxy_peer;
260 static void process_indata( proxy_peer * peer );
262 void reset_info_block( proxy_peer * peer ) {
263 peer->indata_length = 0;
264 peer->tracker_id = 0;
265 peer->fd = -1;
266 peer->packet_tcount = 0;
267 iob_reset( &peer->outdata );
268 PROXYPEER_SETDISCONNECTED( peer->state );
271 /* Number of connections to peers
272 * If a peer's IP is set, we try to reconnect, when the connection drops
273 * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
274 * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
275 * Reconnect attempts occur only twice a minute
277 static int g_connection_count;
278 static ot_time g_connection_reconn;
279 static proxy_peer g_connections[MAX_PEERS];
281 static void handle_reconnects( void ) {
282 int i;
283 for( i=0; i<g_connection_count; ++i )
284 if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) {
285 int64 newfd = socket_tcp6( );
286 fprintf( stderr, "(Re)connecting to peer..." );
287 if( newfd < 0 ) continue; /* No socket for you */
288 io_fd(newfd);
289 if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) {
290 io_close( newfd );
291 continue;
293 if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 &&
294 errno != EINPROGRESS && errno != EWOULDBLOCK ) {
295 close(newfd);
296 continue;
298 io_wantwrite(newfd); /* So we will be informed when it is connected */
299 io_setcookie(newfd,g_connections+i);
301 /* Prepare connection info block */
302 reset_info_block( g_connections+i );
303 g_connections[i].fd = newfd;
304 PROXYPEER_SETCONNECTING( g_connections[i].state );
306 g_connection_reconn = time(NULL) + 30;
309 /* Handle incoming connection requests, check against whitelist */
310 static void handle_accept( int64 serversocket ) {
311 int64 newfd;
312 ot_ip6 ip;
313 uint16 port;
315 while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) {
317 /* XXX some access control */
319 /* Put fd into a non-blocking mode */
320 io_nonblock( newfd );
322 if( !io_fd( newfd ) )
323 io_close( newfd );
324 else {
325 /* Find a new home for our incoming connection */
326 int i;
327 for( i=0; i<MAX_PEERS; ++i )
328 if( g_connections[i].state == FLAG_DISCONNECTED )
329 break;
330 if( i == MAX_PEERS ) {
331 fprintf( stderr, "No room for incoming connection." );
332 close( newfd );
333 continue;
336 /* Prepare connection info block */
337 reset_info_block( g_connections+i );
338 PROXYPEER_SETCONNECTING( g_connections[i].state );
339 g_connections[i].port = port;
340 g_connections[i].fd = newfd;
342 io_setcookie( newfd, g_connections + i );
344 /* We expect the connecting side to begin with its tracker_id */
345 io_wantread( newfd );
349 return;
352 /* New sync data on the stream */
353 static void handle_read( int64 peersocket ) {
354 int i;
355 int64 datalen;
356 uint32_t tracker_id;
357 proxy_peer *peer = io_getcookie( peersocket );
359 if( !peer ) {
360 /* Can't happen ;) */
361 io_close( peersocket );
362 return;
364 switch( peer->state & FLAG_MASK ) {
365 case FLAG_DISCONNECTED:
366 io_close( peersocket );
367 break; /* Shouldnt happen */
368 case FLAG_CONNECTING:
369 case FLAG_WAITTRACKERID:
370 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
371 This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
372 if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) )
373 goto close_socket;
375 /* See, if we already have a connection to that peer */
376 for( i=0; i<MAX_PEERS; ++i )
377 if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED &&
378 g_connections[i].tracker_id == tracker_id ) {
379 fprintf( stderr, "Peer already connected. Closing connection.\n" );
380 goto close_socket;
383 /* Also no need for soliloquy */
384 if( tracker_id == g_tracker_id )
385 goto close_socket;
387 /* The new connection is good, send our tracker_id on incoming connections */
388 if( peer->state == FLAG_CONNECTING )
389 if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) )
390 goto close_socket;
392 peer->tracker_id = tracker_id;
393 PROXYPEER_SETCONNECTED( peer->state );
395 if( peer->state & FLAG_OUTGOING )
396 fprintf( stderr, "succeeded.\n" );
397 else
398 fprintf( stderr, "Incoming connection successful.\n" );
400 break;
401 close_socket:
402 fprintf( stderr, "Handshake incomplete, closing socket\n" );
403 io_close( peersocket );
404 reset_info_block( peer );
405 break;
406 case FLAG_CONNECTED:
407 /* Here we acutally expect data from peer
408 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
409 datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length );
410 if( !datalen || datalen < -1 ) {
411 fprintf( stderr, "Connection closed by remote peer.\n" );
412 io_close( peersocket );
413 reset_info_block( peer );
414 } else if( datalen > 0 ) {
415 peer->indata_length += datalen;
416 process_indata( peer );
418 break;
422 /* Can write new sync data to the stream */
423 static void handle_write( int64 peersocket ) {
424 proxy_peer *peer = io_getcookie( peersocket );
426 if( !peer ) {
427 /* Can't happen ;) */
428 io_close( peersocket );
429 return;
432 switch( peer->state & FLAG_MASK ) {
433 case FLAG_DISCONNECTED:
434 default: /* Should not happen */
435 io_close( peersocket );
436 break;
437 case FLAG_CONNECTING:
438 /* Ensure that the connection is established and handle connection error */
439 if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) {
440 fprintf( stderr, "failed\n" );
441 reset_info_block( peer );
442 io_close( peersocket );
443 break;
446 if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) {
447 PROXYPEER_SETWAITTRACKERID( peer->state );
448 io_dontwantwrite( peersocket );
449 io_wantread( peersocket );
450 } else {
451 fprintf( stderr, "Handshake incomplete, closing socket\n" );
452 io_close( peersocket );
453 reset_info_block( peer );
455 break;
456 case FLAG_CONNECTED:
457 switch( iob_send( peersocket, &peer->outdata ) ) {
458 case 0: /* all data sent */
459 io_dontwantwrite( peersocket );
460 break;
461 case -3: /* an error occured */
462 io_close( peersocket );
463 reset_info_block( peer );
464 break;
465 default: /* Normal operation or eagain */
466 break;
468 break;
471 return;
474 static void server_mainloop() {
475 int64 sock;
477 /* inlined livesync_init() */
478 memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) );
479 g_peerbuffer_pos = g_peerbuffer_start;
480 memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
481 uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER);
482 g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
483 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
485 while(1) {
486 /* See if we need to connect to anyone */
487 if( time(NULL) > g_connection_reconn )
488 handle_reconnects( );
490 /* Wait for io events until next approx reconn check time */
491 io_waituntil2( 30*1000 );
493 /* Loop over readable sockets */
494 while( ( sock = io_canread( ) ) != -1 ) {
495 const void *cookie = io_getcookie( sock );
496 if( (uintptr_t)cookie == FLAG_SERVERSOCKET )
497 handle_accept( sock );
498 else
499 handle_read( sock );
502 /* Loop over writable sockets */
503 while( ( sock = io_canwrite( ) ) != -1 )
504 handle_write( sock );
506 livesync_ticker( );
510 static void panic( const char *routine ) {
511 fprintf( stderr, "%s: %s\n", routine, strerror(errno) );
512 exit( 111 );
515 static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) {
516 int64 sock = socket_tcp6( );
518 if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 )
519 panic( "socket_bind6_reuse" );
521 if( socket_listen( sock, SOMAXCONN) == -1 )
522 panic( "socket_listen" );
524 if( !io_fd( sock ) )
525 panic( "io_fd" );
527 io_setcookie( sock, (void*)FLAG_SERVERSOCKET );
528 io_wantread( sock );
529 return sock;
533 static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) {
534 const char *s = src;
535 int off, bracket = 0;
536 while( isspace(*s) ) ++s;
537 if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */
538 if( !(off = scan_ip6( s, ip ) ) )
539 return 0;
540 s += off;
541 if( *s == 0 || isspace(*s)) return s-src;
542 if( *s == ']' && bracket ) ++s;
543 if( !ip6_isv4mapped(ip)){
544 if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0;
545 s++;
546 } else {
547 if( *(s++) != ':' ) return 0;
549 if( !(off = scan_ushort (s, port ) ) )
550 return 0;
551 return off+s-src;
554 int main( int argc, char **argv ) {
555 static pthread_t sync_in_thread_id;
556 static pthread_t sync_out_thread_id;
557 ot_ip6 serverip;
558 uint16_t tmpport;
559 int scanon = 1, lbound = 0, sbound = 0;
561 srandom( time(NULL) );
562 #ifdef WANT_ARC4RANDOM
563 g_tracker_id = arc4random();
564 #else
565 g_tracker_id = random();
566 #endif
568 while( scanon ) {
569 switch( getopt( argc, argv, ":l:c:L:h" ) ) {
570 case -1: scanon = 0; break;
571 case 'l':
572 tmpport = 0;
573 if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
574 ot_try_bind( serverip, tmpport );
575 ++sbound;
576 break;
577 case 'c':
578 if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" );
579 tmpport = 0;
580 if( !scan_ip6_port( optarg,
581 g_connections[g_connection_count].ip,
582 &g_connections[g_connection_count].port ) ||
583 !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); }
584 g_connections[g_connection_count++].state = FLAG_OUTGOING;
585 break;
586 case 'L':
587 tmpport = 9696;
588 if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
589 livesync_bind_mcast( serverip, tmpport); ++lbound; break;
590 default:
591 case '?': usage( argv[0] ); exit( 1 );
595 if( !lbound ) exerr( "No livesync port bound." );
596 if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." );
597 pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL );
598 pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL );
600 server_mainloop();
601 return 0;
604 static void * streamsync_worker( void * args ) {
605 (void)args;
606 while( 1 ) {
607 int bucket;
608 /* For each bucket... */
609 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
610 /* Get exclusive access to that bucket */
611 ot_vector *torrents_list = mutex_bucket_lock( bucket );
612 size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
613 size_t mem, mem_a = 0, mem_b = 0;
614 uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c;
616 if( !torrents_list->size ) goto unlock_continue;
618 /* For each torrent in this bucket.. */
619 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
620 /* Address torrents members */
621 ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
622 switch( peer_list->peer_count ) {
623 case 2: count_two++; break;
624 case 1: count_one++; break;
625 case 0: break;
626 default: count_def++;
627 count_peers += peer_list->peer_count;
631 /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
632 mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) +
633 ( count_one + 2 * count_two + count_peers ) * 7;
635 fprintf( stderr, "Mem: %zd\n", mem );
637 ptr = ptr_a = ptr_b = ptr_c = malloc( mem );
638 if( !ptr ) goto unlock_continue;
640 if( count_one > 4 || !count_def ) {
641 mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 );
642 ptr_b += mem_a; ptr_c += mem_a;
643 ptr_a[0] = 1; /* Offset 0: packet type 1 */
644 ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
645 ptr_a[2] = count_one >> 8;
646 ptr_a[3] = count_one & 255;
647 ptr_a += 4;
648 } else
649 count_def += count_one;
651 if( count_two > 4 || !count_def ) {
652 mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 );
653 ptr_c += mem_b;
654 ptr_b[0] = 2; /* Offset 0: packet type 2 */
655 ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
656 ptr_b[2] = count_two >> 8;
657 ptr_b[3] = count_two & 255;
658 ptr_b += 4;
659 } else
660 count_def += count_two;
662 if( count_def ) {
663 ptr_c[0] = 0; /* Offset 0: packet type 0 */
664 ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
665 ptr_c[2] = count_def >> 8;
666 ptr_c[3] = count_def & 255;
667 ptr_c += 4;
670 /* For each torrent in this bucket.. */
671 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
672 /* Address torrents members */
673 ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset;
674 ot_peerlist *peer_list = torrent->peer_list;
675 ot_peer *peers = (ot_peer*)(peer_list->peers.data);
676 uint8_t **dst;
678 /* Determine destination slot */
679 count_peers = peer_list->peer_count;
680 switch( count_peers ) {
681 case 0: continue;
682 case 1: dst = mem_a ? &ptr_a : &ptr_c; break;
683 case 2: dst = mem_b ? &ptr_b : &ptr_c; break;
684 default: dst = &ptr_c; break;
687 /* Copy tail of info_hash, advance pointer */
688 memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1);
689 *dst += sizeof( ot_hash ) - 1;
691 /* Encode peer count */
692 if( dst == &ptr_c )
693 while( count_peers ) {
694 if( count_peers <= 0x7f )
695 *(*dst)++ = count_peers;
696 else
697 *(*dst)++ = 0x80 | ( count_peers & 0x7f );
698 count_peers >>= 7;
701 /* Copy peers */
702 count_peers = peer_list->peer_count;
703 while( count_peers-- ) {
704 memcpy( *dst, peers++, OT_IP_SIZE + 3 );
705 *dst += OT_IP_SIZE + 3;
707 free_peerlist(peer_list);
710 free( torrents_list->data );
711 memset( torrents_list, 0, sizeof(*torrents_list ) );
712 unlock_continue:
713 mutex_bucket_unlock( bucket, 0 );
715 if( ptr ) {
716 int i;
718 if( ptr_b > ptr_c ) ptr_c = ptr_b;
719 if( ptr_a > ptr_c ) ptr_c = ptr_a;
720 mem = ptr_c - ptr;
722 for( i=0; i < MAX_PEERS; ++i ) {
723 if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) {
724 void *tmp = malloc( mem );
725 if( tmp ) {
726 memcpy( tmp, ptr, mem );
727 iob_addbuf_free( &g_connections[i].outdata, tmp, mem );
728 io_wantwrite( g_connections[i].fd );
733 free( ptr );
735 usleep( OT_SYNC_SLEEP );
738 return 0;
741 static void livesync_issue_peersync( ) {
742 socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start,
743 groupip_1, LIVESYNC_PORT);
744 g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
745 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
748 void livesync_ticker( ) {
749 /* livesync_issue_peersync sets g_next_packet_time */
750 if( time(NULL) > g_next_packet_time &&
751 g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
752 livesync_issue_peersync();
755 static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) {
756 // unsigned int i;
758 *g_peerbuffer_pos = prefix;
759 memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 );
760 memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 );
762 #if 0
763 /* Dump info_hash */
764 for( i=0; i<sizeof(ot_hash); ++i )
765 printf( "%02X", g_peerbuffer_pos[i] );
766 putchar( ':' );
767 #endif
768 g_peerbuffer_pos += sizeof(ot_hash);
769 #if 0
770 printf( "%hhu.%hhu.%hhu.%hhu:%hu (%02X %02X)\n", g_peerbuffer_pos[0], g_peerbuffer_pos[1], g_peerbuffer_pos[2], g_peerbuffer_pos[3],
771 g_peerbuffer_pos[4] | ( g_peerbuffer_pos[5] << 8 ), g_peerbuffer_pos[6], g_peerbuffer_pos[7] );
772 #endif
773 g_peerbuffer_pos += sizeof(ot_peer);
775 if( g_peerbuffer_pos >= g_peerbuffer_highwater )
776 livesync_issue_peersync();
779 static void process_indata( proxy_peer * peer ) {
780 size_t consumed, peers;
781 uint8_t *data = peer->indata, *hash;
782 uint8_t *dataend = data + peer->indata_length;
784 while( 1 ) {
785 /* If we're not inside of a packet, make a new one */
786 if( !peer->packet_tcount ) {
787 /* Ensure the header is complete or postpone processing */
788 if( data + 4 > dataend ) break;
789 peer->packet_type = data[0];
790 peer->packet_tprefix = data[1];
791 peer->packet_tcount = data[2] * 256 + data[3];
792 data += 4;
793 printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount );
796 /* Ensure size for a minimal torrent block */
797 if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break;
799 /* Advance pointer to peer count or peers */
800 hash = data;
801 data += sizeof(ot_hash) - 1;
803 /* Type 0 has peer count encoded before each peers */
804 peers = peer->packet_type;
805 if( !peers ) {
806 int shift = 0;
807 do peers |= ( 0x7f & *data ) << ( 7 * shift );
808 while ( *(data++) & 0x80 && shift++ < 6 );
810 #if 0
811 printf( "peers: %zd\n", peers );
812 #endif
813 /* Ensure enough data being read to hold all peers */
814 if( data + (OT_IP_SIZE + 3) * peers > dataend ) {
815 data = hash;
816 break;
818 while( peers-- ) {
819 livesync_proxytell( peer->packet_tprefix, hash, data );
820 data += OT_IP_SIZE + 3;
822 --peer->packet_tcount;
825 consumed = data - peer->indata;
826 memmove( peer->indata, data, peer->indata_length - consumed );
827 peer->indata_length -= consumed;
830 static void * livesync_worker( void * args ) {
831 (void)args;
832 while( 1 ) {
833 ot_ip6 in_ip; uint16_t in_port;
834 size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port);
836 /* Expect at least tracker id and packet type */
837 if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) )
838 continue;
839 if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
840 /* drop packet coming from ourselves */
841 continue;
843 switch( uint32_read_big( (char*)g_inbuffer + sizeof( g_tracker_id ) ) ) {
844 case OT_SYNC_PEER4:
845 livesync_handle_peersync( datalen, OT_PEER_SIZE4 );
846 break;
847 case OT_SYNC_PEER6:
848 livesync_handle_peersync( datalen, OT_PEER_SIZE6 );
849 break;
850 default:
851 // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );
852 break;
855 return 0;