rephist: Introduce a fraction and period for overload onionskin
[tor.git] / src / lib / evloop / workqueue.c
blob603dddd5a39c34e619b039e7fccb23b942559d96
2 /* copyright (c) 2013-2015, The Tor Project, Inc. */
3 /* See LICENSE for licensing information */
5 /**
6 * \file workqueue.c
8 * \brief Implements worker threads, queues of work for them, and mechanisms
9 * for them to send answers back to the main thread.
11 * The main structure here is a threadpool_t : it manages a set of worker
12 * threads, a queue of pending work, and a reply queue. Every piece of work
13 * is a workqueue_entry_t, containing data to process and a function to
14 * process it with.
16 * The main thread informs the worker threads of pending work by using a
17 * condition variable. The workers inform the main process of completed work
18 * by using an alert_sockets_t object, as implemented in net/alertsock.c.
20 * The main thread can also queue an "update" that will be handled by all the
21 * workers. This is useful for updating state that all the workers share.
23 * In Tor today, there is currently only one thread pool, used in cpuworker.c.
26 #include "orconfig.h"
27 #include "lib/evloop/compat_libevent.h"
28 #include "lib/evloop/workqueue.h"
30 #include "lib/crypt_ops/crypto_rand.h"
31 #include "lib/intmath/weakrng.h"
32 #include "lib/log/ratelim.h"
33 #include "lib/log/log.h"
34 #include "lib/log/util_bug.h"
35 #include "lib/net/alertsock.h"
36 #include "lib/net/socket.h"
37 #include "lib/thread/threads.h"
39 #include "ext/tor_queue.h"
40 #include <event2/event.h>
41 #include <string.h>
43 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
44 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
45 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
47 TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t);
48 typedef struct work_tailq_t work_tailq_t;
50 struct threadpool_t {
51 /** An array of pointers to workerthread_t: one for each running worker
52 * thread. */
53 struct workerthread_t **threads;
55 /** Condition variable that we wait on when we have no work, and which
56 * gets signaled when our queue becomes nonempty. */
57 tor_cond_t condition;
58 /** Queues of pending work that we have to do. The queue with priority
59 * <b>p</b> is work[p]. */
60 work_tailq_t work[WORKQUEUE_N_PRIORITIES];
62 /** The current 'update generation' of the threadpool. Any thread that is
63 * at an earlier generation needs to run the update function. */
64 unsigned generation;
66 /** Function that should be run for updates on each thread. */
67 workqueue_reply_t (*update_fn)(void *, void *);
68 /** Function to free update arguments if they can't be run. */
69 void (*free_update_arg_fn)(void *);
70 /** Array of n_threads update arguments. */
71 void **update_args;
72 /** Event to notice when another thread has sent a reply. */
73 struct event *reply_event;
74 void (*reply_cb)(threadpool_t *);
76 /** Number of elements in threads. */
77 int n_threads;
78 /** Mutex to protect all the above fields. */
79 tor_mutex_t lock;
81 /** A reply queue to use when constructing new threads. */
82 replyqueue_t *reply_queue;
84 /** Functions used to allocate and free thread state. */
85 void *(*new_thread_state_fn)(void*);
86 void (*free_thread_state_fn)(void*);
87 void *new_thread_state_arg;
90 /** Used to put a workqueue_priority_t value into a bitfield. */
91 #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
92 /** Number of bits needed to hold all legal values of workqueue_priority_t */
93 #define WORKQUEUE_PRIORITY_BITS 2
95 struct workqueue_entry_t {
96 /** The next workqueue_entry_t that's pending on the same thread or
97 * reply queue. */
98 TOR_TAILQ_ENTRY(workqueue_entry_t) next_work;
99 /** The threadpool to which this workqueue_entry_t was assigned. This field
100 * is set when the workqueue_entry_t is created, and won't be cleared until
101 * after it's handled in the main thread. */
102 struct threadpool_t *on_pool;
103 /** True iff this entry is waiting for a worker to start processing it. */
104 uint8_t pending;
105 /** Priority of this entry. */
106 workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS;
107 /** Function to run in the worker thread. */
108 workqueue_reply_t (*fn)(void *state, void *arg);
109 /** Function to run while processing the reply queue. */
110 void (*reply_fn)(void *arg);
111 /** Argument for the above functions. */
112 void *arg;
115 struct replyqueue_t {
116 /** Mutex to protect the answers field */
117 tor_mutex_t lock;
118 /** Doubly-linked list of answers that the reply queue needs to handle. */
119 TOR_TAILQ_HEAD(, workqueue_entry_t) answers;
121 /** Mechanism to wake up the main thread when it is receiving answers. */
122 alert_sockets_t alert;
125 /** A worker thread represents a single thread in a thread pool. */
126 typedef struct workerthread_t {
127 /** Which thread it this? In range 0..in_pool->n_threads-1 */
128 int index;
129 /** The pool this thread is a part of. */
130 struct threadpool_t *in_pool;
131 /** User-supplied state field that we pass to the worker functions of each
132 * work item. */
133 void *state;
134 /** Reply queue to which we pass our results. */
135 replyqueue_t *reply_queue;
136 /** The current update generation of this thread */
137 unsigned generation;
138 /** One over the probability of taking work from a lower-priority queue. */
139 int32_t lower_priority_chance;
140 } workerthread_t;
142 static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
144 /** Allocate and return a new workqueue_entry_t, set up to run the function
145 * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
146 * thread. See threadpool_queue_work() for full documentation. */
147 static workqueue_entry_t *
148 workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
149 void (*reply_fn)(void*),
150 void *arg)
152 workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
153 ent->fn = fn;
154 ent->reply_fn = reply_fn;
155 ent->arg = arg;
156 ent->priority = WQ_PRI_HIGH;
157 return ent;
160 #define workqueue_entry_free(ent) \
161 FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
164 * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
165 * any queue.
167 static void
168 workqueue_entry_free_(workqueue_entry_t *ent)
170 if (!ent)
171 return;
172 memset(ent, 0xf0, sizeof(*ent));
173 tor_free(ent);
177 * Cancel a workqueue_entry_t that has been returned from
178 * threadpool_queue_work.
180 * You must not call this function on any work whose reply function has been
181 * executed in the main thread; that will cause undefined behavior (probably,
182 * a crash).
184 * If the work is cancelled, this function return the argument passed to the
185 * work function. It is the caller's responsibility to free this storage.
187 * This function will have no effect if the worker thread has already executed
188 * or begun to execute the work item. In that case, it will return NULL.
190 void *
191 workqueue_entry_cancel(workqueue_entry_t *ent)
193 int cancelled = 0;
194 void *result = NULL;
195 tor_mutex_acquire(&ent->on_pool->lock);
196 workqueue_priority_t prio = ent->priority;
197 if (ent->pending) {
198 TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
199 cancelled = 1;
200 result = ent->arg;
202 tor_mutex_release(&ent->on_pool->lock);
204 if (cancelled) {
205 workqueue_entry_free(ent);
207 return result;
210 /**DOCDOC
212 must hold lock */
213 static int
214 worker_thread_has_work(workerthread_t *thread)
216 unsigned i;
217 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
218 if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
219 return 1;
221 return thread->generation != thread->in_pool->generation;
224 /** Extract the next workqueue_entry_t from the the thread's pool, removing
225 * it from the relevant queues and marking it as non-pending.
227 * The caller must hold the lock. */
228 static workqueue_entry_t *
229 worker_thread_extract_next_work(workerthread_t *thread)
231 threadpool_t *pool = thread->in_pool;
232 work_tailq_t *queue = NULL, *this_queue;
233 unsigned i;
234 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
235 this_queue = &pool->work[i];
236 if (!TOR_TAILQ_EMPTY(this_queue)) {
237 queue = this_queue;
238 if (! crypto_fast_rng_one_in_n(get_thread_fast_rng(),
239 thread->lower_priority_chance)) {
240 /* Usually we'll just break now, so that we can get out of the loop
241 * and use the queue where we found work. But with a small
242 * probability, we'll keep looking for lower priority work, so that
243 * we don't ignore our low-priority queues entirely. */
244 break;
249 if (queue == NULL)
250 return NULL;
252 workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
253 TOR_TAILQ_REMOVE(queue, work, next_work);
254 work->pending = 0;
255 return work;
259 * Main function for the worker thread.
261 static void
262 worker_thread_main(void *thread_)
264 workerthread_t *thread = thread_;
265 threadpool_t *pool = thread->in_pool;
266 workqueue_entry_t *work;
267 workqueue_reply_t result;
269 tor_mutex_acquire(&pool->lock);
270 while (1) {
271 /* lock must be held at this point. */
272 while (worker_thread_has_work(thread)) {
273 /* lock must be held at this point. */
274 if (thread->in_pool->generation != thread->generation) {
275 void *arg = thread->in_pool->update_args[thread->index];
276 thread->in_pool->update_args[thread->index] = NULL;
277 workqueue_reply_t (*update_fn)(void*,void*) =
278 thread->in_pool->update_fn;
279 thread->generation = thread->in_pool->generation;
280 tor_mutex_release(&pool->lock);
282 workqueue_reply_t r = update_fn(thread->state, arg);
284 if (r != WQ_RPL_REPLY) {
285 return;
288 tor_mutex_acquire(&pool->lock);
289 continue;
291 work = worker_thread_extract_next_work(thread);
292 if (BUG(work == NULL))
293 break;
294 tor_mutex_release(&pool->lock);
296 /* We run the work function without holding the thread lock. This
297 * is the main thread's first opportunity to give us more work. */
298 result = work->fn(thread->state, work->arg);
300 /* Queue the reply for the main thread. */
301 queue_reply(thread->reply_queue, work);
303 /* We may need to exit the thread. */
304 if (result != WQ_RPL_REPLY) {
305 return;
307 tor_mutex_acquire(&pool->lock);
309 /* At this point the lock is held, and there is no work in this thread's
310 * queue. */
312 /* TODO: support an idle-function */
314 /* Okay. Now, wait till somebody has work for us. */
315 if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
316 log_warn(LD_GENERAL, "Fail tor_cond_wait.");
321 /** Put a reply on the reply queue. The reply must not currently be on
322 * any thread's work queue. */
323 static void
324 queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
326 int was_empty;
327 tor_mutex_acquire(&queue->lock);
328 was_empty = TOR_TAILQ_EMPTY(&queue->answers);
329 TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
330 tor_mutex_release(&queue->lock);
332 if (was_empty) {
333 if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
334 /* XXXX complain! */
339 /** Allocate and start a new worker thread to use state object <b>state</b>,
340 * and send responses to <b>replyqueue</b>. */
341 static workerthread_t *
342 workerthread_new(int32_t lower_priority_chance,
343 void *state, threadpool_t *pool, replyqueue_t *replyqueue)
345 workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
346 thr->state = state;
347 thr->reply_queue = replyqueue;
348 thr->in_pool = pool;
349 thr->lower_priority_chance = lower_priority_chance;
351 if (spawn_func(worker_thread_main, thr) < 0) {
352 //LCOV_EXCL_START
353 tor_assert_nonfatal_unreached();
354 log_err(LD_GENERAL, "Can't launch worker thread.");
355 tor_free(thr);
356 return NULL;
357 //LCOV_EXCL_STOP
360 return thr;
364 * Queue an item of work for a thread in a thread pool. The function
365 * <b>fn</b> will be run in a worker thread, and will receive as arguments the
366 * thread's state object, and the provided object <b>arg</b>. It must return
367 * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
369 * Regardless of its return value, the function <b>reply_fn</b> will later be
370 * run in the main thread when it invokes replyqueue_process(), and will
371 * receive as its argument the same <b>arg</b> object. It's the reply
372 * function's responsibility to free the work object.
374 * On success, return a workqueue_entry_t object that can be passed to
375 * workqueue_entry_cancel(). On failure, return NULL. (Failure is not
376 * currently possible, but callers should check anyway.)
378 * Items are executed in a loose priority order -- each thread will usually
379 * take from the queued work with the highest prioirity, but will occasionally
380 * visit lower-priority queues to keep them from starving completely.
382 * Note that because of priorities and thread behavior, work items may not
383 * be executed strictly in order.
385 workqueue_entry_t *
386 threadpool_queue_work_priority(threadpool_t *pool,
387 workqueue_priority_t prio,
388 workqueue_reply_t (*fn)(void *, void *),
389 void (*reply_fn)(void *),
390 void *arg)
392 tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
393 ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
395 workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
396 ent->on_pool = pool;
397 ent->pending = 1;
398 ent->priority = prio;
400 tor_mutex_acquire(&pool->lock);
402 TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
404 tor_cond_signal_one(&pool->condition);
406 tor_mutex_release(&pool->lock);
408 return ent;
411 /** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
412 workqueue_entry_t *
413 threadpool_queue_work(threadpool_t *pool,
414 workqueue_reply_t (*fn)(void *, void *),
415 void (*reply_fn)(void *),
416 void *arg)
418 return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
422 * Queue a copy of a work item for every thread in a pool. This can be used,
423 * for example, to tell the threads to update some parameter in their states.
425 * Arguments are as for <b>threadpool_queue_work</b>, except that the
426 * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
427 * make a copy of it.
429 * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update
430 * will be run. If a new update is scheduled before the old update finishes
431 * running, then the new will replace the old in any threads that haven't run
432 * it yet.
434 * Return 0 on success, -1 on failure.
437 threadpool_queue_update(threadpool_t *pool,
438 void *(*dup_fn)(void *),
439 workqueue_reply_t (*fn)(void *, void *),
440 void (*free_fn)(void *),
441 void *arg)
443 int i, n_threads;
444 void (*old_args_free_fn)(void *arg);
445 void **old_args;
446 void **new_args;
448 tor_mutex_acquire(&pool->lock);
449 n_threads = pool->n_threads;
450 old_args = pool->update_args;
451 old_args_free_fn = pool->free_update_arg_fn;
453 new_args = tor_calloc(n_threads, sizeof(void*));
454 for (i = 0; i < n_threads; ++i) {
455 if (dup_fn)
456 new_args[i] = dup_fn(arg);
457 else
458 new_args[i] = arg;
461 pool->update_args = new_args;
462 pool->free_update_arg_fn = free_fn;
463 pool->update_fn = fn;
464 ++pool->generation;
466 tor_cond_signal_all(&pool->condition);
468 tor_mutex_release(&pool->lock);
470 if (old_args) {
471 for (i = 0; i < n_threads; ++i) {
472 if (old_args[i] && old_args_free_fn)
473 old_args_free_fn(old_args[i]);
475 tor_free(old_args);
478 return 0;
481 /** Don't have more than this many threads per pool. */
482 #define MAX_THREADS 1024
484 /** For half of our threads, choose lower priority queues with probability
485 * 1/N for each of these values. Both are chosen somewhat arbitrarily. If
486 * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
487 * stalling forever. If it's too high, we have a risk of low-priority tasks
488 * grabbing half of the threads. */
489 #define CHANCE_PERMISSIVE 37
490 #define CHANCE_STRICT INT32_MAX
492 /** Launch threads until we have <b>n</b>. */
493 static int
494 threadpool_start_threads(threadpool_t *pool, int n)
496 if (BUG(n < 0))
497 return -1; // LCOV_EXCL_LINE
498 if (n > MAX_THREADS)
499 n = MAX_THREADS;
501 tor_mutex_acquire(&pool->lock);
503 if (pool->n_threads < n)
504 pool->threads = tor_reallocarray(pool->threads,
505 sizeof(workerthread_t*), n);
507 while (pool->n_threads < n) {
508 /* For half of our threads, we'll choose lower priorities permissively;
509 * for the other half, we'll stick more strictly to higher priorities.
510 * This keeps slow low-priority tasks from taking over completely. */
511 int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
513 void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
514 workerthread_t *thr = workerthread_new(chance,
515 state, pool, pool->reply_queue);
517 if (!thr) {
518 //LCOV_EXCL_START
519 tor_assert_nonfatal_unreached();
520 pool->free_thread_state_fn(state);
521 tor_mutex_release(&pool->lock);
522 return -1;
523 //LCOV_EXCL_STOP
525 thr->index = pool->n_threads;
526 pool->threads[pool->n_threads++] = thr;
528 tor_mutex_release(&pool->lock);
530 return 0;
534 * Construct a new thread pool with <b>n</b> worker threads, configured to
535 * send their output to <b>replyqueue</b>. The threads' states will be
536 * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
537 * as its argument. When the threads close, they will call
538 * <b>free_thread_state_fn</b> on their states.
540 threadpool_t *
541 threadpool_new(int n_threads,
542 replyqueue_t *replyqueue,
543 void *(*new_thread_state_fn)(void*),
544 void (*free_thread_state_fn)(void*),
545 void *arg)
547 threadpool_t *pool;
548 pool = tor_malloc_zero(sizeof(threadpool_t));
549 tor_mutex_init_nonrecursive(&pool->lock);
550 tor_cond_init(&pool->condition);
551 unsigned i;
552 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
553 TOR_TAILQ_INIT(&pool->work[i]);
556 pool->new_thread_state_fn = new_thread_state_fn;
557 pool->new_thread_state_arg = arg;
558 pool->free_thread_state_fn = free_thread_state_fn;
559 pool->reply_queue = replyqueue;
561 if (threadpool_start_threads(pool, n_threads) < 0) {
562 //LCOV_EXCL_START
563 tor_assert_nonfatal_unreached();
564 tor_cond_uninit(&pool->condition);
565 tor_mutex_uninit(&pool->lock);
566 tor_free(pool);
567 return NULL;
568 //LCOV_EXCL_STOP
571 return pool;
574 /** Return the reply queue associated with a given thread pool. */
575 replyqueue_t *
576 threadpool_get_replyqueue(threadpool_t *tp)
578 return tp->reply_queue;
581 /** Allocate a new reply queue. Reply queues are used to pass results from
582 * worker threads to the main thread. Since the main thread is running an
583 * IO-centric event loop, it needs to get woken up with means other than a
584 * condition variable. */
585 replyqueue_t *
586 replyqueue_new(uint32_t alertsocks_flags)
588 replyqueue_t *rq;
590 rq = tor_malloc_zero(sizeof(replyqueue_t));
591 if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
592 //LCOV_EXCL_START
593 tor_free(rq);
594 return NULL;
595 //LCOV_EXCL_STOP
598 tor_mutex_init(&rq->lock);
599 TOR_TAILQ_INIT(&rq->answers);
601 return rq;
604 /** Internal: Run from the libevent mainloop when there is work to handle in
605 * the reply queue handler. */
606 static void
607 reply_event_cb(evutil_socket_t sock, short events, void *arg)
609 threadpool_t *tp = arg;
610 (void) sock;
611 (void) events;
612 replyqueue_process(tp->reply_queue);
613 if (tp->reply_cb)
614 tp->reply_cb(tp);
617 /** Register the threadpool <b>tp</b>'s reply queue with Tor's global
618 * libevent mainloop. If <b>cb</b> is provided, it is run after
619 * each time there is work to process from the reply queue. Return 0 on
620 * success, -1 on failure.
623 threadpool_register_reply_event(threadpool_t *tp,
624 void (*cb)(threadpool_t *tp))
626 struct event_base *base = tor_libevent_get_base();
628 if (tp->reply_event) {
629 tor_event_free(tp->reply_event);
631 tp->reply_event = tor_event_new(base,
632 tp->reply_queue->alert.read_fd,
633 EV_READ|EV_PERSIST,
634 reply_event_cb,
635 tp);
636 tor_assert(tp->reply_event);
637 tp->reply_cb = cb;
638 return event_add(tp->reply_event, NULL);
642 * Process all pending replies on a reply queue. The main thread should call
643 * this function every time the socket returned by replyqueue_get_socket() is
644 * readable.
646 void
647 replyqueue_process(replyqueue_t *queue)
649 int r = queue->alert.drain_fn(queue->alert.read_fd);
650 if (r < 0) {
651 //LCOV_EXCL_START
652 static ratelim_t warn_limit = RATELIM_INIT(7200);
653 log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
654 "Failure from drain_fd: %s",
655 tor_socket_strerror(-r));
656 //LCOV_EXCL_STOP
659 tor_mutex_acquire(&queue->lock);
660 while (!TOR_TAILQ_EMPTY(&queue->answers)) {
661 /* lock must be held at this point.*/
662 workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
663 TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
664 tor_mutex_release(&queue->lock);
665 work->on_pool = NULL;
667 work->reply_fn(work->arg);
668 workqueue_entry_free(work);
670 tor_mutex_acquire(&queue->lock);
673 tor_mutex_release(&queue->lock);