2005-01-24 Ben Maurer <bmaurer@ximian.com>
[mono-project.git] / mono / metadata / threadpool.c
blobc4a764e02ea9e5d95274677f58aedacb1a03de6f
1 /*
2 * threadpool.c: global thread pool
4 * Authors:
5 * Dietmar Maurer (dietmar@ximian.com)
6 * Gonzalo Paniagua Javier (gonzalo@ximian.com)
8 * (C) 2001-2003 Ximian, Inc.
9 * (c) 2004 Novell, Inc. (http://www.novell.com)
12 #include <config.h>
13 #include <glib.h>
15 #ifdef PLATFORM_WIN32
16 #define WINVER 0x0500
17 #define _WIN32_WINNT 0x0500
18 #define THREADS_PER_CPU 25
19 #else
20 #define THREADS_PER_CPU 50
21 #endif
23 #include <mono/metadata/domain-internals.h>
24 #include <mono/metadata/tabledefs.h>
25 #include <mono/metadata/threads.h>
26 #include <mono/metadata/threads-types.h>
27 #include <mono/metadata/exception.h>
28 #include <mono/metadata/file-io.h>
29 #include <mono/metadata/monitor.h>
30 #include <mono/metadata/marshal.h>
31 #include <mono/io-layer/io-layer.h>
32 #include <mono/os/gc_wrapper.h>
34 #include "threadpool.h"
36 /* maximum number of worker threads */
37 int mono_max_worker_threads = THREADS_PER_CPU;
38 static int mono_min_worker_threads = 0;
40 /* current number of worker threads */
41 static int mono_worker_threads = 0;
43 /* current number of busy threads */
44 static int busy_worker_threads = 0;
46 /* mono_thread_pool_init called */
47 static int tp_inited;
49 /* we use this to store a reference to the AsyncResult to avoid GC */
50 static MonoGHashTable *ares_htable = NULL;
52 static CRITICAL_SECTION ares_lock;
54 /* we append a job */
55 static HANDLE job_added;
57 typedef struct {
58 MonoMethodMessage *msg;
59 HANDLE wait_event;
60 MonoMethod *cb_method;
61 MonoDelegate *cb_target;
62 MonoObject *state;
63 MonoObject *res;
64 MonoArray *out_args;
65 } ASyncCall;
67 static void async_invoke_thread (gpointer data);
68 static void append_job (MonoAsyncResult *ar);
70 static GList *async_call_queue = NULL;
72 static void
73 mono_async_invoke (MonoAsyncResult *ares)
75 ASyncCall *ac = (ASyncCall *)ares->data;
77 ac->msg->exc = NULL;
78 ac->res = mono_message_invoke (ares->async_delegate, ac->msg,
79 &ac->msg->exc, &ac->out_args);
81 ares->completed = 1;
83 /* call async callback if cb_method != null*/
84 if (ac->cb_method) {
85 MonoObject *exc = NULL;
86 void *pa = &ares;
87 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
88 if (!ac->msg->exc)
89 ac->msg->exc = exc;
92 /* notify listeners */
93 mono_monitor_enter ((MonoObject *) ares);
95 if (ares->handle != NULL) {
96 ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
97 SetEvent (ac->wait_event);
99 mono_monitor_exit ((MonoObject *) ares);
101 EnterCriticalSection (&ares_lock);
102 mono_g_hash_table_remove (ares_htable, ares);
103 LeaveCriticalSection (&ares_lock);
106 void
107 mono_thread_pool_init ()
109 SYSTEM_INFO info;
110 int threads_per_cpu = THREADS_PER_CPU;
112 if ((int) InterlockedCompareExchange (&tp_inited, 1, 0) == 1)
113 return;
115 MONO_GC_REGISTER_ROOT (ares_htable);
116 InitializeCriticalSection (&ares_lock);
117 ares_htable = mono_g_hash_table_new (NULL, NULL);
118 job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
119 GetSystemInfo (&info);
120 if (getenv ("MONO_THREADS_PER_CPU") != NULL) {
121 threads_per_cpu = atoi (getenv ("MONO_THREADS_PER_CPU"));
122 if (threads_per_cpu <= 0)
123 threads_per_cpu = THREADS_PER_CPU;
126 mono_max_worker_threads = threads_per_cpu * info.dwNumberOfProcessors;
129 MonoAsyncResult *
130 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
131 MonoObject *state)
133 MonoDomain *domain = mono_domain_get ();
134 MonoAsyncResult *ares;
135 ASyncCall *ac;
136 int busy, worker;
138 #ifdef HAVE_BOEHM_GC
139 ac = GC_MALLOC (sizeof (ASyncCall));
140 #else
141 /* We'll leak the event if creaated... */
142 ac = g_new0 (ASyncCall, 1);
143 #endif
144 ac->wait_event = NULL;
145 ac->msg = msg;
146 ac->state = state;
148 if (async_callback) {
149 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
150 ac->cb_target = async_callback;
153 ares = mono_async_result_new (domain, NULL, ac->state, ac);
154 ares->async_delegate = target;
156 EnterCriticalSection (&ares_lock);
157 mono_g_hash_table_insert (ares_htable, ares, ares);
158 LeaveCriticalSection (&ares_lock);
160 busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
161 worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
162 if (worker <= ++busy &&
163 worker < mono_max_worker_threads) {
164 InterlockedIncrement (&mono_worker_threads);
165 InterlockedIncrement (&busy_worker_threads);
166 mono_thread_create (domain, async_invoke_thread, ares);
167 } else {
168 append_job (ares);
169 ReleaseSemaphore (job_added, 1, NULL);
172 return ares;
175 MonoObject *
176 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
178 ASyncCall *ac;
180 *exc = NULL;
181 *out_args = NULL;
183 /* check if already finished */
184 mono_monitor_enter ((MonoObject *) ares);
186 if (ares->endinvoke_called) {
187 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System",
188 "InvalidOperationException");
189 mono_monitor_exit ((MonoObject *) ares);
190 return NULL;
193 ares->endinvoke_called = 1;
194 ac = (ASyncCall *)ares->data;
196 g_assert (ac != NULL);
198 /* wait until we are really finished */
199 if (!ares->completed) {
200 if (ares->handle == NULL) {
201 ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
202 ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
204 mono_monitor_exit ((MonoObject *) ares);
205 WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
206 } else {
207 mono_monitor_exit ((MonoObject *) ares);
210 *exc = ac->msg->exc;
211 *out_args = ac->out_args;
213 return ac->res;
216 void
217 mono_thread_pool_cleanup (void)
219 gint release;
221 EnterCriticalSection (&mono_delegate_section);
222 g_list_free (async_call_queue);
223 async_call_queue = NULL;
224 release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
225 LeaveCriticalSection (&mono_delegate_section);
226 if (job_added)
227 ReleaseSemaphore (job_added, release, NULL);
230 static void
231 append_job (MonoAsyncResult *ar)
233 GList *tmp;
235 EnterCriticalSection (&mono_delegate_section);
236 if (async_call_queue == NULL) {
237 async_call_queue = g_list_append (async_call_queue, ar);
238 } else {
239 for (tmp = async_call_queue; tmp && tmp->data != NULL; tmp = tmp->next);
240 if (tmp == NULL) {
241 async_call_queue = g_list_append (async_call_queue, ar);
242 } else {
243 tmp->data = ar;
246 LeaveCriticalSection (&mono_delegate_section);
249 static MonoAsyncResult *
250 dequeue_job (void)
252 MonoAsyncResult *ar = NULL;
253 GList *tmp, *tmp2;
255 EnterCriticalSection (&mono_delegate_section);
256 tmp = async_call_queue;
257 if (tmp) {
258 ar = (MonoAsyncResult *) tmp->data;
259 tmp->data = NULL;
260 tmp2 = tmp;
261 for (tmp2 = tmp; tmp2->next != NULL; tmp2 = tmp2->next);
262 if (tmp2 != tmp) {
263 async_call_queue = tmp->next;
264 tmp->next = NULL;
265 tmp2->next = tmp;
266 tmp->prev = tmp2;
269 LeaveCriticalSection (&mono_delegate_section);
271 return ar;
274 static void
275 async_invoke_thread (gpointer data)
277 MonoDomain *domain;
278 MonoThread *thread;
279 int workers, min;
281 thread = mono_thread_current ();
282 thread->threadpool_thread = TRUE;
283 thread->state |= ThreadState_Background;
285 for (;;) {
286 MonoAsyncResult *ar;
288 ar = (MonoAsyncResult *) data;
289 if (ar) {
290 /* worker threads invokes methods in different domains,
291 * so we need to set the right domain here */
292 domain = ((MonoObject *)ar)->vtable->domain;
293 if (mono_domain_set (domain, FALSE)) {
294 mono_thread_push_appdomain_ref (domain);
295 mono_async_invoke (ar);
296 mono_thread_pop_appdomain_ref ();
298 InterlockedDecrement (&busy_worker_threads);
301 data = dequeue_job ();
303 if (!data) {
304 guint32 wr;
305 int timeout = 10000;
306 guint32 start_time = GetTickCount ();
308 do {
309 wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
310 if ((thread->state & ThreadState_StopRequested)!=0)
311 mono_thread_interruption_checkpoint ();
313 timeout -= GetTickCount () - start_time;
315 if (wr != WAIT_TIMEOUT)
316 data = dequeue_job ();
318 while (!data && timeout > 0);
321 if (!data) {
322 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
323 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
325 while (!data && workers <= min) {
326 WaitForSingleObjectEx (job_added, INFINITE, TRUE);
327 if ((thread->state & ThreadState_StopRequested)!=0)
328 mono_thread_interruption_checkpoint ();
330 data = dequeue_job ();
331 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
332 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
336 if (!data) {
337 InterlockedDecrement (&mono_worker_threads);
338 return;
341 InterlockedIncrement (&busy_worker_threads);
344 g_assert_not_reached ();
347 void
348 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
350 gint busy;
352 MONO_ARCH_SAVE_REGS;
354 busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
355 *workerThreads = mono_max_worker_threads - busy;
356 *completionPortThreads = 0;
359 void
360 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
362 MONO_ARCH_SAVE_REGS;
364 *workerThreads = mono_max_worker_threads;
365 *completionPortThreads = 0;
368 void
369 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
371 gint workers;
373 MONO_ARCH_SAVE_REGS;
375 workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
376 *workerThreads = workers;
377 *completionPortThreads = 0;
380 MonoBoolean
381 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
383 MONO_ARCH_SAVE_REGS;
385 if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
386 return FALSE;
387 InterlockedExchange (&mono_min_worker_threads, workerThreads);
388 /* FIXME: should actually start the idle threads if needed */
389 return TRUE;
392 static void
393 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
395 MonoFSAsyncResult *ares;
396 MonoThread *thread;
398 MONO_ARCH_SAVE_REGS;
400 ares = (MonoFSAsyncResult *) overlapped->handle1;
401 ares->completed = TRUE;
402 if (ares->bytes_read != -1)
403 ares->bytes_read = numbytes;
404 else
405 ares->count = numbytes;
407 thread = mono_thread_attach (mono_object_domain (ares));
408 if (ares->async_callback != NULL) {
409 gpointer p [1];
411 *p = ares;
412 mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
415 SetEvent (ares->wait_handle->handle);
416 mono_thread_detach (thread);
417 g_free (overlapped);
420 MonoBoolean
421 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
423 MONO_ARCH_SAVE_REGS;
425 #ifdef PLATFORM_WIN32
426 return FALSE;
427 #else
428 if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
429 gint error = GetLastError ();
430 MonoException *exc;
431 gchar *msg;
433 if (error == ERROR_INVALID_PARAMETER) {
434 exc = mono_get_exception_argument (NULL, "Invalid parameter.");
435 } else {
436 msg = g_strdup_printf ("Win32 error %d.", error);
437 exc = mono_exception_from_name_msg (mono_defaults.corlib,
438 "System",
439 "ApplicationException", msg);
440 g_free (msg);
443 mono_raise_exception (exc);
446 return TRUE;
447 #endif