1 /* Copyright (c) 2003-2004, Roger Dingledine.
2 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
3 * Copyright (c) 2007-2017, The Tor Project, Inc. */
4 /* See LICENSE for licensing information */
8 * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
11 * The multithreading backend for this module is in workqueue.c; this module
12 * specializes workqueue.c.
14 * Right now, we use this infrastructure
15 * <ul><li>for processing onionskins in onion.c
16 * <li>for compressing consensuses in consdiffmgr.c,
17 * <li>and for calculating diffs and compressing them in consdiffmgr.c.
22 #include "circuitbuild.h"
23 #include "circuitlist.h"
24 #include "connection_or.h"
26 #include "cpuworker.h"
31 #include "workqueue.h"
33 #include <event2/event.h>
35 static void queue_pending_tasks(void);
37 typedef struct worker_state_s
{
39 server_onion_keys_t
*onion_keys
;
43 worker_state_new(void *arg
)
47 ws
= tor_malloc_zero(sizeof(worker_state_t
));
48 ws
->onion_keys
= server_onion_keys_new();
52 #define worker_state_free(ws) \
53 FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
56 worker_state_free_(worker_state_t
*ws
)
60 server_onion_keys_free(ws
->onion_keys
);
65 worker_state_free_void(void *arg
)
67 worker_state_free_(arg
);
70 static replyqueue_t
*replyqueue
= NULL
;
71 static threadpool_t
*threadpool
= NULL
;
72 static struct event
*reply_event
= NULL
;
74 static tor_weak_rng_t request_sample_rng
= TOR_WEAK_RNG_INIT
;
76 static int total_pending_tasks
= 0;
77 static int max_pending_tasks
= 128;
80 replyqueue_process_cb(evutil_socket_t sock
, short events
, void *arg
)
82 replyqueue_t
*rq
= arg
;
85 replyqueue_process(rq
);
88 /** Initialize the cpuworker subsystem. It is OK to call this more than once
89 * during Tor's lifetime.
95 replyqueue
= replyqueue_new(0);
98 reply_event
= tor_event_new(tor_libevent_get_base(),
99 replyqueue_get_socket(replyqueue
),
101 replyqueue_process_cb
,
103 event_add(reply_event
, NULL
);
107 In our threadpool implementation, half the threads are permissive and
108 half are strict (when it comes to running lower-priority tasks). So we
109 always make sure we have at least two threads, so that there will be at
110 least one thread of each kind.
112 const int n_threads
= get_num_cpus(get_options()) + 1;
113 threadpool
= threadpool_new(n_threads
,
116 worker_state_free_void
,
119 /* Total voodoo. Can we make this more sensible? */
120 max_pending_tasks
= get_num_cpus(get_options()) * 64;
121 crypto_seed_weak_rng(&request_sample_rng
);
124 /** Magic numbers to make sure our cpuworker_requests don't grow any
125 * mis-framing bugs. */
126 #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
127 #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
129 /** A request sent to a cpuworker. */
130 typedef struct cpuworker_request_t
{
131 /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
134 /** Flag: Are we timing this request? */
136 /** If we're timing this request, when was it sent to the cpuworker? */
137 struct timeval started_at
;
139 /** A create cell for the cpuworker to process. */
140 create_cell_t create_cell
;
142 /* Turn the above into a tagged union if needed. */
143 } cpuworker_request_t
;
145 /** A reply sent by a cpuworker. */
146 typedef struct cpuworker_reply_t
{
147 /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
150 /** True iff we got a successful request. */
153 /** Are we timing this request? */
154 unsigned int timed
: 1;
155 /** What handshake type was the request? (Used for timing) */
156 uint16_t handshake_type
;
157 /** When did we send the request to the cpuworker? */
158 struct timeval started_at
;
159 /** Once the cpuworker received the request, how many microseconds did it
160 * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
161 * and we'll never have an onion handshake that takes so long.) */
164 /** Output of processing a create cell
168 /** The created cell to send back. */
169 created_cell_t created_cell
;
170 /** The keys to use on this circuit. */
171 uint8_t keys
[CPATH_KEY_MATERIAL_LEN
];
172 /** Input to use for authenticating introduce1 cells. */
173 uint8_t rend_auth_material
[DIGEST_LEN
];
176 typedef struct cpuworker_job_u
{
179 cpuworker_request_t request
;
180 cpuworker_reply_t reply
;
184 static workqueue_reply_t
185 update_state_threadfn(void *state_
, void *work_
)
187 worker_state_t
*state
= state_
;
188 worker_state_t
*update
= work_
;
189 server_onion_keys_free(state
->onion_keys
);
190 state
->onion_keys
= update
->onion_keys
;
191 update
->onion_keys
= NULL
;
192 worker_state_free(update
);
197 /** Called when the onion key has changed so update all CPU worker(s) with
198 * new function pointers with which a new state will be generated.
201 cpuworkers_rotate_keyinfo(void)
204 /* If we're a client, then we won't have cpuworkers, and we won't need
205 * to tell them to rotate their state.
209 if (threadpool_queue_update(threadpool
,
211 update_state_threadfn
,
212 worker_state_free_void
,
214 log_warn(LD_OR
, "Failed to queue key update for worker threads.");
218 /** Indexed by handshake type: how many onionskins have we processed and
219 * counted of that type? */
220 static uint64_t onionskins_n_processed
[MAX_ONION_HANDSHAKE_TYPE
+1];
221 /** Indexed by handshake type, corresponding to the onionskins counted in
222 * onionskins_n_processed: how many microseconds have we spent in cpuworkers
223 * processing that kind of onionskin? */
224 static uint64_t onionskins_usec_internal
[MAX_ONION_HANDSHAKE_TYPE
+1];
225 /** Indexed by handshake type, corresponding to onionskins counted in
226 * onionskins_n_processed: how many microseconds have we spent waiting for
227 * cpuworkers to give us answers for that kind of onionskin?
229 static uint64_t onionskins_usec_roundtrip
[MAX_ONION_HANDSHAKE_TYPE
+1];
231 /** If any onionskin takes longer than this, we clip them to this
232 * time. (microseconds) */
233 #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
235 /** Return true iff we'd like to measure a handshake of type
236 * <b>onionskin_type</b>. Call only from the main thread. */
238 should_time_request(uint16_t onionskin_type
)
240 /* If we've never heard of this type, we shouldn't even be here. */
241 if (onionskin_type
> MAX_ONION_HANDSHAKE_TYPE
)
243 /* Measure the first N handshakes of each type, to ensure we have a
245 if (onionskins_n_processed
[onionskin_type
] < 4096)
247 /** Otherwise, measure with P=1/128. We avoid doing this for every
248 * handshake, since the measurement itself can take a little time. */
249 return tor_weak_random_one_in_n(&request_sample_rng
, 128);
252 /** Return an estimate of how many microseconds we will need for a single
253 * cpuworker to process <b>n_requests</b> onionskins of type
254 * <b>onionskin_type</b>. */
256 estimated_usec_for_onionskins(uint32_t n_requests
, uint16_t onionskin_type
)
258 if (onionskin_type
> MAX_ONION_HANDSHAKE_TYPE
) /* should be impossible */
259 return 1000 * (uint64_t)n_requests
;
260 if (PREDICT_UNLIKELY(onionskins_n_processed
[onionskin_type
] < 100)) {
261 /* Until we have 100 data points, just asssume everything takes 1 msec. */
262 return 1000 * (uint64_t)n_requests
;
264 /* This can't overflow: we'll never have more than 500000 onionskins
265 * measured in onionskin_usec_internal, and they won't take anything near
266 * 1 sec each, and we won't have anything like 1 million queued
267 * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
269 return (onionskins_usec_internal
[onionskin_type
] * n_requests
) /
270 onionskins_n_processed
[onionskin_type
];
274 /** Compute the absolute and relative overhead of using the cpuworker
275 * framework for onionskins of type <b>onionskin_type</b>.*/
277 get_overhead_for_onionskins(uint32_t *usec_out
, double *frac_out
,
278 uint16_t onionskin_type
)
285 if (onionskin_type
> MAX_ONION_HANDSHAKE_TYPE
) /* should be impossible */
287 if (onionskins_n_processed
[onionskin_type
] == 0 ||
288 onionskins_usec_internal
[onionskin_type
] == 0 ||
289 onionskins_usec_roundtrip
[onionskin_type
] == 0)
292 overhead
= onionskins_usec_roundtrip
[onionskin_type
] -
293 onionskins_usec_internal
[onionskin_type
];
295 *usec_out
= (uint32_t)(overhead
/ onionskins_n_processed
[onionskin_type
]);
296 *frac_out
= U64_TO_DBL(overhead
) / onionskins_usec_internal
[onionskin_type
];
301 /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
304 cpuworker_log_onionskin_overhead(int severity
, int onionskin_type
,
305 const char *onionskin_type_name
)
308 double relative_overhead
;
311 r
= get_overhead_for_onionskins(&overhead
, &relative_overhead
,
313 if (!overhead
|| r
<0)
316 log_fn(severity
, LD_OR
,
317 "%s onionskins have averaged %u usec overhead (%.2f%%) in "
319 onionskin_type_name
, (unsigned)overhead
, relative_overhead
*100);
322 /** Handle a reply from the worker threads. */
324 cpuworker_onion_handshake_replyfn(void *work_
)
326 cpuworker_job_t
*job
= work_
;
327 cpuworker_reply_t rpl
;
328 or_circuit_t
*circ
= NULL
;
330 tor_assert(total_pending_tasks
> 0);
331 --total_pending_tasks
;
333 /* Could avoid this, but doesn't matter. */
334 memcpy(&rpl
, &job
->u
.reply
, sizeof(rpl
));
336 tor_assert(rpl
.magic
== CPUWORKER_REPLY_MAGIC
);
338 if (rpl
.timed
&& rpl
.success
&&
339 rpl
.handshake_type
<= MAX_ONION_HANDSHAKE_TYPE
) {
340 /* Time how long this request took. The handshake_type check should be
341 needless, but let's leave it in to be safe. */
342 struct timeval tv_end
, tv_diff
;
343 int64_t usec_roundtrip
;
344 tor_gettimeofday(&tv_end
);
345 timersub(&tv_end
, &rpl
.started_at
, &tv_diff
);
346 usec_roundtrip
= ((int64_t)tv_diff
.tv_sec
)*1000000 + tv_diff
.tv_usec
;
347 if (usec_roundtrip
>= 0 &&
348 usec_roundtrip
< MAX_BELIEVABLE_ONIONSKIN_DELAY
) {
349 ++onionskins_n_processed
[rpl
.handshake_type
];
350 onionskins_usec_internal
[rpl
.handshake_type
] += rpl
.n_usec
;
351 onionskins_usec_roundtrip
[rpl
.handshake_type
] += usec_roundtrip
;
352 if (onionskins_n_processed
[rpl
.handshake_type
] >= 500000) {
353 /* Scale down every 500000 handshakes. On a busy server, that's
354 * less impressive than it sounds. */
355 onionskins_n_processed
[rpl
.handshake_type
] /= 2;
356 onionskins_usec_internal
[rpl
.handshake_type
] /= 2;
357 onionskins_usec_roundtrip
[rpl
.handshake_type
] /= 2;
365 "Unpacking cpuworker reply %p, circ=%p, success=%d",
366 job
, circ
, rpl
.success
);
368 if (circ
->base_
.magic
== DEAD_CIRCUIT_MAGIC
) {
369 /* The circuit was supposed to get freed while the reply was
370 * pending. Instead, it got left for us to free so that we wouldn't freak
371 * out when the job->circ field wound up pointing to nothing. */
372 log_debug(LD_OR
, "Circuit died while reply was pending. Freeing memory.");
373 circ
->base_
.magic
= 0;
375 goto done_processing
;
378 circ
->workqueue_entry
= NULL
;
380 if (TO_CIRCUIT(circ
)->marked_for_close
) {
381 /* We already marked this circuit; we can't call it open. */
382 log_debug(LD_OR
,"circuit is already marked.");
383 goto done_processing
;
386 if (rpl
.success
== 0) {
388 "decoding onionskin failed. "
389 "(Old key or bad software.) Closing.");
390 circuit_mark_for_close(TO_CIRCUIT(circ
), END_CIRC_REASON_TORPROTOCOL
);
391 goto done_processing
;
394 if (onionskin_answer(circ
,
396 (const char*)rpl
.keys
, sizeof(rpl
.keys
),
397 rpl
.rend_auth_material
) < 0) {
398 log_warn(LD_OR
,"onionskin_answer failed. Closing.");
399 circuit_mark_for_close(TO_CIRCUIT(circ
), END_CIRC_REASON_INTERNAL
);
400 goto done_processing
;
402 log_debug(LD_OR
,"onionskin_answer succeeded. Yay.");
405 memwipe(&rpl
, 0, sizeof(rpl
));
406 memwipe(job
, 0, sizeof(*job
));
408 queue_pending_tasks();
411 /** Implementation function for onion handshake requests. */
412 static workqueue_reply_t
413 cpuworker_onion_handshake_threadfn(void *state_
, void *work_
)
415 worker_state_t
*state
= state_
;
416 cpuworker_job_t
*job
= work_
;
418 /* variables for onion processing */
419 server_onion_keys_t
*onion_keys
= state
->onion_keys
;
420 cpuworker_request_t req
;
421 cpuworker_reply_t rpl
;
423 memcpy(&req
, &job
->u
.request
, sizeof(req
));
425 tor_assert(req
.magic
== CPUWORKER_REQUEST_MAGIC
);
426 memset(&rpl
, 0, sizeof(rpl
));
428 const create_cell_t
*cc
= &req
.create_cell
;
429 created_cell_t
*cell_out
= &rpl
.created_cell
;
430 struct timeval tv_start
= {0,0}, tv_end
;
432 rpl
.timed
= req
.timed
;
433 rpl
.started_at
= req
.started_at
;
434 rpl
.handshake_type
= cc
->handshake_type
;
436 tor_gettimeofday(&tv_start
);
437 n
= onion_skin_server_handshake(cc
->handshake_type
,
438 cc
->onionskin
, cc
->handshake_len
,
441 rpl
.keys
, CPATH_KEY_MATERIAL_LEN
,
442 rpl
.rend_auth_material
);
445 log_debug(LD_OR
,"onion_skin_server_handshake failed.");
446 memset(&rpl
, 0, sizeof(rpl
));
450 log_debug(LD_OR
,"onion_skin_server_handshake succeeded.");
451 cell_out
->handshake_len
= n
;
452 switch (cc
->cell_type
) {
454 cell_out
->cell_type
= CELL_CREATED
; break;
456 cell_out
->cell_type
= CELL_CREATED2
; break;
457 case CELL_CREATE_FAST
:
458 cell_out
->cell_type
= CELL_CREATED_FAST
; break;
461 return WQ_RPL_SHUTDOWN
;
465 rpl
.magic
= CPUWORKER_REPLY_MAGIC
;
467 struct timeval tv_diff
;
469 tor_gettimeofday(&tv_end
);
470 timersub(&tv_end
, &tv_start
, &tv_diff
);
471 usec
= ((int64_t)tv_diff
.tv_sec
)*1000000 + tv_diff
.tv_usec
;
472 if (usec
< 0 || usec
> MAX_BELIEVABLE_ONIONSKIN_DELAY
)
473 rpl
.n_usec
= MAX_BELIEVABLE_ONIONSKIN_DELAY
;
475 rpl
.n_usec
= (uint32_t) usec
;
478 memcpy(&job
->u
.reply
, &rpl
, sizeof(rpl
));
480 memwipe(&req
, 0, sizeof(req
));
481 memwipe(&rpl
, 0, sizeof(req
));
485 /** Take pending tasks from the queue and assign them to cpuworkers. */
487 queue_pending_tasks(void)
490 create_cell_t
*onionskin
= NULL
;
492 while (total_pending_tasks
< max_pending_tasks
) {
493 circ
= onion_next_task(&onionskin
);
498 if (assign_onionskin_to_cpuworker(circ
, onionskin
) < 0)
499 log_info(LD_OR
,"assign_to_cpuworker failed. Ignoring.");
504 MOCK_IMPL(workqueue_entry_t
*,
505 cpuworker_queue_work
,(workqueue_priority_t priority
,
506 workqueue_reply_t (*fn
)(void *, void *),
507 void (*reply_fn
)(void *),
510 tor_assert(threadpool
);
512 return threadpool_queue_work_priority(threadpool
,
519 /** Try to tell a cpuworker to perform the public key operations necessary to
520 * respond to <b>onionskin</b> for the circuit <b>circ</b>.
522 * Return 0 if we successfully assign the task, or -1 on failure.
525 assign_onionskin_to_cpuworker(or_circuit_t
*circ
,
526 create_cell_t
*onionskin
)
528 workqueue_entry_t
*queue_entry
;
529 cpuworker_job_t
*job
;
530 cpuworker_request_t req
;
533 tor_assert(threadpool
);
536 log_info(LD_OR
,"circ->p_chan gone. Failing circ.");
541 if (total_pending_tasks
>= max_pending_tasks
) {
542 log_debug(LD_OR
,"No idle cpuworkers. Queuing.");
543 if (onion_pending_add(circ
, onionskin
) < 0) {
550 if (!channel_is_client(circ
->p_chan
))
551 rep_hist_note_circuit_handshake_assigned(onionskin
->handshake_type
);
553 should_time
= should_time_request(onionskin
->handshake_type
);
554 memset(&req
, 0, sizeof(req
));
555 req
.magic
= CPUWORKER_REQUEST_MAGIC
;
556 req
.timed
= should_time
;
558 memcpy(&req
.create_cell
, onionskin
, sizeof(create_cell_t
));
563 tor_gettimeofday(&req
.started_at
);
565 job
= tor_malloc_zero(sizeof(cpuworker_job_t
));
567 memcpy(&job
->u
.request
, &req
, sizeof(req
));
568 memwipe(&req
, 0, sizeof(req
));
570 ++total_pending_tasks
;
571 queue_entry
= threadpool_queue_work_priority(threadpool
,
573 cpuworker_onion_handshake_threadfn
,
574 cpuworker_onion_handshake_replyfn
,
577 log_warn(LD_BUG
, "Couldn't queue work on threadpool");
582 log_debug(LD_OR
, "Queued task %p (qe=%p, circ=%p)",
583 job
, queue_entry
, job
->circ
);
585 circ
->workqueue_entry
= queue_entry
;
590 /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
591 * remove it from the worker queue. */
593 cpuworker_cancel_circ_handshake(or_circuit_t
*circ
)
595 cpuworker_job_t
*job
;
596 if (circ
->workqueue_entry
== NULL
)
599 job
= workqueue_entry_cancel(circ
->workqueue_entry
);
601 /* It successfully cancelled. */
602 memwipe(job
, 0xe0, sizeof(*job
));
604 tor_assert(total_pending_tasks
> 0);
605 --total_pending_tasks
;
606 /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
607 circ
->workqueue_entry
= NULL
;