3 * Threadpool for all concurrent GC work.
5 * Copyright (C) 2015 Xamarin Inc
7 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/sgen/sgen-client.h"
16 #include "mono/utils/mono-os-mutex.h"
18 static mono_mutex_t lock
;
19 static mono_cond_t work_cond
;
20 static mono_cond_t done_cond
;
22 static int threads_num
;
23 static MonoNativeThreadId threads
[SGEN_THREADPOOL_MAX_NUM_THREADS
];
24 static int threads_context
[SGEN_THREADPOOL_MAX_NUM_THREADS
];
26 static volatile gboolean threadpool_shutdown
;
27 static volatile int threads_finished
;
29 static int contexts_num
;
30 static SgenThreadPoolContext pool_contexts
[SGEN_THREADPOOL_MAX_NUM_CONTEXTS
];
38 /* Assumes that the lock is held. */
39 static SgenThreadPoolJob
*
40 get_job_and_set_in_progress (SgenThreadPoolContext
*context
)
42 for (size_t i
= 0; i
< context
->job_queue
.next_slot
; ++i
) {
43 SgenThreadPoolJob
*job
= (SgenThreadPoolJob
*)context
->job_queue
.data
[i
];
44 if (job
->state
== STATE_WAITING
) {
45 job
->state
= STATE_IN_PROGRESS
;
52 /* Assumes that the lock is held. */
54 find_job_in_queue (SgenThreadPoolContext
*context
, SgenThreadPoolJob
*job
)
56 for (ssize_t i
= 0; i
< context
->job_queue
.next_slot
; ++i
) {
57 if (context
->job_queue
.data
[i
] == job
)
63 /* Assumes that the lock is held. */
65 remove_job (SgenThreadPoolContext
*context
, SgenThreadPoolJob
*job
)
68 SGEN_ASSERT (0, job
->state
== STATE_DONE
, "Why are we removing a job that's not done?");
69 index
= find_job_in_queue (context
, job
);
70 SGEN_ASSERT (0, index
>= 0, "Why is the job we're trying to remove not in the queue?");
71 context
->job_queue
.data
[index
] = NULL
;
72 sgen_pointer_queue_remove_nulls (&context
->job_queue
);
73 sgen_thread_pool_job_free (job
);
77 continue_idle_job (SgenThreadPoolContext
*context
, void *thread_data
)
79 if (!context
->continue_idle_job_func
)
81 return context
->continue_idle_job_func (thread_data
, context
- pool_contexts
);
85 should_work (SgenThreadPoolContext
*context
, void *thread_data
)
87 if (!context
->should_work_func
)
89 return context
->should_work_func (thread_data
);
93 * Tells whether we should lock and attempt to get work from
94 * a higher priority context.
97 has_priority_work (int worker_index
, int current_context
)
101 for (i
= 0; i
< current_context
; i
++) {
102 SgenThreadPoolContext
*context
= &pool_contexts
[i
];
105 if (worker_index
>= context
->num_threads
)
107 thread_data
= (context
->thread_datas
) ? context
->thread_datas
[worker_index
] : NULL
;
108 if (!should_work (context
, thread_data
))
110 if (context
->job_queue
.next_slot
> 0)
112 if (continue_idle_job (context
, thread_data
))
116 /* Return if job enqueued on current context. Jobs have priority over idle work */
117 if (pool_contexts
[current_context
].job_queue
.next_slot
> 0)
124 * Gets the highest priority work. If there is none, it waits
125 * for work_cond. Should always be called with lock held.
128 get_work (int worker_index
, int *work_context
, int *do_idle
, SgenThreadPoolJob
**job
)
130 while (!threadpool_shutdown
) {
133 for (i
= 0; i
< contexts_num
; i
++) {
134 SgenThreadPoolContext
*context
= &pool_contexts
[i
];
137 if (worker_index
>= context
->num_threads
)
139 thread_data
= (context
->thread_datas
) ? context
->thread_datas
[worker_index
] : NULL
;
141 if (!should_work (context
, thread_data
))
145 * It's important that we check the continue idle flag with the lock held.
146 * Suppose we didn't check with the lock held, and the result is FALSE. The
147 * main thread might then set continue idle and signal us before we can take
148 * the lock, and we'd lose the signal.
150 *do_idle
= continue_idle_job (context
, thread_data
);
151 *job
= get_job_and_set_in_progress (context
);
153 if (*job
|| *do_idle
) {
160 * Nothing to do on any context
161 * pthread_cond_wait() can return successfully despite the condition
162 * not being signalled, so we have to run this in a loop until we
163 * really have work to do.
165 mono_os_cond_wait (&work_cond
, &lock
);
169 static mono_native_thread_return_t
170 thread_func (void *data
)
172 int worker_index
= (int)(gsize
)data
;
174 void *thread_data
= NULL
;
176 sgen_client_thread_register_worker ();
178 for (current_context
= 0; current_context
< contexts_num
; current_context
++) {
179 if (worker_index
>= pool_contexts
[current_context
].num_threads
||
180 !pool_contexts
[current_context
].thread_init_func
)
183 thread_data
= (pool_contexts
[current_context
].thread_datas
) ? pool_contexts
[current_context
].thread_datas
[worker_index
] : NULL
;
184 pool_contexts
[current_context
].thread_init_func (thread_data
);
189 mono_os_mutex_lock (&lock
);
191 gboolean do_idle
= FALSE
;
192 SgenThreadPoolJob
*job
= NULL
;
193 SgenThreadPoolContext
*context
= NULL
;
195 threads_context
[worker_index
] = -1;
196 get_work (worker_index
, ¤t_context
, &do_idle
, &job
);
197 threads_context
[worker_index
] = current_context
;
199 if (!threadpool_shutdown
) {
200 context
= &pool_contexts
[current_context
];
201 thread_data
= (context
->thread_datas
) ? context
->thread_datas
[worker_index
] : NULL
;
204 mono_os_mutex_unlock (&lock
);
207 job
->func (thread_data
, job
);
209 mono_os_mutex_lock (&lock
);
211 SGEN_ASSERT (0, job
->state
== STATE_IN_PROGRESS
, "The job should still be in progress.");
212 job
->state
= STATE_DONE
;
213 remove_job (context
, job
);
215 * Only the main GC thread will ever wait on the done condition, so we don't
218 mono_os_cond_signal (&done_cond
);
219 } else if (do_idle
) {
220 SGEN_ASSERT (0, context
->idle_job_func
, "Why do we have idle work when there's no idle job function?");
222 context
->idle_job_func (thread_data
);
223 do_idle
= continue_idle_job (context
, thread_data
);
224 } while (do_idle
&& !has_priority_work (worker_index
, current_context
));
226 mono_os_mutex_lock (&lock
);
229 mono_os_cond_signal (&done_cond
);
231 SGEN_ASSERT (0, threadpool_shutdown
, "Why did we unlock if no jobs and not shutting down?");
232 mono_os_mutex_lock (&lock
);
234 mono_os_cond_signal (&done_cond
);
235 mono_os_mutex_unlock (&lock
);
244 sgen_thread_pool_create_context (int num_threads
, SgenThreadPoolThreadInitFunc init_func
, SgenThreadPoolIdleJobFunc idle_func
, SgenThreadPoolContinueIdleJobFunc continue_idle_func
, SgenThreadPoolShouldWorkFunc should_work_func
, void **thread_datas
)
246 int context_id
= contexts_num
;
248 SGEN_ASSERT (0, contexts_num
< SGEN_THREADPOOL_MAX_NUM_CONTEXTS
, "Maximum sgen thread pool contexts reached");
250 pool_contexts
[context_id
].thread_init_func
= init_func
;
251 pool_contexts
[context_id
].idle_job_func
= idle_func
;
252 pool_contexts
[context_id
].continue_idle_job_func
= continue_idle_func
;
253 pool_contexts
[context_id
].should_work_func
= should_work_func
;
254 pool_contexts
[context_id
].thread_datas
= thread_datas
;
256 SGEN_ASSERT (0, num_threads
<= SGEN_THREADPOOL_MAX_NUM_THREADS
, "Maximum sgen thread pool threads exceeded");
258 pool_contexts
[context_id
].num_threads
= num_threads
;
260 sgen_pointer_queue_init (&pool_contexts
[contexts_num
].job_queue
, 0);
268 sgen_thread_pool_start (void)
272 for (i
= 0; i
< contexts_num
; i
++) {
273 if (threads_num
< pool_contexts
[i
].num_threads
)
274 threads_num
= pool_contexts
[i
].num_threads
;
280 mono_os_mutex_init (&lock
);
281 mono_os_cond_init (&work_cond
);
282 mono_os_cond_init (&done_cond
);
284 threads_finished
= 0;
285 threadpool_shutdown
= FALSE
;
287 for (i
= 0; i
< threads_num
; i
++) {
288 mono_native_thread_create (&threads
[i
], (gpointer
)thread_func
, (void*)(gsize
)i
);
293 sgen_thread_pool_shutdown (void)
298 mono_os_mutex_lock (&lock
);
299 threadpool_shutdown
= TRUE
;
300 mono_os_cond_broadcast (&work_cond
);
301 while (threads_finished
< threads_num
)
302 mono_os_cond_wait (&done_cond
, &lock
);
303 mono_os_mutex_unlock (&lock
);
305 mono_os_mutex_destroy (&lock
);
306 mono_os_cond_destroy (&work_cond
);
307 mono_os_cond_destroy (&done_cond
);
309 for (int i
= 0; i
< threads_num
; i
++) {
310 mono_threads_add_joinable_thread ((gpointer
)(gsize
)threads
[i
]);
315 sgen_thread_pool_job_alloc (const char *name
, SgenThreadPoolJobFunc func
, size_t size
)
317 SgenThreadPoolJob
*job
= (SgenThreadPoolJob
*)sgen_alloc_internal_dynamic (size
, INTERNAL_MEM_THREAD_POOL_JOB
, TRUE
);
320 job
->state
= STATE_WAITING
;
326 sgen_thread_pool_job_free (SgenThreadPoolJob
*job
)
328 sgen_free_internal_dynamic (job
, job
->size
, INTERNAL_MEM_THREAD_POOL_JOB
);
332 sgen_thread_pool_job_enqueue (int context_id
, SgenThreadPoolJob
*job
)
334 mono_os_mutex_lock (&lock
);
336 sgen_pointer_queue_add (&pool_contexts
[context_id
].job_queue
, job
);
337 mono_os_cond_broadcast (&work_cond
);
339 mono_os_mutex_unlock (&lock
);
343 sgen_thread_pool_job_wait (int context_id
, SgenThreadPoolJob
*job
)
345 SGEN_ASSERT (0, job
, "Where's the job?");
347 mono_os_mutex_lock (&lock
);
349 while (find_job_in_queue (&pool_contexts
[context_id
], job
) >= 0)
350 mono_os_cond_wait (&done_cond
, &lock
);
352 mono_os_mutex_unlock (&lock
);
356 sgen_thread_pool_idle_signal (int context_id
)
358 SGEN_ASSERT (0, pool_contexts
[context_id
].idle_job_func
, "Why are we signaling idle without an idle function?");
360 mono_os_mutex_lock (&lock
);
362 if (pool_contexts
[context_id
].continue_idle_job_func (NULL
, context_id
))
363 mono_os_cond_broadcast (&work_cond
);
365 mono_os_mutex_unlock (&lock
);
369 sgen_thread_pool_idle_wait (int context_id
, SgenThreadPoolContinueIdleWaitFunc continue_wait
)
371 SGEN_ASSERT (0, pool_contexts
[context_id
].idle_job_func
, "Why are we waiting for idle without an idle function?");
373 mono_os_mutex_lock (&lock
);
375 while (continue_wait (context_id
, threads_context
))
376 mono_os_cond_wait (&done_cond
, &lock
);
378 mono_os_mutex_unlock (&lock
);
382 sgen_thread_pool_wait_for_all_jobs (int context_id
)
384 mono_os_mutex_lock (&lock
);
386 while (!sgen_pointer_queue_is_empty (&pool_contexts
[context_id
].job_queue
))
387 mono_os_cond_wait (&done_cond
, &lock
);
389 mono_os_mutex_unlock (&lock
);
392 /* Return 0 if is not a thread pool thread or the thread number otherwise */
394 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread
)
398 for (i
= 0; i
< threads_num
; i
++) {
399 if (some_thread
== threads
[i
])