Merge remote-tracking branch 'public/bug23985_029' into maint-0.2.9
[tor.git] / src / or / cpuworker.c
blobfd6de6ea7ce2e782b45058f06c63ab8babe6ff8e
1 /* Copyright (c) 2003-2004, Roger Dingledine.
2 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
3 * Copyright (c) 2007-2016, The Tor Project, Inc. */
4 /* See LICENSE for licensing information */
6 /**
7 * \file cpuworker.c
8 * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
9 * out to subprocesses.
11 * The multithreading backend for this module is in workqueue.c; this module
12 * specializes workqueue.c.
14 * Right now, we only use this for processing onionskins, and invoke it mostly
15 * from onion.c.
16 **/
17 #include "or.h"
18 #include "channel.h"
19 #include "circuitbuild.h"
20 #include "circuitlist.h"
21 #include "connection_or.h"
22 #include "config.h"
23 #include "cpuworker.h"
24 #include "main.h"
25 #include "onion.h"
26 #include "rephist.h"
27 #include "router.h"
28 #include "workqueue.h"
30 #include <event2/event.h>
32 static void queue_pending_tasks(void);
34 typedef struct worker_state_s {
35 int generation;
36 server_onion_keys_t *onion_keys;
37 } worker_state_t;
39 static void *
40 worker_state_new(void *arg)
42 worker_state_t *ws;
43 (void)arg;
44 ws = tor_malloc_zero(sizeof(worker_state_t));
45 ws->onion_keys = server_onion_keys_new();
46 return ws;
48 static void
49 worker_state_free(void *arg)
51 worker_state_t *ws = arg;
52 server_onion_keys_free(ws->onion_keys);
53 tor_free(ws);
56 static replyqueue_t *replyqueue = NULL;
57 static threadpool_t *threadpool = NULL;
58 static struct event *reply_event = NULL;
60 static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
62 static int total_pending_tasks = 0;
63 static int max_pending_tasks = 128;
65 static void
66 replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
68 replyqueue_t *rq = arg;
69 (void) sock;
70 (void) events;
71 replyqueue_process(rq);
74 /** Initialize the cpuworker subsystem. It is OK to call this more than once
75 * during Tor's lifetime.
77 void
78 cpu_init(void)
80 if (!replyqueue) {
81 replyqueue = replyqueue_new(0);
83 if (!reply_event) {
84 reply_event = tor_event_new(tor_libevent_get_base(),
85 replyqueue_get_socket(replyqueue),
86 EV_READ|EV_PERSIST,
87 replyqueue_process_cb,
88 replyqueue);
89 event_add(reply_event, NULL);
91 if (!threadpool) {
92 threadpool = threadpool_new(get_num_cpus(get_options()),
93 replyqueue,
94 worker_state_new,
95 worker_state_free,
96 NULL);
98 /* Total voodoo. Can we make this more sensible? */
99 max_pending_tasks = get_num_cpus(get_options()) * 64;
100 crypto_seed_weak_rng(&request_sample_rng);
103 /** Magic numbers to make sure our cpuworker_requests don't grow any
104 * mis-framing bugs. */
105 #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
106 #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
108 /** A request sent to a cpuworker. */
109 typedef struct cpuworker_request_t {
110 /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
111 uint32_t magic;
113 /** Flag: Are we timing this request? */
114 unsigned timed : 1;
115 /** If we're timing this request, when was it sent to the cpuworker? */
116 struct timeval started_at;
118 /** A create cell for the cpuworker to process. */
119 create_cell_t create_cell;
121 /* Turn the above into a tagged union if needed. */
122 } cpuworker_request_t;
124 /** A reply sent by a cpuworker. */
125 typedef struct cpuworker_reply_t {
126 /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
127 uint32_t magic;
129 /** True iff we got a successful request. */
130 uint8_t success;
132 /** Are we timing this request? */
133 unsigned int timed : 1;
134 /** What handshake type was the request? (Used for timing) */
135 uint16_t handshake_type;
136 /** When did we send the request to the cpuworker? */
137 struct timeval started_at;
138 /** Once the cpuworker received the request, how many microseconds did it
139 * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
140 * and we'll never have an onion handshake that takes so long.) */
141 uint32_t n_usec;
143 /** Output of processing a create cell
145 * @{
147 /** The created cell to send back. */
148 created_cell_t created_cell;
149 /** The keys to use on this circuit. */
150 uint8_t keys[CPATH_KEY_MATERIAL_LEN];
151 /** Input to use for authenticating introduce1 cells. */
152 uint8_t rend_auth_material[DIGEST_LEN];
153 } cpuworker_reply_t;
155 typedef struct cpuworker_job_u {
156 or_circuit_t *circ;
157 union {
158 cpuworker_request_t request;
159 cpuworker_reply_t reply;
160 } u;
161 } cpuworker_job_t;
163 static workqueue_reply_t
164 update_state_threadfn(void *state_, void *work_)
166 worker_state_t *state = state_;
167 worker_state_t *update = work_;
168 server_onion_keys_free(state->onion_keys);
169 state->onion_keys = update->onion_keys;
170 update->onion_keys = NULL;
171 worker_state_free(update);
172 ++state->generation;
173 return WQ_RPL_REPLY;
176 /** Called when the onion key has changed so update all CPU worker(s) with
177 * new function pointers with which a new state will be generated.
179 void
180 cpuworkers_rotate_keyinfo(void)
182 if (!threadpool) {
183 /* If we're a client, then we won't have cpuworkers, and we won't need
184 * to tell them to rotate their state.
186 return;
188 if (threadpool_queue_update(threadpool,
189 worker_state_new,
190 update_state_threadfn,
191 worker_state_free,
192 NULL)) {
193 log_warn(LD_OR, "Failed to queue key update for worker threads.");
197 /** Indexed by handshake type: how many onionskins have we processed and
198 * counted of that type? */
199 static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
200 /** Indexed by handshake type, corresponding to the onionskins counted in
201 * onionskins_n_processed: how many microseconds have we spent in cpuworkers
202 * processing that kind of onionskin? */
203 static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
204 /** Indexed by handshake type, corresponding to onionskins counted in
205 * onionskins_n_processed: how many microseconds have we spent waiting for
206 * cpuworkers to give us answers for that kind of onionskin?
208 static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
210 /** If any onionskin takes longer than this, we clip them to this
211 * time. (microseconds) */
212 #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
214 /** Return true iff we'd like to measure a handshake of type
215 * <b>onionskin_type</b>. Call only from the main thread. */
216 static int
217 should_time_request(uint16_t onionskin_type)
219 /* If we've never heard of this type, we shouldn't even be here. */
220 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
221 return 0;
222 /* Measure the first N handshakes of each type, to ensure we have a
223 * sample */
224 if (onionskins_n_processed[onionskin_type] < 4096)
225 return 1;
226 /** Otherwise, measure with P=1/128. We avoid doing this for every
227 * handshake, since the measurement itself can take a little time. */
228 return tor_weak_random_one_in_n(&request_sample_rng, 128);
231 /** Return an estimate of how many microseconds we will need for a single
232 * cpuworker to to process <b>n_requests</b> onionskins of type
233 * <b>onionskin_type</b>. */
234 uint64_t
235 estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
237 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
238 return 1000 * (uint64_t)n_requests;
239 if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
240 /* Until we have 100 data points, just asssume everything takes 1 msec. */
241 return 1000 * (uint64_t)n_requests;
242 } else {
243 /* This can't overflow: we'll never have more than 500000 onionskins
244 * measured in onionskin_usec_internal, and they won't take anything near
245 * 1 sec each, and we won't have anything like 1 million queued
246 * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
247 * UINT64_MAX. */
248 return (onionskins_usec_internal[onionskin_type] * n_requests) /
249 onionskins_n_processed[onionskin_type];
253 /** Compute the absolute and relative overhead of using the cpuworker
254 * framework for onionskins of type <b>onionskin_type</b>.*/
255 static int
256 get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
257 uint16_t onionskin_type)
259 uint64_t overhead;
261 *usec_out = 0;
262 *frac_out = 0.0;
264 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
265 return -1;
266 if (onionskins_n_processed[onionskin_type] == 0 ||
267 onionskins_usec_internal[onionskin_type] == 0 ||
268 onionskins_usec_roundtrip[onionskin_type] == 0)
269 return -1;
271 overhead = onionskins_usec_roundtrip[onionskin_type] -
272 onionskins_usec_internal[onionskin_type];
274 *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
275 *frac_out = U64_TO_DBL(overhead) / onionskins_usec_internal[onionskin_type];
277 return 0;
280 /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
281 * log it. */
282 void
283 cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
284 const char *onionskin_type_name)
286 uint32_t overhead;
287 double relative_overhead;
288 int r;
290 r = get_overhead_for_onionskins(&overhead, &relative_overhead,
291 onionskin_type);
292 if (!overhead || r<0)
293 return;
295 log_fn(severity, LD_OR,
296 "%s onionskins have averaged %u usec overhead (%.2f%%) in "
297 "cpuworker code ",
298 onionskin_type_name, (unsigned)overhead, relative_overhead*100);
301 /** Handle a reply from the worker threads. */
302 static void
303 cpuworker_onion_handshake_replyfn(void *work_)
305 cpuworker_job_t *job = work_;
306 cpuworker_reply_t rpl;
307 or_circuit_t *circ = NULL;
309 tor_assert(total_pending_tasks > 0);
310 --total_pending_tasks;
312 /* Could avoid this, but doesn't matter. */
313 memcpy(&rpl, &job->u.reply, sizeof(rpl));
315 tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
317 if (rpl.timed && rpl.success &&
318 rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
319 /* Time how long this request took. The handshake_type check should be
320 needless, but let's leave it in to be safe. */
321 struct timeval tv_end, tv_diff;
322 int64_t usec_roundtrip;
323 tor_gettimeofday(&tv_end);
324 timersub(&tv_end, &rpl.started_at, &tv_diff);
325 usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
326 if (usec_roundtrip >= 0 &&
327 usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
328 ++onionskins_n_processed[rpl.handshake_type];
329 onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
330 onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
331 if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
332 /* Scale down every 500000 handshakes. On a busy server, that's
333 * less impressive than it sounds. */
334 onionskins_n_processed[rpl.handshake_type] /= 2;
335 onionskins_usec_internal[rpl.handshake_type] /= 2;
336 onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
341 circ = job->circ;
343 log_debug(LD_OR,
344 "Unpacking cpuworker reply %p, circ=%p, success=%d",
345 job, circ, rpl.success);
347 if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
348 /* The circuit was supposed to get freed while the reply was
349 * pending. Instead, it got left for us to free so that we wouldn't freak
350 * out when the job->circ field wound up pointing to nothing. */
351 log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
352 circ->base_.magic = 0;
353 tor_free(circ);
354 goto done_processing;
357 circ->workqueue_entry = NULL;
359 if (TO_CIRCUIT(circ)->marked_for_close) {
360 /* We already marked this circuit; we can't call it open. */
361 log_debug(LD_OR,"circuit is already marked.");
362 goto done_processing;
365 if (rpl.success == 0) {
366 log_debug(LD_OR,
367 "decoding onionskin failed. "
368 "(Old key or bad software.) Closing.");
369 circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
370 goto done_processing;
373 if (onionskin_answer(circ,
374 &rpl.created_cell,
375 (const char*)rpl.keys,
376 rpl.rend_auth_material) < 0) {
377 log_warn(LD_OR,"onionskin_answer failed. Closing.");
378 circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
379 goto done_processing;
381 log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
383 done_processing:
384 memwipe(&rpl, 0, sizeof(rpl));
385 memwipe(job, 0, sizeof(*job));
386 tor_free(job);
387 queue_pending_tasks();
390 /** Implementation function for onion handshake requests. */
391 static workqueue_reply_t
392 cpuworker_onion_handshake_threadfn(void *state_, void *work_)
394 worker_state_t *state = state_;
395 cpuworker_job_t *job = work_;
397 /* variables for onion processing */
398 server_onion_keys_t *onion_keys = state->onion_keys;
399 cpuworker_request_t req;
400 cpuworker_reply_t rpl;
402 memcpy(&req, &job->u.request, sizeof(req));
404 tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
405 memset(&rpl, 0, sizeof(rpl));
407 const create_cell_t *cc = &req.create_cell;
408 created_cell_t *cell_out = &rpl.created_cell;
409 struct timeval tv_start = {0,0}, tv_end;
410 int n;
411 rpl.timed = req.timed;
412 rpl.started_at = req.started_at;
413 rpl.handshake_type = cc->handshake_type;
414 if (req.timed)
415 tor_gettimeofday(&tv_start);
416 n = onion_skin_server_handshake(cc->handshake_type,
417 cc->onionskin, cc->handshake_len,
418 onion_keys,
419 cell_out->reply,
420 rpl.keys, CPATH_KEY_MATERIAL_LEN,
421 rpl.rend_auth_material);
422 if (n < 0) {
423 /* failure */
424 log_debug(LD_OR,"onion_skin_server_handshake failed.");
425 memset(&rpl, 0, sizeof(rpl));
426 rpl.success = 0;
427 } else {
428 /* success */
429 log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
430 cell_out->handshake_len = n;
431 switch (cc->cell_type) {
432 case CELL_CREATE:
433 cell_out->cell_type = CELL_CREATED; break;
434 case CELL_CREATE2:
435 cell_out->cell_type = CELL_CREATED2; break;
436 case CELL_CREATE_FAST:
437 cell_out->cell_type = CELL_CREATED_FAST; break;
438 default:
439 tor_assert(0);
440 return WQ_RPL_SHUTDOWN;
442 rpl.success = 1;
444 rpl.magic = CPUWORKER_REPLY_MAGIC;
445 if (req.timed) {
446 struct timeval tv_diff;
447 int64_t usec;
448 tor_gettimeofday(&tv_end);
449 timersub(&tv_end, &tv_start, &tv_diff);
450 usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
451 if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
452 rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
453 else
454 rpl.n_usec = (uint32_t) usec;
457 memcpy(&job->u.reply, &rpl, sizeof(rpl));
459 memwipe(&req, 0, sizeof(req));
460 memwipe(&rpl, 0, sizeof(req));
461 return WQ_RPL_REPLY;
464 /** Take pending tasks from the queue and assign them to cpuworkers. */
465 static void
466 queue_pending_tasks(void)
468 or_circuit_t *circ;
469 create_cell_t *onionskin = NULL;
471 while (total_pending_tasks < max_pending_tasks) {
472 circ = onion_next_task(&onionskin);
474 if (!circ)
475 return;
477 if (assign_onionskin_to_cpuworker(circ, onionskin))
478 log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
482 /** Try to tell a cpuworker to perform the public key operations necessary to
483 * respond to <b>onionskin</b> for the circuit <b>circ</b>.
485 * Return 0 if we successfully assign the task, or -1 on failure.
488 assign_onionskin_to_cpuworker(or_circuit_t *circ,
489 create_cell_t *onionskin)
491 workqueue_entry_t *queue_entry;
492 cpuworker_job_t *job;
493 cpuworker_request_t req;
494 int should_time;
496 tor_assert(threadpool);
498 if (!circ->p_chan) {
499 log_info(LD_OR,"circ->p_chan gone. Failing circ.");
500 tor_free(onionskin);
501 return -1;
504 if (total_pending_tasks >= max_pending_tasks) {
505 log_debug(LD_OR,"No idle cpuworkers. Queuing.");
506 if (onion_pending_add(circ, onionskin) < 0) {
507 tor_free(onionskin);
508 return -1;
510 return 0;
513 if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
514 rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
516 should_time = should_time_request(onionskin->handshake_type);
517 memset(&req, 0, sizeof(req));
518 req.magic = CPUWORKER_REQUEST_MAGIC;
519 req.timed = should_time;
521 memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
523 tor_free(onionskin);
525 if (should_time)
526 tor_gettimeofday(&req.started_at);
528 job = tor_malloc_zero(sizeof(cpuworker_job_t));
529 job->circ = circ;
530 memcpy(&job->u.request, &req, sizeof(req));
531 memwipe(&req, 0, sizeof(req));
533 ++total_pending_tasks;
534 queue_entry = threadpool_queue_work(threadpool,
535 cpuworker_onion_handshake_threadfn,
536 cpuworker_onion_handshake_replyfn,
537 job);
538 if (!queue_entry) {
539 log_warn(LD_BUG, "Couldn't queue work on threadpool");
540 tor_free(job);
541 return -1;
544 log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
545 job, queue_entry, job->circ);
547 circ->workqueue_entry = queue_entry;
549 return 0;
552 /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
553 * remove it from the worker queue. */
554 void
555 cpuworker_cancel_circ_handshake(or_circuit_t *circ)
557 cpuworker_job_t *job;
558 if (circ->workqueue_entry == NULL)
559 return;
561 job = workqueue_entry_cancel(circ->workqueue_entry);
562 if (job) {
563 /* It successfully cancelled. */
564 memwipe(job, 0xe0, sizeof(*job));
565 tor_free(job);
566 tor_assert(total_pending_tasks > 0);
567 --total_pending_tasks;
568 /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
569 circ->workqueue_entry = NULL;