2 * threadpool.c: global thread pool
5 * Dietmar Maurer (dietmar@ximian.com)
6 * Gonzalo Paniagua Javier (gonzalo@ximian.com)
8 * (C) 2001-2003 Ximian, Inc.
9 * (c) 2004 Novell, Inc. (http://www.novell.com)
17 #define _WIN32_WINNT 0x0500
18 #define THREADS_PER_CPU 25
20 #define THREADS_PER_CPU 50
23 #include <mono/metadata/domain-internals.h>
24 #include <mono/metadata/tabledefs.h>
25 #include <mono/metadata/threads.h>
26 #include <mono/metadata/threads-types.h>
27 #include <mono/metadata/exception.h>
28 #include <mono/metadata/file-io.h>
29 #include <mono/metadata/monitor.h>
30 #include <mono/metadata/marshal.h>
31 #include <mono/io-layer/io-layer.h>
32 #include <mono/os/gc_wrapper.h>
34 #include "threadpool.h"
36 /* maximum number of worker threads */
37 int mono_max_worker_threads
= THREADS_PER_CPU
;
38 static int mono_min_worker_threads
= 0;
40 /* current number of worker threads */
41 static int mono_worker_threads
= 0;
43 /* current number of busy threads */
44 static int busy_worker_threads
= 0;
46 /* mono_thread_pool_init called */
49 /* we use this to store a reference to the AsyncResult to avoid GC */
50 static MonoGHashTable
*ares_htable
= NULL
;
52 static CRITICAL_SECTION ares_lock
;
55 static HANDLE job_added
;
58 MonoMethodMessage
*msg
;
60 MonoMethod
*cb_method
;
61 MonoDelegate
*cb_target
;
67 static void async_invoke_thread (gpointer data
);
68 static void append_job (MonoAsyncResult
*ar
);
70 static GList
*async_call_queue
= NULL
;
73 mono_async_invoke (MonoAsyncResult
*ares
)
75 ASyncCall
*ac
= (ASyncCall
*)ares
->data
;
78 ac
->res
= mono_message_invoke (ares
->async_delegate
, ac
->msg
,
79 &ac
->msg
->exc
, &ac
->out_args
);
83 /* call async callback if cb_method != null*/
85 MonoObject
*exc
= NULL
;
87 mono_runtime_invoke (ac
->cb_method
, ac
->cb_target
, pa
, &exc
);
92 /* notify listeners */
93 mono_monitor_enter ((MonoObject
*) ares
);
95 if (ares
->handle
!= NULL
) {
96 ac
->wait_event
= ((MonoWaitHandle
*) ares
->handle
)->handle
;
97 SetEvent (ac
->wait_event
);
99 mono_monitor_exit ((MonoObject
*) ares
);
101 EnterCriticalSection (&ares_lock
);
102 mono_g_hash_table_remove (ares_htable
, ares
);
103 LeaveCriticalSection (&ares_lock
);
107 mono_thread_pool_init ()
110 int threads_per_cpu
= THREADS_PER_CPU
;
112 if ((int) InterlockedCompareExchange (&tp_inited
, 1, 0) == 1)
115 MONO_GC_REGISTER_ROOT (ares_htable
);
116 InitializeCriticalSection (&ares_lock
);
117 ares_htable
= mono_g_hash_table_new (NULL
, NULL
);
118 job_added
= CreateSemaphore (NULL
, 0, 0x7fffffff, NULL
);
119 GetSystemInfo (&info
);
120 if (getenv ("MONO_THREADS_PER_CPU") != NULL
) {
121 threads_per_cpu
= atoi (getenv ("MONO_THREADS_PER_CPU"));
122 if (threads_per_cpu
<= 0)
123 threads_per_cpu
= THREADS_PER_CPU
;
126 mono_max_worker_threads
= threads_per_cpu
* info
.dwNumberOfProcessors
;
130 mono_thread_pool_add (MonoObject
*target
, MonoMethodMessage
*msg
, MonoDelegate
*async_callback
,
133 MonoDomain
*domain
= mono_domain_get ();
134 MonoAsyncResult
*ares
;
139 ac
= GC_MALLOC (sizeof (ASyncCall
));
141 /* We'll leak the event if creaated... */
142 ac
= g_new0 (ASyncCall
, 1);
144 ac
->wait_event
= NULL
;
148 if (async_callback
) {
149 ac
->cb_method
= mono_get_delegate_invoke (((MonoObject
*)async_callback
)->vtable
->klass
);
150 ac
->cb_target
= async_callback
;
153 ares
= mono_async_result_new (domain
, NULL
, ac
->state
, ac
);
154 ares
->async_delegate
= target
;
156 EnterCriticalSection (&ares_lock
);
157 mono_g_hash_table_insert (ares_htable
, ares
, ares
);
158 LeaveCriticalSection (&ares_lock
);
160 busy
= (int) InterlockedCompareExchange (&busy_worker_threads
, 0, -1);
161 worker
= (int) InterlockedCompareExchange (&mono_worker_threads
, 0, -1);
162 if (worker
<= ++busy
&&
163 worker
< mono_max_worker_threads
) {
164 InterlockedIncrement (&mono_worker_threads
);
165 InterlockedIncrement (&busy_worker_threads
);
166 mono_thread_create (domain
, async_invoke_thread
, ares
);
169 ReleaseSemaphore (job_added
, 1, NULL
);
176 mono_thread_pool_finish (MonoAsyncResult
*ares
, MonoArray
**out_args
, MonoObject
**exc
)
183 /* check if already finished */
184 mono_monitor_enter ((MonoObject
*) ares
);
186 if (ares
->endinvoke_called
) {
187 *exc
= (MonoObject
*)mono_exception_from_name (mono_defaults
.corlib
, "System",
188 "InvalidOperationException");
189 mono_monitor_exit ((MonoObject
*) ares
);
193 ares
->endinvoke_called
= 1;
194 ac
= (ASyncCall
*)ares
->data
;
196 g_assert (ac
!= NULL
);
198 /* wait until we are really finished */
199 if (!ares
->completed
) {
200 if (ares
->handle
== NULL
) {
201 ac
->wait_event
= CreateEvent (NULL
, TRUE
, FALSE
, NULL
);
202 ares
->handle
= (MonoObject
*) mono_wait_handle_new (mono_object_domain (ares
), ac
->wait_event
);
204 mono_monitor_exit ((MonoObject
*) ares
);
205 WaitForSingleObjectEx (ac
->wait_event
, INFINITE
, TRUE
);
207 mono_monitor_exit ((MonoObject
*) ares
);
211 *out_args
= ac
->out_args
;
217 mono_thread_pool_cleanup (void)
221 EnterCriticalSection (&mono_delegate_section
);
222 g_list_free (async_call_queue
);
223 async_call_queue
= NULL
;
224 release
= (gint
) InterlockedCompareExchange (&busy_worker_threads
, 0, -1);
225 LeaveCriticalSection (&mono_delegate_section
);
227 ReleaseSemaphore (job_added
, release
, NULL
);
231 append_job (MonoAsyncResult
*ar
)
235 EnterCriticalSection (&mono_delegate_section
);
236 if (async_call_queue
== NULL
) {
237 async_call_queue
= g_list_append (async_call_queue
, ar
);
239 for (tmp
= async_call_queue
; tmp
&& tmp
->data
!= NULL
; tmp
= tmp
->next
);
241 async_call_queue
= g_list_append (async_call_queue
, ar
);
246 LeaveCriticalSection (&mono_delegate_section
);
249 static MonoAsyncResult
*
252 MonoAsyncResult
*ar
= NULL
;
255 EnterCriticalSection (&mono_delegate_section
);
256 tmp
= async_call_queue
;
258 ar
= (MonoAsyncResult
*) tmp
->data
;
261 for (tmp2
= tmp
; tmp2
->next
!= NULL
; tmp2
= tmp2
->next
);
263 async_call_queue
= tmp
->next
;
269 LeaveCriticalSection (&mono_delegate_section
);
275 async_invoke_thread (gpointer data
)
281 thread
= mono_thread_current ();
282 thread
->threadpool_thread
= TRUE
;
283 thread
->state
|= ThreadState_Background
;
288 ar
= (MonoAsyncResult
*) data
;
290 /* worker threads invokes methods in different domains,
291 * so we need to set the right domain here */
292 domain
= ((MonoObject
*)ar
)->vtable
->domain
;
293 if (mono_domain_set (domain
, FALSE
)) {
294 mono_thread_push_appdomain_ref (domain
);
295 mono_async_invoke (ar
);
296 mono_thread_pop_appdomain_ref ();
298 InterlockedDecrement (&busy_worker_threads
);
301 data
= dequeue_job ();
306 guint32 start_time
= GetTickCount ();
309 wr
= WaitForSingleObjectEx (job_added
, (guint32
)timeout
, TRUE
);
310 if ((thread
->state
& ThreadState_StopRequested
)!=0)
311 mono_thread_interruption_checkpoint ();
313 timeout
-= GetTickCount () - start_time
;
315 if (wr
!= WAIT_TIMEOUT
)
316 data
= dequeue_job ();
318 while (!data
&& timeout
> 0);
322 workers
= (int) InterlockedCompareExchange (&mono_worker_threads
, 0, -1);
323 min
= (int) InterlockedCompareExchange (&mono_min_worker_threads
, 0, -1);
325 while (!data
&& workers
<= min
) {
326 WaitForSingleObjectEx (job_added
, INFINITE
, TRUE
);
327 if ((thread
->state
& ThreadState_StopRequested
)!=0)
328 mono_thread_interruption_checkpoint ();
330 data
= dequeue_job ();
331 workers
= (int) InterlockedCompareExchange (&mono_worker_threads
, 0, -1);
332 min
= (int) InterlockedCompareExchange (&mono_min_worker_threads
, 0, -1);
337 InterlockedDecrement (&mono_worker_threads
);
341 InterlockedIncrement (&busy_worker_threads
);
344 g_assert_not_reached ();
348 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint
*workerThreads
, gint
*completionPortThreads
)
354 busy
= (gint
) InterlockedCompareExchange (&busy_worker_threads
, 0, -1);
355 *workerThreads
= mono_max_worker_threads
- busy
;
356 *completionPortThreads
= 0;
360 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint
*workerThreads
, gint
*completionPortThreads
)
364 *workerThreads
= mono_max_worker_threads
;
365 *completionPortThreads
= 0;
369 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint
*workerThreads
, gint
*completionPortThreads
)
375 workers
= (gint
) InterlockedCompareExchange (&mono_min_worker_threads
, 0, -1);
376 *workerThreads
= workers
;
377 *completionPortThreads
= 0;
381 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads
, gint completionPortThreads
)
385 if (workerThreads
< 0 || workerThreads
> mono_max_worker_threads
)
387 InterlockedExchange (&mono_min_worker_threads
, workerThreads
);
388 /* FIXME: should actually start the idle threads if needed */
393 overlapped_callback (guint32 error
, guint32 numbytes
, WapiOverlapped
*overlapped
)
395 MonoFSAsyncResult
*ares
;
400 ares
= (MonoFSAsyncResult
*) overlapped
->handle1
;
401 ares
->completed
= TRUE
;
402 if (ares
->bytes_read
!= -1)
403 ares
->bytes_read
= numbytes
;
405 ares
->count
= numbytes
;
407 thread
= mono_thread_attach (mono_object_domain (ares
));
408 if (ares
->async_callback
!= NULL
) {
412 mono_runtime_invoke (ares
->async_callback
->method_info
->method
, NULL
, p
, NULL
);
415 SetEvent (ares
->wait_handle
->handle
);
416 mono_thread_detach (thread
);
421 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle
)
425 #ifdef PLATFORM_WIN32
428 if (!BindIoCompletionCallback (handle
, overlapped_callback
, 0)) {
429 gint error
= GetLastError ();
433 if (error
== ERROR_INVALID_PARAMETER
) {
434 exc
= mono_get_exception_argument (NULL
, "Invalid parameter.");
436 msg
= g_strdup_printf ("Win32 error %d.", error
);
437 exc
= mono_exception_from_name_msg (mono_defaults
.corlib
,
439 "ApplicationException", msg
);
443 mono_raise_exception (exc
);