Make chunked transfers use gzip also
[opentracker.git] / ot_livesync.c
blob335cce5ce402cdb67222ed85d2f62facaf5abb0f
1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
4 $id$ */
6 /* System */
7 #include <sys/types.h>
8 #include <sys/uio.h>
9 #include <string.h>
10 #include <pthread.h>
11 #include <unistd.h>
12 #include <stdlib.h>
14 /* Libowfat */
15 #include "socket.h"
16 #include "ndelay.h"
17 #include "byte.h"
18 #include "ip6.h"
20 /* Opentracker */
21 #include "trackerlogic.h"
22 #include "ot_livesync.h"
23 #include "ot_accesslist.h"
24 #include "ot_stats.h"
25 #include "ot_mutex.h"
27 #ifdef WANT_SYNC_LIVE
29 char groupip_1[4] = { 224,0,23,5 };
31 #define LIVESYNC_INCOMING_BUFFSIZE (256*256)
33 #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
34 #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
36 #define LIVESYNC_MAXDELAY 15 /* seconds */
38 enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
40 /* Forward declaration */
41 static void * livesync_worker( void * args );
43 /* For outgoing packets */
44 static int64 g_socket_in = -1;
46 /* For incoming packets */
47 static int64 g_socket_out = -1;
49 static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER;
50 typedef struct {
51 uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
52 size_t fill;
53 ot_time next_packet_time;
54 } sync_buffer;
56 static sync_buffer g_v6_buf;
57 static sync_buffer g_v4_buf;
59 static pthread_t thread_id;
60 void livesync_init( ) {
62 if( g_socket_in == -1 )
63 exerr( "No socket address for live sync specified." );
65 /* Prepare outgoing peers buffer */
66 memcpy( g_v6_buf.data, &g_tracker_id, sizeof( g_tracker_id ) );
67 memcpy( g_v4_buf.data, &g_tracker_id, sizeof( g_tracker_id ) );
69 uint32_pack_big( (char*)g_v6_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER6);
70 uint32_pack_big( (char*)g_v4_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER4);
72 g_v6_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t );
73 g_v4_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t );
75 g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
76 g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
78 pthread_create( &thread_id, NULL, livesync_worker, NULL );
81 void livesync_deinit() {
82 if( g_socket_in != -1 )
83 close( g_socket_in );
84 if( g_socket_out != -1 )
85 close( g_socket_out );
87 pthread_cancel( thread_id );
90 void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
91 char tmpip[4] = {0,0,0,0};
92 char *v4ip;
94 if( !ip6_isv4mapped(ip))
95 exerr("v6 mcast support not yet available.");
96 v4ip = ip+12;
98 if( g_socket_in != -1 )
99 exerr("Error: Livesync listen ip specified twice.");
101 if( ( g_socket_in = socket_udp4( )) < 0)
102 exerr("Error: Cant create live sync incoming socket." );
103 ndelay_off(g_socket_in);
105 if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 )
106 exerr("Error: Cant bind live sync incoming socket." );
108 if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) )
109 exerr("Error: Cant make live sync incoming socket join mcast group.");
111 if( ( g_socket_out = socket_udp4()) < 0)
112 exerr("Error: Cant create live sync outgoing socket." );
113 if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 )
114 exerr("Error: Cant bind live sync outgoing socket." );
116 socket_mcttl4(g_socket_out, 1);
117 socket_mcloop4(g_socket_out, 0);
120 /* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */
121 static void livesync_issue_peersync( sync_buffer *buf ) {
122 char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
123 size_t fill = buf->fill;
125 memcpy( mycopy, buf->data, fill );
126 buf->fill = sizeof( g_tracker_id ) + sizeof( uint32_t );
127 buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
129 /* From now this thread has a local copy of the buffer and
130 has modified the protected element */
131 pthread_mutex_unlock(&g_outbuf_mutex);
133 socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT);
136 static void livesync_handle_peersync( struct ot_workstruct *ws, size_t peer_size ) {
137 size_t off = sizeof( g_tracker_id ) + sizeof( uint32_t );
139 /* Now basic sanity checks have been done on the live sync packet
140 We might add more testing and logging. */
141 while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= ws->request_size ) {
142 memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), peer_size );
143 ws->hash = (ot_hash*)(ws->request + off);
145 if( !g_opentracker_running ) return;
147 if( OT_PEERFLAG(ws->peer) & PEER_FLAG_STOPPED )
148 remove_peer_from_torrent( FLAG_MCA, ws );
149 else
150 add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 );
152 off += sizeof( ot_hash ) + peer_size;
155 stats_issue_event(EVENT_SYNC, 0,
156 (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) /
157 ((ssize_t)sizeof( ot_hash ) + peer_size));
160 /* Tickle the live sync module from time to time, so no events get
161 stuck when there's not enough traffic to fill udp packets fast
162 enough */
163 void livesync_ticker( ) {
164 /* livesync_issue_peersync sets g_next_packet_time */
165 pthread_mutex_lock(&g_outbuf_mutex);
166 if( g_now_seconds > g_v6_buf.next_packet_time &&
167 g_v6_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
168 livesync_issue_peersync(&g_v6_buf);
169 else
170 pthread_mutex_unlock(&g_outbuf_mutex);
172 pthread_mutex_lock(&g_outbuf_mutex);
173 if( g_now_seconds > g_v4_buf.next_packet_time &&
174 g_v4_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
175 livesync_issue_peersync(&g_v4_buf);
176 else
177 pthread_mutex_unlock(&g_outbuf_mutex);
180 /* Inform live sync about whats going on. */
181 void livesync_tell( struct ot_workstruct *ws ) {
182 size_t peer_size; /* initialized in next line */
183 ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size);
184 sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf;
186 pthread_mutex_lock(&g_outbuf_mutex);
188 memcpy( dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash) );
189 dest_buf->fill += sizeof(ot_hash);
191 memcpy( dest_buf->data + dest_buf->fill, peer_src, peer_size );
192 dest_buf->fill += peer_size;
194 if( dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS )
195 livesync_issue_peersync(dest_buf);
196 else
197 pthread_mutex_unlock(&g_outbuf_mutex);
200 static void * livesync_worker( void * args ) {
201 struct ot_workstruct ws;
202 ot_ip6 in_ip; uint16_t in_port;
204 (void)args;
206 /* Initialize our "thread local storage" */
207 ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE );
208 ws.outbuf = ws.reply = 0;
210 memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) );
212 while( 1 ) {
213 ws.request_size = socket_recv4(g_socket_in, (char*)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port);
215 /* Expect at least tracker id and packet type */
216 if( ws.request_size <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) )
217 continue;
218 if( !accesslist_is_blessed(in_ip, OT_PERMISSION_MAY_LIVESYNC))
219 continue;
220 if( !memcmp( ws.inbuf, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
221 /* TODO: log packet coming from ourselves */
222 continue;
225 switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) {
226 case OT_SYNC_PEER6:
227 livesync_handle_peersync( &ws, OT_PEER_SIZE6 );
228 break;
229 case OT_SYNC_PEER4:
230 livesync_handle_peersync( &ws, OT_PEER_SIZE4 );
231 break;
232 default:
233 break;
237 /* Never returns. */
238 return NULL;
241 #endif
242 const char *g_version_livesync_c = "$Source$: $Revision$\n";