1 /* Copyright (c) 2003-2004, Roger Dingledine.
2 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
3 * Copyright (c) 2007-2016, The Tor Project, Inc. */
4 /* See LICENSE for licensing information */
8 * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
11 * The multithreading backend for this module is in workqueue.c; this module
12 * specializes workqueue.c.
14 * Right now, we only use this for processing onionskins, and invoke it mostly
19 #include "circuitbuild.h"
20 #include "circuitlist.h"
21 #include "connection_or.h"
23 #include "cpuworker.h"
28 #include "workqueue.h"
30 #include <event2/event.h>
32 static void queue_pending_tasks(void);
34 typedef struct worker_state_s
{
36 server_onion_keys_t
*onion_keys
;
40 worker_state_new(void *arg
)
44 ws
= tor_malloc_zero(sizeof(worker_state_t
));
45 ws
->onion_keys
= server_onion_keys_new();
49 worker_state_free(void *arg
)
51 worker_state_t
*ws
= arg
;
52 server_onion_keys_free(ws
->onion_keys
);
56 static replyqueue_t
*replyqueue
= NULL
;
57 static threadpool_t
*threadpool
= NULL
;
58 static struct event
*reply_event
= NULL
;
60 static tor_weak_rng_t request_sample_rng
= TOR_WEAK_RNG_INIT
;
62 static int total_pending_tasks
= 0;
63 static int max_pending_tasks
= 128;
66 replyqueue_process_cb(evutil_socket_t sock
, short events
, void *arg
)
68 replyqueue_t
*rq
= arg
;
71 replyqueue_process(rq
);
74 /** Initialize the cpuworker subsystem. It is OK to call this more than once
75 * during Tor's lifetime.
81 replyqueue
= replyqueue_new(0);
84 reply_event
= tor_event_new(tor_libevent_get_base(),
85 replyqueue_get_socket(replyqueue
),
87 replyqueue_process_cb
,
89 event_add(reply_event
, NULL
);
92 threadpool
= threadpool_new(get_num_cpus(get_options()),
98 /* Total voodoo. Can we make this more sensible? */
99 max_pending_tasks
= get_num_cpus(get_options()) * 64;
100 crypto_seed_weak_rng(&request_sample_rng
);
103 /** Magic numbers to make sure our cpuworker_requests don't grow any
104 * mis-framing bugs. */
105 #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
106 #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
108 /** A request sent to a cpuworker. */
109 typedef struct cpuworker_request_t
{
110 /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
113 /** Flag: Are we timing this request? */
115 /** If we're timing this request, when was it sent to the cpuworker? */
116 struct timeval started_at
;
118 /** A create cell for the cpuworker to process. */
119 create_cell_t create_cell
;
121 /* Turn the above into a tagged union if needed. */
122 } cpuworker_request_t
;
124 /** A reply sent by a cpuworker. */
125 typedef struct cpuworker_reply_t
{
126 /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
129 /** True iff we got a successful request. */
132 /** Are we timing this request? */
133 unsigned int timed
: 1;
134 /** What handshake type was the request? (Used for timing) */
135 uint16_t handshake_type
;
136 /** When did we send the request to the cpuworker? */
137 struct timeval started_at
;
138 /** Once the cpuworker received the request, how many microseconds did it
139 * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
140 * and we'll never have an onion handshake that takes so long.) */
143 /** Output of processing a create cell
147 /** The created cell to send back. */
148 created_cell_t created_cell
;
149 /** The keys to use on this circuit. */
150 uint8_t keys
[CPATH_KEY_MATERIAL_LEN
];
151 /** Input to use for authenticating introduce1 cells. */
152 uint8_t rend_auth_material
[DIGEST_LEN
];
155 typedef struct cpuworker_job_u
{
158 cpuworker_request_t request
;
159 cpuworker_reply_t reply
;
163 static workqueue_reply_t
164 update_state_threadfn(void *state_
, void *work_
)
166 worker_state_t
*state
= state_
;
167 worker_state_t
*update
= work_
;
168 server_onion_keys_free(state
->onion_keys
);
169 state
->onion_keys
= update
->onion_keys
;
170 update
->onion_keys
= NULL
;
171 worker_state_free(update
);
176 /** Called when the onion key has changed so update all CPU worker(s) with
177 * new function pointers with which a new state will be generated.
180 cpuworkers_rotate_keyinfo(void)
183 /* If we're a client, then we won't have cpuworkers, and we won't need
184 * to tell them to rotate their state.
188 if (threadpool_queue_update(threadpool
,
190 update_state_threadfn
,
193 log_warn(LD_OR
, "Failed to queue key update for worker threads.");
197 /** Indexed by handshake type: how many onionskins have we processed and
198 * counted of that type? */
199 static uint64_t onionskins_n_processed
[MAX_ONION_HANDSHAKE_TYPE
+1];
200 /** Indexed by handshake type, corresponding to the onionskins counted in
201 * onionskins_n_processed: how many microseconds have we spent in cpuworkers
202 * processing that kind of onionskin? */
203 static uint64_t onionskins_usec_internal
[MAX_ONION_HANDSHAKE_TYPE
+1];
204 /** Indexed by handshake type, corresponding to onionskins counted in
205 * onionskins_n_processed: how many microseconds have we spent waiting for
206 * cpuworkers to give us answers for that kind of onionskin?
208 static uint64_t onionskins_usec_roundtrip
[MAX_ONION_HANDSHAKE_TYPE
+1];
210 /** If any onionskin takes longer than this, we clip them to this
211 * time. (microseconds) */
212 #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
214 /** Return true iff we'd like to measure a handshake of type
215 * <b>onionskin_type</b>. Call only from the main thread. */
217 should_time_request(uint16_t onionskin_type
)
219 /* If we've never heard of this type, we shouldn't even be here. */
220 if (onionskin_type
> MAX_ONION_HANDSHAKE_TYPE
)
222 /* Measure the first N handshakes of each type, to ensure we have a
224 if (onionskins_n_processed
[onionskin_type
] < 4096)
226 /** Otherwise, measure with P=1/128. We avoid doing this for every
227 * handshake, since the measurement itself can take a little time. */
228 return tor_weak_random_one_in_n(&request_sample_rng
, 128);
231 /** Return an estimate of how many microseconds we will need for a single
232 * cpuworker to to process <b>n_requests</b> onionskins of type
233 * <b>onionskin_type</b>. */
235 estimated_usec_for_onionskins(uint32_t n_requests
, uint16_t onionskin_type
)
237 if (onionskin_type
> MAX_ONION_HANDSHAKE_TYPE
) /* should be impossible */
238 return 1000 * (uint64_t)n_requests
;
239 if (PREDICT_UNLIKELY(onionskins_n_processed
[onionskin_type
] < 100)) {
240 /* Until we have 100 data points, just asssume everything takes 1 msec. */
241 return 1000 * (uint64_t)n_requests
;
243 /* This can't overflow: we'll never have more than 500000 onionskins
244 * measured in onionskin_usec_internal, and they won't take anything near
245 * 1 sec each, and we won't have anything like 1 million queued
246 * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
248 return (onionskins_usec_internal
[onionskin_type
] * n_requests
) /
249 onionskins_n_processed
[onionskin_type
];
253 /** Compute the absolute and relative overhead of using the cpuworker
254 * framework for onionskins of type <b>onionskin_type</b>.*/
256 get_overhead_for_onionskins(uint32_t *usec_out
, double *frac_out
,
257 uint16_t onionskin_type
)
264 if (onionskin_type
> MAX_ONION_HANDSHAKE_TYPE
) /* should be impossible */
266 if (onionskins_n_processed
[onionskin_type
] == 0 ||
267 onionskins_usec_internal
[onionskin_type
] == 0 ||
268 onionskins_usec_roundtrip
[onionskin_type
] == 0)
271 overhead
= onionskins_usec_roundtrip
[onionskin_type
] -
272 onionskins_usec_internal
[onionskin_type
];
274 *usec_out
= (uint32_t)(overhead
/ onionskins_n_processed
[onionskin_type
]);
275 *frac_out
= U64_TO_DBL(overhead
) / onionskins_usec_internal
[onionskin_type
];
280 /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
283 cpuworker_log_onionskin_overhead(int severity
, int onionskin_type
,
284 const char *onionskin_type_name
)
287 double relative_overhead
;
290 r
= get_overhead_for_onionskins(&overhead
, &relative_overhead
,
292 if (!overhead
|| r
<0)
295 log_fn(severity
, LD_OR
,
296 "%s onionskins have averaged %u usec overhead (%.2f%%) in "
298 onionskin_type_name
, (unsigned)overhead
, relative_overhead
*100);
301 /** Handle a reply from the worker threads. */
303 cpuworker_onion_handshake_replyfn(void *work_
)
305 cpuworker_job_t
*job
= work_
;
306 cpuworker_reply_t rpl
;
307 or_circuit_t
*circ
= NULL
;
309 tor_assert(total_pending_tasks
> 0);
310 --total_pending_tasks
;
312 /* Could avoid this, but doesn't matter. */
313 memcpy(&rpl
, &job
->u
.reply
, sizeof(rpl
));
315 tor_assert(rpl
.magic
== CPUWORKER_REPLY_MAGIC
);
317 if (rpl
.timed
&& rpl
.success
&&
318 rpl
.handshake_type
<= MAX_ONION_HANDSHAKE_TYPE
) {
319 /* Time how long this request took. The handshake_type check should be
320 needless, but let's leave it in to be safe. */
321 struct timeval tv_end
, tv_diff
;
322 int64_t usec_roundtrip
;
323 tor_gettimeofday(&tv_end
);
324 timersub(&tv_end
, &rpl
.started_at
, &tv_diff
);
325 usec_roundtrip
= ((int64_t)tv_diff
.tv_sec
)*1000000 + tv_diff
.tv_usec
;
326 if (usec_roundtrip
>= 0 &&
327 usec_roundtrip
< MAX_BELIEVABLE_ONIONSKIN_DELAY
) {
328 ++onionskins_n_processed
[rpl
.handshake_type
];
329 onionskins_usec_internal
[rpl
.handshake_type
] += rpl
.n_usec
;
330 onionskins_usec_roundtrip
[rpl
.handshake_type
] += usec_roundtrip
;
331 if (onionskins_n_processed
[rpl
.handshake_type
] >= 500000) {
332 /* Scale down every 500000 handshakes. On a busy server, that's
333 * less impressive than it sounds. */
334 onionskins_n_processed
[rpl
.handshake_type
] /= 2;
335 onionskins_usec_internal
[rpl
.handshake_type
] /= 2;
336 onionskins_usec_roundtrip
[rpl
.handshake_type
] /= 2;
344 "Unpacking cpuworker reply %p, circ=%p, success=%d",
345 job
, circ
, rpl
.success
);
347 if (circ
->base_
.magic
== DEAD_CIRCUIT_MAGIC
) {
348 /* The circuit was supposed to get freed while the reply was
349 * pending. Instead, it got left for us to free so that we wouldn't freak
350 * out when the job->circ field wound up pointing to nothing. */
351 log_debug(LD_OR
, "Circuit died while reply was pending. Freeing memory.");
352 circ
->base_
.magic
= 0;
354 goto done_processing
;
357 circ
->workqueue_entry
= NULL
;
359 if (TO_CIRCUIT(circ
)->marked_for_close
) {
360 /* We already marked this circuit; we can't call it open. */
361 log_debug(LD_OR
,"circuit is already marked.");
362 goto done_processing
;
365 if (rpl
.success
== 0) {
367 "decoding onionskin failed. "
368 "(Old key or bad software.) Closing.");
369 circuit_mark_for_close(TO_CIRCUIT(circ
), END_CIRC_REASON_TORPROTOCOL
);
370 goto done_processing
;
373 if (onionskin_answer(circ
,
375 (const char*)rpl
.keys
,
376 rpl
.rend_auth_material
) < 0) {
377 log_warn(LD_OR
,"onionskin_answer failed. Closing.");
378 circuit_mark_for_close(TO_CIRCUIT(circ
), END_CIRC_REASON_INTERNAL
);
379 goto done_processing
;
381 log_debug(LD_OR
,"onionskin_answer succeeded. Yay.");
384 memwipe(&rpl
, 0, sizeof(rpl
));
385 memwipe(job
, 0, sizeof(*job
));
387 queue_pending_tasks();
390 /** Implementation function for onion handshake requests. */
391 static workqueue_reply_t
392 cpuworker_onion_handshake_threadfn(void *state_
, void *work_
)
394 worker_state_t
*state
= state_
;
395 cpuworker_job_t
*job
= work_
;
397 /* variables for onion processing */
398 server_onion_keys_t
*onion_keys
= state
->onion_keys
;
399 cpuworker_request_t req
;
400 cpuworker_reply_t rpl
;
402 memcpy(&req
, &job
->u
.request
, sizeof(req
));
404 tor_assert(req
.magic
== CPUWORKER_REQUEST_MAGIC
);
405 memset(&rpl
, 0, sizeof(rpl
));
407 const create_cell_t
*cc
= &req
.create_cell
;
408 created_cell_t
*cell_out
= &rpl
.created_cell
;
409 struct timeval tv_start
= {0,0}, tv_end
;
411 rpl
.timed
= req
.timed
;
412 rpl
.started_at
= req
.started_at
;
413 rpl
.handshake_type
= cc
->handshake_type
;
415 tor_gettimeofday(&tv_start
);
416 n
= onion_skin_server_handshake(cc
->handshake_type
,
417 cc
->onionskin
, cc
->handshake_len
,
420 rpl
.keys
, CPATH_KEY_MATERIAL_LEN
,
421 rpl
.rend_auth_material
);
424 log_debug(LD_OR
,"onion_skin_server_handshake failed.");
425 memset(&rpl
, 0, sizeof(rpl
));
429 log_debug(LD_OR
,"onion_skin_server_handshake succeeded.");
430 cell_out
->handshake_len
= n
;
431 switch (cc
->cell_type
) {
433 cell_out
->cell_type
= CELL_CREATED
; break;
435 cell_out
->cell_type
= CELL_CREATED2
; break;
436 case CELL_CREATE_FAST
:
437 cell_out
->cell_type
= CELL_CREATED_FAST
; break;
440 return WQ_RPL_SHUTDOWN
;
444 rpl
.magic
= CPUWORKER_REPLY_MAGIC
;
446 struct timeval tv_diff
;
448 tor_gettimeofday(&tv_end
);
449 timersub(&tv_end
, &tv_start
, &tv_diff
);
450 usec
= ((int64_t)tv_diff
.tv_sec
)*1000000 + tv_diff
.tv_usec
;
451 if (usec
< 0 || usec
> MAX_BELIEVABLE_ONIONSKIN_DELAY
)
452 rpl
.n_usec
= MAX_BELIEVABLE_ONIONSKIN_DELAY
;
454 rpl
.n_usec
= (uint32_t) usec
;
457 memcpy(&job
->u
.reply
, &rpl
, sizeof(rpl
));
459 memwipe(&req
, 0, sizeof(req
));
460 memwipe(&rpl
, 0, sizeof(req
));
464 /** Take pending tasks from the queue and assign them to cpuworkers. */
466 queue_pending_tasks(void)
469 create_cell_t
*onionskin
= NULL
;
471 while (total_pending_tasks
< max_pending_tasks
) {
472 circ
= onion_next_task(&onionskin
);
477 if (assign_onionskin_to_cpuworker(circ
, onionskin
))
478 log_warn(LD_OR
,"assign_to_cpuworker failed. Ignoring.");
482 /** Try to tell a cpuworker to perform the public key operations necessary to
483 * respond to <b>onionskin</b> for the circuit <b>circ</b>.
485 * Return 0 if we successfully assign the task, or -1 on failure.
488 assign_onionskin_to_cpuworker(or_circuit_t
*circ
,
489 create_cell_t
*onionskin
)
491 workqueue_entry_t
*queue_entry
;
492 cpuworker_job_t
*job
;
493 cpuworker_request_t req
;
496 tor_assert(threadpool
);
499 log_info(LD_OR
,"circ->p_chan gone. Failing circ.");
504 if (total_pending_tasks
>= max_pending_tasks
) {
505 log_debug(LD_OR
,"No idle cpuworkers. Queuing.");
506 if (onion_pending_add(circ
, onionskin
) < 0) {
513 if (connection_or_digest_is_known_relay(circ
->p_chan
->identity_digest
))
514 rep_hist_note_circuit_handshake_assigned(onionskin
->handshake_type
);
516 should_time
= should_time_request(onionskin
->handshake_type
);
517 memset(&req
, 0, sizeof(req
));
518 req
.magic
= CPUWORKER_REQUEST_MAGIC
;
519 req
.timed
= should_time
;
521 memcpy(&req
.create_cell
, onionskin
, sizeof(create_cell_t
));
526 tor_gettimeofday(&req
.started_at
);
528 job
= tor_malloc_zero(sizeof(cpuworker_job_t
));
530 memcpy(&job
->u
.request
, &req
, sizeof(req
));
531 memwipe(&req
, 0, sizeof(req
));
533 ++total_pending_tasks
;
534 queue_entry
= threadpool_queue_work(threadpool
,
535 cpuworker_onion_handshake_threadfn
,
536 cpuworker_onion_handshake_replyfn
,
539 log_warn(LD_BUG
, "Couldn't queue work on threadpool");
544 log_debug(LD_OR
, "Queued task %p (qe=%p, circ=%p)",
545 job
, queue_entry
, job
->circ
);
547 circ
->workqueue_entry
= queue_entry
;
552 /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
553 * remove it from the worker queue. */
555 cpuworker_cancel_circ_handshake(or_circuit_t
*circ
)
557 cpuworker_job_t
*job
;
558 if (circ
->workqueue_entry
== NULL
)
561 job
= workqueue_entry_cancel(circ
->workqueue_entry
);
563 /* It successfully cancelled. */
564 memwipe(job
, 0xe0, sizeof(*job
));
566 tor_assert(total_pending_tasks
> 0);
567 --total_pending_tasks
;
568 /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
569 circ
->workqueue_entry
= NULL
;