Disable forced gzip by default
[opentracker.git] / ot_fullscrape.c
blob6fd6d1cde30b19e1aa954c2cbb4ed57cf745926a
1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
4 $id$ */
6 #ifdef WANT_FULLSCRAPE
8 /* System */
9 #include <arpa/inet.h>
10 #include <pthread.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <sys/param.h>
14 #ifdef WANT_COMPRESSION_GZIP
15 #include <zlib.h>
16 #endif
17 #ifdef WANT_COMPRESSION_ZSTD
18 #include <zstd.h>
19 #endif
22 /* Libowfat */
23 #include "byte.h"
24 #include "io.h"
25 #include "textcode.h"
27 /* Opentracker */
28 #include "ot_fullscrape.h"
29 #include "ot_iovec.h"
30 #include "ot_mutex.h"
31 #include "trackerlogic.h"
33 /* Fetch full scrape info for all torrents
34 Full scrapes usually are huge and one does not want to
35 allocate more memory. So lets get them in 512k units
37 #define OT_SCRAPE_CHUNK_SIZE (1024 * 1024)
39 /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
40 #define OT_SCRAPE_MAXENTRYLEN 256
42 /* Forward declaration */
43 static void fullscrape_make(int taskid, ot_tasktype mode);
44 #ifdef WANT_COMPRESSION_GZIP
45 static void fullscrape_make_gzip(int taskid, ot_tasktype mode);
46 #endif
47 #ifdef WANT_COMPRESSION_ZSTD
48 static void fullscrape_make_zstd(int taskid, ot_tasktype mode);
49 #endif
51 /* Converter function from memory to human readable hex strings
52 XXX - Duplicated from ot_stats. Needs fix. */
53 static char *to_hex(char *d, uint8_t *s) {
54 char *m = "0123456789ABCDEF";
55 char *t = d;
56 char *e = d + 40;
57 while (d < e) {
58 *d++ = m[*s >> 4];
59 *d++ = m[*s++ & 15];
61 *d = 0;
62 return t;
65 /* This is the entry point into this worker thread
66 It grabs tasks from mutex_tasklist and delivers results back
68 static void *fullscrape_worker(void *args) {
69 (void)args;
71 while (g_opentracker_running) {
72 ot_tasktype tasktype = TASK_FULLSCRAPE;
73 ot_taskid taskid = mutex_workqueue_poptask(&tasktype);
74 #ifdef WANT_COMPRESSION_ZSTD
75 if (tasktype & TASK_FLAG_ZSTD)
76 fullscrape_make_zstd(taskid, tasktype);
77 else
78 #endif
79 #ifdef WANT_COMPRESSION_GZIP
80 if (tasktype & TASK_FLAG_GZIP)
81 fullscrape_make_gzip(taskid, tasktype);
82 else
83 #endif
84 fullscrape_make(taskid, tasktype);
85 mutex_workqueue_pushchunked(taskid, NULL);
87 return NULL;
90 static pthread_t thread_id;
91 void fullscrape_init( ) {
92 pthread_create( &thread_id, NULL, fullscrape_worker, NULL );
95 void fullscrape_deinit( ) {
96 pthread_cancel( thread_id );
99 void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) {
100 mutex_workqueue_pushtask( sock, tasktype );
103 static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torrent, ot_hash *hash ) {
104 size_t seed_count = torrent->peer_list6->seed_count + torrent->peer_list4->seed_count;
105 size_t peer_count = torrent->peer_list6->peer_count + torrent->peer_list4->peer_count;
106 size_t down_count = torrent->peer_list6->down_count + torrent->peer_list4->down_count;
108 switch (mode & TASK_TASK_MASK) {
109 case TASK_FULLSCRAPE:
110 default:
111 /* push hash as bencoded string */
112 *r++ = '2';
113 *r++ = '0';
114 *r++ = ':';
115 memcpy(r, hash, sizeof(ot_hash));
116 r += sizeof(ot_hash);
117 /* push rest of the scrape string */
118 r += sprintf(r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", seed_count, down_count, peer_count - seed_count);
120 break;
121 case TASK_FULLSCRAPE_TPB_ASCII:
122 to_hex(r, *hash);
123 r += 2 * sizeof(ot_hash);
124 r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count);
125 break;
126 case TASK_FULLSCRAPE_TPB_ASCII_PLUS:
127 to_hex(r, *hash);
128 r += 2 * sizeof(ot_hash);
129 r += sprintf(r, ":%zd:%zd:%zd\n", seed_count, peer_count - seed_count, down_count);
130 break;
131 case TASK_FULLSCRAPE_TPB_BINARY:
132 memcpy(r, *hash, sizeof(ot_hash));
133 r += sizeof(ot_hash);
134 *(uint32_t *)(r + 0) = htonl((uint32_t)seed_count);
135 *(uint32_t *)(r + 4) = htonl((uint32_t)(peer_count - seed_count));
136 r += 8;
137 break;
138 case TASK_FULLSCRAPE_TPB_URLENCODED:
139 r += fmt_urlencoded(r, (char *)*hash, 20);
140 r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count);
141 break;
142 case TASK_FULLSCRAPE_TRACKERSTATE:
143 to_hex(r, *hash);
144 r += 2 * sizeof(ot_hash);
145 r += sprintf(r, ":%zd:%zd\n", torrent->peer_list6->base, down_count);
146 break;
148 return r;
151 static void fullscrape_make(int taskid, ot_tasktype mode) {
152 int bucket;
153 char *r, *re;
154 struct iovec iovector = {NULL, 0};
156 /* Setup return vector... */
157 r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
158 if (!r)
159 return;
161 /* re points to low watermark */
162 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
164 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE)
165 r += sprintf(r, "d5:filesd");
167 /* For each bucket... */
168 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
169 /* Get exclusive access to that bucket */
170 ot_vector *torrents_list = mutex_bucket_lock(bucket);
171 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
172 size_t i;
174 /* For each torrent in this bucket.. */
175 for (i = 0; i < torrents_list->size; ++i) {
176 r = fullscrape_write_one(mode, r, torrents + i, &torrents[i].hash);
178 if (r > re) {
179 iovector.iov_len = r - (char *)iovector.iov_base;
181 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
182 free(iovector.iov_base);
183 return mutex_bucket_unlock(bucket, 0);
185 /* Allocate a fresh output buffer */
186 r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
187 if (!r)
188 return mutex_bucket_unlock(bucket, 0);
190 /* re points to low watermark */
191 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
195 /* All torrents done: release lock on current bucket */
196 mutex_bucket_unlock(bucket, 0);
198 /* Parent thread died? */
199 if (!g_opentracker_running)
200 return;
203 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE)
204 r += sprintf(r, "ee");
206 /* Send rest of data */
207 iovector.iov_len = r - (char *)iovector.iov_base;
208 if (mutex_workqueue_pushchunked(taskid, &iovector))
209 free(iovector.iov_base);
212 #ifdef WANT_COMPRESSION_GZIP
214 static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
215 int bucket;
216 char *r;
217 struct iovec iovector = {NULL, 0};
218 int zres;
219 z_stream strm;
220 /* Setup return vector... */
221 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
222 if (!iovector.iov_base)
223 return;
225 byte_zero(&strm, sizeof(strm));
226 strm.next_out = (uint8_t *)iovector.iov_base;
227 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
228 if (deflateInit2(&strm, 7, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY) != Z_OK)
229 fprintf(stderr, "not ok.\n");
231 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
232 strm.next_in = (uint8_t *)"d5:filesd";
233 strm.avail_in = strlen("d5:filesd");
234 zres = deflate(&strm, Z_NO_FLUSH);
237 /* For each bucket... */
238 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
239 /* Get exclusive access to that bucket */
240 ot_vector *torrents_list = mutex_bucket_lock(bucket);
241 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
242 size_t i;
244 /* For each torrent in this bucket.. */
245 for (i = 0; i < torrents_list->size; ++i) {
246 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
247 r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
248 strm.next_in = (uint8_t *)compress_buffer;
249 strm.avail_in = r - compress_buffer;
250 zres = deflate(&strm, Z_NO_FLUSH);
251 if ((zres < Z_OK) && (zres != Z_BUF_ERROR))
252 fprintf(stderr, "deflate() failed while in fullscrape_make().\n");
254 /* Check if there still is enough buffer left */
255 while (!strm.avail_out) {
256 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
258 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
259 free(iovector.iov_base);
260 return mutex_bucket_unlock(bucket, 0);
262 /* Allocate a fresh output buffer */
263 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
264 if (!iovector.iov_base) {
265 fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
266 deflateEnd(&strm);
267 return mutex_bucket_unlock(bucket, 0);
269 strm.next_out = (uint8_t *)iovector.iov_base;
270 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
271 zres = deflate(&strm, Z_NO_FLUSH);
272 if ((zres < Z_OK) && (zres != Z_BUF_ERROR))
273 fprintf(stderr, "deflate() failed while in fullscrape_make().\n");
277 /* All torrents done: release lock on current bucket */
278 mutex_bucket_unlock(bucket, 0);
280 /* Parent thread died? */
281 if (!g_opentracker_running) {
282 deflateEnd(&strm);
283 return;
287 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
288 strm.next_in = (uint8_t *)"ee";
289 strm.avail_in = strlen("ee");
292 if (deflate(&strm, Z_FINISH) < Z_OK)
293 fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n");
295 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
296 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
297 free(iovector.iov_base);
298 deflateEnd(&strm);
299 return;
302 /* Check if there's a last batch of data in the zlib buffer */
303 if (!strm.avail_out) {
304 /* Allocate a fresh output buffer */
305 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
307 if (!iovector.iov_base) {
308 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
309 deflateEnd(&strm);
310 return;
312 strm.next_out = iovector.iov_base;
313 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
314 if (deflate(&strm, Z_FINISH) < Z_OK)
315 fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n");
317 /* Only pass the new buffer if there actually was some data left in the buffer */
318 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
319 if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
320 free(iovector.iov_base);
323 deflateEnd(&strm);
325 /* WANT_COMPRESSION_GZIP */
326 #endif
328 #ifdef WANT_COMPRESSION_ZSTD
330 static void fullscrape_make_zstd(int taskid, ot_tasktype mode) {
331 int bucket;
332 char *r;
333 struct iovec iovector = {NULL, 0};
334 ZSTD_CCtx *zstream = ZSTD_createCCtx();
335 ZSTD_inBuffer inbuf;
336 ZSTD_outBuffer outbuf;
337 size_t more_bytes;
339 if (!zstream)
340 return;
342 /* Setup return vector... */
343 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
344 if (!iovector.iov_base) {
345 ZSTD_freeCCtx(zstream);
346 return;
349 /* Working with a compression level 6 is half as fast as level 3, but
350 seems to be the last reasonable bump that's worth extra cpu */
351 ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6);
353 outbuf.dst = iovector.iov_base;
354 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
355 outbuf.pos = 0;
357 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
358 inbuf.src = (const void *)"d5:filesd";
359 inbuf.size = strlen("d5:filesd");
360 inbuf.pos = 0;
361 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
364 /* For each bucket... */
365 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
366 /* Get exclusive access to that bucket */
367 ot_vector *torrents_list = mutex_bucket_lock(bucket);
368 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
369 size_t i;
371 /* For each torrent in this bucket.. */
372 for (i = 0; i < torrents_list->size; ++i) {
373 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
374 r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
375 inbuf.src = compress_buffer;
376 inbuf.size = r - compress_buffer;
377 inbuf.pos = 0;
378 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
380 /* Check if there still is enough buffer left */
381 while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) {
382 iovector.iov_len = outbuf.size;
384 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
385 free(iovector.iov_base);
386 ZSTD_freeCCtx(zstream);
387 return mutex_bucket_unlock(bucket, 0);
389 /* Allocate a fresh output buffer */
390 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
391 if (!iovector.iov_base) {
392 fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
393 ZSTD_freeCCtx(zstream);
394 return mutex_bucket_unlock(bucket, 0);
397 outbuf.dst = iovector.iov_base;
398 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
399 outbuf.pos = 0;
401 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
405 /* All torrents done: release lock on current bucket */
406 mutex_bucket_unlock(bucket, 0);
408 /* Parent thread died? */
409 if (!g_opentracker_running)
410 return;
413 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
414 inbuf.src = (const void *)"ee";
415 inbuf.size = strlen("ee");
416 inbuf.pos = 0;
419 more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
421 iovector.iov_len = outbuf.pos;
422 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
423 free(iovector.iov_base);
424 ZSTD_freeCCtx(zstream);
425 return;
428 /* Check if there's a last batch of data in the zlib buffer */
429 if (more_bytes) {
430 /* Allocate a fresh output buffer */
431 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
433 if (!iovector.iov_base) {
434 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
435 ZSTD_freeCCtx(zstream);
436 return;
439 outbuf.dst = iovector.iov_base;
440 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
441 outbuf.pos = 0;
443 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
445 /* Only pass the new buffer if there actually was some data left in the buffer */
446 iovector.iov_len = outbuf.pos;
447 if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
448 free(iovector.iov_base);
451 ZSTD_freeCCtx(zstream);
453 /* WANT_COMPRESSION_ZSTD */
454 #endif
456 /* WANT_FULLSCRAPE */
457 #endif