Let our fullscrapes have a binary content-type
[opentracker.git] / ot_fullscrape.c
blobaed2ad976c6e90ce319f4f3def73628130021a6f
1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
4 $id$ */
6 #ifdef WANT_FULLSCRAPE
8 /* System */
9 #include <arpa/inet.h>
10 #include <pthread.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <sys/param.h>
14 #ifdef WANT_COMPRESSION_GZIP
15 #include <zlib.h>
16 #endif
18 /* Libowfat */
19 #include "byte.h"
20 #include "io.h"
21 #include "textcode.h"
23 /* Opentracker */
24 #include "ot_fullscrape.h"
25 #include "ot_iovec.h"
26 #include "ot_mutex.h"
27 #include "trackerlogic.h"
29 /* Fetch full scrape info for all torrents
30 Full scrapes usually are huge and one does not want to
31 allocate more memory. So lets get them in 512k units
33 #define OT_SCRAPE_CHUNK_SIZE (1024 * 1024)
35 /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
36 #define OT_SCRAPE_MAXENTRYLEN 256
38 /* Forward declaration */
39 static void fullscrape_make(int taskid, ot_tasktype mode);
40 #ifdef WANT_COMPRESSION_GZIP
41 static void fullscrape_make_gzip(int taskid, ot_tasktype mode);
42 #endif
44 /* Converter function from memory to human readable hex strings
45 XXX - Duplicated from ot_stats. Needs fix. */
46 static char *to_hex(char *d, uint8_t *s) {
47 char *m = "0123456789ABCDEF";
48 char *t = d;
49 char *e = d + 40;
50 while (d < e) {
51 *d++ = m[*s >> 4];
52 *d++ = m[*s++ & 15];
54 *d = 0;
55 return t;
58 /* This is the entry point into this worker thread
59 It grabs tasks from mutex_tasklist and delivers results back
61 static void *fullscrape_worker(void *args) {
62 (void)args;
64 while (g_opentracker_running) {
65 ot_tasktype tasktype = TASK_FULLSCRAPE;
66 ot_taskid taskid = mutex_workqueue_poptask(&tasktype);
67 #ifdef WANT_COMPRESSION_GZIP
68 if (tasktype & TASK_FLAG_GZIP)
69 fullscrape_make_gzip(taskid, tasktype);
70 else
71 #endif
72 fullscrape_make(taskid, tasktype);
73 mutex_workqueue_pushchunked(taskid, NULL);
75 return NULL;
78 static pthread_t thread_id;
79 void fullscrape_init( ) {
80 pthread_create( &thread_id, NULL, fullscrape_worker, NULL );
83 void fullscrape_deinit( ) {
84 pthread_cancel( thread_id );
87 void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) {
88 mutex_workqueue_pushtask( sock, tasktype );
91 static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torrent, ot_hash *hash ) {
92 size_t seed_count = torrent->peer_list6->seed_count + torrent->peer_list4->seed_count;
93 size_t peer_count = torrent->peer_list6->peer_count + torrent->peer_list4->peer_count;
94 size_t down_count = torrent->peer_list6->down_count + torrent->peer_list4->down_count;
96 switch (mode & TASK_TASK_MASK) {
97 case TASK_FULLSCRAPE:
98 default:
99 /* push hash as bencoded string */
100 *r++ = '2';
101 *r++ = '0';
102 *r++ = ':';
103 memcpy(r, hash, sizeof(ot_hash));
104 r += sizeof(ot_hash);
105 /* push rest of the scrape string */
106 r += sprintf(r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", seed_count, down_count, peer_count - seed_count);
108 break;
109 case TASK_FULLSCRAPE_TPB_ASCII:
110 to_hex(r, *hash);
111 r += 2 * sizeof(ot_hash);
112 r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count);
113 break;
114 case TASK_FULLSCRAPE_TPB_ASCII_PLUS:
115 to_hex(r, *hash);
116 r += 2 * sizeof(ot_hash);
117 r += sprintf(r, ":%zd:%zd:%zd\n", seed_count, peer_count - seed_count, down_count);
118 break;
119 case TASK_FULLSCRAPE_TPB_BINARY:
120 memcpy(r, *hash, sizeof(ot_hash));
121 r += sizeof(ot_hash);
122 *(uint32_t *)(r + 0) = htonl((uint32_t)seed_count);
123 *(uint32_t *)(r + 4) = htonl((uint32_t)(peer_count - seed_count));
124 r += 8;
125 break;
126 case TASK_FULLSCRAPE_TPB_URLENCODED:
127 r += fmt_urlencoded(r, (char *)*hash, 20);
128 r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count);
129 break;
130 case TASK_FULLSCRAPE_TRACKERSTATE:
131 to_hex(r, *hash);
132 r += 2 * sizeof(ot_hash);
133 r += sprintf(r, ":%zd:%zd\n", torrent->peer_list6->base, down_count);
134 break;
136 return r;
139 static void fullscrape_make(int taskid, ot_tasktype mode) {
140 int bucket;
141 char *r, *re;
142 struct iovec iovector = {NULL, 0};
144 /* Setup return vector... */
145 r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
146 if (!r)
147 return;
149 /* re points to low watermark */
150 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
152 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE)
153 r += sprintf(r, "d5:filesd");
155 /* For each bucket... */
156 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
157 /* Get exclusive access to that bucket */
158 ot_vector *torrents_list = mutex_bucket_lock(bucket);
159 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
160 size_t i;
162 /* For each torrent in this bucket.. */
163 for (i = 0; i < torrents_list->size; ++i) {
164 r = fullscrape_write_one(mode, r, torrents + i, &torrents[i].hash);
166 if (r > re) {
167 iovector.iov_len = r - (char *)iovector.iov_base;
169 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
170 free(iovector.iov_base);
171 return mutex_bucket_unlock(bucket, 0);
173 /* Allocate a fresh output buffer */
174 r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
175 if (!r)
176 return mutex_bucket_unlock(bucket, 0);
178 /* re points to low watermark */
179 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
183 /* All torrents done: release lock on current bucket */
184 mutex_bucket_unlock(bucket, 0);
186 /* Parent thread died? */
187 if (!g_opentracker_running)
188 return;
191 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE)
192 r += sprintf(r, "ee");
194 /* Send rest of data */
195 iovector.iov_len = r - (char *)iovector.iov_base;
196 if (mutex_workqueue_pushchunked(taskid, &iovector))
197 free(iovector.iov_base);
200 #ifdef WANT_COMPRESSION_GZIP
202 static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
203 int bucket;
204 char *r;
205 struct iovec iovector = {NULL, 0};
206 int zres;
207 z_stream strm;
208 fprintf(stderr, "GZIP path\n");
209 /* Setup return vector... */
210 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
211 if (!iovector.iov_base)
212 return;
214 byte_zero(&strm, sizeof(strm));
215 strm.next_out = (uint8_t *)iovector.iov_base;
216 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
217 if (deflateInit2(&strm, 7, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY) != Z_OK)
218 fprintf(stderr, "not ok.\n");
220 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
221 strm.next_in = (uint8_t *)"d5:filesd";
222 strm.avail_in = strlen("d5:filesd");
223 zres = deflate(&strm, Z_NO_FLUSH);
226 /* For each bucket... */
227 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
228 /* Get exclusive access to that bucket */
229 ot_vector *torrents_list = mutex_bucket_lock(bucket);
230 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
231 size_t i;
233 /* For each torrent in this bucket.. */
234 for (i = 0; i < torrents_list->size; ++i) {
235 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
236 r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
237 strm.next_in = (uint8_t *)compress_buffer;
238 strm.avail_in = r - compress_buffer;
239 zres = deflate(&strm, Z_NO_FLUSH);
240 if ((zres < Z_OK) && (zres != Z_BUF_ERROR))
241 fprintf(stderr, "deflate() failed while in fullscrape_make().\n");
243 /* Check if there still is enough buffer left */
244 while (!strm.avail_out) {
245 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
247 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
248 free(iovector.iov_base);
249 return mutex_bucket_unlock(bucket, 0);
251 /* Allocate a fresh output buffer */
252 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
253 if (!iovector.iov_base) {
254 fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
255 deflateEnd(&strm);
256 return mutex_bucket_unlock(bucket, 0);
258 strm.next_out = (uint8_t *)iovector.iov_base;
259 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
260 zres = deflate(&strm, Z_NO_FLUSH);
261 if ((zres < Z_OK) && (zres != Z_BUF_ERROR))
262 fprintf(stderr, "deflate() failed while in fullscrape_make().\n");
266 /* All torrents done: release lock on current bucket */
267 mutex_bucket_unlock(bucket, 0);
269 /* Parent thread died? */
270 if (!g_opentracker_running)
271 return;
274 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
275 strm.next_in = (uint8_t *)"ee";
276 strm.avail_in = strlen("ee");
279 if (deflate(&strm, Z_FINISH) < Z_OK)
280 fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n");
282 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
283 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
284 free(iovector.iov_base);
285 return mutex_bucket_unlock(bucket, 0);
288 /* Check if there's a last batch of data in the zlib buffer */
289 if (!strm.avail_out) {
290 /* Allocate a fresh output buffer */
291 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
293 if (!iovector.iov_base) {
294 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
295 deflateEnd(&strm);
296 return mutex_bucket_unlock(bucket, 0);
298 strm.next_out = iovector.iov_base;
299 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
300 if (deflate(&strm, Z_FINISH) < Z_OK)
301 fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n");
303 /* Only pass the new buffer if there actually was some data left in the buffer */
304 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
305 if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
306 free(iovector.iov_base);
309 deflateEnd(&strm);
311 /* WANT_COMPRESSION_GZIP */
312 #endif
314 /* WANT_FULLSCRAPE */
315 #endif