1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
13 #include <sys/param.h>
14 #ifdef WANT_COMPRESSION_GZIP
17 #ifdef WANT_COMPRESSION_ZSTD
28 #include "ot_fullscrape.h"
31 #include "trackerlogic.h"
33 /* Fetch full scrape info for all torrents
34 Full scrapes usually are huge and one does not want to
35 allocate more memory. So lets get them in 512k units
37 #define OT_SCRAPE_CHUNK_SIZE (1024 * 1024)
39 /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
40 #define OT_SCRAPE_MAXENTRYLEN 256
42 /* Forward declaration */
43 static void fullscrape_make(int taskid
, ot_tasktype mode
);
44 #ifdef WANT_COMPRESSION_GZIP
45 static void fullscrape_make_gzip(int taskid
, ot_tasktype mode
);
47 #ifdef WANT_COMPRESSION_ZSTD
48 static void fullscrape_make_zstd(int taskid
, ot_tasktype mode
);
51 /* Converter function from memory to human readable hex strings
52 XXX - Duplicated from ot_stats. Needs fix. */
53 static char *to_hex(char *d
, uint8_t *s
) {
54 char *m
= "0123456789ABCDEF";
65 /* This is the entry point into this worker thread
66 It grabs tasks from mutex_tasklist and delivers results back
68 static void *fullscrape_worker(void *args
) {
71 while (g_opentracker_running
) {
72 ot_tasktype tasktype
= TASK_FULLSCRAPE
;
73 ot_taskid taskid
= mutex_workqueue_poptask(&tasktype
);
74 #ifdef WANT_COMPRESSION_ZSTD
75 if (tasktype
& TASK_FLAG_ZSTD
)
76 fullscrape_make_zstd(taskid
, tasktype
);
79 #ifdef WANT_COMPRESSION_GZIP
80 if (tasktype
& TASK_FLAG_GZIP
)
81 fullscrape_make_gzip(taskid
, tasktype
);
84 fullscrape_make(taskid
, tasktype
);
85 mutex_workqueue_pushchunked(taskid
, NULL
);
90 static pthread_t thread_id
;
91 void fullscrape_init( ) {
92 pthread_create( &thread_id
, NULL
, fullscrape_worker
, NULL
);
95 void fullscrape_deinit( ) {
96 pthread_cancel( thread_id
);
99 void fullscrape_deliver( int64 sock
, ot_tasktype tasktype
) {
100 mutex_workqueue_pushtask( sock
, tasktype
);
103 static char * fullscrape_write_one( ot_tasktype mode
, char *r
, ot_torrent
*torrent
, ot_hash
*hash
) {
104 size_t seed_count
= torrent
->peer_list6
->seed_count
+ torrent
->peer_list4
->seed_count
;
105 size_t peer_count
= torrent
->peer_list6
->peer_count
+ torrent
->peer_list4
->peer_count
;
106 size_t down_count
= torrent
->peer_list6
->down_count
+ torrent
->peer_list4
->down_count
;
108 switch (mode
& TASK_TASK_MASK
) {
109 case TASK_FULLSCRAPE
:
111 /* push hash as bencoded string */
115 memcpy(r
, hash
, sizeof(ot_hash
));
116 r
+= sizeof(ot_hash
);
117 /* push rest of the scrape string */
118 r
+= sprintf(r
, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", seed_count
, down_count
, peer_count
- seed_count
);
121 case TASK_FULLSCRAPE_TPB_ASCII
:
123 r
+= 2 * sizeof(ot_hash
);
124 r
+= sprintf(r
, ":%zd:%zd\n", seed_count
, peer_count
- seed_count
);
126 case TASK_FULLSCRAPE_TPB_ASCII_PLUS
:
128 r
+= 2 * sizeof(ot_hash
);
129 r
+= sprintf(r
, ":%zd:%zd:%zd\n", seed_count
, peer_count
- seed_count
, down_count
);
131 case TASK_FULLSCRAPE_TPB_BINARY
:
132 memcpy(r
, *hash
, sizeof(ot_hash
));
133 r
+= sizeof(ot_hash
);
134 *(uint32_t *)(r
+ 0) = htonl((uint32_t)seed_count
);
135 *(uint32_t *)(r
+ 4) = htonl((uint32_t)(peer_count
- seed_count
));
138 case TASK_FULLSCRAPE_TPB_URLENCODED
:
139 r
+= fmt_urlencoded(r
, (char *)*hash
, 20);
140 r
+= sprintf(r
, ":%zd:%zd\n", seed_count
, peer_count
- seed_count
);
142 case TASK_FULLSCRAPE_TRACKERSTATE
:
144 r
+= 2 * sizeof(ot_hash
);
145 r
+= sprintf(r
, ":%zd:%zd\n", torrent
->peer_list6
->base
, down_count
);
151 static void fullscrape_make(int taskid
, ot_tasktype mode
) {
154 struct iovec iovector
= {NULL
, 0};
156 /* Setup return vector... */
157 r
= iovector
.iov_base
= malloc(OT_SCRAPE_CHUNK_SIZE
);
161 /* re points to low watermark */
162 re
= r
+ OT_SCRAPE_CHUNK_SIZE
- OT_SCRAPE_MAXENTRYLEN
;
164 if ((mode
& TASK_TASK_MASK
) == TASK_FULLSCRAPE
)
165 r
+= sprintf(r
, "d5:filesd");
167 /* For each bucket... */
168 for (bucket
= 0; bucket
< OT_BUCKET_COUNT
; ++bucket
) {
169 /* Get exclusive access to that bucket */
170 ot_vector
*torrents_list
= mutex_bucket_lock(bucket
);
171 ot_torrent
*torrents
= (ot_torrent
*)(torrents_list
->data
);
174 /* For each torrent in this bucket.. */
175 for (i
= 0; i
< torrents_list
->size
; ++i
) {
176 r
= fullscrape_write_one(mode
, r
, torrents
+ i
, &torrents
[i
].hash
);
179 iovector
.iov_len
= r
- (char *)iovector
.iov_base
;
181 if (mutex_workqueue_pushchunked(taskid
, &iovector
)) {
182 free(iovector
.iov_base
);
183 return mutex_bucket_unlock(bucket
, 0);
185 /* Allocate a fresh output buffer */
186 r
= iovector
.iov_base
= malloc(OT_SCRAPE_CHUNK_SIZE
);
188 return mutex_bucket_unlock(bucket
, 0);
190 /* re points to low watermark */
191 re
= r
+ OT_SCRAPE_CHUNK_SIZE
- OT_SCRAPE_MAXENTRYLEN
;
195 /* All torrents done: release lock on current bucket */
196 mutex_bucket_unlock(bucket
, 0);
198 /* Parent thread died? */
199 if (!g_opentracker_running
)
203 if ((mode
& TASK_TASK_MASK
) == TASK_FULLSCRAPE
)
204 r
+= sprintf(r
, "ee");
206 /* Send rest of data */
207 iovector
.iov_len
= r
- (char *)iovector
.iov_base
;
208 if (mutex_workqueue_pushchunked(taskid
, &iovector
))
209 free(iovector
.iov_base
);
212 #ifdef WANT_COMPRESSION_GZIP
214 static void fullscrape_make_gzip(int taskid
, ot_tasktype mode
) {
217 struct iovec iovector
= {NULL
, 0};
220 /* Setup return vector... */
221 iovector
.iov_base
= malloc(OT_SCRAPE_CHUNK_SIZE
);
222 if (!iovector
.iov_base
)
225 byte_zero(&strm
, sizeof(strm
));
226 strm
.next_out
= (uint8_t *)iovector
.iov_base
;
227 strm
.avail_out
= OT_SCRAPE_CHUNK_SIZE
;
228 if (deflateInit2(&strm
, 7, Z_DEFLATED
, 31, 9, Z_DEFAULT_STRATEGY
) != Z_OK
)
229 fprintf(stderr
, "not ok.\n");
231 if ((mode
& TASK_TASK_MASK
) == TASK_FULLSCRAPE
) {
232 strm
.next_in
= (uint8_t *)"d5:filesd";
233 strm
.avail_in
= strlen("d5:filesd");
234 zres
= deflate(&strm
, Z_NO_FLUSH
);
237 /* For each bucket... */
238 for (bucket
= 0; bucket
< OT_BUCKET_COUNT
; ++bucket
) {
239 /* Get exclusive access to that bucket */
240 ot_vector
*torrents_list
= mutex_bucket_lock(bucket
);
241 ot_torrent
*torrents
= (ot_torrent
*)(torrents_list
->data
);
244 /* For each torrent in this bucket.. */
245 for (i
= 0; i
< torrents_list
->size
; ++i
) {
246 char compress_buffer
[OT_SCRAPE_MAXENTRYLEN
];
247 r
= fullscrape_write_one(mode
, compress_buffer
, torrents
+ i
, &torrents
[i
].hash
);
248 strm
.next_in
= (uint8_t *)compress_buffer
;
249 strm
.avail_in
= r
- compress_buffer
;
250 zres
= deflate(&strm
, Z_NO_FLUSH
);
251 if ((zres
< Z_OK
) && (zres
!= Z_BUF_ERROR
))
252 fprintf(stderr
, "deflate() failed while in fullscrape_make().\n");
254 /* Check if there still is enough buffer left */
255 while (!strm
.avail_out
) {
256 iovector
.iov_len
= (char *)strm
.next_out
- (char *)iovector
.iov_base
;
258 if (mutex_workqueue_pushchunked(taskid
, &iovector
)) {
259 free(iovector
.iov_base
);
260 return mutex_bucket_unlock(bucket
, 0);
262 /* Allocate a fresh output buffer */
263 iovector
.iov_base
= malloc(OT_SCRAPE_CHUNK_SIZE
);
264 if (!iovector
.iov_base
) {
265 fprintf(stderr
, "Out of memory trying to claim ouput buffer\n");
267 return mutex_bucket_unlock(bucket
, 0);
269 strm
.next_out
= (uint8_t *)iovector
.iov_base
;
270 strm
.avail_out
= OT_SCRAPE_CHUNK_SIZE
;
271 zres
= deflate(&strm
, Z_NO_FLUSH
);
272 if ((zres
< Z_OK
) && (zres
!= Z_BUF_ERROR
))
273 fprintf(stderr
, "deflate() failed while in fullscrape_make().\n");
277 /* All torrents done: release lock on current bucket */
278 mutex_bucket_unlock(bucket
, 0);
280 /* Parent thread died? */
281 if (!g_opentracker_running
) {
287 if ((mode
& TASK_TASK_MASK
) == TASK_FULLSCRAPE
) {
288 strm
.next_in
= (uint8_t *)"ee";
289 strm
.avail_in
= strlen("ee");
292 if (deflate(&strm
, Z_FINISH
) < Z_OK
)
293 fprintf(stderr
, "deflate() failed while in fullscrape_make()'s endgame.\n");
295 iovector
.iov_len
= (char *)strm
.next_out
- (char *)iovector
.iov_base
;
296 if (mutex_workqueue_pushchunked(taskid
, &iovector
)) {
297 free(iovector
.iov_base
);
302 /* Check if there's a last batch of data in the zlib buffer */
303 if (!strm
.avail_out
) {
304 /* Allocate a fresh output buffer */
305 iovector
.iov_base
= malloc(OT_SCRAPE_CHUNK_SIZE
);
307 if (!iovector
.iov_base
) {
308 fprintf(stderr
, "Problem with iovec_fix_increase_or_free\n");
312 strm
.next_out
= iovector
.iov_base
;
313 strm
.avail_out
= OT_SCRAPE_CHUNK_SIZE
;
314 if (deflate(&strm
, Z_FINISH
) < Z_OK
)
315 fprintf(stderr
, "deflate() failed while in fullscrape_make()'s endgame.\n");
317 /* Only pass the new buffer if there actually was some data left in the buffer */
318 iovector
.iov_len
= (char *)strm
.next_out
- (char *)iovector
.iov_base
;
319 if (!iovector
.iov_len
|| mutex_workqueue_pushchunked(taskid
, &iovector
))
320 free(iovector
.iov_base
);
325 /* WANT_COMPRESSION_GZIP */
328 #ifdef WANT_COMPRESSION_ZSTD
330 static void fullscrape_make_zstd(int taskid
, ot_tasktype mode
) {
333 struct iovec iovector
= {NULL
, 0};
334 ZSTD_CCtx
*zstream
= ZSTD_createCCtx();
336 ZSTD_outBuffer outbuf
;
342 /* Setup return vector... */
343 iovector
.iov_base
= malloc(OT_SCRAPE_CHUNK_SIZE
);
344 if (!iovector
.iov_base
) {
345 ZSTD_freeCCtx(zstream
);
349 /* Working with a compression level 6 is half as fast as level 3, but
350 seems to be the last reasonable bump that's worth extra cpu */
351 ZSTD_CCtx_setParameter(zstream
, ZSTD_c_compressionLevel
, 6);
353 outbuf
.dst
= iovector
.iov_base
;
354 outbuf
.size
= OT_SCRAPE_CHUNK_SIZE
;
357 if ((mode
& TASK_TASK_MASK
) == TASK_FULLSCRAPE
) {
358 inbuf
.src
= (const void *)"d5:filesd";
359 inbuf
.size
= strlen("d5:filesd");
361 ZSTD_compressStream2(zstream
, &outbuf
, &inbuf
, ZSTD_e_continue
);
364 /* For each bucket... */
365 for (bucket
= 0; bucket
< OT_BUCKET_COUNT
; ++bucket
) {
366 /* Get exclusive access to that bucket */
367 ot_vector
*torrents_list
= mutex_bucket_lock(bucket
);
368 ot_torrent
*torrents
= (ot_torrent
*)(torrents_list
->data
);
371 /* For each torrent in this bucket.. */
372 for (i
= 0; i
< torrents_list
->size
; ++i
) {
373 char compress_buffer
[OT_SCRAPE_MAXENTRYLEN
];
374 r
= fullscrape_write_one(mode
, compress_buffer
, torrents
+ i
, &torrents
[i
].hash
);
375 inbuf
.src
= compress_buffer
;
376 inbuf
.size
= r
- compress_buffer
;
378 ZSTD_compressStream2(zstream
, &outbuf
, &inbuf
, ZSTD_e_continue
);
380 /* Check if there still is enough buffer left */
381 while (outbuf
.pos
+ OT_SCRAPE_MAXENTRYLEN
> outbuf
.size
) {
382 iovector
.iov_len
= outbuf
.size
;
384 if (mutex_workqueue_pushchunked(taskid
, &iovector
)) {
385 free(iovector
.iov_base
);
386 ZSTD_freeCCtx(zstream
);
387 return mutex_bucket_unlock(bucket
, 0);
389 /* Allocate a fresh output buffer */
390 iovector
.iov_base
= malloc(OT_SCRAPE_CHUNK_SIZE
);
391 if (!iovector
.iov_base
) {
392 fprintf(stderr
, "Out of memory trying to claim ouput buffer\n");
393 ZSTD_freeCCtx(zstream
);
394 return mutex_bucket_unlock(bucket
, 0);
397 outbuf
.dst
= iovector
.iov_base
;
398 outbuf
.size
= OT_SCRAPE_CHUNK_SIZE
;
401 ZSTD_compressStream2(zstream
, &outbuf
, &inbuf
, ZSTD_e_continue
);
405 /* All torrents done: release lock on current bucket */
406 mutex_bucket_unlock(bucket
, 0);
408 /* Parent thread died? */
409 if (!g_opentracker_running
)
413 if ((mode
& TASK_TASK_MASK
) == TASK_FULLSCRAPE
) {
414 inbuf
.src
= (const void *)"ee";
415 inbuf
.size
= strlen("ee");
419 more_bytes
= ZSTD_compressStream2(zstream
, &outbuf
, &inbuf
, ZSTD_e_end
);
421 iovector
.iov_len
= outbuf
.pos
;
422 if (mutex_workqueue_pushchunked(taskid
, &iovector
)) {
423 free(iovector
.iov_base
);
424 ZSTD_freeCCtx(zstream
);
428 /* Check if there's a last batch of data in the zlib buffer */
430 /* Allocate a fresh output buffer */
431 iovector
.iov_base
= malloc(OT_SCRAPE_CHUNK_SIZE
);
433 if (!iovector
.iov_base
) {
434 fprintf(stderr
, "Problem with iovec_fix_increase_or_free\n");
435 ZSTD_freeCCtx(zstream
);
439 outbuf
.dst
= iovector
.iov_base
;
440 outbuf
.size
= OT_SCRAPE_CHUNK_SIZE
;
443 ZSTD_compressStream2(zstream
, &outbuf
, &inbuf
, ZSTD_e_end
);
445 /* Only pass the new buffer if there actually was some data left in the buffer */
446 iovector
.iov_len
= outbuf
.pos
;
447 if (!iovector
.iov_len
|| mutex_workqueue_pushchunked(taskid
, &iovector
))
448 free(iovector
.iov_base
);
451 ZSTD_freeCCtx(zstream
);
453 /* WANT_COMPRESSION_ZSTD */
456 /* WANT_FULLSCRAPE */