2 * threadpool.c: global thread pool
5 * Dietmar Maurer (dietmar@ximian.com)
6 * Gonzalo Paniagua Javier (gonzalo@ximian.com)
8 * (C) 2001-2003 Ximian, Inc.
9 * (c) 2004,2005 Novell, Inc. (http://www.novell.com)
17 #define _WIN32_WINNT 0x0500
20 #define THREADS_PER_CPU 5 /* 20 + THREADS_PER_CPU * number of CPUs */
22 #include <mono/metadata/domain-internals.h>
23 #include <mono/metadata/tabledefs.h>
24 #include <mono/metadata/threads.h>
25 #include <mono/metadata/threads-types.h>
26 #include <mono/metadata/threadpool-internals.h>
27 #include <mono/metadata/exception.h>
28 #include <mono/metadata/file-io.h>
29 #include <mono/metadata/monitor.h>
30 #include <mono/metadata/mono-mlist.h>
31 #include <mono/metadata/marshal.h>
32 #include <mono/metadata/socket-io.h>
33 #include <mono/io-layer/io-layer.h>
34 #include <mono/os/gc_wrapper.h>
37 #include <sys/types.h>
42 #include <mono/utils/mono-poll.h>
44 #include <sys/epoll.h>
47 #include "mono/io-layer/socket-wrappers.h"
49 #include "threadpool.h"
51 #define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \
52 ThreadState_SuspendRequested)) != 0)
56 /* maximum number of worker threads */
57 static int mono_max_worker_threads
;
58 static int mono_min_worker_threads
;
59 static int mono_io_max_worker_threads
;
61 /* current number of worker threads */
62 static int mono_worker_threads
= 0;
63 static int io_worker_threads
= 0;
65 /* current number of busy threads */
66 static int busy_worker_threads
= 0;
67 static int busy_io_worker_threads
;
69 /* mono_thread_pool_init called */
72 /* we use this to store a reference to the AsyncResult to avoid GC */
73 static MonoGHashTable
*ares_htable
= NULL
;
75 static CRITICAL_SECTION ares_lock
;
76 static CRITICAL_SECTION io_queue_lock
;
77 static int pending_io_items
;
80 CRITICAL_SECTION io_lock
; /* access to sock_to_state */
83 MonoGHashTable
*sock_to_state
;
85 HANDLE new_sem
; /* access to newpfd and write side of the pipe */
87 gboolean epoll_disabled
;
93 static SocketIOData socket_io_data
;
96 static HANDLE job_added
;
97 static HANDLE io_job_added
;
99 /* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */
102 MonoMethodMessage
*msg
;
103 MonoMethod
*cb_method
;
104 MonoDelegate
*cb_target
;
108 /* This is a HANDLE, we use guint64 so the managed object layout remains constant */
118 static void async_invoke_thread (gpointer data
);
119 static void append_job (CRITICAL_SECTION
*cs
, TPQueue
*list
, MonoObject
*ar
);
120 static void start_thread_or_queue (MonoAsyncResult
*ares
);
121 static void mono_async_invoke (MonoAsyncResult
*ares
);
122 static MonoObject
* dequeue_job (CRITICAL_SECTION
*cs
, TPQueue
*list
);
123 static void free_queue (TPQueue
*list
);
125 static TPQueue async_call_queue
= {NULL
, 0, 0};
126 static TPQueue async_io_queue
= {NULL
, 0, 0};
128 static MonoClass
*async_call_klass
;
129 static MonoClass
*socket_async_call_klass
;
130 static MonoClass
*process_async_call_klass
;
132 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
141 AIO_OP_RECV_JUST_CALLBACK
,
142 AIO_OP_SEND_JUST_CALLBACK
,
148 socket_io_cleanup (SocketIOData
*data
)
152 if (data
->inited
== 0)
155 EnterCriticalSection (&data
->io_lock
);
157 #ifdef PLATFORM_WIN32
158 closesocket (data
->pipe
[0]);
159 closesocket (data
->pipe
[1]);
161 close (data
->pipe
[0]);
162 close (data
->pipe
[1]);
167 CloseHandle (data
->new_sem
);
168 data
->new_sem
= NULL
;
169 mono_g_hash_table_destroy (data
->sock_to_state
);
170 data
->sock_to_state
= NULL
;
171 free_queue (&async_io_queue
);
172 release
= (gint
) InterlockedCompareExchange (&io_worker_threads
, 0, -1);
174 ReleaseSemaphore (io_job_added
, release
, NULL
);
175 g_free (data
->newpfd
);
178 if (FALSE
== data
->epoll_disabled
)
179 close (data
->epollfd
);
181 LeaveCriticalSection (&data
->io_lock
);
185 get_event_from_state (MonoSocketAsyncResult
*state
)
187 switch (state
->operation
) {
190 case AIO_OP_RECV_JUST_CALLBACK
:
191 case AIO_OP_RECEIVEFROM
:
192 case AIO_OP_READPIPE
:
195 case AIO_OP_SEND_JUST_CALLBACK
:
199 default: /* Should never happen */
200 g_print ("get_event_from_state: unknown value in switch!!!\n");
206 get_events_from_list (MonoMList
*list
)
208 MonoSocketAsyncResult
*state
;
211 while (list
&& (state
= (MonoSocketAsyncResult
*)mono_mlist_get_data (list
))) {
212 events
|= get_event_from_state (state
);
213 list
= mono_mlist_next (list
);
219 #define ICALL_RECV(x) ves_icall_System_Net_Sockets_Socket_Receive_internal (\
220 (SOCKET) x->handle, x->buffer, x->offset, x->size,\
221 x->socket_flags, &x->error);
223 #define ICALL_SEND(x) ves_icall_System_Net_Sockets_Socket_Send_internal (\
224 (SOCKET) x->handle, x->buffer, x->offset, x->size,\
225 x->socket_flags, &x->error);
228 async_invoke_io_thread (gpointer data
)
232 thread
= mono_thread_current ();
233 thread
->threadpool_thread
= TRUE
;
234 ves_icall_System_Threading_Thread_SetState (thread
, ThreadState_Background
);
237 MonoSocketAsyncResult
*state
;
240 state
= (MonoSocketAsyncResult
*) data
;
242 InterlockedDecrement (&pending_io_items
);
244 switch (state
->operation
) {
246 state
->total
= ICALL_RECV (state
);
249 state
->total
= ICALL_SEND (state
);
253 /* worker threads invokes methods in different domains,
254 * so we need to set the right domain here */
255 domain
= ((MonoObject
*)ar
)->vtable
->domain
;
256 mono_thread_push_appdomain_ref (domain
);
257 if (mono_domain_set (domain
, FALSE
)) {
260 mono_async_invoke (ar
);
261 ac
= (ASyncCall
*) ar
->object_data
;
263 if (ac->msg->exc != NULL)
264 mono_unhandled_exception (ac->msg->exc);
266 mono_domain_set (mono_get_root_domain (), TRUE
);
268 mono_thread_pop_appdomain_ref ();
269 InterlockedDecrement (&busy_io_worker_threads
);
272 data
= dequeue_job (&io_queue_lock
, &async_io_queue
);
277 guint32 start_time
= GetTickCount ();
280 wr
= WaitForSingleObjectEx (io_job_added
, (guint32
)timeout
, TRUE
);
281 if (THREAD_WANTS_A_BREAK (thread
))
282 mono_thread_interruption_checkpoint ();
284 timeout
-= GetTickCount () - start_time
;
286 if (wr
!= WAIT_TIMEOUT
)
287 data
= dequeue_job (&io_queue_lock
, &async_io_queue
);
289 while (!data
&& timeout
> 0);
293 if (InterlockedDecrement (&io_worker_threads
) < 2) {
294 /* If we have pending items, keep the thread alive */
295 if (InterlockedCompareExchange (&pending_io_items
, 0, 0) != 0) {
296 InterlockedIncrement (&io_worker_threads
);
303 InterlockedIncrement (&busy_io_worker_threads
);
306 g_assert_not_reached ();
310 start_io_thread_or_queue (MonoSocketAsyncResult
*ares
)
315 busy
= (int) InterlockedCompareExchange (&busy_io_worker_threads
, 0, -1);
316 worker
= (int) InterlockedCompareExchange (&io_worker_threads
, 0, -1);
317 if (worker
<= ++busy
&&
318 worker
< mono_io_max_worker_threads
) {
319 InterlockedIncrement (&busy_io_worker_threads
);
320 InterlockedIncrement (&io_worker_threads
);
321 domain
= ((ares
) ? ((MonoObject
*) ares
)->vtable
->domain
: mono_domain_get ());
322 mono_thread_create (mono_get_root_domain (), async_invoke_io_thread
, ares
);
324 append_job (&io_queue_lock
, &async_io_queue
, (MonoObject
*)ares
);
325 ReleaseSemaphore (io_job_added
, 1, NULL
);
330 process_io_event (MonoMList
*list
, int event
)
332 MonoSocketAsyncResult
*state
;
338 state
= (MonoSocketAsyncResult
*) mono_mlist_get_data (list
);
339 if (get_event_from_state (state
) == event
)
342 list
= mono_mlist_next (list
);
346 oldlist
= mono_mlist_remove_item (oldlist
, list
);
348 g_print ("Dispatching event %d on socket %d\n", event
, state
->handle
);
350 InterlockedIncrement (&pending_io_items
);
351 start_io_thread_or_queue (state
);
358 mark_bad_fds (mono_pollfd
*pfds
, int nfds
)
364 for (i
= 0; i
< nfds
; i
++) {
369 ret
= mono_poll (pfd
, 1, 0);
370 if (ret
== -1 && errno
== EBADF
) {
371 pfd
->revents
|= MONO_POLLNVAL
;
373 } else if (ret
== 1) {
382 socket_io_poll_main (gpointer p
)
384 #define INITIAL_POLLFD_SIZE 1024
385 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
386 SocketIOData
*data
= p
;
393 thread
= mono_thread_current ();
394 thread
->threadpool_thread
= TRUE
;
395 ves_icall_System_Threading_Thread_SetState (thread
, ThreadState_Background
);
397 allocated
= INITIAL_POLLFD_SIZE
;
398 pfds
= g_new0 (mono_pollfd
, allocated
);
399 INIT_POLLFD (pfds
, data
->pipe
[0], MONO_POLLIN
);
400 for (i
= 1; i
< allocated
; i
++)
401 INIT_POLLFD (&pfds
[i
], -1, 0);
411 if (THREAD_WANTS_A_BREAK (thread
))
412 mono_thread_interruption_checkpoint ();
415 nsock
= mono_poll (pfds
, maxfd
, -1);
416 } while (nsock
== -1 && errno
== EINTR
);
419 * Apart from EINTR, we only check EBADF, for the rest:
420 * EINVAL: mono_poll() 'protects' us from descriptor
421 * numbers above the limit if using select() by marking
422 * then as MONO_POLLERR. If a system poll() is being
423 * used, the number of descriptor we're passing will not
424 * be over sysconf(_SC_OPEN_MAX), as the error would have
425 * happened when opening.
427 * EFAULT: we own the memory pointed by pfds.
428 * ENOMEM: we're doomed anyway
432 if (nsock
== -1 && errno
== EBADF
) {
433 pfds
->revents
= 0; /* Just in case... */
434 nsock
= mark_bad_fds (pfds
, maxfd
);
437 if ((pfds
->revents
& POLL_ERRORS
) != 0) {
438 /* We're supposed to die now, as the pipe has been closed */
440 socket_io_cleanup (data
);
444 /* Got a new socket */
445 if ((pfds
->revents
& MONO_POLLIN
) != 0) {
448 for (i
= 1; i
< allocated
; i
++) {
450 if (pfd
->fd
== -1 || pfd
->fd
== data
->newpfd
->fd
)
454 if (i
== allocated
) {
459 allocated
= allocated
* 2;
460 pfds
= g_renew (mono_pollfd
, oldfd
, allocated
);
462 for (; i
< allocated
; i
++)
463 INIT_POLLFD (&pfds
[i
], -1, 0);
465 #ifndef PLATFORM_WIN32
466 nread
= read (data
->pipe
[0], one
, 1);
468 nread
= recv ((SOCKET
) data
->pipe
[0], one
, 1, 0);
472 return; /* we're closed */
475 INIT_POLLFD (&pfds
[i
], data
->newpfd
->fd
, data
->newpfd
->events
);
476 ReleaseSemaphore (data
->new_sem
, 1, NULL
);
485 EnterCriticalSection (&data
->io_lock
);
486 if (data
->inited
== 0) {
488 LeaveCriticalSection (&data
->io_lock
);
489 return; /* cleanup called */
492 for (i
= 1; i
< maxfd
&& nsock
> 0; i
++) {
494 if (pfd
->fd
== -1 || pfd
->revents
== 0)
498 list
= mono_g_hash_table_lookup (data
->sock_to_state
, GINT_TO_POINTER (pfd
->fd
));
499 if (list
!= NULL
&& (pfd
->revents
& (MONO_POLLIN
| POLL_ERRORS
)) != 0) {
500 list
= process_io_event (list
, MONO_POLLIN
);
503 if (list
!= NULL
&& (pfd
->revents
& (MONO_POLLOUT
| POLL_ERRORS
)) != 0) {
504 list
= process_io_event (list
, MONO_POLLOUT
);
508 mono_g_hash_table_replace (data
->sock_to_state
, GINT_TO_POINTER (pfd
->fd
), list
);
509 pfd
->events
= get_events_from_list (list
);
511 mono_g_hash_table_remove (data
->sock_to_state
, GINT_TO_POINTER (pfd
->fd
));
517 LeaveCriticalSection (&data
->io_lock
);
522 #define EPOLL_ERRORS (EPOLLERR | EPOLLHUP)
524 socket_io_epoll_main (gpointer p
)
529 struct epoll_event
*events
, *evt
;
530 const int nevents
= 512;
534 epollfd
= data
->epollfd
;
535 thread
= mono_thread_current ();
536 thread
->threadpool_thread
= TRUE
;
537 ves_icall_System_Threading_Thread_SetState (thread
, ThreadState_Background
);
538 events
= g_new0 (struct epoll_event
, nevents
);
543 if (THREAD_WANTS_A_BREAK (thread
))
544 mono_thread_interruption_checkpoint ();
547 g_print ("epoll_wait init\n");
549 ready
= epoll_wait (epollfd
, events
, nevents
, -1);
553 g_print ("epoll_wait end with %d ready sockets (%d %s).\n", ready
, err
, (err
) ? g_strerror (err
) : "");
557 } while (ready
== -1 && errno
== EINTR
);
563 g_warning ("epoll_wait: %d %s\n", err
, g_strerror (err
));
569 EnterCriticalSection (&data
->io_lock
);
570 if (data
->inited
== 0) {
572 g_print ("data->inited == 0\n");
576 return; /* cleanup called */
579 for (i
= 0; i
< ready
; i
++) {
585 list
= mono_g_hash_table_lookup (data
->sock_to_state
, GINT_TO_POINTER (fd
));
587 g_print ("Event %d on %d list length: %d\n", evt
->events
, fd
, mono_mlist_length (list
));
589 if (list
!= NULL
&& (evt
->events
& (EPOLLIN
| EPOLL_ERRORS
)) != 0) {
590 list
= process_io_event (list
, MONO_POLLIN
);
593 if (list
!= NULL
&& (evt
->events
& (EPOLLOUT
| EPOLL_ERRORS
)) != 0) {
594 list
= process_io_event (list
, MONO_POLLOUT
);
598 mono_g_hash_table_replace (data
->sock_to_state
, GINT_TO_POINTER (fd
), list
);
599 evt
->events
= get_events_from_list (list
);
601 g_print ("MOD %d to %d\n", fd
, evt
->events
);
603 if (epoll_ctl (epollfd
, EPOLL_CTL_MOD
, fd
, evt
)) {
604 if (epoll_ctl (epollfd
, EPOLL_CTL_ADD
, fd
, evt
) == -1) {
607 g_message ("epoll_ctl(MOD): %d %s fd: %d events: %d", err
, g_strerror (err
), fd
, evt
->events
);
613 mono_g_hash_table_remove (data
->sock_to_state
, GINT_TO_POINTER (fd
));
615 g_print ("DEL %d\n", fd
);
617 epoll_ctl (epollfd
, EPOLL_CTL_DEL
, fd
, evt
);
620 LeaveCriticalSection (&data
->io_lock
);
626 * select/poll wake up when a socket is closed, but epoll just removes
627 * the socket from its internal list without notification.
630 mono_thread_pool_remove_socket (int sock
)
633 MonoMList
*list
, *next
;
634 MonoSocketAsyncResult
*state
;
636 if (socket_io_data
.epoll_disabled
== TRUE
|| socket_io_data
.inited
== FALSE
)
639 EnterCriticalSection (&socket_io_data
.io_lock
);
640 list
= mono_g_hash_table_lookup (socket_io_data
.sock_to_state
, GINT_TO_POINTER (sock
));
642 mono_g_hash_table_remove (socket_io_data
.sock_to_state
, GINT_TO_POINTER (sock
));
644 LeaveCriticalSection (&socket_io_data
.io_lock
);
647 state
= (MonoSocketAsyncResult
*) mono_mlist_get_data (list
);
648 if (state
->operation
== AIO_OP_RECEIVE
)
649 state
->operation
= AIO_OP_RECV_JUST_CALLBACK
;
650 else if (state
->operation
== AIO_OP_SEND
)
651 state
->operation
= AIO_OP_SEND_JUST_CALLBACK
;
653 next
= mono_mlist_remove_item (list
, list
);
654 list
= process_io_event (list
, MONO_POLLIN
);
656 process_io_event (list
, MONO_POLLOUT
);
663 #ifdef PLATFORM_WIN32
665 connect_hack (gpointer x
)
667 struct sockaddr_in
*addr
= (struct sockaddr_in
*) x
;
670 while (connect ((SOCKET
) socket_io_data
.pipe
[1], (SOCKADDR
*) addr
, sizeof (struct sockaddr_in
))) {
673 g_warning ("Error initializing async. sockets %d.\n", WSAGetLastError ());
674 g_assert (WSAGetLastError ());
681 socket_io_init (SocketIOData
*data
)
683 #ifdef PLATFORM_WIN32
684 struct sockaddr_in server
;
685 struct sockaddr_in client
;
691 inited
= InterlockedCompareExchange (&data
->inited
, -1, -1);
695 EnterCriticalSection (&data
->io_lock
);
696 inited
= InterlockedCompareExchange (&data
->inited
, -1, -1);
698 LeaveCriticalSection (&data
->io_lock
);
703 data
->epoll_disabled
= (g_getenv ("MONO_DISABLE_AIO") != NULL
);
704 if (FALSE
== data
->epoll_disabled
) {
705 data
->epollfd
= epoll_create (256);
706 data
->epoll_disabled
= (data
->epollfd
== -1);
707 if (data
->epoll_disabled
&& g_getenv ("MONO_DEBUG"))
708 g_message ("epoll_create() failed. Using plain poll().");
713 data
->epoll_disabled
= TRUE
;
716 #ifndef PLATFORM_WIN32
717 if (data
->epoll_disabled
) {
718 if (pipe (data
->pipe
) != 0) {
728 srv
= socket (AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
729 g_assert (srv
!= INVALID_SOCKET
);
730 data
->pipe
[1] = socket (AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
731 g_assert (data
->pipe
[1] != INVALID_SOCKET
);
733 server
.sin_family
= AF_INET
;
734 server
.sin_addr
.s_addr
= inet_addr ("127.0.0.1");
736 if (bind (srv
, (SOCKADDR
*) &server
, sizeof (server
))) {
737 g_print ("%d\n", WSAGetLastError ());
741 len
= sizeof (server
);
742 getsockname (srv
, (SOCKADDR
*) &server
, &len
);
744 mono_thread_create (mono_get_root_domain (), connect_hack
, &server
);
745 len
= sizeof (server
);
746 data
->pipe
[0] = accept (srv
, (SOCKADDR
*) &client
, &len
);
747 g_assert (data
->pipe
[0] != INVALID_SOCKET
);
750 mono_io_max_worker_threads
= mono_max_worker_threads
/ 2;
751 if (mono_io_max_worker_threads
< 10)
752 mono_io_max_worker_threads
= 10;
754 data
->sock_to_state
= mono_g_hash_table_new_type (g_direct_hash
, g_direct_equal
, MONO_HASH_VALUE_GC
);
756 if (data
->epoll_disabled
) {
757 data
->new_sem
= CreateSemaphore (NULL
, 1, 1, NULL
);
758 g_assert (data
->new_sem
!= NULL
);
760 io_job_added
= CreateSemaphore (NULL
, 0, 0x7fffffff, NULL
);
761 g_assert (io_job_added
!= NULL
);
762 InitializeCriticalSection (&io_queue_lock
);
763 if (data
->epoll_disabled
) {
764 mono_thread_create (mono_get_root_domain (), socket_io_poll_main
, data
);
768 mono_thread_create (mono_get_root_domain (), socket_io_epoll_main
, data
);
771 InterlockedCompareExchange (&data
->inited
, 1, 0);
772 LeaveCriticalSection (&data
->io_lock
);
776 socket_io_add_poll (MonoSocketAsyncResult
*state
)
781 SocketIOData
*data
= &socket_io_data
;
783 #if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD6) || defined(PLATFORM_WIN32)
784 /* select() for connect() does not work well on the Mac. Bug #75436. */
785 /* Bug #77637 for the BSD 6 case */
786 /* Bug #78888 for the Windows case */
787 if (state
->operation
== AIO_OP_CONNECT
&& state
->blocking
== TRUE
) {
788 start_io_thread_or_queue (state
);
792 WaitForSingleObject (data
->new_sem
, INFINITE
);
793 if (data
->newpfd
== NULL
)
794 data
->newpfd
= g_new0 (mono_pollfd
, 1);
796 EnterCriticalSection (&data
->io_lock
);
797 /* FIXME: 64 bit issue: handle can be a pointer on windows? */
798 list
= mono_g_hash_table_lookup (data
->sock_to_state
, GINT_TO_POINTER (state
->handle
));
800 list
= mono_mlist_alloc ((MonoObject
*)state
);
802 list
= mono_mlist_append (list
, (MonoObject
*)state
);
805 events
= get_events_from_list (list
);
806 INIT_POLLFD (data
->newpfd
, GPOINTER_TO_INT (state
->handle
), events
);
807 mono_g_hash_table_replace (data
->sock_to_state
, GINT_TO_POINTER (state
->handle
), list
);
808 LeaveCriticalSection (&data
->io_lock
);
809 *msg
= (char) state
->operation
;
810 #ifndef PLATFORM_WIN32
811 write (data
->pipe
[1], msg
, 1);
813 send ((SOCKET
) data
->pipe
[1], msg
, 1, 0);
819 socket_io_add_epoll (MonoSocketAsyncResult
*state
)
822 SocketIOData
*data
= &socket_io_data
;
823 struct epoll_event event
;
827 memset (&event
, 0, sizeof (struct epoll_event
));
828 fd
= GPOINTER_TO_INT (state
->handle
);
829 EnterCriticalSection (&data
->io_lock
);
830 list
= mono_g_hash_table_lookup (data
->sock_to_state
, GINT_TO_POINTER (fd
));
832 list
= mono_mlist_alloc ((MonoObject
*)state
);
833 epoll_op
= EPOLL_CTL_ADD
;
835 list
= mono_mlist_append (list
, (MonoObject
*)state
);
836 epoll_op
= EPOLL_CTL_MOD
;
839 ievt
= get_events_from_list (list
);
840 if ((ievt
& MONO_POLLIN
) != 0)
841 event
.events
|= EPOLLIN
;
842 if ((ievt
& MONO_POLLOUT
) != 0)
843 event
.events
|= EPOLLOUT
;
845 mono_g_hash_table_replace (data
->sock_to_state
, state
->handle
, list
);
848 g_print ("%s %d with %d\n", epoll_op
== EPOLL_CTL_ADD
? "ADD" : "MOD", fd
, event
.events
);
850 if (epoll_ctl (data
->epollfd
, epoll_op
, fd
, &event
) == -1) {
852 if (epoll_op
== EPOLL_CTL_ADD
&& err
== EEXIST
) {
853 epoll_op
= EPOLL_CTL_MOD
;
854 if (epoll_ctl (data
->epollfd
, epoll_op
, fd
, &event
) == -1) {
855 g_message ("epoll_ctl(MOD): %d %s\n", err
, g_strerror (err
));
860 LeaveCriticalSection (&data
->io_lock
);
866 socket_io_add (MonoAsyncResult
*ares
, MonoSocketAsyncResult
*state
)
868 socket_io_init (&socket_io_data
);
869 MONO_OBJECT_SETREF (state
, ares
, ares
);
871 if (socket_io_data
.epoll_disabled
== FALSE
) {
872 if (socket_io_add_epoll (state
))
876 socket_io_add_poll (state
);
880 socket_io_filter (MonoObject
*target
, MonoObject
*state
)
883 MonoSocketAsyncResult
*sock_res
= (MonoSocketAsyncResult
*) state
;
886 if (target
== NULL
|| state
== NULL
)
889 if (socket_async_call_klass
== NULL
) {
890 klass
= target
->vtable
->klass
;
891 /* Check if it's SocketAsyncCall in System.Net.Sockets
892 * FIXME: check the assembly is signed correctly for extra care
894 if (klass
->name
[0] == 'S' && strcmp (klass
->name
, "SocketAsyncCall") == 0
895 && strcmp (mono_image_get_name (klass
->image
), "System") == 0
896 && klass
->nested_in
&& strcmp (klass
->nested_in
->name
, "Socket") == 0)
897 socket_async_call_klass
= klass
;
900 if (process_async_call_klass
== NULL
) {
901 klass
= target
->vtable
->klass
;
902 /* Check if it's AsyncReadHandler in System.Diagnostics.Process
903 * FIXME: check the assembly is signed correctly for extra care
905 if (klass
->name
[0] == 'A' && strcmp (klass
->name
, "AsyncReadHandler") == 0
906 && strcmp (mono_image_get_name (klass
->image
), "System") == 0
907 && klass
->nested_in
&& strcmp (klass
->nested_in
->name
, "Process") == 0)
908 process_async_call_klass
= klass
;
910 /* return both when socket_async_call_klass has not been seen yet and when
911 * the object is not an instance of the class.
913 if (target
->vtable
->klass
!= socket_async_call_klass
&& target
->vtable
->klass
!= process_async_call_klass
)
916 op
= sock_res
->operation
;
917 if (op
< AIO_OP_FIRST
|| op
>= AIO_OP_LAST
)
924 mono_async_invoke (MonoAsyncResult
*ares
)
926 ASyncCall
*ac
= (ASyncCall
*)ares
->object_data
;
927 MonoThread
*thread
= NULL
;
928 MonoObject
*res
, *exc
= NULL
;
929 MonoArray
*out_args
= NULL
;
931 if (ares
->execution_context
) {
932 /* use captured ExecutionContext (if available) */
933 thread
= mono_thread_current ();
934 MONO_OBJECT_SETREF (ares
, original_context
, thread
->execution_context
);
935 MONO_OBJECT_SETREF (thread
, execution_context
, ares
->execution_context
);
937 ares
->original_context
= NULL
;
941 res
= mono_message_invoke (ares
->async_delegate
, ac
->msg
, &exc
, &out_args
);
942 MONO_OBJECT_SETREF (ac
, res
, res
);
943 MONO_OBJECT_SETREF (ac
, msg
->exc
, exc
);
944 MONO_OBJECT_SETREF (ac
, out_args
, out_args
);
948 /* call async callback if cb_method != null*/
950 MonoObject
*exc
= NULL
;
952 mono_runtime_invoke (ac
->cb_method
, ac
->cb_target
, pa
, &exc
);
953 /* 'exc' will be the previous ac->msg->exc if not NULL and not
954 * catched. If catched, this will be set to NULL and the
955 * exception will not be printed. */
956 MONO_OBJECT_SETREF (ac
->msg
, exc
, exc
);
959 /* restore original thread execution context if flow isn't suppressed, i.e. non null */
960 if (ares
->original_context
) {
961 MONO_OBJECT_SETREF (thread
, execution_context
, ares
->original_context
);
962 ares
->original_context
= NULL
;
965 /* notify listeners */
966 mono_monitor_enter ((MonoObject
*) ares
);
967 if (ares
->handle
!= NULL
) {
968 ac
->wait_event
= (gsize
) mono_wait_handle_get_handle ((MonoWaitHandle
*) ares
->handle
);
969 SetEvent ((gpointer
)(gsize
)ac
->wait_event
);
971 mono_monitor_exit ((MonoObject
*) ares
);
973 EnterCriticalSection (&ares_lock
);
974 mono_g_hash_table_remove (ares_htable
, ares
);
975 LeaveCriticalSection (&ares_lock
);
979 mono_thread_pool_init ()
982 int threads_per_cpu
= THREADS_PER_CPU
;
984 if ((int) InterlockedCompareExchange (&tp_inited
, 1, 0) == 1)
987 MONO_GC_REGISTER_ROOT (ares_htable
);
988 MONO_GC_REGISTER_ROOT (socket_io_data
.sock_to_state
);
989 InitializeCriticalSection (&socket_io_data
.io_lock
);
990 InitializeCriticalSection (&ares_lock
);
991 ares_htable
= mono_g_hash_table_new_type (NULL
, NULL
, MONO_HASH_KEY_VALUE_GC
);
992 job_added
= CreateSemaphore (NULL
, 0, 0x7fffffff, NULL
);
993 g_assert (job_added
!= NULL
);
994 GetSystemInfo (&info
);
995 if (g_getenv ("MONO_THREADS_PER_CPU") != NULL
) {
996 threads_per_cpu
= atoi (g_getenv ("MONO_THREADS_PER_CPU"));
997 if (threads_per_cpu
<= 0)
998 threads_per_cpu
= THREADS_PER_CPU
;
1001 mono_max_worker_threads
= 20 + threads_per_cpu
* info
.dwNumberOfProcessors
;
1003 async_call_klass
= mono_class_from_name (mono_defaults
.corlib
, "System", "MonoAsyncCall");
1004 g_assert (async_call_klass
);
1008 mono_thread_pool_add (MonoObject
*target
, MonoMethodMessage
*msg
, MonoDelegate
*async_callback
,
1011 MonoDomain
*domain
= mono_domain_get ();
1012 MonoAsyncResult
*ares
;
1015 ac
= (ASyncCall
*)mono_object_new (mono_domain_get (), async_call_klass
);
1016 MONO_OBJECT_SETREF (ac
, msg
, msg
);
1017 MONO_OBJECT_SETREF (ac
, state
, state
);
1019 if (async_callback
) {
1020 ac
->cb_method
= mono_get_delegate_invoke (((MonoObject
*)async_callback
)->vtable
->klass
);
1021 MONO_OBJECT_SETREF (ac
, cb_target
, async_callback
);
1024 ares
= mono_async_result_new (domain
, NULL
, ac
->state
, NULL
, (MonoObject
*)ac
);
1025 MONO_OBJECT_SETREF (ares
, async_delegate
, target
);
1027 EnterCriticalSection (&ares_lock
);
1028 mono_g_hash_table_insert (ares_htable
, ares
, ares
);
1029 LeaveCriticalSection (&ares_lock
);
1031 if (socket_io_filter (target
, state
)) {
1032 socket_io_add (ares
, (MonoSocketAsyncResult
*) state
);
1036 start_thread_or_queue (ares
);
1041 start_thread_or_queue (MonoAsyncResult
*ares
)
1045 busy
= (int) InterlockedCompareExchange (&busy_worker_threads
, 0, -1);
1046 worker
= (int) InterlockedCompareExchange (&mono_worker_threads
, 0, -1);
1047 if (worker
<= ++busy
&&
1048 worker
< mono_max_worker_threads
) {
1049 InterlockedIncrement (&mono_worker_threads
);
1050 InterlockedIncrement (&busy_worker_threads
);
1051 mono_thread_create (mono_get_root_domain (), async_invoke_thread
, ares
);
1053 append_job (&mono_delegate_section
, &async_call_queue
, (MonoObject
*)ares
);
1054 ReleaseSemaphore (job_added
, 1, NULL
);
1059 mono_thread_pool_finish (MonoAsyncResult
*ares
, MonoArray
**out_args
, MonoObject
**exc
)
1066 /* check if already finished */
1067 mono_monitor_enter ((MonoObject
*) ares
);
1069 if (ares
->endinvoke_called
) {
1070 *exc
= (MonoObject
*)mono_exception_from_name (mono_defaults
.corlib
, "System",
1071 "InvalidOperationException");
1072 mono_monitor_exit ((MonoObject
*) ares
);
1076 ares
->endinvoke_called
= 1;
1077 ac
= (ASyncCall
*)ares
->object_data
;
1079 g_assert (ac
!= NULL
);
1081 /* wait until we are really finished */
1082 if (!ares
->completed
) {
1083 if (ares
->handle
== NULL
) {
1084 ac
->wait_event
= (gsize
)CreateEvent (NULL
, TRUE
, FALSE
, NULL
);
1085 g_assert(ac
->wait_event
!= 0);
1086 MONO_OBJECT_SETREF (ares
, handle
, (MonoObject
*) mono_wait_handle_new (mono_object_domain (ares
), (gpointer
)(gsize
)ac
->wait_event
));
1088 mono_monitor_exit ((MonoObject
*) ares
);
1089 WaitForSingleObjectEx ((gpointer
)(gsize
)ac
->wait_event
, INFINITE
, TRUE
);
1091 mono_monitor_exit ((MonoObject
*) ares
);
1094 *exc
= ac
->msg
->exc
; /* FIXME: GC add write barrier */
1095 *out_args
= ac
->out_args
;
1101 mono_thread_pool_cleanup (void)
1105 EnterCriticalSection (&mono_delegate_section
);
1106 free_queue (&async_call_queue
);
1107 release
= (gint
) InterlockedCompareExchange (&mono_worker_threads
, 0, -1);
1108 LeaveCriticalSection (&mono_delegate_section
);
1110 ReleaseSemaphore (job_added
, release
, NULL
);
1112 socket_io_cleanup (&socket_io_data
);
1116 append_job (CRITICAL_SECTION
*cs
, TPQueue
*list
, MonoObject
*ar
)
1118 EnterCriticalSection (cs
);
1119 if (list
->array
&& (list
->next_elem
< mono_array_length (list
->array
))) {
1120 mono_array_setref (list
->array
, list
->next_elem
, ar
);
1122 LeaveCriticalSection (cs
);
1126 MONO_GC_REGISTER_ROOT (list
->array
);
1127 list
->array
= mono_array_new (mono_get_root_domain (), mono_defaults
.object_class
, 16);
1129 int count
= list
->next_elem
- list
->first_elem
;
1130 /* slide the array or create a larger one if it's full */
1131 if (list
->first_elem
) {
1132 mono_array_memcpy_refs (list
->array
, 0, list
->array
, list
->first_elem
, count
);
1134 MonoArray
*newa
= mono_array_new (mono_get_root_domain (), mono_defaults
.object_class
, mono_array_length (list
->array
) * 2);
1135 mono_array_memcpy_refs (newa
, 0, list
->array
, list
->first_elem
, count
);
1138 list
->first_elem
= 0;
1139 list
->next_elem
= count
;
1141 mono_array_setref (list
->array
, list
->next_elem
, ar
);
1143 LeaveCriticalSection (cs
);
1147 dequeue_job (CRITICAL_SECTION
*cs
, TPQueue
*list
)
1152 EnterCriticalSection (cs
);
1153 if (!list
->array
|| list
->first_elem
== list
->next_elem
) {
1154 LeaveCriticalSection (cs
);
1157 ar
= mono_array_get (list
->array
, MonoObject
*, list
->first_elem
);
1159 count
= list
->next_elem
- list
->first_elem
;
1160 /* reduce the size of the array if it's mostly empty */
1161 if (mono_array_length (list
->array
) > 16 && count
< (mono_array_length (list
->array
) / 3)) {
1162 MonoArray
*newa
= mono_array_new (mono_get_root_domain (), mono_defaults
.object_class
, mono_array_length (list
->array
) / 2);
1163 mono_array_memcpy_refs (newa
, 0, list
->array
, list
->first_elem
, count
);
1165 list
->first_elem
= 0;
1166 list
->next_elem
= count
;
1168 LeaveCriticalSection (cs
);
1174 free_queue (TPQueue
*list
)
1177 list
->first_elem
= list
->next_elem
= 0;
1181 async_invoke_thread (gpointer data
)
1187 thread
= mono_thread_current ();
1188 thread
->threadpool_thread
= TRUE
;
1189 ves_icall_System_Threading_Thread_SetState (thread
, ThreadState_Background
);
1192 MonoAsyncResult
*ar
;
1194 ar
= (MonoAsyncResult
*) data
;
1196 /* worker threads invokes methods in different domains,
1197 * so we need to set the right domain here */
1198 domain
= ((MonoObject
*)ar
)->vtable
->domain
;
1199 mono_thread_push_appdomain_ref (domain
);
1200 if (mono_domain_set (domain
, FALSE
)) {
1203 mono_async_invoke (ar
);
1204 ac
= (ASyncCall
*) ar
->object_data
;
1206 if (ac->msg->exc != NULL)
1207 mono_unhandled_exception (ac->msg->exc);
1209 mono_domain_set (mono_get_root_domain (), TRUE
);
1211 mono_thread_pop_appdomain_ref ();
1212 InterlockedDecrement (&busy_worker_threads
);
1215 data
= dequeue_job (&mono_delegate_section
, &async_call_queue
);
1219 int timeout
= 10000;
1220 guint32 start_time
= GetTickCount ();
1223 wr
= WaitForSingleObjectEx (job_added
, (guint32
)timeout
, TRUE
);
1224 if (THREAD_WANTS_A_BREAK (thread
))
1225 mono_thread_interruption_checkpoint ();
1227 timeout
-= GetTickCount () - start_time
;
1229 if (wr
!= WAIT_TIMEOUT
)
1230 data
= dequeue_job (&mono_delegate_section
, &async_call_queue
);
1232 while (!data
&& timeout
> 0);
1236 workers
= (int) InterlockedCompareExchange (&mono_worker_threads
, 0, -1);
1237 min
= (int) InterlockedCompareExchange (&mono_min_worker_threads
, 0, -1);
1239 while (!data
&& workers
<= min
) {
1240 WaitForSingleObjectEx (job_added
, INFINITE
, TRUE
);
1241 if (THREAD_WANTS_A_BREAK (thread
))
1242 mono_thread_interruption_checkpoint ();
1244 data
= dequeue_job (&mono_delegate_section
, &async_call_queue
);
1245 workers
= (int) InterlockedCompareExchange (&mono_worker_threads
, 0, -1);
1246 min
= (int) InterlockedCompareExchange (&mono_min_worker_threads
, 0, -1);
1251 InterlockedDecrement (&mono_worker_threads
);
1255 InterlockedIncrement (&busy_worker_threads
);
1258 g_assert_not_reached ();
1262 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint
*workerThreads
, gint
*completionPortThreads
)
1266 MONO_ARCH_SAVE_REGS
;
1268 busy
= (gint
) InterlockedCompareExchange (&busy_worker_threads
, 0, -1);
1269 *workerThreads
= mono_max_worker_threads
- busy
;
1270 *completionPortThreads
= 0;
1274 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint
*workerThreads
, gint
*completionPortThreads
)
1276 MONO_ARCH_SAVE_REGS
;
1278 *workerThreads
= mono_max_worker_threads
;
1279 *completionPortThreads
= 0;
1283 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint
*workerThreads
, gint
*completionPortThreads
)
1287 MONO_ARCH_SAVE_REGS
;
1289 workers
= (gint
) InterlockedCompareExchange (&mono_min_worker_threads
, 0, -1);
1290 *workerThreads
= workers
;
1291 *completionPortThreads
= 0;
1295 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads
, gint completionPortThreads
)
1297 MONO_ARCH_SAVE_REGS
;
1299 if (workerThreads
< 0 || workerThreads
> mono_max_worker_threads
)
1301 InterlockedExchange (&mono_min_worker_threads
, workerThreads
);
1302 /* FIXME: should actually start the idle threads if needed */