1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
20 #include "trackerlogic.h"
26 #ifdef WANT_SYNC_BATCH
28 #define OT_SYNC_CHUNK_SIZE (512*1024)
30 /* Import Changeset from an external authority
31 format: d4:syncd[..]ee
32 [..]: ( 20:01234567890abcdefghij16:XXXXYYYY )+
34 int add_changeset_to_tracker( uint8_t *data
, size_t len
) {
36 uint8_t *end
= data
+ len
;
37 unsigned long peer_count
;
39 /* We do know, that the string is \n terminated, so it cant
41 if( byte_diff( data
, 8, "d4:syncd" ) ) return -1;
45 if( byte_diff( data
, 3, "20:" ) ) {
46 if( byte_diff( data
, 2, "ee" ) )
51 hash
= (ot_hash
*)data
;
52 data
+= sizeof( ot_hash
);
54 /* Scan string length indicator */
55 data
+= ( len
= scan_ulong( (char*)data
, &peer_count
) );
57 /* If no long was scanned, it is not divisible by 8, it is not
58 followed by a colon or claims to need to much memory, we fail */
59 if( !len
|| !peer_count
|| ( peer_count
& 7 ) || ( *data
++ != ':' ) || ( data
+ peer_count
> end
) )
62 while( peer_count
> 0 ) {
63 add_peer_to_torrent( hash
, (ot_peer
*)data
, 1 );
64 data
+= 8; peer_count
-= 8;
70 /* Proposed output format
71 d4:syncd20:<info_hash>8*N:(xxxxyyyy)*Nee
73 static void sync_make( int *iovec_entries
, struct iovec
**iovector
) {
77 /* Setup return vector... */
80 if( !( r
= iovec_increase( iovec_entries
, iovector
, OT_SYNC_CHUNK_SIZE
) ) )
83 /* ... and pointer to end of current output buffer.
84 This works as a low watermark */
85 re
= r
+ OT_SYNC_CHUNK_SIZE
;
87 memmove( r
, "d4:syncd", 8 ); r
+= 8;
89 /* For each bucket... */
90 for( bucket
=0; bucket
<OT_BUCKET_COUNT
; ++bucket
) {
91 /* Get exclusive access to that bucket */
92 ot_vector
*torrents_list
= mutex_bucket_lock( bucket
);
95 /* For each torrent in this bucket.. */
96 for( tor_offset
=0; tor_offset
<torrents_list
->size
; ++tor_offset
) {
97 /* Address torrents members */
98 ot_peerlist
*peer_list
= ( ((ot_torrent
*)(torrents_list
->data
))[tor_offset
] ).peer_list
;
99 ot_hash
*hash
=&( ((ot_torrent
*)(torrents_list
->data
))[tor_offset
] ).hash
;
100 const size_t byte_count
= sizeof(ot_peer
) * peer_list
->changeset
.size
;
102 /* If we reached our low watermark in buffer... */
103 if( re
- r
<= (ssize_t
)(/* strlen( "20:" ) == */ 3 + sizeof( ot_hash
) + /* strlen_max( "%zd" ) == */ 12 + byte_count
) ) {
105 /* Allocate a fresh output buffer at the end of our buffers list
106 release bucket and return, if that fails */
107 if( !( r
= iovec_fix_increase_or_free( iovec_entries
, iovector
, r
, OT_SYNC_CHUNK_SIZE
) ) )
108 return mutex_bucket_unlock( bucket
);
110 /* Adjust new end of output buffer */
111 re
= r
+ OT_SYNC_CHUNK_SIZE
;
114 *r
++ = '2'; *r
++ = '0'; *r
++ = ':';
115 memmove( r
, hash
, sizeof( ot_hash
) ); r
+= sizeof( ot_hash
);
116 r
+= sprintf( r
, "%zd:", byte_count
);
117 memmove( r
, peer_list
->changeset
.data
, byte_count
); r
+= byte_count
;
120 /* All torrents done: release lock on currenct bucket */
121 mutex_bucket_unlock( bucket
);
124 /* Close bencoded sync dictionary */
127 /* Release unused memory in current output buffer */
128 iovec_fixlast( iovec_entries
, iovector
, r
);
131 /* This is the entry point into this worker thread
132 It grabs tasks from mutex_tasklist and delivers results back
134 static void * sync_worker( void * args
) {
136 struct iovec
*iovector
;
141 ot_tasktype tasktype
= TASK_SYNC_OUT
;
142 ot_taskid taskid
= mutex_workqueue_poptask( &tasktype
);
143 sync_make( &iovec_entries
, &iovector
);
144 stats_issue_event( EVENT_SYNC_OUT
, FLAG_TCP
, iovec_length( &iovec_entries
, &iovector
) );
145 if( mutex_workqueue_pushresult( taskid
, iovec_entries
, iovector
) )
146 iovec_free( &iovec_entries
, &iovector
);
151 static pthread_t thread_id
;
153 pthread_create( &thread_id
, NULL
, sync_worker
, NULL
);
156 void sync_deinit( ) {
157 pthread_cancel( thread_id
);
160 void sync_deliver( int64 socket
) {
161 mutex_workqueue_pushtask( socket
, TASK_SYNC_OUT
);
166 const char *g_version_sync_c
= "$Source$: $Revision$\n";