[LoongArch64] Part-5:add loongarch support in some files for LoongArch64. (#21769)
[mono-project.git] / mono / sgen / sgen-thread-pool.c
blob1f77fd2c2d3fc0b552c9a64444e10fd2136048cb
1 /**
2 * \file
3 * Threadpool for all concurrent GC work.
5 * Copyright (C) 2015 Xamarin Inc
7 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
8 */
10 #include "config.h"
11 #ifdef HAVE_SGEN_GC
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/sgen/sgen-client.h"
16 #include "mono/utils/mono-os-mutex.h"
19 #ifndef DISABLE_SGEN_MAJOR_MARKSWEEP_CONC
20 static mono_mutex_t lock;
21 static mono_cond_t work_cond;
22 static mono_cond_t done_cond;
24 static int threads_num;
25 static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS];
26 static int threads_context [SGEN_THREADPOOL_MAX_NUM_THREADS];
28 static volatile gboolean threadpool_shutdown;
29 static volatile int threads_finished;
31 static int contexts_num;
32 static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS];
34 enum {
35 STATE_WAITING,
36 STATE_IN_PROGRESS,
37 STATE_DONE
40 /* Assumes that the lock is held. */
41 static SgenThreadPoolJob*
42 get_job_and_set_in_progress (SgenThreadPoolContext *context)
44 for (size_t i = 0; i < context->job_queue.next_slot; ++i) {
45 SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i];
46 if (job->state == STATE_WAITING) {
47 job->state = STATE_IN_PROGRESS;
48 return job;
51 return NULL;
54 /* Assumes that the lock is held. */
55 static ssize_t
56 find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
58 for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) {
59 if (context->job_queue.data [i] == job)
60 return i;
62 return -1;
65 /* Assumes that the lock is held. */
66 static void
67 remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
69 ssize_t index;
70 SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
71 index = find_job_in_queue (context, job);
72 SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
73 context->job_queue.data [index] = NULL;
74 sgen_pointer_queue_remove_nulls (&context->job_queue);
75 sgen_thread_pool_job_free (job);
78 static gboolean
79 continue_idle_job (SgenThreadPoolContext *context, void *thread_data)
81 if (!context->continue_idle_job_func)
82 return FALSE;
83 return context->continue_idle_job_func (thread_data, context - pool_contexts);
86 static gboolean
87 should_work (SgenThreadPoolContext *context, void *thread_data)
89 if (!context->should_work_func)
90 return TRUE;
91 return context->should_work_func (thread_data);
95 * Tells whether we should lock and attempt to get work from
96 * a higher priority context.
98 static gboolean
99 has_priority_work (int worker_index, int current_context)
101 int i;
103 for (i = 0; i < current_context; i++) {
104 SgenThreadPoolContext *context = &pool_contexts [i];
105 void *thread_data;
107 if (worker_index >= context->num_threads)
108 continue;
109 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
110 if (!should_work (context, thread_data))
111 continue;
112 if (context->job_queue.next_slot > 0)
113 return TRUE;
114 if (continue_idle_job (context, thread_data))
115 return TRUE;
118 /* Return if job enqueued on current context. Jobs have priority over idle work */
119 if (pool_contexts [current_context].job_queue.next_slot > 0)
120 return TRUE;
122 return FALSE;
126 * Gets the highest priority work. If there is none, it waits
127 * for work_cond. Should always be called with lock held.
129 static void
130 get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job)
132 while (!threadpool_shutdown) {
133 int i;
135 for (i = 0; i < contexts_num; i++) {
136 SgenThreadPoolContext *context = &pool_contexts [i];
137 void *thread_data;
139 if (worker_index >= context->num_threads)
140 continue;
141 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
143 if (!should_work (context, thread_data))
144 continue;
147 * It's important that we check the continue idle flag with the lock held.
148 * Suppose we didn't check with the lock held, and the result is FALSE. The
149 * main thread might then set continue idle and signal us before we can take
150 * the lock, and we'd lose the signal.
152 *do_idle = continue_idle_job (context, thread_data);
153 *job = get_job_and_set_in_progress (context);
155 if (*job || *do_idle) {
156 *work_context = i;
157 return;
162 * Nothing to do on any context
163 * pthread_cond_wait() can return successfully despite the condition
164 * not being signalled, so we have to run this in a loop until we
165 * really have work to do.
167 mono_os_cond_wait (&work_cond, &lock);
171 static mono_native_thread_return_t
172 thread_func (void *data)
174 int worker_index = (int)(gsize)data;
175 int current_context;
176 void *thread_data = NULL;
178 sgen_client_thread_register_worker ();
180 for (current_context = 0; current_context < contexts_num; current_context++) {
181 if (worker_index >= pool_contexts [current_context].num_threads ||
182 !pool_contexts [current_context].thread_init_func)
183 break;
185 thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL;
186 pool_contexts [current_context].thread_init_func (thread_data);
189 current_context = 0;
191 mono_os_mutex_lock (&lock);
192 for (;;) {
193 gboolean do_idle = FALSE;
194 SgenThreadPoolJob *job = NULL;
195 SgenThreadPoolContext *context = NULL;
197 threads_context [worker_index] = -1;
198 get_work (worker_index, &current_context, &do_idle, &job);
199 threads_context [worker_index] = current_context;
201 if (!threadpool_shutdown) {
202 context = &pool_contexts [current_context];
203 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
206 mono_os_mutex_unlock (&lock);
208 if (job) {
209 job->func (thread_data, job);
211 mono_os_mutex_lock (&lock);
213 SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
214 job->state = STATE_DONE;
215 remove_job (context, job);
217 * Only the main GC thread will ever wait on the done condition, so we don't
218 * have to broadcast.
220 mono_os_cond_signal (&done_cond);
221 } else if (do_idle) {
222 SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?");
223 do {
224 context->idle_job_func (thread_data);
225 do_idle = continue_idle_job (context, thread_data);
226 } while (do_idle && !has_priority_work (worker_index, current_context));
228 mono_os_mutex_lock (&lock);
230 if (!do_idle)
231 mono_os_cond_signal (&done_cond);
232 } else {
233 SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
234 mono_os_mutex_lock (&lock);
235 threads_finished++;
236 mono_os_cond_signal (&done_cond);
237 mono_os_mutex_unlock (&lock);
238 return 0;
242 return 0;
246 sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas)
248 int context_id = contexts_num;
250 SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached");
252 pool_contexts [context_id].thread_init_func = init_func;
253 pool_contexts [context_id].idle_job_func = idle_func;
254 pool_contexts [context_id].continue_idle_job_func = continue_idle_func;
255 pool_contexts [context_id].should_work_func = should_work_func;
256 pool_contexts [context_id].thread_datas = thread_datas;
258 SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded");
260 pool_contexts [context_id].num_threads = num_threads;
262 sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0);
264 // Job batches normally split into num_threads * 4 jobs. Make room for up to four job batches in the deferred queue (should reduce flushes during minor collections).
265 pool_contexts [context_id].deferred_jobs_len = (num_threads * 4 * 4) + 1;
266 pool_contexts [context_id].deferred_jobs = (void **)sgen_alloc_internal_dynamic (sizeof (void *) * pool_contexts [context_id].deferred_jobs_len, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
267 pool_contexts [context_id].deferred_jobs_count = 0;
269 contexts_num++;
271 return context_id;
274 void
275 sgen_thread_pool_start (void)
277 int i;
279 for (i = 0; i < contexts_num; i++) {
280 if (threads_num < pool_contexts [i].num_threads)
281 threads_num = pool_contexts [i].num_threads;
284 if (!threads_num)
285 return;
287 mono_os_mutex_init (&lock);
288 mono_os_cond_init (&work_cond);
289 mono_os_cond_init (&done_cond);
291 threads_finished = 0;
292 threadpool_shutdown = FALSE;
294 for (i = 0; i < threads_num; i++) {
295 mono_native_thread_create (&threads [i], (gpointer)thread_func, (void*)(gsize)i);
299 void
300 sgen_thread_pool_shutdown (void)
302 if (!threads_num)
303 return;
305 mono_os_mutex_lock (&lock);
306 threadpool_shutdown = TRUE;
307 mono_os_cond_broadcast (&work_cond);
308 while (threads_finished < threads_num)
309 mono_os_cond_wait (&done_cond, &lock);
310 mono_os_mutex_unlock (&lock);
312 mono_os_mutex_destroy (&lock);
313 mono_os_cond_destroy (&work_cond);
314 mono_os_cond_destroy (&done_cond);
316 for (int i = 0; i < threads_num; i++) {
317 mono_threads_add_joinable_thread ((gpointer)(gsize)threads [i]);
321 SgenThreadPoolJob*
322 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
324 SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
325 job->name = name;
326 job->size = size;
327 job->state = STATE_WAITING;
328 job->func = func;
329 return job;
332 void
333 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
335 sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
338 void
339 sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job)
341 mono_os_mutex_lock (&lock);
343 sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job);
344 mono_os_cond_broadcast (&work_cond);
346 mono_os_mutex_unlock (&lock);
350 * LOCKING: Assumes the GC lock is held (or it will race with sgen_thread_pool_flush_deferred_jobs)
352 void
353 sgen_thread_pool_job_enqueue_deferred (int context_id, SgenThreadPoolJob *job)
355 // Fast path assumes the GC lock is held.
356 pool_contexts [context_id].deferred_jobs [pool_contexts [context_id].deferred_jobs_count++] = job;
357 if (pool_contexts [context_id].deferred_jobs_count >= pool_contexts [context_id].deferred_jobs_len) {
358 // Slow path, flush jobs into queue, but don't signal workers.
359 sgen_thread_pool_flush_deferred_jobs (context_id, FALSE);
364 * LOCKING: Assumes the GC lock is held (or it will race with sgen_thread_pool_job_enqueue_deferred).
366 void
367 sgen_thread_pool_flush_deferred_jobs (int context_id, gboolean signal)
369 if (!signal && !sgen_thread_pool_have_deferred_jobs (context_id))
370 return;
372 mono_os_mutex_lock (&lock);
373 for (int i = 0; i < pool_contexts [context_id].deferred_jobs_count; i++) {
374 sgen_pointer_queue_add (&pool_contexts[context_id].job_queue, pool_contexts [context_id].deferred_jobs [i]);
375 pool_contexts [context_id].deferred_jobs [i] = NULL;
377 pool_contexts [context_id].deferred_jobs_count = 0;
378 if (signal)
379 mono_os_cond_broadcast (&work_cond);
380 mono_os_mutex_unlock (&lock);
383 gboolean sgen_thread_pool_have_deferred_jobs (int context_id)
385 return pool_contexts [context_id].deferred_jobs_count != 0;
388 void
389 sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job)
391 SGEN_ASSERT (0, job, "Where's the job?");
393 mono_os_mutex_lock (&lock);
395 while (find_job_in_queue (&pool_contexts [context_id], job) >= 0)
396 mono_os_cond_wait (&done_cond, &lock);
398 mono_os_mutex_unlock (&lock);
401 void
402 sgen_thread_pool_idle_signal (int context_id)
404 SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?");
406 mono_os_mutex_lock (&lock);
408 if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
409 mono_os_cond_broadcast (&work_cond);
411 mono_os_mutex_unlock (&lock);
414 void
415 sgen_thread_pool_idle_wait (int context_id, SgenThreadPoolContinueIdleWaitFunc continue_wait)
417 SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?");
419 mono_os_mutex_lock (&lock);
421 while (continue_wait (context_id, threads_context))
422 mono_os_cond_wait (&done_cond, &lock);
424 mono_os_mutex_unlock (&lock);
427 void
428 sgen_thread_pool_wait_for_all_jobs (int context_id)
430 mono_os_mutex_lock (&lock);
432 while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue))
433 mono_os_cond_wait (&done_cond, &lock);
435 mono_os_mutex_unlock (&lock);
438 /* Return 0 if is not a thread pool thread or the thread number otherwise */
440 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
442 int i;
444 for (i = 0; i < threads_num; i++) {
445 if (some_thread == threads [i])
446 return i + 1;
449 return 0;
451 #else
454 sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas)
456 return 0;
459 void
460 sgen_thread_pool_start (void)
464 void
465 sgen_thread_pool_shutdown (void)
469 SgenThreadPoolJob*
470 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
472 SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
473 job->name = name;
474 job->size = size;
475 job->func = func;
476 return job;
479 void
480 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
482 sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
485 void
486 sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job)
490 void
491 sgen_thread_pool_job_enqueue_deferred (int context_id, SgenThreadPoolJob *job)
495 void
496 sgen_thread_pool_flush_deferred_jobs (int context_id, gboolean signal)
500 gboolean sgen_thread_pool_have_deferred_jobs (int context_id)
502 return FALSE;
505 void
506 sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job)
510 void
511 sgen_thread_pool_idle_signal (int context_id)
515 void
516 sgen_thread_pool_idle_wait (int context_id, SgenThreadPoolContinueIdleWaitFunc continue_wait)
520 void
521 sgen_thread_pool_wait_for_all_jobs (int context_id)
526 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
528 return 0;
531 #endif
533 #endif