2 /* copyright (c) 2013-2015, The Tor Project, Inc. */
3 /* See LICENSE for licensing information */
8 * \brief Implements worker threads, queues of work for them, and mechanisms
9 * for them to send answers back to the main thread.
11 * The main structure here is a threadpool_t : it manages a set of worker
12 * threads, a queue of pending work, and a reply queue. Every piece of work
13 * is a workqueue_entry_t, containing data to process and a function to
16 * The main thread informs the worker threads of pending work by using a
17 * condition variable. The workers inform the main process of completed work
18 * by using an alert_sockets_t object, as implemented in net/alertsock.c.
20 * The main thread can also queue an "update" that will be handled by all the
21 * workers. This is useful for updating state that all the workers share.
23 * In Tor today, there is currently only one thread pool, used in cpuworker.c.
27 #include "lib/evloop/compat_libevent.h"
28 #include "lib/evloop/workqueue.h"
30 #include "lib/crypt_ops/crypto_rand.h"
31 #include "lib/intmath/weakrng.h"
32 #include "lib/log/ratelim.h"
33 #include "lib/log/log.h"
34 #include "lib/log/util_bug.h"
35 #include "lib/net/alertsock.h"
36 #include "lib/net/socket.h"
37 #include "lib/thread/threads.h"
39 #include "ext/tor_queue.h"
40 #include <event2/event.h>
43 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
44 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
45 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
47 TOR_TAILQ_HEAD(work_tailq_t
, workqueue_entry_t
);
48 typedef struct work_tailq_t work_tailq_t
;
51 /** An array of pointers to workerthread_t: one for each running worker
53 struct workerthread_t
**threads
;
55 /** Condition variable that we wait on when we have no work, and which
56 * gets signaled when our queue becomes nonempty. */
58 /** Queues of pending work that we have to do. The queue with priority
59 * <b>p</b> is work[p]. */
60 work_tailq_t work
[WORKQUEUE_N_PRIORITIES
];
62 /** The current 'update generation' of the threadpool. Any thread that is
63 * at an earlier generation needs to run the update function. */
66 /** Function that should be run for updates on each thread. */
67 workqueue_reply_t (*update_fn
)(void *, void *);
68 /** Function to free update arguments if they can't be run. */
69 void (*free_update_arg_fn
)(void *);
70 /** Array of n_threads update arguments. */
72 /** Event to notice when another thread has sent a reply. */
73 struct event
*reply_event
;
74 void (*reply_cb
)(threadpool_t
*);
76 /** Number of elements in threads. */
78 /** Mutex to protect all the above fields. */
81 /** A reply queue to use when constructing new threads. */
82 replyqueue_t
*reply_queue
;
84 /** Functions used to allocate and free thread state. */
85 void *(*new_thread_state_fn
)(void*);
86 void (*free_thread_state_fn
)(void*);
87 void *new_thread_state_arg
;
90 /** Used to put a workqueue_priority_t value into a bitfield. */
91 #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
92 /** Number of bits needed to hold all legal values of workqueue_priority_t */
93 #define WORKQUEUE_PRIORITY_BITS 2
95 struct workqueue_entry_t
{
96 /** The next workqueue_entry_t that's pending on the same thread or
98 TOR_TAILQ_ENTRY(workqueue_entry_t
) next_work
;
99 /** The threadpool to which this workqueue_entry_t was assigned. This field
100 * is set when the workqueue_entry_t is created, and won't be cleared until
101 * after it's handled in the main thread. */
102 struct threadpool_t
*on_pool
;
103 /** True iff this entry is waiting for a worker to start processing it. */
105 /** Priority of this entry. */
106 workqueue_priority_bitfield_t priority
: WORKQUEUE_PRIORITY_BITS
;
107 /** Function to run in the worker thread. */
108 workqueue_reply_t (*fn
)(void *state
, void *arg
);
109 /** Function to run while processing the reply queue. */
110 void (*reply_fn
)(void *arg
);
111 /** Argument for the above functions. */
115 struct replyqueue_t
{
116 /** Mutex to protect the answers field */
118 /** Doubly-linked list of answers that the reply queue needs to handle. */
119 TOR_TAILQ_HEAD(, workqueue_entry_t
) answers
;
121 /** Mechanism to wake up the main thread when it is receiving answers. */
122 alert_sockets_t alert
;
125 /** A worker thread represents a single thread in a thread pool. */
126 typedef struct workerthread_t
{
127 /** Which thread it this? In range 0..in_pool->n_threads-1 */
129 /** The pool this thread is a part of. */
130 struct threadpool_t
*in_pool
;
131 /** User-supplied state field that we pass to the worker functions of each
134 /** Reply queue to which we pass our results. */
135 replyqueue_t
*reply_queue
;
136 /** The current update generation of this thread */
138 /** One over the probability of taking work from a lower-priority queue. */
139 int32_t lower_priority_chance
;
142 static void queue_reply(replyqueue_t
*queue
, workqueue_entry_t
*work
);
144 /** Allocate and return a new workqueue_entry_t, set up to run the function
145 * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
146 * thread. See threadpool_queue_work() for full documentation. */
147 static workqueue_entry_t
*
148 workqueue_entry_new(workqueue_reply_t (*fn
)(void*, void*),
149 void (*reply_fn
)(void*),
152 workqueue_entry_t
*ent
= tor_malloc_zero(sizeof(workqueue_entry_t
));
154 ent
->reply_fn
= reply_fn
;
156 ent
->priority
= WQ_PRI_HIGH
;
160 #define workqueue_entry_free(ent) \
161 FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
164 * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
168 workqueue_entry_free_(workqueue_entry_t
*ent
)
172 memset(ent
, 0xf0, sizeof(*ent
));
177 * Cancel a workqueue_entry_t that has been returned from
178 * threadpool_queue_work.
180 * You must not call this function on any work whose reply function has been
181 * executed in the main thread; that will cause undefined behavior (probably,
184 * If the work is cancelled, this function return the argument passed to the
185 * work function. It is the caller's responsibility to free this storage.
187 * This function will have no effect if the worker thread has already executed
188 * or begun to execute the work item. In that case, it will return NULL.
191 workqueue_entry_cancel(workqueue_entry_t
*ent
)
195 tor_mutex_acquire(&ent
->on_pool
->lock
);
196 workqueue_priority_t prio
= ent
->priority
;
198 TOR_TAILQ_REMOVE(&ent
->on_pool
->work
[prio
], ent
, next_work
);
202 tor_mutex_release(&ent
->on_pool
->lock
);
205 workqueue_entry_free(ent
);
214 worker_thread_has_work(workerthread_t
*thread
)
217 for (i
= WORKQUEUE_PRIORITY_FIRST
; i
<= WORKQUEUE_PRIORITY_LAST
; ++i
) {
218 if (!TOR_TAILQ_EMPTY(&thread
->in_pool
->work
[i
]))
221 return thread
->generation
!= thread
->in_pool
->generation
;
224 /** Extract the next workqueue_entry_t from the the thread's pool, removing
225 * it from the relevant queues and marking it as non-pending.
227 * The caller must hold the lock. */
228 static workqueue_entry_t
*
229 worker_thread_extract_next_work(workerthread_t
*thread
)
231 threadpool_t
*pool
= thread
->in_pool
;
232 work_tailq_t
*queue
= NULL
, *this_queue
;
234 for (i
= WORKQUEUE_PRIORITY_FIRST
; i
<= WORKQUEUE_PRIORITY_LAST
; ++i
) {
235 this_queue
= &pool
->work
[i
];
236 if (!TOR_TAILQ_EMPTY(this_queue
)) {
238 if (! crypto_fast_rng_one_in_n(get_thread_fast_rng(),
239 thread
->lower_priority_chance
)) {
240 /* Usually we'll just break now, so that we can get out of the loop
241 * and use the queue where we found work. But with a small
242 * probability, we'll keep looking for lower priority work, so that
243 * we don't ignore our low-priority queues entirely. */
252 workqueue_entry_t
*work
= TOR_TAILQ_FIRST(queue
);
253 TOR_TAILQ_REMOVE(queue
, work
, next_work
);
259 * Main function for the worker thread.
262 worker_thread_main(void *thread_
)
264 workerthread_t
*thread
= thread_
;
265 threadpool_t
*pool
= thread
->in_pool
;
266 workqueue_entry_t
*work
;
267 workqueue_reply_t result
;
269 tor_mutex_acquire(&pool
->lock
);
271 /* lock must be held at this point. */
272 while (worker_thread_has_work(thread
)) {
273 /* lock must be held at this point. */
274 if (thread
->in_pool
->generation
!= thread
->generation
) {
275 void *arg
= thread
->in_pool
->update_args
[thread
->index
];
276 thread
->in_pool
->update_args
[thread
->index
] = NULL
;
277 workqueue_reply_t (*update_fn
)(void*,void*) =
278 thread
->in_pool
->update_fn
;
279 thread
->generation
= thread
->in_pool
->generation
;
280 tor_mutex_release(&pool
->lock
);
282 workqueue_reply_t r
= update_fn(thread
->state
, arg
);
284 if (r
!= WQ_RPL_REPLY
) {
288 tor_mutex_acquire(&pool
->lock
);
291 work
= worker_thread_extract_next_work(thread
);
292 if (BUG(work
== NULL
))
294 tor_mutex_release(&pool
->lock
);
296 /* We run the work function without holding the thread lock. This
297 * is the main thread's first opportunity to give us more work. */
298 result
= work
->fn(thread
->state
, work
->arg
);
300 /* Queue the reply for the main thread. */
301 queue_reply(thread
->reply_queue
, work
);
303 /* We may need to exit the thread. */
304 if (result
!= WQ_RPL_REPLY
) {
307 tor_mutex_acquire(&pool
->lock
);
309 /* At this point the lock is held, and there is no work in this thread's
312 /* TODO: support an idle-function */
314 /* Okay. Now, wait till somebody has work for us. */
315 if (tor_cond_wait(&pool
->condition
, &pool
->lock
, NULL
) < 0) {
316 log_warn(LD_GENERAL
, "Fail tor_cond_wait.");
321 /** Put a reply on the reply queue. The reply must not currently be on
322 * any thread's work queue. */
324 queue_reply(replyqueue_t
*queue
, workqueue_entry_t
*work
)
327 tor_mutex_acquire(&queue
->lock
);
328 was_empty
= TOR_TAILQ_EMPTY(&queue
->answers
);
329 TOR_TAILQ_INSERT_TAIL(&queue
->answers
, work
, next_work
);
330 tor_mutex_release(&queue
->lock
);
333 if (queue
->alert
.alert_fn(queue
->alert
.write_fd
) < 0) {
339 /** Allocate and start a new worker thread to use state object <b>state</b>,
340 * and send responses to <b>replyqueue</b>. */
341 static workerthread_t
*
342 workerthread_new(int32_t lower_priority_chance
,
343 void *state
, threadpool_t
*pool
, replyqueue_t
*replyqueue
)
345 workerthread_t
*thr
= tor_malloc_zero(sizeof(workerthread_t
));
347 thr
->reply_queue
= replyqueue
;
349 thr
->lower_priority_chance
= lower_priority_chance
;
351 if (spawn_func(worker_thread_main
, thr
) < 0) {
353 tor_assert_nonfatal_unreached();
354 log_err(LD_GENERAL
, "Can't launch worker thread.");
364 * Queue an item of work for a thread in a thread pool. The function
365 * <b>fn</b> will be run in a worker thread, and will receive as arguments the
366 * thread's state object, and the provided object <b>arg</b>. It must return
367 * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
369 * Regardless of its return value, the function <b>reply_fn</b> will later be
370 * run in the main thread when it invokes replyqueue_process(), and will
371 * receive as its argument the same <b>arg</b> object. It's the reply
372 * function's responsibility to free the work object.
374 * On success, return a workqueue_entry_t object that can be passed to
375 * workqueue_entry_cancel(). On failure, return NULL. (Failure is not
376 * currently possible, but callers should check anyway.)
378 * Items are executed in a loose priority order -- each thread will usually
379 * take from the queued work with the highest prioirity, but will occasionally
380 * visit lower-priority queues to keep them from starving completely.
382 * Note that because of priorities and thread behavior, work items may not
383 * be executed strictly in order.
386 threadpool_queue_work_priority(threadpool_t
*pool
,
387 workqueue_priority_t prio
,
388 workqueue_reply_t (*fn
)(void *, void *),
389 void (*reply_fn
)(void *),
392 tor_assert(((int)prio
) >= WORKQUEUE_PRIORITY_FIRST
&&
393 ((int)prio
) <= WORKQUEUE_PRIORITY_LAST
);
395 workqueue_entry_t
*ent
= workqueue_entry_new(fn
, reply_fn
, arg
);
398 ent
->priority
= prio
;
400 tor_mutex_acquire(&pool
->lock
);
402 TOR_TAILQ_INSERT_TAIL(&pool
->work
[prio
], ent
, next_work
);
404 tor_cond_signal_one(&pool
->condition
);
406 tor_mutex_release(&pool
->lock
);
411 /** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
413 threadpool_queue_work(threadpool_t
*pool
,
414 workqueue_reply_t (*fn
)(void *, void *),
415 void (*reply_fn
)(void *),
418 return threadpool_queue_work_priority(pool
, WQ_PRI_HIGH
, fn
, reply_fn
, arg
);
422 * Queue a copy of a work item for every thread in a pool. This can be used,
423 * for example, to tell the threads to update some parameter in their states.
425 * Arguments are as for <b>threadpool_queue_work</b>, except that the
426 * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
429 * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update
430 * will be run. If a new update is scheduled before the old update finishes
431 * running, then the new will replace the old in any threads that haven't run
434 * Return 0 on success, -1 on failure.
437 threadpool_queue_update(threadpool_t
*pool
,
438 void *(*dup_fn
)(void *),
439 workqueue_reply_t (*fn
)(void *, void *),
440 void (*free_fn
)(void *),
444 void (*old_args_free_fn
)(void *arg
);
448 tor_mutex_acquire(&pool
->lock
);
449 n_threads
= pool
->n_threads
;
450 old_args
= pool
->update_args
;
451 old_args_free_fn
= pool
->free_update_arg_fn
;
453 new_args
= tor_calloc(n_threads
, sizeof(void*));
454 for (i
= 0; i
< n_threads
; ++i
) {
456 new_args
[i
] = dup_fn(arg
);
461 pool
->update_args
= new_args
;
462 pool
->free_update_arg_fn
= free_fn
;
463 pool
->update_fn
= fn
;
466 tor_cond_signal_all(&pool
->condition
);
468 tor_mutex_release(&pool
->lock
);
471 for (i
= 0; i
< n_threads
; ++i
) {
472 if (old_args
[i
] && old_args_free_fn
)
473 old_args_free_fn(old_args
[i
]);
481 /** Don't have more than this many threads per pool. */
482 #define MAX_THREADS 1024
484 /** For half of our threads, choose lower priority queues with probability
485 * 1/N for each of these values. Both are chosen somewhat arbitrarily. If
486 * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
487 * stalling forever. If it's too high, we have a risk of low-priority tasks
488 * grabbing half of the threads. */
489 #define CHANCE_PERMISSIVE 37
490 #define CHANCE_STRICT INT32_MAX
492 /** Launch threads until we have <b>n</b>. */
494 threadpool_start_threads(threadpool_t
*pool
, int n
)
497 return -1; // LCOV_EXCL_LINE
501 tor_mutex_acquire(&pool
->lock
);
503 if (pool
->n_threads
< n
)
504 pool
->threads
= tor_reallocarray(pool
->threads
,
505 sizeof(workerthread_t
*), n
);
507 while (pool
->n_threads
< n
) {
508 /* For half of our threads, we'll choose lower priorities permissively;
509 * for the other half, we'll stick more strictly to higher priorities.
510 * This keeps slow low-priority tasks from taking over completely. */
511 int32_t chance
= (pool
->n_threads
& 1) ? CHANCE_STRICT
: CHANCE_PERMISSIVE
;
513 void *state
= pool
->new_thread_state_fn(pool
->new_thread_state_arg
);
514 workerthread_t
*thr
= workerthread_new(chance
,
515 state
, pool
, pool
->reply_queue
);
519 tor_assert_nonfatal_unreached();
520 pool
->free_thread_state_fn(state
);
521 tor_mutex_release(&pool
->lock
);
525 thr
->index
= pool
->n_threads
;
526 pool
->threads
[pool
->n_threads
++] = thr
;
528 tor_mutex_release(&pool
->lock
);
534 * Construct a new thread pool with <b>n</b> worker threads, configured to
535 * send their output to <b>replyqueue</b>. The threads' states will be
536 * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
537 * as its argument. When the threads close, they will call
538 * <b>free_thread_state_fn</b> on their states.
541 threadpool_new(int n_threads
,
542 replyqueue_t
*replyqueue
,
543 void *(*new_thread_state_fn
)(void*),
544 void (*free_thread_state_fn
)(void*),
548 pool
= tor_malloc_zero(sizeof(threadpool_t
));
549 tor_mutex_init_nonrecursive(&pool
->lock
);
550 tor_cond_init(&pool
->condition
);
552 for (i
= WORKQUEUE_PRIORITY_FIRST
; i
<= WORKQUEUE_PRIORITY_LAST
; ++i
) {
553 TOR_TAILQ_INIT(&pool
->work
[i
]);
556 pool
->new_thread_state_fn
= new_thread_state_fn
;
557 pool
->new_thread_state_arg
= arg
;
558 pool
->free_thread_state_fn
= free_thread_state_fn
;
559 pool
->reply_queue
= replyqueue
;
561 if (threadpool_start_threads(pool
, n_threads
) < 0) {
563 tor_assert_nonfatal_unreached();
564 tor_cond_uninit(&pool
->condition
);
565 tor_mutex_uninit(&pool
->lock
);
574 /** Return the reply queue associated with a given thread pool. */
576 threadpool_get_replyqueue(threadpool_t
*tp
)
578 return tp
->reply_queue
;
581 /** Allocate a new reply queue. Reply queues are used to pass results from
582 * worker threads to the main thread. Since the main thread is running an
583 * IO-centric event loop, it needs to get woken up with means other than a
584 * condition variable. */
586 replyqueue_new(uint32_t alertsocks_flags
)
590 rq
= tor_malloc_zero(sizeof(replyqueue_t
));
591 if (alert_sockets_create(&rq
->alert
, alertsocks_flags
) < 0) {
598 tor_mutex_init(&rq
->lock
);
599 TOR_TAILQ_INIT(&rq
->answers
);
604 /** Internal: Run from the libevent mainloop when there is work to handle in
605 * the reply queue handler. */
607 reply_event_cb(evutil_socket_t sock
, short events
, void *arg
)
609 threadpool_t
*tp
= arg
;
612 replyqueue_process(tp
->reply_queue
);
617 /** Register the threadpool <b>tp</b>'s reply queue with Tor's global
618 * libevent mainloop. If <b>cb</b> is provided, it is run after
619 * each time there is work to process from the reply queue. Return 0 on
620 * success, -1 on failure.
623 threadpool_register_reply_event(threadpool_t
*tp
,
624 void (*cb
)(threadpool_t
*tp
))
626 struct event_base
*base
= tor_libevent_get_base();
628 if (tp
->reply_event
) {
629 tor_event_free(tp
->reply_event
);
631 tp
->reply_event
= tor_event_new(base
,
632 tp
->reply_queue
->alert
.read_fd
,
636 tor_assert(tp
->reply_event
);
638 return event_add(tp
->reply_event
, NULL
);
642 * Process all pending replies on a reply queue. The main thread should call
643 * this function every time the socket returned by replyqueue_get_socket() is
647 replyqueue_process(replyqueue_t
*queue
)
649 int r
= queue
->alert
.drain_fn(queue
->alert
.read_fd
);
652 static ratelim_t warn_limit
= RATELIM_INIT(7200);
653 log_fn_ratelim(&warn_limit
, LOG_WARN
, LD_GENERAL
,
654 "Failure from drain_fd: %s",
655 tor_socket_strerror(-r
));
659 tor_mutex_acquire(&queue
->lock
);
660 while (!TOR_TAILQ_EMPTY(&queue
->answers
)) {
661 /* lock must be held at this point.*/
662 workqueue_entry_t
*work
= TOR_TAILQ_FIRST(&queue
->answers
);
663 TOR_TAILQ_REMOVE(&queue
->answers
, work
, next_work
);
664 tor_mutex_release(&queue
->lock
);
665 work
->on_pool
= NULL
;
667 work
->reply_fn(work
->arg
);
668 workqueue_entry_free(work
);
670 tor_mutex_acquire(&queue
->lock
);
673 tor_mutex_release(&queue
->lock
);