Make chunked transfers use gzip also
[opentracker.git] / ot_fullscrape.c
blobb147b6a743a2fe3c4daa69c3ea67518fee2b29c8
1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
4 $id$ */
6 #ifdef WANT_FULLSCRAPE
8 /* System */
9 #include <sys/param.h>
10 #include <stdio.h>
11 #include <string.h>
12 #include <pthread.h>
13 #include <arpa/inet.h>
14 #ifdef WANT_COMPRESSION_GZIP
15 #include <zlib.h>
16 #endif
18 /* Libowfat */
19 #include "byte.h"
20 #include "io.h"
21 #include "textcode.h"
23 /* Opentracker */
24 #include "trackerlogic.h"
25 #include "ot_mutex.h"
26 #include "ot_iovec.h"
27 #include "ot_fullscrape.h"
29 /* Fetch full scrape info for all torrents
30 Full scrapes usually are huge and one does not want to
31 allocate more memory. So lets get them in 512k units
33 #define OT_SCRAPE_CHUNK_SIZE (1024*1024)
35 /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
36 #define OT_SCRAPE_MAXENTRYLEN 256
38 /* Forward declaration */
39 static void fullscrape_make( int taskid, ot_tasktype mode);
40 #ifdef WANT_COMPRESSION_GZIP
41 static void fullscrape_make_gzip( int taskid, ot_tasktype mode);
42 #endif
44 /* Converter function from memory to human readable hex strings
45 XXX - Duplicated from ot_stats. Needs fix. */
46 static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;}
48 /* This is the entry point into this worker thread
49 It grabs tasks from mutex_tasklist and delivers results back
51 static void * fullscrape_worker( void * args ) {
52 (void) args;
54 while( g_opentracker_running ) {
55 ot_tasktype tasktype = TASK_FULLSCRAPE;
56 ot_taskid taskid = mutex_workqueue_poptask( &tasktype );
57 #ifdef WANT_COMPRESSION_GZIP
58 if (tasktype & TASK_FLAG_GZIP)
59 fullscrape_make_gzip( taskid, tasktype );
60 else
61 #endif
62 fullscrape_make( taskid, tasktype );
63 mutex_workqueue_pushchunked( taskid, NULL );
65 return NULL;
68 static pthread_t thread_id;
69 void fullscrape_init( ) {
70 pthread_create( &thread_id, NULL, fullscrape_worker, NULL );
73 void fullscrape_deinit( ) {
74 pthread_cancel( thread_id );
77 void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) {
78 mutex_workqueue_pushtask( sock, tasktype );
81 static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torrent, ot_hash *hash ) {
82 size_t seed_count = torrent->peer_list6->seed_count + torrent->peer_list4->seed_count;
83 size_t peer_count = torrent->peer_list6->peer_count + torrent->peer_list4->peer_count;
84 size_t down_count = torrent->peer_list6->down_count + torrent->peer_list4->down_count;
86 switch( mode & TASK_TASK_MASK ) {
87 case TASK_FULLSCRAPE:
88 default:
89 /* push hash as bencoded string */
90 *r++='2'; *r++='0'; *r++=':';
91 memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash);
92 /* push rest of the scrape string */
93 r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", seed_count, down_count, peer_count-seed_count );
95 break;
96 case TASK_FULLSCRAPE_TPB_ASCII:
97 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
98 r += sprintf( r, ":%zd:%zd\n", seed_count, peer_count-seed_count );
99 break;
100 case TASK_FULLSCRAPE_TPB_ASCII_PLUS:
101 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
102 r += sprintf( r, ":%zd:%zd:%zd\n", seed_count, peer_count-seed_count, down_count );
103 break;
104 case TASK_FULLSCRAPE_TPB_BINARY:
105 memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash);
106 *(uint32_t*)(r+0) = htonl( (uint32_t) seed_count );
107 *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_count-seed_count) );
108 r+=8;
109 break;
110 case TASK_FULLSCRAPE_TPB_URLENCODED:
111 r += fmt_urlencoded( r, (char *)*hash, 20 );
112 r += sprintf( r, ":%zd:%zd\n", seed_count, peer_count-seed_count );
113 break;
114 case TASK_FULLSCRAPE_TRACKERSTATE:
115 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
116 r += sprintf( r, ":%zd:%zd\n", torrent->peer_list6->base, down_count );
117 break;
119 return r;
122 static void fullscrape_make( int taskid, ot_tasktype mode ) {
123 int bucket;
124 char *r, *re;
125 struct iovec iovector = { NULL, 0 };
127 /* Setup return vector... */
128 r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE );
129 if( !r )
130 return;
132 /* re points to low watermark */
133 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
135 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
136 r += sprintf( r, "d5:filesd" );
138 /* For each bucket... */
139 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
140 /* Get exclusive access to that bucket */
141 ot_vector *torrents_list = mutex_bucket_lock( bucket );
142 ot_torrent *torrents = (ot_torrent*)(torrents_list->data);
143 size_t i;
145 /* For each torrent in this bucket.. */
146 for( i=0; i<torrents_list->size; ++i ) {
147 r = fullscrape_write_one( mode, r, torrents+i, &torrents[i].hash );
149 if( r > re) {
150 iovector.iov_len = r - (char *)iovector.iov_base;
152 if (mutex_workqueue_pushchunked(taskid, &iovector) ) {
153 free(iovector.iov_base);
154 return mutex_bucket_unlock( bucket, 0 );
156 /* Allocate a fresh output buffer */
157 r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE );
158 if( !r )
159 return mutex_bucket_unlock( bucket, 0 );
161 /* re points to low watermark */
162 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
166 /* All torrents done: release lock on current bucket */
167 mutex_bucket_unlock( bucket, 0 );
169 /* Parent thread died? */
170 if( !g_opentracker_running )
171 return;
174 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
175 r += sprintf( r, "ee" );
177 /* Send rest of data */
178 iovector.iov_len = r - (char *)iovector.iov_base;
179 if( mutex_workqueue_pushchunked(taskid, &iovector) )
180 free(iovector.iov_base);
183 #ifdef WANT_COMPRESSION_GZIP
185 static void fullscrape_make_gzip( int taskid, ot_tasktype mode) {
186 int bucket;
187 char *r;
188 struct iovec iovector = { NULL, 0 };
189 int zres;
190 z_stream strm;
191 fprintf(stderr, "GZIP path\n");
192 /* Setup return vector... */
193 iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE );
194 if( !iovector.iov_base )
195 return;
197 byte_zero( &strm, sizeof(strm) );
198 strm.next_out = (uint8_t*)iovector.iov_base;
199 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
200 if( deflateInit2(&strm,7,Z_DEFLATED,31,9,Z_DEFAULT_STRATEGY) != Z_OK )
201 fprintf( stderr, "not ok.\n" );
203 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {
204 strm.next_in = (uint8_t*)"d5:filesd";
205 strm.avail_in = strlen("d5:filesd");
206 zres = deflate( &strm, Z_NO_FLUSH );
209 /* For each bucket... */
210 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
211 /* Get exclusive access to that bucket */
212 ot_vector *torrents_list = mutex_bucket_lock( bucket );
213 ot_torrent *torrents = (ot_torrent*)(torrents_list->data);
214 size_t i;
216 /* For each torrent in this bucket.. */
217 for( i=0; i<torrents_list->size; ++i ) {
218 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
219 r = fullscrape_write_one( mode, compress_buffer, torrents+i, &torrents[i].hash );
220 strm.next_in = (uint8_t*)compress_buffer;
221 strm.avail_in = r - compress_buffer;
222 zres = deflate( &strm, Z_NO_FLUSH );
223 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) )
224 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" );
226 /* Check if there still is enough buffer left */
227 while( !strm.avail_out ) {
228 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
230 if (mutex_workqueue_pushchunked(taskid, &iovector) ) {
231 free(iovector.iov_base);
232 return mutex_bucket_unlock( bucket, 0 );
234 /* Allocate a fresh output buffer */
235 iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE );
236 if( !iovector.iov_base ) {
237 fprintf( stderr, "Out of memory trying to claim ouput buffer\n" );
238 deflateEnd(&strm);
239 return mutex_bucket_unlock( bucket, 0 );
241 strm.next_out = (uint8_t*)iovector.iov_base;
242 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
243 zres = deflate( &strm, Z_NO_FLUSH );
244 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) )
245 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" );
249 /* All torrents done: release lock on current bucket */
250 mutex_bucket_unlock( bucket, 0 );
252 /* Parent thread died? */
253 if( !g_opentracker_running )
254 return;
257 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {
258 strm.next_in = (uint8_t*)"ee";
259 strm.avail_in = strlen("ee");
262 if( deflate( &strm, Z_FINISH ) < Z_OK )
263 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" );
265 if( !strm.avail_out ) {
266 unsigned int pending;
267 int bits;
268 deflatePending( &strm, &pending, &bits);
269 pending += ( bits ? 1 : 0 );
271 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
272 if (mutex_workqueue_pushchunked(taskid, &iovector) ) {
273 free(iovector.iov_base);
274 return mutex_bucket_unlock( bucket, 0 );
276 /* Allocate a fresh output buffer */
277 iovector.iov_base = malloc( pending );
278 iovector.iov_len = pending;
280 if( !iovector.iov_base ) {
281 fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" );
282 deflateEnd(&strm);
283 return mutex_bucket_unlock( bucket, 0 );
285 strm.next_out = iovector.iov_base;
286 strm.avail_out = pending;
287 if( deflate( &strm, Z_FINISH ) < Z_OK )
288 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" );
290 if( mutex_workqueue_pushchunked(taskid, &iovector) )
291 free(iovector.iov_base);
294 deflateEnd(&strm);
296 /* WANT_COMPRESSION_GZIP */
297 #endif
299 /* WANT_FULLSCRAPE */
300 #endif
301 const char *g_version_fullscrape_c = "$Source$: $Revision$\n";