Bump copyright date to 2019
[tor.git] / src / core / mainloop / cpuworker.c
blobe704d55642d2f1125981ad39e83f7a03852f943d
1 /* Copyright (c) 2003-2004, Roger Dingledine.
2 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
3 * Copyright (c) 2007-2019, The Tor Project, Inc. */
4 /* See LICENSE for licensing information */
6 /**
7 * \file cpuworker.c
8 * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
9 * out to subprocesses.
11 * The multithreading backend for this module is in workqueue.c; this module
12 * specializes workqueue.c.
14 * Right now, we use this infrastructure
15 * <ul><li>for processing onionskins in onion.c
16 * <li>for compressing consensuses in consdiffmgr.c,
17 * <li>and for calculating diffs and compressing them in consdiffmgr.c.
18 * </ul>
19 **/
20 #include "core/or/or.h"
21 #include "core/or/channel.h"
22 #include "core/or/circuitbuild.h"
23 #include "core/or/circuitlist.h"
24 #include "core/or/connection_or.h"
25 #include "app/config/config.h"
26 #include "core/mainloop/cpuworker.h"
27 #include "lib/crypt_ops/crypto_rand.h"
28 #include "lib/crypt_ops/crypto_util.h"
29 #include "core/or/onion.h"
30 #include "feature/relay/onion_queue.h"
31 #include "feature/stats/rephist.h"
32 #include "feature/relay/router.h"
33 #include "lib/evloop/workqueue.h"
34 #include "core/crypto/onion_crypto.h"
36 #include "core/or/or_circuit_st.h"
37 #include "lib/intmath/weakrng.h"
39 static void queue_pending_tasks(void);
41 typedef struct worker_state_s {
42 int generation;
43 server_onion_keys_t *onion_keys;
44 } worker_state_t;
46 static void *
47 worker_state_new(void *arg)
49 worker_state_t *ws;
50 (void)arg;
51 ws = tor_malloc_zero(sizeof(worker_state_t));
52 ws->onion_keys = server_onion_keys_new();
53 return ws;
56 #define worker_state_free(ws) \
57 FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
59 static void
60 worker_state_free_(worker_state_t *ws)
62 if (!ws)
63 return;
64 server_onion_keys_free(ws->onion_keys);
65 tor_free(ws);
68 static void
69 worker_state_free_void(void *arg)
71 worker_state_free_(arg);
74 static replyqueue_t *replyqueue = NULL;
75 static threadpool_t *threadpool = NULL;
77 static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
79 static int total_pending_tasks = 0;
80 static int max_pending_tasks = 128;
82 /** Initialize the cpuworker subsystem. It is OK to call this more than once
83 * during Tor's lifetime.
85 void
86 cpu_init(void)
88 if (!replyqueue) {
89 replyqueue = replyqueue_new(0);
91 if (!threadpool) {
93 In our threadpool implementation, half the threads are permissive and
94 half are strict (when it comes to running lower-priority tasks). So we
95 always make sure we have at least two threads, so that there will be at
96 least one thread of each kind.
98 const int n_threads = get_num_cpus(get_options()) + 1;
99 threadpool = threadpool_new(n_threads,
100 replyqueue,
101 worker_state_new,
102 worker_state_free_void,
103 NULL);
105 int r = threadpool_register_reply_event(threadpool, NULL);
107 tor_assert(r == 0);
110 /* Total voodoo. Can we make this more sensible? */
111 max_pending_tasks = get_num_cpus(get_options()) * 64;
112 crypto_seed_weak_rng(&request_sample_rng);
115 /** Magic numbers to make sure our cpuworker_requests don't grow any
116 * mis-framing bugs. */
117 #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
118 #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
120 /** A request sent to a cpuworker. */
121 typedef struct cpuworker_request_t {
122 /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
123 uint32_t magic;
125 /** Flag: Are we timing this request? */
126 unsigned timed : 1;
127 /** If we're timing this request, when was it sent to the cpuworker? */
128 struct timeval started_at;
130 /** A create cell for the cpuworker to process. */
131 create_cell_t create_cell;
133 /* Turn the above into a tagged union if needed. */
134 } cpuworker_request_t;
136 /** A reply sent by a cpuworker. */
137 typedef struct cpuworker_reply_t {
138 /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
139 uint32_t magic;
141 /** True iff we got a successful request. */
142 uint8_t success;
144 /** Are we timing this request? */
145 unsigned int timed : 1;
146 /** What handshake type was the request? (Used for timing) */
147 uint16_t handshake_type;
148 /** When did we send the request to the cpuworker? */
149 struct timeval started_at;
150 /** Once the cpuworker received the request, how many microseconds did it
151 * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
152 * and we'll never have an onion handshake that takes so long.) */
153 uint32_t n_usec;
155 /** Output of processing a create cell
157 * @{
159 /** The created cell to send back. */
160 created_cell_t created_cell;
161 /** The keys to use on this circuit. */
162 uint8_t keys[CPATH_KEY_MATERIAL_LEN];
163 /** Input to use for authenticating introduce1 cells. */
164 uint8_t rend_auth_material[DIGEST_LEN];
165 } cpuworker_reply_t;
167 typedef struct cpuworker_job_u {
168 or_circuit_t *circ;
169 union {
170 cpuworker_request_t request;
171 cpuworker_reply_t reply;
172 } u;
173 } cpuworker_job_t;
175 static workqueue_reply_t
176 update_state_threadfn(void *state_, void *work_)
178 worker_state_t *state = state_;
179 worker_state_t *update = work_;
180 server_onion_keys_free(state->onion_keys);
181 state->onion_keys = update->onion_keys;
182 update->onion_keys = NULL;
183 worker_state_free(update);
184 ++state->generation;
185 return WQ_RPL_REPLY;
188 /** Called when the onion key has changed so update all CPU worker(s) with
189 * new function pointers with which a new state will be generated.
191 void
192 cpuworkers_rotate_keyinfo(void)
194 if (!threadpool) {
195 /* If we're a client, then we won't have cpuworkers, and we won't need
196 * to tell them to rotate their state.
198 return;
200 if (threadpool_queue_update(threadpool,
201 worker_state_new,
202 update_state_threadfn,
203 worker_state_free_void,
204 NULL)) {
205 log_warn(LD_OR, "Failed to queue key update for worker threads.");
209 /** Indexed by handshake type: how many onionskins have we processed and
210 * counted of that type? */
211 static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
212 /** Indexed by handshake type, corresponding to the onionskins counted in
213 * onionskins_n_processed: how many microseconds have we spent in cpuworkers
214 * processing that kind of onionskin? */
215 static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
216 /** Indexed by handshake type, corresponding to onionskins counted in
217 * onionskins_n_processed: how many microseconds have we spent waiting for
218 * cpuworkers to give us answers for that kind of onionskin?
220 static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
222 /** If any onionskin takes longer than this, we clip them to this
223 * time. (microseconds) */
224 #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
226 /** Return true iff we'd like to measure a handshake of type
227 * <b>onionskin_type</b>. Call only from the main thread. */
228 static int
229 should_time_request(uint16_t onionskin_type)
231 /* If we've never heard of this type, we shouldn't even be here. */
232 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
233 return 0;
234 /* Measure the first N handshakes of each type, to ensure we have a
235 * sample */
236 if (onionskins_n_processed[onionskin_type] < 4096)
237 return 1;
238 /** Otherwise, measure with P=1/128. We avoid doing this for every
239 * handshake, since the measurement itself can take a little time. */
240 return tor_weak_random_one_in_n(&request_sample_rng, 128);
243 /** Return an estimate of how many microseconds we will need for a single
244 * cpuworker to process <b>n_requests</b> onionskins of type
245 * <b>onionskin_type</b>. */
246 uint64_t
247 estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
249 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
250 return 1000 * (uint64_t)n_requests;
251 if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
252 /* Until we have 100 data points, just asssume everything takes 1 msec. */
253 return 1000 * (uint64_t)n_requests;
254 } else {
255 /* This can't overflow: we'll never have more than 500000 onionskins
256 * measured in onionskin_usec_internal, and they won't take anything near
257 * 1 sec each, and we won't have anything like 1 million queued
258 * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
259 * UINT64_MAX. */
260 return (onionskins_usec_internal[onionskin_type] * n_requests) /
261 onionskins_n_processed[onionskin_type];
265 /** Compute the absolute and relative overhead of using the cpuworker
266 * framework for onionskins of type <b>onionskin_type</b>.*/
267 static int
268 get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
269 uint16_t onionskin_type)
271 uint64_t overhead;
273 *usec_out = 0;
274 *frac_out = 0.0;
276 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
277 return -1;
278 if (onionskins_n_processed[onionskin_type] == 0 ||
279 onionskins_usec_internal[onionskin_type] == 0 ||
280 onionskins_usec_roundtrip[onionskin_type] == 0)
281 return -1;
283 overhead = onionskins_usec_roundtrip[onionskin_type] -
284 onionskins_usec_internal[onionskin_type];
286 *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
287 *frac_out = ((double)overhead) / onionskins_usec_internal[onionskin_type];
289 return 0;
292 /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
293 * log it. */
294 void
295 cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
296 const char *onionskin_type_name)
298 uint32_t overhead;
299 double relative_overhead;
300 int r;
302 r = get_overhead_for_onionskins(&overhead, &relative_overhead,
303 onionskin_type);
304 if (!overhead || r<0)
305 return;
307 log_fn(severity, LD_OR,
308 "%s onionskins have averaged %u usec overhead (%.2f%%) in "
309 "cpuworker code ",
310 onionskin_type_name, (unsigned)overhead, relative_overhead*100);
313 /** Handle a reply from the worker threads. */
314 static void
315 cpuworker_onion_handshake_replyfn(void *work_)
317 cpuworker_job_t *job = work_;
318 cpuworker_reply_t rpl;
319 or_circuit_t *circ = NULL;
321 tor_assert(total_pending_tasks > 0);
322 --total_pending_tasks;
324 /* Could avoid this, but doesn't matter. */
325 memcpy(&rpl, &job->u.reply, sizeof(rpl));
327 tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
329 if (rpl.timed && rpl.success &&
330 rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
331 /* Time how long this request took. The handshake_type check should be
332 needless, but let's leave it in to be safe. */
333 struct timeval tv_end, tv_diff;
334 int64_t usec_roundtrip;
335 tor_gettimeofday(&tv_end);
336 timersub(&tv_end, &rpl.started_at, &tv_diff);
337 usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
338 if (usec_roundtrip >= 0 &&
339 usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
340 ++onionskins_n_processed[rpl.handshake_type];
341 onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
342 onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
343 if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
344 /* Scale down every 500000 handshakes. On a busy server, that's
345 * less impressive than it sounds. */
346 onionskins_n_processed[rpl.handshake_type] /= 2;
347 onionskins_usec_internal[rpl.handshake_type] /= 2;
348 onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
353 circ = job->circ;
355 log_debug(LD_OR,
356 "Unpacking cpuworker reply %p, circ=%p, success=%d",
357 job, circ, rpl.success);
359 if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
360 /* The circuit was supposed to get freed while the reply was
361 * pending. Instead, it got left for us to free so that we wouldn't freak
362 * out when the job->circ field wound up pointing to nothing. */
363 log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
364 circ->base_.magic = 0;
365 tor_free(circ);
366 goto done_processing;
369 circ->workqueue_entry = NULL;
371 if (TO_CIRCUIT(circ)->marked_for_close) {
372 /* We already marked this circuit; we can't call it open. */
373 log_debug(LD_OR,"circuit is already marked.");
374 goto done_processing;
377 if (rpl.success == 0) {
378 log_debug(LD_OR,
379 "decoding onionskin failed. "
380 "(Old key or bad software.) Closing.");
381 circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
382 goto done_processing;
385 if (onionskin_answer(circ,
386 &rpl.created_cell,
387 (const char*)rpl.keys, sizeof(rpl.keys),
388 rpl.rend_auth_material) < 0) {
389 log_warn(LD_OR,"onionskin_answer failed. Closing.");
390 circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
391 goto done_processing;
393 log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
395 done_processing:
396 memwipe(&rpl, 0, sizeof(rpl));
397 memwipe(job, 0, sizeof(*job));
398 tor_free(job);
399 queue_pending_tasks();
402 /** Implementation function for onion handshake requests. */
403 static workqueue_reply_t
404 cpuworker_onion_handshake_threadfn(void *state_, void *work_)
406 worker_state_t *state = state_;
407 cpuworker_job_t *job = work_;
409 /* variables for onion processing */
410 server_onion_keys_t *onion_keys = state->onion_keys;
411 cpuworker_request_t req;
412 cpuworker_reply_t rpl;
414 memcpy(&req, &job->u.request, sizeof(req));
416 tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
417 memset(&rpl, 0, sizeof(rpl));
419 const create_cell_t *cc = &req.create_cell;
420 created_cell_t *cell_out = &rpl.created_cell;
421 struct timeval tv_start = {0,0}, tv_end;
422 int n;
423 rpl.timed = req.timed;
424 rpl.started_at = req.started_at;
425 rpl.handshake_type = cc->handshake_type;
426 if (req.timed)
427 tor_gettimeofday(&tv_start);
428 n = onion_skin_server_handshake(cc->handshake_type,
429 cc->onionskin, cc->handshake_len,
430 onion_keys,
431 cell_out->reply,
432 rpl.keys, CPATH_KEY_MATERIAL_LEN,
433 rpl.rend_auth_material);
434 if (n < 0) {
435 /* failure */
436 log_debug(LD_OR,"onion_skin_server_handshake failed.");
437 memset(&rpl, 0, sizeof(rpl));
438 rpl.success = 0;
439 } else {
440 /* success */
441 log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
442 cell_out->handshake_len = n;
443 switch (cc->cell_type) {
444 case CELL_CREATE:
445 cell_out->cell_type = CELL_CREATED; break;
446 case CELL_CREATE2:
447 cell_out->cell_type = CELL_CREATED2; break;
448 case CELL_CREATE_FAST:
449 cell_out->cell_type = CELL_CREATED_FAST; break;
450 default:
451 tor_assert(0);
452 return WQ_RPL_SHUTDOWN;
454 rpl.success = 1;
456 rpl.magic = CPUWORKER_REPLY_MAGIC;
457 if (req.timed) {
458 struct timeval tv_diff;
459 int64_t usec;
460 tor_gettimeofday(&tv_end);
461 timersub(&tv_end, &tv_start, &tv_diff);
462 usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
463 if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
464 rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
465 else
466 rpl.n_usec = (uint32_t) usec;
469 memcpy(&job->u.reply, &rpl, sizeof(rpl));
471 memwipe(&req, 0, sizeof(req));
472 memwipe(&rpl, 0, sizeof(req));
473 return WQ_RPL_REPLY;
476 /** Take pending tasks from the queue and assign them to cpuworkers. */
477 static void
478 queue_pending_tasks(void)
480 or_circuit_t *circ;
481 create_cell_t *onionskin = NULL;
483 while (total_pending_tasks < max_pending_tasks) {
484 circ = onion_next_task(&onionskin);
486 if (!circ)
487 return;
489 if (assign_onionskin_to_cpuworker(circ, onionskin) < 0)
490 log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
494 /** DOCDOC */
495 MOCK_IMPL(workqueue_entry_t *,
496 cpuworker_queue_work,(workqueue_priority_t priority,
497 workqueue_reply_t (*fn)(void *, void *),
498 void (*reply_fn)(void *),
499 void *arg))
501 tor_assert(threadpool);
503 return threadpool_queue_work_priority(threadpool,
504 priority,
506 reply_fn,
507 arg);
510 /** Try to tell a cpuworker to perform the public key operations necessary to
511 * respond to <b>onionskin</b> for the circuit <b>circ</b>.
513 * Return 0 if we successfully assign the task, or -1 on failure.
516 assign_onionskin_to_cpuworker(or_circuit_t *circ,
517 create_cell_t *onionskin)
519 workqueue_entry_t *queue_entry;
520 cpuworker_job_t *job;
521 cpuworker_request_t req;
522 int should_time;
524 tor_assert(threadpool);
526 if (!circ->p_chan) {
527 log_info(LD_OR,"circ->p_chan gone. Failing circ.");
528 tor_free(onionskin);
529 return -1;
532 if (total_pending_tasks >= max_pending_tasks) {
533 log_debug(LD_OR,"No idle cpuworkers. Queuing.");
534 if (onion_pending_add(circ, onionskin) < 0) {
535 tor_free(onionskin);
536 return -1;
538 return 0;
541 if (!channel_is_client(circ->p_chan))
542 rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
544 should_time = should_time_request(onionskin->handshake_type);
545 memset(&req, 0, sizeof(req));
546 req.magic = CPUWORKER_REQUEST_MAGIC;
547 req.timed = should_time;
549 memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
551 tor_free(onionskin);
553 if (should_time)
554 tor_gettimeofday(&req.started_at);
556 job = tor_malloc_zero(sizeof(cpuworker_job_t));
557 job->circ = circ;
558 memcpy(&job->u.request, &req, sizeof(req));
559 memwipe(&req, 0, sizeof(req));
561 ++total_pending_tasks;
562 queue_entry = threadpool_queue_work_priority(threadpool,
563 WQ_PRI_HIGH,
564 cpuworker_onion_handshake_threadfn,
565 cpuworker_onion_handshake_replyfn,
566 job);
567 if (!queue_entry) {
568 log_warn(LD_BUG, "Couldn't queue work on threadpool");
569 tor_free(job);
570 return -1;
573 log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
574 job, queue_entry, job->circ);
576 circ->workqueue_entry = queue_entry;
578 return 0;
581 /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
582 * remove it from the worker queue. */
583 void
584 cpuworker_cancel_circ_handshake(or_circuit_t *circ)
586 cpuworker_job_t *job;
587 if (circ->workqueue_entry == NULL)
588 return;
590 job = workqueue_entry_cancel(circ->workqueue_entry);
591 if (job) {
592 /* It successfully cancelled. */
593 memwipe(job, 0xe0, sizeof(*job));
594 tor_free(job);
595 tor_assert(total_pending_tasks > 0);
596 --total_pending_tasks;
597 /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
598 circ->workqueue_entry = NULL;