1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
21 #include "trackerlogic.h"
22 #include "ot_livesync.h"
23 #include "ot_accesslist.h"
29 char groupip_1
[4] = { 224,0,23,5 };
31 #define LIVESYNC_INCOMING_BUFFSIZE (256*256)
33 #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
34 #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
36 #define LIVESYNC_MAXDELAY 15 /* seconds */
38 enum { OT_SYNC_PEER4
, OT_SYNC_PEER6
};
40 /* Forward declaration */
41 static void * livesync_worker( void * args
);
43 /* For outgoing packets */
44 static int64 g_socket_in
= -1;
46 /* For incoming packets */
47 static int64 g_socket_out
= -1;
49 static pthread_mutex_t g_outbuf_mutex
= PTHREAD_MUTEX_INITIALIZER
;
51 uint8_t data
[LIVESYNC_OUTGOING_BUFFSIZE_PEERS
];
53 ot_time next_packet_time
;
56 static sync_buffer g_v6_buf
;
57 static sync_buffer g_v4_buf
;
59 static pthread_t thread_id
;
60 void livesync_init( ) {
62 if( g_socket_in
== -1 )
63 exerr( "No socket address for live sync specified." );
65 /* Prepare outgoing peers buffer */
66 memcpy( g_v6_buf
.data
, &g_tracker_id
, sizeof( g_tracker_id
) );
67 memcpy( g_v4_buf
.data
, &g_tracker_id
, sizeof( g_tracker_id
) );
69 uint32_pack_big( (char*)g_v6_buf
.data
+ sizeof( g_tracker_id
), OT_SYNC_PEER6
);
70 uint32_pack_big( (char*)g_v4_buf
.data
+ sizeof( g_tracker_id
), OT_SYNC_PEER4
);
72 g_v6_buf
.fill
= sizeof( g_tracker_id
) + sizeof( uint32_t );
73 g_v4_buf
.fill
= sizeof( g_tracker_id
) + sizeof( uint32_t );
75 g_v6_buf
.next_packet_time
= g_now_seconds
+ LIVESYNC_MAXDELAY
;
76 g_v4_buf
.next_packet_time
= g_now_seconds
+ LIVESYNC_MAXDELAY
;
78 pthread_create( &thread_id
, NULL
, livesync_worker
, NULL
);
81 void livesync_deinit() {
82 if( g_socket_in
!= -1 )
84 if( g_socket_out
!= -1 )
85 close( g_socket_out
);
87 pthread_cancel( thread_id
);
90 void livesync_bind_mcast( ot_ip6 ip
, uint16_t port
) {
91 char tmpip
[4] = {0,0,0,0};
94 if( !ip6_isv4mapped(ip
))
95 exerr("v6 mcast support not yet available.");
98 if( g_socket_in
!= -1 )
99 exerr("Error: Livesync listen ip specified twice.");
101 if( ( g_socket_in
= socket_udp4( )) < 0)
102 exerr("Error: Cant create live sync incoming socket." );
103 ndelay_off(g_socket_in
);
105 if( socket_bind4_reuse( g_socket_in
, tmpip
, port
) == -1 )
106 exerr("Error: Cant bind live sync incoming socket." );
108 if( socket_mcjoin4( g_socket_in
, groupip_1
, v4ip
) )
109 exerr("Error: Cant make live sync incoming socket join mcast group.");
111 if( ( g_socket_out
= socket_udp4()) < 0)
112 exerr("Error: Cant create live sync outgoing socket." );
113 if( socket_bind4_reuse( g_socket_out
, v4ip
, port
) == -1 )
114 exerr("Error: Cant bind live sync outgoing socket." );
116 socket_mcttl4(g_socket_out
, 1);
117 socket_mcloop4(g_socket_out
, 0);
120 /* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */
121 static void livesync_issue_peersync( sync_buffer
*buf
) {
122 char mycopy
[LIVESYNC_OUTGOING_BUFFSIZE_PEERS
];
123 size_t fill
= buf
->fill
;
125 memcpy( mycopy
, buf
->data
, fill
);
126 buf
->fill
= sizeof( g_tracker_id
) + sizeof( uint32_t );
127 buf
->next_packet_time
= g_now_seconds
+ LIVESYNC_MAXDELAY
;
129 /* From now this thread has a local copy of the buffer and
130 has modified the protected element */
131 pthread_mutex_unlock(&g_outbuf_mutex
);
133 socket_send4(g_socket_out
, mycopy
, fill
, groupip_1
, LIVESYNC_PORT
);
136 static void livesync_handle_peersync( struct ot_workstruct
*ws
, size_t peer_size
) {
137 size_t off
= sizeof( g_tracker_id
) + sizeof( uint32_t );
139 /* Now basic sanity checks have been done on the live sync packet
140 We might add more testing and logging. */
141 while( (ssize_t
)(off
+ sizeof( ot_hash
) + peer_size
) <= ws
->request_size
) {
142 memcpy( &ws
->peer
, ws
->request
+ off
+ sizeof(ot_hash
), peer_size
);
143 ws
->hash
= (ot_hash
*)(ws
->request
+ off
);
145 if( !g_opentracker_running
) return;
147 if( OT_PEERFLAG(ws
->peer
) & PEER_FLAG_STOPPED
)
148 remove_peer_from_torrent( FLAG_MCA
, ws
);
150 add_peer_to_torrent_and_return_peers( FLAG_MCA
, ws
, /* amount = */ 0 );
152 off
+= sizeof( ot_hash
) + peer_size
;
155 stats_issue_event(EVENT_SYNC
, 0,
156 (ws
->request_size
- sizeof( g_tracker_id
) - sizeof( uint32_t ) ) /
157 ((ssize_t
)sizeof( ot_hash
) + peer_size
));
160 /* Tickle the live sync module from time to time, so no events get
161 stuck when there's not enough traffic to fill udp packets fast
163 void livesync_ticker( ) {
164 /* livesync_issue_peersync sets g_next_packet_time */
165 pthread_mutex_lock(&g_outbuf_mutex
);
166 if( g_now_seconds
> g_v6_buf
.next_packet_time
&&
167 g_v6_buf
.fill
> sizeof( g_tracker_id
) + sizeof( uint32_t ) )
168 livesync_issue_peersync(&g_v6_buf
);
170 pthread_mutex_unlock(&g_outbuf_mutex
);
172 pthread_mutex_lock(&g_outbuf_mutex
);
173 if( g_now_seconds
> g_v4_buf
.next_packet_time
&&
174 g_v4_buf
.fill
> sizeof( g_tracker_id
) + sizeof( uint32_t ) )
175 livesync_issue_peersync(&g_v4_buf
);
177 pthread_mutex_unlock(&g_outbuf_mutex
);
180 /* Inform live sync about whats going on. */
181 void livesync_tell( struct ot_workstruct
*ws
) {
182 size_t peer_size
; /* initialized in next line */
183 ot_peer
*peer_src
= peer_from_peer6(&ws
->peer
, &peer_size
);
184 sync_buffer
*dest_buf
= peer_size
== OT_PEER_SIZE6
? &g_v6_buf
: &g_v4_buf
;
186 pthread_mutex_lock(&g_outbuf_mutex
);
188 memcpy( dest_buf
->data
+ dest_buf
->fill
, ws
->hash
, sizeof(ot_hash
) );
189 dest_buf
->fill
+= sizeof(ot_hash
);
191 memcpy( dest_buf
->data
+ dest_buf
->fill
, peer_src
, peer_size
);
192 dest_buf
->fill
+= peer_size
;
194 if( dest_buf
->fill
>= LIVESYNC_OUTGOING_BUFFSIZE_PEERS
- LIVESYNC_OUTGOING_WATERMARK_PEERS
)
195 livesync_issue_peersync(dest_buf
);
197 pthread_mutex_unlock(&g_outbuf_mutex
);
200 static void * livesync_worker( void * args
) {
201 struct ot_workstruct ws
;
202 ot_ip6 in_ip
; uint16_t in_port
;
206 /* Initialize our "thread local storage" */
207 ws
.inbuf
= ws
.request
= malloc( LIVESYNC_INCOMING_BUFFSIZE
);
208 ws
.outbuf
= ws
.reply
= 0;
210 memcpy( in_ip
, V4mappedprefix
, sizeof( V4mappedprefix
) );
213 ws
.request_size
= socket_recv4(g_socket_in
, (char*)ws
.inbuf
, LIVESYNC_INCOMING_BUFFSIZE
, 12+(char*)in_ip
, &in_port
);
215 /* Expect at least tracker id and packet type */
216 if( ws
.request_size
<= (ssize_t
)(sizeof( g_tracker_id
) + sizeof( uint32_t )) )
218 if( !accesslist_is_blessed(in_ip
, OT_PERMISSION_MAY_LIVESYNC
))
220 if( !memcmp( ws
.inbuf
, &g_tracker_id
, sizeof( g_tracker_id
) ) ) {
221 /* TODO: log packet coming from ourselves */
225 switch( uint32_read_big( sizeof( g_tracker_id
) + (char *)ws
.inbuf
) ) {
227 livesync_handle_peersync( &ws
, OT_PEER_SIZE6
);
230 livesync_handle_peersync( &ws
, OT_PEER_SIZE4
);
242 const char *g_version_livesync_c
= "$Source$: $Revision$\n";