3 * Threadpool for all concurrent GC work.
5 * Copyright (C) 2015 Xamarin Inc
7 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/sgen/sgen-client.h"
16 #include "mono/utils/mono-os-mutex.h"
19 #ifndef DISABLE_SGEN_MAJOR_MARKSWEEP_CONC
20 static mono_mutex_t lock
;
21 static mono_cond_t work_cond
;
22 static mono_cond_t done_cond
;
24 static int threads_num
;
25 static MonoNativeThreadId threads
[SGEN_THREADPOOL_MAX_NUM_THREADS
];
26 static int threads_context
[SGEN_THREADPOOL_MAX_NUM_THREADS
];
28 static volatile gboolean threadpool_shutdown
;
29 static volatile int threads_finished
;
31 static int contexts_num
;
32 static SgenThreadPoolContext pool_contexts
[SGEN_THREADPOOL_MAX_NUM_CONTEXTS
];
40 /* Assumes that the lock is held. */
41 static SgenThreadPoolJob
*
42 get_job_and_set_in_progress (SgenThreadPoolContext
*context
)
44 for (size_t i
= 0; i
< context
->job_queue
.next_slot
; ++i
) {
45 SgenThreadPoolJob
*job
= (SgenThreadPoolJob
*)context
->job_queue
.data
[i
];
46 if (job
->state
== STATE_WAITING
) {
47 job
->state
= STATE_IN_PROGRESS
;
54 /* Assumes that the lock is held. */
56 find_job_in_queue (SgenThreadPoolContext
*context
, SgenThreadPoolJob
*job
)
58 for (ssize_t i
= 0; i
< context
->job_queue
.next_slot
; ++i
) {
59 if (context
->job_queue
.data
[i
] == job
)
65 /* Assumes that the lock is held. */
67 remove_job (SgenThreadPoolContext
*context
, SgenThreadPoolJob
*job
)
70 SGEN_ASSERT (0, job
->state
== STATE_DONE
, "Why are we removing a job that's not done?");
71 index
= find_job_in_queue (context
, job
);
72 SGEN_ASSERT (0, index
>= 0, "Why is the job we're trying to remove not in the queue?");
73 context
->job_queue
.data
[index
] = NULL
;
74 sgen_pointer_queue_remove_nulls (&context
->job_queue
);
75 sgen_thread_pool_job_free (job
);
79 continue_idle_job (SgenThreadPoolContext
*context
, void *thread_data
)
81 if (!context
->continue_idle_job_func
)
83 return context
->continue_idle_job_func (thread_data
, context
- pool_contexts
);
87 should_work (SgenThreadPoolContext
*context
, void *thread_data
)
89 if (!context
->should_work_func
)
91 return context
->should_work_func (thread_data
);
95 * Tells whether we should lock and attempt to get work from
96 * a higher priority context.
99 has_priority_work (int worker_index
, int current_context
)
103 for (i
= 0; i
< current_context
; i
++) {
104 SgenThreadPoolContext
*context
= &pool_contexts
[i
];
107 if (worker_index
>= context
->num_threads
)
109 thread_data
= (context
->thread_datas
) ? context
->thread_datas
[worker_index
] : NULL
;
110 if (!should_work (context
, thread_data
))
112 if (context
->job_queue
.next_slot
> 0)
114 if (continue_idle_job (context
, thread_data
))
118 /* Return if job enqueued on current context. Jobs have priority over idle work */
119 if (pool_contexts
[current_context
].job_queue
.next_slot
> 0)
126 * Gets the highest priority work. If there is none, it waits
127 * for work_cond. Should always be called with lock held.
130 get_work (int worker_index
, int *work_context
, int *do_idle
, SgenThreadPoolJob
**job
)
132 while (!threadpool_shutdown
) {
135 for (i
= 0; i
< contexts_num
; i
++) {
136 SgenThreadPoolContext
*context
= &pool_contexts
[i
];
139 if (worker_index
>= context
->num_threads
)
141 thread_data
= (context
->thread_datas
) ? context
->thread_datas
[worker_index
] : NULL
;
143 if (!should_work (context
, thread_data
))
147 * It's important that we check the continue idle flag with the lock held.
148 * Suppose we didn't check with the lock held, and the result is FALSE. The
149 * main thread might then set continue idle and signal us before we can take
150 * the lock, and we'd lose the signal.
152 *do_idle
= continue_idle_job (context
, thread_data
);
153 *job
= get_job_and_set_in_progress (context
);
155 if (*job
|| *do_idle
) {
162 * Nothing to do on any context
163 * pthread_cond_wait() can return successfully despite the condition
164 * not being signalled, so we have to run this in a loop until we
165 * really have work to do.
167 mono_os_cond_wait (&work_cond
, &lock
);
171 static mono_native_thread_return_t
172 thread_func (void *data
)
174 int worker_index
= (int)(gsize
)data
;
176 void *thread_data
= NULL
;
178 sgen_client_thread_register_worker ();
180 for (current_context
= 0; current_context
< contexts_num
; current_context
++) {
181 if (worker_index
>= pool_contexts
[current_context
].num_threads
||
182 !pool_contexts
[current_context
].thread_init_func
)
185 thread_data
= (pool_contexts
[current_context
].thread_datas
) ? pool_contexts
[current_context
].thread_datas
[worker_index
] : NULL
;
186 pool_contexts
[current_context
].thread_init_func (thread_data
);
191 mono_os_mutex_lock (&lock
);
193 gboolean do_idle
= FALSE
;
194 SgenThreadPoolJob
*job
= NULL
;
195 SgenThreadPoolContext
*context
= NULL
;
197 threads_context
[worker_index
] = -1;
198 get_work (worker_index
, ¤t_context
, &do_idle
, &job
);
199 threads_context
[worker_index
] = current_context
;
201 if (!threadpool_shutdown
) {
202 context
= &pool_contexts
[current_context
];
203 thread_data
= (context
->thread_datas
) ? context
->thread_datas
[worker_index
] : NULL
;
206 mono_os_mutex_unlock (&lock
);
209 job
->func (thread_data
, job
);
211 mono_os_mutex_lock (&lock
);
213 SGEN_ASSERT (0, job
->state
== STATE_IN_PROGRESS
, "The job should still be in progress.");
214 job
->state
= STATE_DONE
;
215 remove_job (context
, job
);
217 * Only the main GC thread will ever wait on the done condition, so we don't
220 mono_os_cond_signal (&done_cond
);
221 } else if (do_idle
) {
222 SGEN_ASSERT (0, context
->idle_job_func
, "Why do we have idle work when there's no idle job function?");
224 context
->idle_job_func (thread_data
);
225 do_idle
= continue_idle_job (context
, thread_data
);
226 } while (do_idle
&& !has_priority_work (worker_index
, current_context
));
228 mono_os_mutex_lock (&lock
);
231 mono_os_cond_signal (&done_cond
);
233 SGEN_ASSERT (0, threadpool_shutdown
, "Why did we unlock if no jobs and not shutting down?");
234 mono_os_mutex_lock (&lock
);
236 mono_os_cond_signal (&done_cond
);
237 mono_os_mutex_unlock (&lock
);
246 sgen_thread_pool_create_context (int num_threads
, SgenThreadPoolThreadInitFunc init_func
, SgenThreadPoolIdleJobFunc idle_func
, SgenThreadPoolContinueIdleJobFunc continue_idle_func
, SgenThreadPoolShouldWorkFunc should_work_func
, void **thread_datas
)
248 int context_id
= contexts_num
;
250 SGEN_ASSERT (0, contexts_num
< SGEN_THREADPOOL_MAX_NUM_CONTEXTS
, "Maximum sgen thread pool contexts reached");
252 pool_contexts
[context_id
].thread_init_func
= init_func
;
253 pool_contexts
[context_id
].idle_job_func
= idle_func
;
254 pool_contexts
[context_id
].continue_idle_job_func
= continue_idle_func
;
255 pool_contexts
[context_id
].should_work_func
= should_work_func
;
256 pool_contexts
[context_id
].thread_datas
= thread_datas
;
258 SGEN_ASSERT (0, num_threads
<= SGEN_THREADPOOL_MAX_NUM_THREADS
, "Maximum sgen thread pool threads exceeded");
260 pool_contexts
[context_id
].num_threads
= num_threads
;
262 sgen_pointer_queue_init (&pool_contexts
[contexts_num
].job_queue
, 0);
264 // Job batches normally split into num_threads * 4 jobs. Make room for up to four job batches in the deferred queue (should reduce flushes during minor collections).
265 pool_contexts
[context_id
].deferred_jobs_len
= (num_threads
* 4 * 4) + 1;
266 pool_contexts
[context_id
].deferred_jobs
= (void **)sgen_alloc_internal_dynamic (sizeof (void *) * pool_contexts
[context_id
].deferred_jobs_len
, INTERNAL_MEM_THREAD_POOL_JOB
, TRUE
);
267 pool_contexts
[context_id
].deferred_jobs_count
= 0;
275 sgen_thread_pool_start (void)
279 for (i
= 0; i
< contexts_num
; i
++) {
280 if (threads_num
< pool_contexts
[i
].num_threads
)
281 threads_num
= pool_contexts
[i
].num_threads
;
287 mono_os_mutex_init (&lock
);
288 mono_os_cond_init (&work_cond
);
289 mono_os_cond_init (&done_cond
);
291 threads_finished
= 0;
292 threadpool_shutdown
= FALSE
;
294 for (i
= 0; i
< threads_num
; i
++) {
295 mono_native_thread_create (&threads
[i
], (gpointer
)thread_func
, (void*)(gsize
)i
);
300 sgen_thread_pool_shutdown (void)
305 mono_os_mutex_lock (&lock
);
306 threadpool_shutdown
= TRUE
;
307 mono_os_cond_broadcast (&work_cond
);
308 while (threads_finished
< threads_num
)
309 mono_os_cond_wait (&done_cond
, &lock
);
310 mono_os_mutex_unlock (&lock
);
312 mono_os_mutex_destroy (&lock
);
313 mono_os_cond_destroy (&work_cond
);
314 mono_os_cond_destroy (&done_cond
);
316 for (int i
= 0; i
< threads_num
; i
++) {
317 mono_threads_add_joinable_thread ((gpointer
)(gsize
)threads
[i
]);
322 sgen_thread_pool_job_alloc (const char *name
, SgenThreadPoolJobFunc func
, size_t size
)
324 SgenThreadPoolJob
*job
= (SgenThreadPoolJob
*)sgen_alloc_internal_dynamic (size
, INTERNAL_MEM_THREAD_POOL_JOB
, TRUE
);
327 job
->state
= STATE_WAITING
;
333 sgen_thread_pool_job_free (SgenThreadPoolJob
*job
)
335 sgen_free_internal_dynamic (job
, job
->size
, INTERNAL_MEM_THREAD_POOL_JOB
);
339 sgen_thread_pool_job_enqueue (int context_id
, SgenThreadPoolJob
*job
)
341 mono_os_mutex_lock (&lock
);
343 sgen_pointer_queue_add (&pool_contexts
[context_id
].job_queue
, job
);
344 mono_os_cond_broadcast (&work_cond
);
346 mono_os_mutex_unlock (&lock
);
350 * LOCKING: Assumes the GC lock is held (or it will race with sgen_thread_pool_flush_deferred_jobs)
353 sgen_thread_pool_job_enqueue_deferred (int context_id
, SgenThreadPoolJob
*job
)
355 // Fast path assumes the GC lock is held.
356 pool_contexts
[context_id
].deferred_jobs
[pool_contexts
[context_id
].deferred_jobs_count
++] = job
;
357 if (pool_contexts
[context_id
].deferred_jobs_count
>= pool_contexts
[context_id
].deferred_jobs_len
) {
358 // Slow path, flush jobs into queue, but don't signal workers.
359 sgen_thread_pool_flush_deferred_jobs (context_id
, FALSE
);
364 * LOCKING: Assumes the GC lock is held (or it will race with sgen_thread_pool_job_enqueue_deferred).
367 sgen_thread_pool_flush_deferred_jobs (int context_id
, gboolean signal
)
369 if (!signal
&& !sgen_thread_pool_have_deferred_jobs (context_id
))
372 mono_os_mutex_lock (&lock
);
373 for (int i
= 0; i
< pool_contexts
[context_id
].deferred_jobs_count
; i
++) {
374 sgen_pointer_queue_add (&pool_contexts
[context_id
].job_queue
, pool_contexts
[context_id
].deferred_jobs
[i
]);
375 pool_contexts
[context_id
].deferred_jobs
[i
] = NULL
;
377 pool_contexts
[context_id
].deferred_jobs_count
= 0;
379 mono_os_cond_broadcast (&work_cond
);
380 mono_os_mutex_unlock (&lock
);
383 gboolean
sgen_thread_pool_have_deferred_jobs (int context_id
)
385 return pool_contexts
[context_id
].deferred_jobs_count
!= 0;
389 sgen_thread_pool_job_wait (int context_id
, SgenThreadPoolJob
*job
)
391 SGEN_ASSERT (0, job
, "Where's the job?");
393 mono_os_mutex_lock (&lock
);
395 while (find_job_in_queue (&pool_contexts
[context_id
], job
) >= 0)
396 mono_os_cond_wait (&done_cond
, &lock
);
398 mono_os_mutex_unlock (&lock
);
402 sgen_thread_pool_idle_signal (int context_id
)
404 SGEN_ASSERT (0, pool_contexts
[context_id
].idle_job_func
, "Why are we signaling idle without an idle function?");
406 mono_os_mutex_lock (&lock
);
408 if (pool_contexts
[context_id
].continue_idle_job_func (NULL
, context_id
))
409 mono_os_cond_broadcast (&work_cond
);
411 mono_os_mutex_unlock (&lock
);
415 sgen_thread_pool_idle_wait (int context_id
, SgenThreadPoolContinueIdleWaitFunc continue_wait
)
417 SGEN_ASSERT (0, pool_contexts
[context_id
].idle_job_func
, "Why are we waiting for idle without an idle function?");
419 mono_os_mutex_lock (&lock
);
421 while (continue_wait (context_id
, threads_context
))
422 mono_os_cond_wait (&done_cond
, &lock
);
424 mono_os_mutex_unlock (&lock
);
428 sgen_thread_pool_wait_for_all_jobs (int context_id
)
430 mono_os_mutex_lock (&lock
);
432 while (!sgen_pointer_queue_is_empty (&pool_contexts
[context_id
].job_queue
))
433 mono_os_cond_wait (&done_cond
, &lock
);
435 mono_os_mutex_unlock (&lock
);
438 /* Return 0 if is not a thread pool thread or the thread number otherwise */
440 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread
)
444 for (i
= 0; i
< threads_num
; i
++) {
445 if (some_thread
== threads
[i
])
454 sgen_thread_pool_create_context (int num_threads
, SgenThreadPoolThreadInitFunc init_func
, SgenThreadPoolIdleJobFunc idle_func
, SgenThreadPoolContinueIdleJobFunc continue_idle_func
, SgenThreadPoolShouldWorkFunc should_work_func
, void **thread_datas
)
460 sgen_thread_pool_start (void)
465 sgen_thread_pool_shutdown (void)
470 sgen_thread_pool_job_alloc (const char *name
, SgenThreadPoolJobFunc func
, size_t size
)
472 SgenThreadPoolJob
*job
= (SgenThreadPoolJob
*)sgen_alloc_internal_dynamic (size
, INTERNAL_MEM_THREAD_POOL_JOB
, TRUE
);
480 sgen_thread_pool_job_free (SgenThreadPoolJob
*job
)
482 sgen_free_internal_dynamic (job
, job
->size
, INTERNAL_MEM_THREAD_POOL_JOB
);
486 sgen_thread_pool_job_enqueue (int context_id
, SgenThreadPoolJob
*job
)
491 sgen_thread_pool_job_enqueue_deferred (int context_id
, SgenThreadPoolJob
*job
)
496 sgen_thread_pool_flush_deferred_jobs (int context_id
, gboolean signal
)
500 gboolean
sgen_thread_pool_have_deferred_jobs (int context_id
)
506 sgen_thread_pool_job_wait (int context_id
, SgenThreadPoolJob
*job
)
511 sgen_thread_pool_idle_signal (int context_id
)
516 sgen_thread_pool_idle_wait (int context_id
, SgenThreadPoolContinueIdleWaitFunc continue_wait
)
521 sgen_thread_pool_wait_for_all_jobs (int context_id
)
526 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread
)