2 * threadpool.c: Microsoft threadpool runtime support
5 * Ludovic Henry (ludovic.henry@xamarin.com)
7 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
12 // Copyright (c) Microsoft. All rights reserved.
13 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
16 // - src/vm/comthreadpool.cpp
17 // - src/vm/win32threadpoolcpp
18 // - src/vm/threadpoolrequest.cpp
19 // - src/vm/hillclimbing.cpp
21 // Ported from C++ to C and adjusted to Mono runtime
24 #define _USE_MATH_DEFINES // needed by MSVC to define math constants
29 #include <mono/metadata/class-internals.h>
30 #include <mono/metadata/exception.h>
31 #include <mono/metadata/gc-internals.h>
32 #include <mono/metadata/object.h>
33 #include <mono/metadata/object-internals.h>
34 #include <mono/metadata/threadpool.h>
35 #include <mono/metadata/threadpool-worker.h>
36 #include <mono/metadata/threadpool-io.h>
37 #include <mono/metadata/w32event.h>
38 #include <mono/utils/atomic.h>
39 #include <mono/utils/mono-compiler.h>
40 #include <mono/utils/mono-complex.h>
41 #include <mono/utils/mono-lazy-init.h>
42 #include <mono/utils/mono-logger.h>
43 #include <mono/utils/mono-logger-internals.h>
44 #include <mono/utils/mono-proclib.h>
45 #include <mono/utils/mono-threads.h>
46 #include <mono/utils/mono-time.h>
47 #include <mono/utils/refcount.h>
51 /* Number of outstanding jobs */
52 gint32 outstanding_request
;
53 /* Number of currently executing jobs */
54 gint32 threadpool_jobs
;
55 /* Signalled when threadpool_jobs + outstanding_request is 0 */
56 /* Protected by threadpool->domains_lock */
57 MonoCoopCond cleanup_cond
;
62 gint16 starting
; /* starting, but not yet in worker_callback */
63 gint16 working
; /* executing worker_callback */
71 GPtrArray
*domains
; // ThreadPoolDomain* []
72 MonoCoopMutex domains_lock
;
74 GPtrArray
*threads
; // MonoInternalThread* []
75 MonoCoopMutex threads_lock
;
76 MonoCoopCond threads_exit_cond
;
78 ThreadPoolCounter counters
;
83 MonoThreadPoolWorker
*worker
;
86 static mono_lazy_init_t status
= MONO_LAZY_INIT_STATUS_NOT_INITIALIZED
;
88 static ThreadPool
* threadpool
;
90 #define COUNTER_ATOMIC(threadpool,var,block) \
92 ThreadPoolCounter __old; \
94 g_assert (threadpool); \
95 (var) = __old = COUNTER_READ (threadpool); \
97 if (!(counter._.starting >= 0)) \
98 g_error ("%s: counter._.starting = %d, but should be >= 0", __func__, counter._.starting); \
99 if (!(counter._.working >= 0)) \
100 g_error ("%s: counter._.working = %d, but should be >= 0", __func__, counter._.working); \
101 } while (InterlockedCompareExchange (&threadpool->counters.as_gint32, (var).as_gint32, __old.as_gint32) != __old.as_gint32); \
104 static inline ThreadPoolCounter
105 COUNTER_READ (ThreadPool
*threadpool
)
107 ThreadPoolCounter counter
;
108 counter
.as_gint32
= InterlockedRead (&threadpool
->counters
.as_gint32
);
115 mono_coop_mutex_lock (&threadpool
->domains_lock
);
119 domains_unlock (void)
121 mono_coop_mutex_unlock (&threadpool
->domains_lock
);
125 destroy (gpointer unused
)
127 g_ptr_array_free (threadpool
->domains
, TRUE
);
128 mono_coop_mutex_destroy (&threadpool
->domains_lock
);
130 g_ptr_array_free (threadpool
->threads
, TRUE
);
131 mono_coop_mutex_destroy (&threadpool
->threads_lock
);
132 mono_coop_cond_destroy (&threadpool
->threads_exit_cond
);
140 g_assert (!threadpool
);
141 threadpool
= g_new0 (ThreadPool
, 1);
142 g_assert (threadpool
);
144 g_assert (sizeof (ThreadPoolCounter
) == sizeof (gint32
));
146 mono_refcount_init (threadpool
, destroy
);
148 threadpool
->domains
= g_ptr_array_new ();
149 mono_coop_mutex_init (&threadpool
->domains_lock
);
151 threadpool
->threads
= g_ptr_array_new ();
152 mono_coop_mutex_init (&threadpool
->threads_lock
);
153 mono_coop_cond_init (&threadpool
->threads_exit_cond
);
155 threadpool
->limit_io_min
= mono_cpu_count ();
156 threadpool
->limit_io_max
= CLAMP (threadpool
->limit_io_min
* 100, MIN (threadpool
->limit_io_min
, 200), MAX (threadpool
->limit_io_min
, 200));
158 mono_threadpool_worker_init (&threadpool
->worker
);
165 MonoInternalThread
*current
;
167 /* we make the assumption along the code that we are
168 * cleaning up only if the runtime is shutting down */
169 g_assert (mono_runtime_is_shutting_down ());
171 current
= mono_thread_internal_current ();
173 mono_coop_mutex_lock (&threadpool
->threads_lock
);
175 /* stop all threadpool->threads */
176 for (i
= 0; i
< threadpool
->threads
->len
; ++i
) {
177 MonoInternalThread
*thread
= (MonoInternalThread
*) g_ptr_array_index (threadpool
->threads
, i
);
178 if (thread
!= current
)
179 mono_thread_internal_abort (thread
);
182 mono_coop_mutex_unlock (&threadpool
->threads_lock
);
184 /* give a chance to the other threads to exit */
185 mono_thread_info_yield ();
187 mono_coop_mutex_lock (&threadpool
->threads_lock
);
190 if (threadpool
->threads
->len
== 0)
193 if (threadpool
->threads
->len
== 1 && g_ptr_array_index (threadpool
->threads
, 0) == current
) {
194 /* We are waiting on ourselves */
198 mono_coop_cond_wait (&threadpool
->threads_exit_cond
, &threadpool
->threads_lock
);
201 mono_coop_mutex_unlock (&threadpool
->threads_lock
);
203 mono_threadpool_worker_cleanup (threadpool
->worker
);
205 mono_refcount_dec (threadpool
);
209 mono_threadpool_enqueue_work_item (MonoDomain
*domain
, MonoObject
*work_item
, MonoError
*error
)
211 static MonoClass
*threadpool_class
= NULL
;
212 static MonoMethod
*unsafe_queue_custom_work_item_method
= NULL
;
213 MonoDomain
*current_domain
;
217 mono_error_init (error
);
218 g_assert (work_item
);
220 if (!threadpool_class
)
221 threadpool_class
= mono_class_load_from_name (mono_defaults
.corlib
, "System.Threading", "ThreadPool");
223 if (!unsafe_queue_custom_work_item_method
)
224 unsafe_queue_custom_work_item_method
= mono_class_get_method_from_name (threadpool_class
, "UnsafeQueueCustomWorkItem", 2);
225 g_assert (unsafe_queue_custom_work_item_method
);
229 args
[0] = (gpointer
) work_item
;
230 args
[1] = (gpointer
) &f
;
232 current_domain
= mono_domain_get ();
233 if (current_domain
== domain
) {
234 mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method
, NULL
, args
, error
);
235 return_val_if_nok (error
, FALSE
);
237 mono_thread_push_appdomain_ref (domain
);
238 if (mono_domain_set (domain
, FALSE
)) {
239 mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method
, NULL
, args
, error
);
240 if (!is_ok (error
)) {
241 mono_thread_pop_appdomain_ref ();
244 mono_domain_set (current_domain
, TRUE
);
246 mono_thread_pop_appdomain_ref ();
251 /* LOCKING: domains_lock must be held */
253 tpdomain_add (ThreadPoolDomain
*tpdomain
)
259 len
= threadpool
->domains
->len
;
260 for (i
= 0; i
< len
; ++i
) {
261 if (g_ptr_array_index (threadpool
->domains
, i
) == tpdomain
)
266 g_ptr_array_add (threadpool
->domains
, tpdomain
);
269 /* LOCKING: domains_lock must be held. */
271 tpdomain_remove (ThreadPoolDomain
*tpdomain
)
274 return g_ptr_array_remove (threadpool
->domains
, tpdomain
);
277 /* LOCKING: domains_lock must be held */
278 static ThreadPoolDomain
*
279 tpdomain_get (MonoDomain
*domain
, gboolean create
)
282 ThreadPoolDomain
*tpdomain
;
286 for (i
= 0; i
< threadpool
->domains
->len
; ++i
) {
287 ThreadPoolDomain
*tpdomain
;
289 tpdomain
= (ThreadPoolDomain
*)g_ptr_array_index (threadpool
->domains
, i
);
290 if (tpdomain
->domain
== domain
)
297 tpdomain
= g_new0 (ThreadPoolDomain
, 1);
298 tpdomain
->domain
= domain
;
299 mono_coop_cond_init (&tpdomain
->cleanup_cond
);
301 tpdomain_add (tpdomain
);
307 tpdomain_free (ThreadPoolDomain
*tpdomain
)
312 /* LOCKING: domains_lock must be held */
313 static ThreadPoolDomain
*
314 tpdomain_get_next (ThreadPoolDomain
*current
)
316 ThreadPoolDomain
*tpdomain
= NULL
;
319 len
= threadpool
->domains
->len
;
321 guint i
, current_idx
= -1;
323 for (i
= 0; i
< len
; ++i
) {
324 if (current
== g_ptr_array_index (threadpool
->domains
, i
)) {
329 g_assert (current_idx
!= (guint
)-1);
331 for (i
= current_idx
+ 1; i
< len
+ current_idx
+ 1; ++i
) {
332 ThreadPoolDomain
*tmp
= (ThreadPoolDomain
*)g_ptr_array_index (threadpool
->domains
, i
% len
);
333 if (tmp
->outstanding_request
> 0) {
344 worker_callback (gpointer unused
)
347 ThreadPoolDomain
*tpdomain
, *previous_tpdomain
;
348 ThreadPoolCounter counter
;
349 MonoInternalThread
*thread
;
351 thread
= mono_thread_internal_current ();
353 COUNTER_ATOMIC (threadpool
, counter
, {
354 if (!(counter
._
.working
< 32767 /* G_MAXINT16 */))
355 g_error ("%s: counter._.working = %d, but should be < 32767", __func__
, counter
._
.working
);
357 counter
._
.starting
--;
358 counter
._
.working
++;
361 if (mono_runtime_is_shutting_down ()) {
362 COUNTER_ATOMIC (threadpool
, counter
, {
363 counter
._
.working
--;
366 mono_refcount_dec (threadpool
);
370 mono_coop_mutex_lock (&threadpool
->threads_lock
);
371 g_ptr_array_add (threadpool
->threads
, thread
);
372 mono_coop_mutex_unlock (&threadpool
->threads_lock
);
375 * This is needed so there is always an lmf frame in the runtime invoke call below,
376 * so ThreadAbortExceptions are caught even if the thread is in native code.
378 mono_defaults
.threadpool_perform_wait_callback_method
->save_lmf
= TRUE
;
382 previous_tpdomain
= NULL
;
384 while (!mono_runtime_is_shutting_down ()) {
385 gboolean retire
= FALSE
;
387 if ((thread
->state
& (ThreadState_AbortRequested
| ThreadState_SuspendRequested
)) != 0) {
389 mono_thread_interruption_checkpoint ();
393 tpdomain
= tpdomain_get_next (previous_tpdomain
);
397 tpdomain
->outstanding_request
--;
398 g_assert (tpdomain
->outstanding_request
>= 0);
400 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] worker running in domain %p (outstanding requests %d)",
401 mono_native_thread_id_get (), tpdomain
->domain
, tpdomain
->outstanding_request
);
403 g_assert (tpdomain
->threadpool_jobs
>= 0);
404 tpdomain
->threadpool_jobs
++;
408 mono_thread_clr_state (thread
, (MonoThreadState
)~ThreadState_Background
);
409 if (!mono_thread_test_state (thread
, ThreadState_Background
))
410 ves_icall_System_Threading_Thread_SetState (thread
, ThreadState_Background
);
412 mono_thread_push_appdomain_ref (tpdomain
->domain
);
413 if (mono_domain_set (tpdomain
->domain
, FALSE
)) {
414 MonoObject
*exc
= NULL
, *res
;
416 res
= mono_runtime_try_invoke (mono_defaults
.threadpool_perform_wait_callback_method
, NULL
, NULL
, &exc
, &error
);
417 if (exc
|| !mono_error_ok(&error
)) {
419 exc
= (MonoObject
*) mono_error_convert_to_exception (&error
);
421 mono_error_cleanup (&error
);
422 mono_thread_internal_unhandled_exception (exc
);
423 } else if (res
&& *(MonoBoolean
*) mono_object_unbox (res
) == FALSE
) {
427 mono_domain_set (mono_get_root_domain (), TRUE
);
429 mono_thread_pop_appdomain_ref ();
433 tpdomain
->threadpool_jobs
--;
434 g_assert (tpdomain
->threadpool_jobs
>= 0);
436 if (tpdomain
->outstanding_request
+ tpdomain
->threadpool_jobs
== 0 && mono_domain_is_unloading (tpdomain
->domain
)) {
439 removed
= tpdomain_remove (tpdomain
);
442 mono_coop_cond_signal (&tpdomain
->cleanup_cond
);
449 previous_tpdomain
= tpdomain
;
454 mono_coop_mutex_lock (&threadpool
->threads_lock
);
456 g_ptr_array_remove_fast (threadpool
->threads
, thread
);
458 mono_coop_cond_signal (&threadpool
->threads_exit_cond
);
460 mono_coop_mutex_unlock (&threadpool
->threads_lock
);
462 COUNTER_ATOMIC (threadpool
, counter
, {
463 counter
._
.working
--;
466 mono_refcount_dec (threadpool
);
470 mono_threadpool_cleanup (void)
472 #ifndef DISABLE_SOCKETS
473 mono_threadpool_io_cleanup ();
475 mono_lazy_cleanup (&status
, cleanup
);
479 mono_threadpool_begin_invoke (MonoDomain
*domain
, MonoObject
*target
, MonoMethod
*method
, gpointer
*params
, MonoError
*error
)
481 static MonoClass
*async_call_klass
= NULL
;
482 MonoMethodMessage
*message
;
483 MonoAsyncResult
*async_result
;
484 MonoAsyncCall
*async_call
;
485 MonoDelegate
*async_callback
= NULL
;
486 MonoObject
*state
= NULL
;
488 if (!async_call_klass
)
489 async_call_klass
= mono_class_load_from_name (mono_defaults
.corlib
, "System", "MonoAsyncCall");
491 mono_lazy_initialize (&status
, initialize
);
493 mono_error_init (error
);
495 message
= mono_method_call_message_new (method
, params
, mono_get_delegate_invoke (method
->klass
), (params
!= NULL
) ? (&async_callback
) : NULL
, (params
!= NULL
) ? (&state
) : NULL
, error
);
496 return_val_if_nok (error
, NULL
);
498 async_call
= (MonoAsyncCall
*) mono_object_new_checked (domain
, async_call_klass
, error
);
499 return_val_if_nok (error
, NULL
);
501 MONO_OBJECT_SETREF (async_call
, msg
, message
);
502 MONO_OBJECT_SETREF (async_call
, state
, state
);
504 if (async_callback
) {
505 MONO_OBJECT_SETREF (async_call
, cb_method
, mono_get_delegate_invoke (((MonoObject
*) async_callback
)->vtable
->klass
));
506 MONO_OBJECT_SETREF (async_call
, cb_target
, async_callback
);
509 async_result
= mono_async_result_new (domain
, NULL
, async_call
->state
, NULL
, (MonoObject
*) async_call
, error
);
510 return_val_if_nok (error
, NULL
);
511 MONO_OBJECT_SETREF (async_result
, async_delegate
, target
);
513 mono_threadpool_enqueue_work_item (domain
, (MonoObject
*) async_result
, error
);
514 return_val_if_nok (error
, NULL
);
520 mono_threadpool_end_invoke (MonoAsyncResult
*ares
, MonoArray
**out_args
, MonoObject
**exc
, MonoError
*error
)
524 mono_error_init (error
);
531 /* check if already finished */
532 mono_monitor_enter ((MonoObject
*) ares
);
534 if (ares
->endinvoke_called
) {
535 mono_error_set_invalid_operation(error
, "Delegate EndInvoke method called more than once");
536 mono_monitor_exit ((MonoObject
*) ares
);
540 ares
->endinvoke_called
= 1;
542 /* wait until we are really finished */
543 if (ares
->completed
) {
544 mono_monitor_exit ((MonoObject
*) ares
);
548 wait_event
= mono_wait_handle_get_handle ((MonoWaitHandle
*) ares
->handle
);
550 wait_event
= mono_w32event_create (TRUE
, FALSE
);
551 g_assert(wait_event
);
552 MonoWaitHandle
*wait_handle
= mono_wait_handle_new (mono_object_domain (ares
), wait_event
, error
);
553 if (!is_ok (error
)) {
554 CloseHandle (wait_event
);
557 MONO_OBJECT_SETREF (ares
, handle
, (MonoObject
*) wait_handle
);
559 mono_monitor_exit ((MonoObject
*) ares
);
562 WaitForSingleObjectEx (wait_event
, INFINITE
, TRUE
);
564 mono_w32handle_wait_one (wait_event
, MONO_INFINITE_WAIT
, TRUE
);
569 ac
= (MonoAsyncCall
*) ares
->object_data
;
572 *exc
= ac
->msg
->exc
; /* FIXME: GC add write barrier */
573 *out_args
= ac
->out_args
;
578 mono_threadpool_remove_domain_jobs (MonoDomain
*domain
, int timeout
)
581 ThreadPoolDomain
*tpdomain
;
585 g_assert (timeout
>= -1);
587 g_assert (mono_domain_is_unloading (domain
));
590 end
= mono_msec_ticks () + timeout
;
592 #ifndef DISABLE_SOCKETS
593 mono_threadpool_io_remove_domain_jobs (domain
);
595 if (mono_msec_ticks () > end
)
601 * Wait for all threads which execute jobs in the domain to exit.
602 * The is_unloading () check in worker_request () ensures that
603 * no new jobs are added after we enter the lock below.
606 if (!mono_lazy_is_initialized (&status
))
609 mono_refcount_inc (threadpool
);
613 tpdomain
= tpdomain_get (domain
, FALSE
);
616 mono_refcount_dec (threadpool
);
622 while (tpdomain
->outstanding_request
+ tpdomain
->threadpool_jobs
> 0) {
624 mono_coop_cond_wait (&tpdomain
->cleanup_cond
, &threadpool
->domains_lock
);
629 now
= mono_msec_ticks();
635 res
= mono_coop_cond_timedwait (&tpdomain
->cleanup_cond
, &threadpool
->domains_lock
, end
- now
);
643 /* Remove from the list the worker threads look at */
644 tpdomain_remove (tpdomain
);
648 mono_coop_cond_destroy (&tpdomain
->cleanup_cond
);
649 tpdomain_free (tpdomain
);
651 mono_refcount_dec (threadpool
);
657 mono_threadpool_suspend (void)
660 mono_threadpool_worker_set_suspended (threadpool
->worker
, TRUE
);
664 mono_threadpool_resume (void)
667 mono_threadpool_worker_set_suspended (threadpool
->worker
, FALSE
);
671 ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32
*worker_threads
, gint32
*completion_port_threads
)
673 ThreadPoolCounter counter
;
675 if (!worker_threads
|| !completion_port_threads
)
678 mono_lazy_initialize (&status
, initialize
);
680 counter
= COUNTER_READ (threadpool
);
682 *worker_threads
= MAX (0, mono_threadpool_worker_get_max (threadpool
->worker
) - counter
._
.working
);
683 *completion_port_threads
= threadpool
->limit_io_max
;
687 ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32
*worker_threads
, gint32
*completion_port_threads
)
689 if (!worker_threads
|| !completion_port_threads
)
692 mono_lazy_initialize (&status
, initialize
);
694 *worker_threads
= mono_threadpool_worker_get_min (threadpool
->worker
);
695 *completion_port_threads
= threadpool
->limit_io_min
;
699 ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32
*worker_threads
, gint32
*completion_port_threads
)
701 if (!worker_threads
|| !completion_port_threads
)
704 mono_lazy_initialize (&status
, initialize
);
706 *worker_threads
= mono_threadpool_worker_get_max (threadpool
->worker
);
707 *completion_port_threads
= threadpool
->limit_io_max
;
711 ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads
, gint32 completion_port_threads
)
713 mono_lazy_initialize (&status
, initialize
);
715 if (completion_port_threads
<= 0 || completion_port_threads
> threadpool
->limit_io_max
)
718 if (!mono_threadpool_worker_set_min (threadpool
->worker
, worker_threads
))
721 threadpool
->limit_io_min
= completion_port_threads
;
727 ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads
, gint32 completion_port_threads
)
729 gint cpu_count
= mono_cpu_count ();
731 mono_lazy_initialize (&status
, initialize
);
733 if (completion_port_threads
< threadpool
->limit_io_min
|| completion_port_threads
< cpu_count
)
736 if (!mono_threadpool_worker_set_max (threadpool
->worker
, worker_threads
))
739 threadpool
->limit_io_max
= completion_port_threads
;
745 ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean
*enable_worker_tracking
)
747 if (enable_worker_tracking
) {
748 // TODO implement some kind of switch to have the possibily to use it
749 *enable_worker_tracking
= FALSE
;
752 mono_lazy_initialize (&status
, initialize
);
756 ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void)
758 if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
761 return mono_threadpool_worker_notify_completed (threadpool
->worker
);
765 ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
767 mono_threadpool_worker_notify_completed (threadpool
->worker
);
771 ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working
)
775 mono_error_set_not_implemented (&error
, "");
776 mono_error_set_pending_exception (&error
);
780 ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void)
783 ThreadPoolDomain
*tpdomain
;
784 ThreadPoolCounter counter
;
786 domain
= mono_domain_get ();
787 if (mono_domain_is_unloading (domain
))
792 /* synchronize with mono_threadpool_remove_domain_jobs */
793 if (mono_domain_is_unloading (domain
)) {
798 tpdomain
= tpdomain_get (domain
, TRUE
);
801 tpdomain
->outstanding_request
++;
802 g_assert (tpdomain
->outstanding_request
>= 1);
806 COUNTER_ATOMIC (threadpool
, counter
, {
807 if (counter
._
.starting
== 16)
810 counter
._
.starting
++;
813 mono_refcount_inc (threadpool
);
815 mono_threadpool_worker_enqueue (threadpool
->worker
, worker_callback
, NULL
);
820 MonoBoolean G_GNUC_UNUSED
821 ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped
*native_overlapped
)
823 /* This copy the behavior of the current Mono implementation */
825 mono_error_set_not_implemented (&error
, "");
826 mono_error_set_pending_exception (&error
);
830 MonoBoolean G_GNUC_UNUSED
831 ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle
)
833 /* This copy the behavior of the current Mono implementation */
837 MonoBoolean G_GNUC_UNUSED
838 ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void)