Cache control file
[tor/appveyor.git] / src / or / cpuworker.c
blob7da7dc5f8bd4d012ed61c78c9586fb4c1d9e53ac
1 /* Copyright (c) 2003-2004, Roger Dingledine.
2 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
3 * Copyright (c) 2007-2017, The Tor Project, Inc. */
4 /* See LICENSE for licensing information */
6 /**
7 * \file cpuworker.c
8 * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
9 * out to subprocesses.
11 * The multithreading backend for this module is in workqueue.c; this module
12 * specializes workqueue.c.
14 * Right now, we use this infrastructure
15 * <ul><li>for processing onionskins in onion.c
16 * <li>for compressing consensuses in consdiffmgr.c,
17 * <li>and for calculating diffs and compressing them in consdiffmgr.c.
18 * </ul>
19 **/
20 #include "or.h"
21 #include "channel.h"
22 #include "circuitbuild.h"
23 #include "circuitlist.h"
24 #include "connection_or.h"
25 #include "config.h"
26 #include "cpuworker.h"
27 #include "main.h"
28 #include "onion.h"
29 #include "rephist.h"
30 #include "router.h"
31 #include "workqueue.h"
33 #include <event2/event.h>
35 static void queue_pending_tasks(void);
37 typedef struct worker_state_s {
38 int generation;
39 server_onion_keys_t *onion_keys;
40 } worker_state_t;
42 static void *
43 worker_state_new(void *arg)
45 worker_state_t *ws;
46 (void)arg;
47 ws = tor_malloc_zero(sizeof(worker_state_t));
48 ws->onion_keys = server_onion_keys_new();
49 return ws;
52 #define worker_state_free(ws) \
53 FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
55 static void
56 worker_state_free_(worker_state_t *ws)
58 if (!ws)
59 return;
60 server_onion_keys_free(ws->onion_keys);
61 tor_free(ws);
64 static void
65 worker_state_free_void(void *arg)
67 worker_state_free_(arg);
70 static replyqueue_t *replyqueue = NULL;
71 static threadpool_t *threadpool = NULL;
72 static struct event *reply_event = NULL;
74 static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
76 static int total_pending_tasks = 0;
77 static int max_pending_tasks = 128;
79 static void
80 replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
82 replyqueue_t *rq = arg;
83 (void) sock;
84 (void) events;
85 replyqueue_process(rq);
88 /** Initialize the cpuworker subsystem. It is OK to call this more than once
89 * during Tor's lifetime.
91 void
92 cpu_init(void)
94 if (!replyqueue) {
95 replyqueue = replyqueue_new(0);
97 if (!reply_event) {
98 reply_event = tor_event_new(tor_libevent_get_base(),
99 replyqueue_get_socket(replyqueue),
100 EV_READ|EV_PERSIST,
101 replyqueue_process_cb,
102 replyqueue);
103 event_add(reply_event, NULL);
105 if (!threadpool) {
107 In our threadpool implementation, half the threads are permissive and
108 half are strict (when it comes to running lower-priority tasks). So we
109 always make sure we have at least two threads, so that there will be at
110 least one thread of each kind.
112 const int n_threads = get_num_cpus(get_options()) + 1;
113 threadpool = threadpool_new(n_threads,
114 replyqueue,
115 worker_state_new,
116 worker_state_free_void,
117 NULL);
119 /* Total voodoo. Can we make this more sensible? */
120 max_pending_tasks = get_num_cpus(get_options()) * 64;
121 crypto_seed_weak_rng(&request_sample_rng);
124 /** Magic numbers to make sure our cpuworker_requests don't grow any
125 * mis-framing bugs. */
126 #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
127 #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
129 /** A request sent to a cpuworker. */
130 typedef struct cpuworker_request_t {
131 /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
132 uint32_t magic;
134 /** Flag: Are we timing this request? */
135 unsigned timed : 1;
136 /** If we're timing this request, when was it sent to the cpuworker? */
137 struct timeval started_at;
139 /** A create cell for the cpuworker to process. */
140 create_cell_t create_cell;
142 /* Turn the above into a tagged union if needed. */
143 } cpuworker_request_t;
145 /** A reply sent by a cpuworker. */
146 typedef struct cpuworker_reply_t {
147 /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
148 uint32_t magic;
150 /** True iff we got a successful request. */
151 uint8_t success;
153 /** Are we timing this request? */
154 unsigned int timed : 1;
155 /** What handshake type was the request? (Used for timing) */
156 uint16_t handshake_type;
157 /** When did we send the request to the cpuworker? */
158 struct timeval started_at;
159 /** Once the cpuworker received the request, how many microseconds did it
160 * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
161 * and we'll never have an onion handshake that takes so long.) */
162 uint32_t n_usec;
164 /** Output of processing a create cell
166 * @{
168 /** The created cell to send back. */
169 created_cell_t created_cell;
170 /** The keys to use on this circuit. */
171 uint8_t keys[CPATH_KEY_MATERIAL_LEN];
172 /** Input to use for authenticating introduce1 cells. */
173 uint8_t rend_auth_material[DIGEST_LEN];
174 } cpuworker_reply_t;
176 typedef struct cpuworker_job_u {
177 or_circuit_t *circ;
178 union {
179 cpuworker_request_t request;
180 cpuworker_reply_t reply;
181 } u;
182 } cpuworker_job_t;
184 static workqueue_reply_t
185 update_state_threadfn(void *state_, void *work_)
187 worker_state_t *state = state_;
188 worker_state_t *update = work_;
189 server_onion_keys_free(state->onion_keys);
190 state->onion_keys = update->onion_keys;
191 update->onion_keys = NULL;
192 worker_state_free(update);
193 ++state->generation;
194 return WQ_RPL_REPLY;
197 /** Called when the onion key has changed so update all CPU worker(s) with
198 * new function pointers with which a new state will be generated.
200 void
201 cpuworkers_rotate_keyinfo(void)
203 if (!threadpool) {
204 /* If we're a client, then we won't have cpuworkers, and we won't need
205 * to tell them to rotate their state.
207 return;
209 if (threadpool_queue_update(threadpool,
210 worker_state_new,
211 update_state_threadfn,
212 worker_state_free_void,
213 NULL)) {
214 log_warn(LD_OR, "Failed to queue key update for worker threads.");
218 /** Indexed by handshake type: how many onionskins have we processed and
219 * counted of that type? */
220 static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
221 /** Indexed by handshake type, corresponding to the onionskins counted in
222 * onionskins_n_processed: how many microseconds have we spent in cpuworkers
223 * processing that kind of onionskin? */
224 static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
225 /** Indexed by handshake type, corresponding to onionskins counted in
226 * onionskins_n_processed: how many microseconds have we spent waiting for
227 * cpuworkers to give us answers for that kind of onionskin?
229 static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
231 /** If any onionskin takes longer than this, we clip them to this
232 * time. (microseconds) */
233 #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
235 /** Return true iff we'd like to measure a handshake of type
236 * <b>onionskin_type</b>. Call only from the main thread. */
237 static int
238 should_time_request(uint16_t onionskin_type)
240 /* If we've never heard of this type, we shouldn't even be here. */
241 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
242 return 0;
243 /* Measure the first N handshakes of each type, to ensure we have a
244 * sample */
245 if (onionskins_n_processed[onionskin_type] < 4096)
246 return 1;
247 /** Otherwise, measure with P=1/128. We avoid doing this for every
248 * handshake, since the measurement itself can take a little time. */
249 return tor_weak_random_one_in_n(&request_sample_rng, 128);
252 /** Return an estimate of how many microseconds we will need for a single
253 * cpuworker to process <b>n_requests</b> onionskins of type
254 * <b>onionskin_type</b>. */
255 uint64_t
256 estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
258 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
259 return 1000 * (uint64_t)n_requests;
260 if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
261 /* Until we have 100 data points, just asssume everything takes 1 msec. */
262 return 1000 * (uint64_t)n_requests;
263 } else {
264 /* This can't overflow: we'll never have more than 500000 onionskins
265 * measured in onionskin_usec_internal, and they won't take anything near
266 * 1 sec each, and we won't have anything like 1 million queued
267 * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
268 * UINT64_MAX. */
269 return (onionskins_usec_internal[onionskin_type] * n_requests) /
270 onionskins_n_processed[onionskin_type];
274 /** Compute the absolute and relative overhead of using the cpuworker
275 * framework for onionskins of type <b>onionskin_type</b>.*/
276 static int
277 get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
278 uint16_t onionskin_type)
280 uint64_t overhead;
282 *usec_out = 0;
283 *frac_out = 0.0;
285 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
286 return -1;
287 if (onionskins_n_processed[onionskin_type] == 0 ||
288 onionskins_usec_internal[onionskin_type] == 0 ||
289 onionskins_usec_roundtrip[onionskin_type] == 0)
290 return -1;
292 overhead = onionskins_usec_roundtrip[onionskin_type] -
293 onionskins_usec_internal[onionskin_type];
295 *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
296 *frac_out = U64_TO_DBL(overhead) / onionskins_usec_internal[onionskin_type];
298 return 0;
301 /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
302 * log it. */
303 void
304 cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
305 const char *onionskin_type_name)
307 uint32_t overhead;
308 double relative_overhead;
309 int r;
311 r = get_overhead_for_onionskins(&overhead, &relative_overhead,
312 onionskin_type);
313 if (!overhead || r<0)
314 return;
316 log_fn(severity, LD_OR,
317 "%s onionskins have averaged %u usec overhead (%.2f%%) in "
318 "cpuworker code ",
319 onionskin_type_name, (unsigned)overhead, relative_overhead*100);
322 /** Handle a reply from the worker threads. */
323 static void
324 cpuworker_onion_handshake_replyfn(void *work_)
326 cpuworker_job_t *job = work_;
327 cpuworker_reply_t rpl;
328 or_circuit_t *circ = NULL;
330 tor_assert(total_pending_tasks > 0);
331 --total_pending_tasks;
333 /* Could avoid this, but doesn't matter. */
334 memcpy(&rpl, &job->u.reply, sizeof(rpl));
336 tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
338 if (rpl.timed && rpl.success &&
339 rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
340 /* Time how long this request took. The handshake_type check should be
341 needless, but let's leave it in to be safe. */
342 struct timeval tv_end, tv_diff;
343 int64_t usec_roundtrip;
344 tor_gettimeofday(&tv_end);
345 timersub(&tv_end, &rpl.started_at, &tv_diff);
346 usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
347 if (usec_roundtrip >= 0 &&
348 usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
349 ++onionskins_n_processed[rpl.handshake_type];
350 onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
351 onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
352 if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
353 /* Scale down every 500000 handshakes. On a busy server, that's
354 * less impressive than it sounds. */
355 onionskins_n_processed[rpl.handshake_type] /= 2;
356 onionskins_usec_internal[rpl.handshake_type] /= 2;
357 onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
362 circ = job->circ;
364 log_debug(LD_OR,
365 "Unpacking cpuworker reply %p, circ=%p, success=%d",
366 job, circ, rpl.success);
368 if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
369 /* The circuit was supposed to get freed while the reply was
370 * pending. Instead, it got left for us to free so that we wouldn't freak
371 * out when the job->circ field wound up pointing to nothing. */
372 log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
373 circ->base_.magic = 0;
374 tor_free(circ);
375 goto done_processing;
378 circ->workqueue_entry = NULL;
380 if (TO_CIRCUIT(circ)->marked_for_close) {
381 /* We already marked this circuit; we can't call it open. */
382 log_debug(LD_OR,"circuit is already marked.");
383 goto done_processing;
386 if (rpl.success == 0) {
387 log_debug(LD_OR,
388 "decoding onionskin failed. "
389 "(Old key or bad software.) Closing.");
390 circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
391 goto done_processing;
394 if (onionskin_answer(circ,
395 &rpl.created_cell,
396 (const char*)rpl.keys, sizeof(rpl.keys),
397 rpl.rend_auth_material) < 0) {
398 log_warn(LD_OR,"onionskin_answer failed. Closing.");
399 circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
400 goto done_processing;
402 log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
404 done_processing:
405 memwipe(&rpl, 0, sizeof(rpl));
406 memwipe(job, 0, sizeof(*job));
407 tor_free(job);
408 queue_pending_tasks();
411 /** Implementation function for onion handshake requests. */
412 static workqueue_reply_t
413 cpuworker_onion_handshake_threadfn(void *state_, void *work_)
415 worker_state_t *state = state_;
416 cpuworker_job_t *job = work_;
418 /* variables for onion processing */
419 server_onion_keys_t *onion_keys = state->onion_keys;
420 cpuworker_request_t req;
421 cpuworker_reply_t rpl;
423 memcpy(&req, &job->u.request, sizeof(req));
425 tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
426 memset(&rpl, 0, sizeof(rpl));
428 const create_cell_t *cc = &req.create_cell;
429 created_cell_t *cell_out = &rpl.created_cell;
430 struct timeval tv_start = {0,0}, tv_end;
431 int n;
432 rpl.timed = req.timed;
433 rpl.started_at = req.started_at;
434 rpl.handshake_type = cc->handshake_type;
435 if (req.timed)
436 tor_gettimeofday(&tv_start);
437 n = onion_skin_server_handshake(cc->handshake_type,
438 cc->onionskin, cc->handshake_len,
439 onion_keys,
440 cell_out->reply,
441 rpl.keys, CPATH_KEY_MATERIAL_LEN,
442 rpl.rend_auth_material);
443 if (n < 0) {
444 /* failure */
445 log_debug(LD_OR,"onion_skin_server_handshake failed.");
446 memset(&rpl, 0, sizeof(rpl));
447 rpl.success = 0;
448 } else {
449 /* success */
450 log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
451 cell_out->handshake_len = n;
452 switch (cc->cell_type) {
453 case CELL_CREATE:
454 cell_out->cell_type = CELL_CREATED; break;
455 case CELL_CREATE2:
456 cell_out->cell_type = CELL_CREATED2; break;
457 case CELL_CREATE_FAST:
458 cell_out->cell_type = CELL_CREATED_FAST; break;
459 default:
460 tor_assert(0);
461 return WQ_RPL_SHUTDOWN;
463 rpl.success = 1;
465 rpl.magic = CPUWORKER_REPLY_MAGIC;
466 if (req.timed) {
467 struct timeval tv_diff;
468 int64_t usec;
469 tor_gettimeofday(&tv_end);
470 timersub(&tv_end, &tv_start, &tv_diff);
471 usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
472 if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
473 rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
474 else
475 rpl.n_usec = (uint32_t) usec;
478 memcpy(&job->u.reply, &rpl, sizeof(rpl));
480 memwipe(&req, 0, sizeof(req));
481 memwipe(&rpl, 0, sizeof(req));
482 return WQ_RPL_REPLY;
485 /** Take pending tasks from the queue and assign them to cpuworkers. */
486 static void
487 queue_pending_tasks(void)
489 or_circuit_t *circ;
490 create_cell_t *onionskin = NULL;
492 while (total_pending_tasks < max_pending_tasks) {
493 circ = onion_next_task(&onionskin);
495 if (!circ)
496 return;
498 if (assign_onionskin_to_cpuworker(circ, onionskin) < 0)
499 log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
503 /** DOCDOC */
504 MOCK_IMPL(workqueue_entry_t *,
505 cpuworker_queue_work,(workqueue_priority_t priority,
506 workqueue_reply_t (*fn)(void *, void *),
507 void (*reply_fn)(void *),
508 void *arg))
510 tor_assert(threadpool);
512 return threadpool_queue_work_priority(threadpool,
513 priority,
515 reply_fn,
516 arg);
519 /** Try to tell a cpuworker to perform the public key operations necessary to
520 * respond to <b>onionskin</b> for the circuit <b>circ</b>.
522 * Return 0 if we successfully assign the task, or -1 on failure.
525 assign_onionskin_to_cpuworker(or_circuit_t *circ,
526 create_cell_t *onionskin)
528 workqueue_entry_t *queue_entry;
529 cpuworker_job_t *job;
530 cpuworker_request_t req;
531 int should_time;
533 tor_assert(threadpool);
535 if (!circ->p_chan) {
536 log_info(LD_OR,"circ->p_chan gone. Failing circ.");
537 tor_free(onionskin);
538 return -1;
541 if (total_pending_tasks >= max_pending_tasks) {
542 log_debug(LD_OR,"No idle cpuworkers. Queuing.");
543 if (onion_pending_add(circ, onionskin) < 0) {
544 tor_free(onionskin);
545 return -1;
547 return 0;
550 if (!channel_is_client(circ->p_chan))
551 rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
553 should_time = should_time_request(onionskin->handshake_type);
554 memset(&req, 0, sizeof(req));
555 req.magic = CPUWORKER_REQUEST_MAGIC;
556 req.timed = should_time;
558 memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
560 tor_free(onionskin);
562 if (should_time)
563 tor_gettimeofday(&req.started_at);
565 job = tor_malloc_zero(sizeof(cpuworker_job_t));
566 job->circ = circ;
567 memcpy(&job->u.request, &req, sizeof(req));
568 memwipe(&req, 0, sizeof(req));
570 ++total_pending_tasks;
571 queue_entry = threadpool_queue_work_priority(threadpool,
572 WQ_PRI_HIGH,
573 cpuworker_onion_handshake_threadfn,
574 cpuworker_onion_handshake_replyfn,
575 job);
576 if (!queue_entry) {
577 log_warn(LD_BUG, "Couldn't queue work on threadpool");
578 tor_free(job);
579 return -1;
582 log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
583 job, queue_entry, job->circ);
585 circ->workqueue_entry = queue_entry;
587 return 0;
590 /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
591 * remove it from the worker queue. */
592 void
593 cpuworker_cancel_circ_handshake(or_circuit_t *circ)
595 cpuworker_job_t *job;
596 if (circ->workqueue_entry == NULL)
597 return;
599 job = workqueue_entry_cancel(circ->workqueue_entry);
600 if (job) {
601 /* It successfully cancelled. */
602 memwipe(job, 0xe0, sizeof(*job));
603 tor_free(job);
604 tor_assert(total_pending_tasks > 0);
605 --total_pending_tasks;
606 /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
607 circ->workqueue_entry = NULL;