Whittle away at warnings: (#18874)
[mono-project.git] / mono / sgen / sgen-thread-pool.c
blob656e561856791cf18f5dcf7fd4242d113f86962c
1 /**
2 * \file
3 * Threadpool for all concurrent GC work.
5 * Copyright (C) 2015 Xamarin Inc
7 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
8 */
10 #include "config.h"
11 #ifdef HAVE_SGEN_GC
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/sgen/sgen-client.h"
16 #include "mono/utils/mono-os-mutex.h"
18 static mono_mutex_t lock;
19 static mono_cond_t work_cond;
20 static mono_cond_t done_cond;
22 static int threads_num;
23 static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS];
24 static int threads_context [SGEN_THREADPOOL_MAX_NUM_THREADS];
26 static volatile gboolean threadpool_shutdown;
27 static volatile int threads_finished;
29 static int contexts_num;
30 static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS];
32 enum {
33 STATE_WAITING,
34 STATE_IN_PROGRESS,
35 STATE_DONE
38 /* Assumes that the lock is held. */
39 static SgenThreadPoolJob*
40 get_job_and_set_in_progress (SgenThreadPoolContext *context)
42 for (size_t i = 0; i < context->job_queue.next_slot; ++i) {
43 SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i];
44 if (job->state == STATE_WAITING) {
45 job->state = STATE_IN_PROGRESS;
46 return job;
49 return NULL;
52 /* Assumes that the lock is held. */
53 static ssize_t
54 find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
56 for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) {
57 if (context->job_queue.data [i] == job)
58 return i;
60 return -1;
63 /* Assumes that the lock is held. */
64 static void
65 remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
67 ssize_t index;
68 SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
69 index = find_job_in_queue (context, job);
70 SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
71 context->job_queue.data [index] = NULL;
72 sgen_pointer_queue_remove_nulls (&context->job_queue);
73 sgen_thread_pool_job_free (job);
76 static gboolean
77 continue_idle_job (SgenThreadPoolContext *context, void *thread_data)
79 if (!context->continue_idle_job_func)
80 return FALSE;
81 return context->continue_idle_job_func (thread_data, context - pool_contexts);
84 static gboolean
85 should_work (SgenThreadPoolContext *context, void *thread_data)
87 if (!context->should_work_func)
88 return TRUE;
89 return context->should_work_func (thread_data);
93 * Tells whether we should lock and attempt to get work from
94 * a higher priority context.
96 static gboolean
97 has_priority_work (int worker_index, int current_context)
99 int i;
101 for (i = 0; i < current_context; i++) {
102 SgenThreadPoolContext *context = &pool_contexts [i];
103 void *thread_data;
105 if (worker_index >= context->num_threads)
106 continue;
107 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
108 if (!should_work (context, thread_data))
109 continue;
110 if (context->job_queue.next_slot > 0)
111 return TRUE;
112 if (continue_idle_job (context, thread_data))
113 return TRUE;
116 /* Return if job enqueued on current context. Jobs have priority over idle work */
117 if (pool_contexts [current_context].job_queue.next_slot > 0)
118 return TRUE;
120 return FALSE;
124 * Gets the highest priority work. If there is none, it waits
125 * for work_cond. Should always be called with lock held.
127 static void
128 get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job)
130 while (!threadpool_shutdown) {
131 int i;
133 for (i = 0; i < contexts_num; i++) {
134 SgenThreadPoolContext *context = &pool_contexts [i];
135 void *thread_data;
137 if (worker_index >= context->num_threads)
138 continue;
139 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
141 if (!should_work (context, thread_data))
142 continue;
145 * It's important that we check the continue idle flag with the lock held.
146 * Suppose we didn't check with the lock held, and the result is FALSE. The
147 * main thread might then set continue idle and signal us before we can take
148 * the lock, and we'd lose the signal.
150 *do_idle = continue_idle_job (context, thread_data);
151 *job = get_job_and_set_in_progress (context);
153 if (*job || *do_idle) {
154 *work_context = i;
155 return;
160 * Nothing to do on any context
161 * pthread_cond_wait() can return successfully despite the condition
162 * not being signalled, so we have to run this in a loop until we
163 * really have work to do.
165 mono_os_cond_wait (&work_cond, &lock);
169 static mono_native_thread_return_t
170 thread_func (void *data)
172 int worker_index = (int)(gsize)data;
173 int current_context;
174 void *thread_data = NULL;
176 sgen_client_thread_register_worker ();
178 for (current_context = 0; current_context < contexts_num; current_context++) {
179 if (worker_index >= pool_contexts [current_context].num_threads ||
180 !pool_contexts [current_context].thread_init_func)
181 break;
183 thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL;
184 pool_contexts [current_context].thread_init_func (thread_data);
187 current_context = 0;
189 mono_os_mutex_lock (&lock);
190 for (;;) {
191 gboolean do_idle = FALSE;
192 SgenThreadPoolJob *job = NULL;
193 SgenThreadPoolContext *context = NULL;
195 threads_context [worker_index] = -1;
196 get_work (worker_index, &current_context, &do_idle, &job);
197 threads_context [worker_index] = current_context;
199 if (!threadpool_shutdown) {
200 context = &pool_contexts [current_context];
201 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
204 mono_os_mutex_unlock (&lock);
206 if (job) {
207 job->func (thread_data, job);
209 mono_os_mutex_lock (&lock);
211 SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
212 job->state = STATE_DONE;
213 remove_job (context, job);
215 * Only the main GC thread will ever wait on the done condition, so we don't
216 * have to broadcast.
218 mono_os_cond_signal (&done_cond);
219 } else if (do_idle) {
220 SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?");
221 do {
222 context->idle_job_func (thread_data);
223 do_idle = continue_idle_job (context, thread_data);
224 } while (do_idle && !has_priority_work (worker_index, current_context));
226 mono_os_mutex_lock (&lock);
228 if (!do_idle)
229 mono_os_cond_signal (&done_cond);
230 } else {
231 SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
232 mono_os_mutex_lock (&lock);
233 threads_finished++;
234 mono_os_cond_signal (&done_cond);
235 mono_os_mutex_unlock (&lock);
236 return 0;
240 return 0;
244 sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas)
246 int context_id = contexts_num;
248 SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached");
250 pool_contexts [context_id].thread_init_func = init_func;
251 pool_contexts [context_id].idle_job_func = idle_func;
252 pool_contexts [context_id].continue_idle_job_func = continue_idle_func;
253 pool_contexts [context_id].should_work_func = should_work_func;
254 pool_contexts [context_id].thread_datas = thread_datas;
256 SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded");
258 pool_contexts [context_id].num_threads = num_threads;
260 sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0);
262 contexts_num++;
264 return context_id;
267 void
268 sgen_thread_pool_start (void)
270 int i;
272 for (i = 0; i < contexts_num; i++) {
273 if (threads_num < pool_contexts [i].num_threads)
274 threads_num = pool_contexts [i].num_threads;
277 if (!threads_num)
278 return;
280 mono_os_mutex_init (&lock);
281 mono_os_cond_init (&work_cond);
282 mono_os_cond_init (&done_cond);
284 threads_finished = 0;
285 threadpool_shutdown = FALSE;
287 for (i = 0; i < threads_num; i++) {
288 mono_native_thread_create (&threads [i], (gpointer)thread_func, (void*)(gsize)i);
292 void
293 sgen_thread_pool_shutdown (void)
295 if (!threads_num)
296 return;
298 mono_os_mutex_lock (&lock);
299 threadpool_shutdown = TRUE;
300 mono_os_cond_broadcast (&work_cond);
301 while (threads_finished < threads_num)
302 mono_os_cond_wait (&done_cond, &lock);
303 mono_os_mutex_unlock (&lock);
305 mono_os_mutex_destroy (&lock);
306 mono_os_cond_destroy (&work_cond);
307 mono_os_cond_destroy (&done_cond);
309 for (int i = 0; i < threads_num; i++) {
310 mono_threads_add_joinable_thread ((gpointer)(gsize)threads [i]);
314 SgenThreadPoolJob*
315 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
317 SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
318 job->name = name;
319 job->size = size;
320 job->state = STATE_WAITING;
321 job->func = func;
322 return job;
325 void
326 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
328 sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
331 void
332 sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job)
334 mono_os_mutex_lock (&lock);
336 sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job);
337 mono_os_cond_broadcast (&work_cond);
339 mono_os_mutex_unlock (&lock);
342 void
343 sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job)
345 SGEN_ASSERT (0, job, "Where's the job?");
347 mono_os_mutex_lock (&lock);
349 while (find_job_in_queue (&pool_contexts [context_id], job) >= 0)
350 mono_os_cond_wait (&done_cond, &lock);
352 mono_os_mutex_unlock (&lock);
355 void
356 sgen_thread_pool_idle_signal (int context_id)
358 SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?");
360 mono_os_mutex_lock (&lock);
362 if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
363 mono_os_cond_broadcast (&work_cond);
365 mono_os_mutex_unlock (&lock);
368 void
369 sgen_thread_pool_idle_wait (int context_id, SgenThreadPoolContinueIdleWaitFunc continue_wait)
371 SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?");
373 mono_os_mutex_lock (&lock);
375 while (continue_wait (context_id, threads_context))
376 mono_os_cond_wait (&done_cond, &lock);
378 mono_os_mutex_unlock (&lock);
381 void
382 sgen_thread_pool_wait_for_all_jobs (int context_id)
384 mono_os_mutex_lock (&lock);
386 while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue))
387 mono_os_cond_wait (&done_cond, &lock);
389 mono_os_mutex_unlock (&lock);
392 /* Return 0 if is not a thread pool thread or the thread number otherwise */
394 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
396 int i;
398 for (i = 0; i < threads_num; i++) {
399 if (some_thread == threads [i])
400 return i + 1;
403 return 0;
406 #endif