3 * Microsoft IO threadpool runtime support
6 * Ludovic Henry (ludovic.henry@xamarin.com)
8 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
9 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
14 #ifndef DISABLE_SOCKETS
18 #if defined(HOST_WIN32)
25 #include <mono/metadata/gc-internals.h>
26 #include <mono/metadata/mono-mlist.h>
27 #include <mono/metadata/threadpool.h>
28 #include <mono/metadata/threadpool-io.h>
29 #include <mono/utils/atomic.h>
30 #include <mono/utils/mono-threads.h>
31 #include <mono/utils/mono-lazy-init.h>
32 #include <mono/utils/mono-logger-internals.h>
33 #include <mono/utils/w32api.h>
36 gboolean (*init
) (gint wakeup_pipe_fd
);
37 void (*register_fd
) (gint fd
, gint events
, gboolean is_new
);
38 void (*remove_fd
) (gint fd
);
39 gint (*event_wait
) (void (*callback
) (gint fd
, gint events
, gpointer user_data
), gpointer user_data
);
40 } ThreadPoolIOBackend
;
42 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
43 enum MonoIOOperation
{
46 EVENT_ERR
= 1 << 2, /* not in managed */
49 #include "threadpool-io-epoll.c"
50 #include "threadpool-io-kqueue.c"
51 #include "threadpool-io-poll.c"
53 #define UPDATES_CAPACITY 128
55 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
56 struct _MonoIOSelectorJob
{
68 } ThreadPoolIOUpdateType
;
72 MonoIOSelectorJob
*job
;
73 } ThreadPoolIOUpdate_Add
;
77 } ThreadPoolIOUpdate_RemoveSocket
;
81 } ThreadPoolIOUpdate_RemoveDomain
;
84 ThreadPoolIOUpdateType type
;
86 ThreadPoolIOUpdate_Add add
;
87 ThreadPoolIOUpdate_RemoveSocket remove_socket
;
88 ThreadPoolIOUpdate_RemoveDomain remove_domain
;
93 ThreadPoolIOBackend backend
;
95 ThreadPoolIOUpdate updates
[UPDATES_CAPACITY
];
97 MonoCoopMutex updates_lock
;
98 MonoCoopCond updates_cond
;
100 #if !defined(HOST_WIN32)
101 gint wakeup_pipes
[2];
103 SOCKET wakeup_pipes
[2];
107 static mono_lazy_init_t io_status
= MONO_LAZY_INIT_STATUS_NOT_INITIALIZED
;
109 static gboolean io_selector_running
= FALSE
;
111 static ThreadPoolIO
* threadpool_io
;
113 static MonoIOSelectorJob
*
114 get_job_for_event (MonoMList
**list
, gint32 event
)
120 for (current
= *list
; current
; current
= mono_mlist_next (current
)) {
121 MonoIOSelectorJob
*job
= (MonoIOSelectorJob
*) mono_mlist_get_data (current
);
122 if (job
->operation
== event
) {
123 *list
= mono_mlist_remove_item (*list
, current
);
124 mono_mlist_set_data (current
, NULL
);
133 get_operations_for_jobs (MonoMList
*list
)
138 for (current
= list
; current
; current
= mono_mlist_next (current
))
139 operations
|= ((MonoIOSelectorJob
*) mono_mlist_get_data (current
))->operation
;
145 selector_thread_wakeup (void)
151 #if !defined(HOST_WIN32)
152 written
= write (threadpool_io
->wakeup_pipes
[1], &msg
, 1);
156 g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno
, g_strerror (errno
));
160 written
= send (threadpool_io
->wakeup_pipes
[1], &msg
, 1, 0);
163 if (written
== SOCKET_ERROR
) {
164 g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
172 selector_thread_wakeup_drain_pipes (void)
178 #if !defined(HOST_WIN32)
179 received
= read (threadpool_io
->wakeup_pipes
[0], buffer
, sizeof (buffer
));
182 if (received
== -1) {
185 * some unices (like AIX) send ERESTART, which doesn't
186 * exist on some other OSes errno
188 if (errno
!= EINTR
&& errno
!= EAGAIN
&& errno
!= ERESTART
)
190 if (errno
!= EINTR
&& errno
!= EAGAIN
)
192 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno
, g_strerror (errno
));
196 received
= recv (threadpool_io
->wakeup_pipes
[0], buffer
, sizeof (buffer
), 0);
199 if (received
== SOCKET_ERROR
) {
200 if (WSAGetLastError () != WSAEINTR
&& WSAGetLastError () != WSAEWOULDBLOCK
)
201 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
210 MonoGHashTable
*states
;
211 } FilterSockaresForDomainData
;
214 filter_jobs_for_domain (gpointer key
, gpointer value
, gpointer user_data
)
216 FilterSockaresForDomainData
*data
;
217 MonoMList
*list
= (MonoMList
*)value
, *element
;
219 MonoGHashTable
*states
;
221 g_assert (user_data
);
222 data
= (FilterSockaresForDomainData
*)user_data
;
223 domain
= data
->domain
;
224 states
= data
->states
;
226 for (element
= list
; element
; element
= mono_mlist_next (element
)) {
227 MonoIOSelectorJob
*job
= (MonoIOSelectorJob
*) mono_mlist_get_data (element
);
228 if (mono_object_domain (job
) == domain
)
229 mono_mlist_set_data (element
, NULL
);
232 /* we skip all the first elements which are NULL */
233 for (; list
; list
= mono_mlist_next (list
)) {
234 if (mono_mlist_get_data (list
))
239 g_assert (mono_mlist_get_data (list
));
241 /* we delete all the NULL elements after the first one */
242 for (element
= list
; element
;) {
244 if (!(next
= mono_mlist_next (element
)))
246 if (mono_mlist_get_data (next
))
249 mono_mlist_set_next (element
, mono_mlist_next (next
));
253 mono_g_hash_table_replace (states
, key
, list
);
257 wait_callback (gint fd
, gint events
, gpointer user_data
)
261 if (mono_runtime_is_shutting_down ())
264 if (fd
== threadpool_io
->wakeup_pipes
[0]) {
265 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: wke");
266 selector_thread_wakeup_drain_pipes ();
268 MonoGHashTable
*states
;
269 MonoMList
*list
= NULL
;
271 gboolean remove_fd
= FALSE
;
274 g_assert (user_data
);
275 states
= (MonoGHashTable
*)user_data
;
277 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
278 fd
, (events
& EVENT_IN
) ? "RD" : "..", (events
& EVENT_OUT
) ? "WR" : "..", (events
& EVENT_ERR
) ? "ERR" : "...");
280 if (!mono_g_hash_table_lookup_extended (states
, GINT_TO_POINTER (fd
), &k
, (gpointer
*) &list
))
281 g_error ("wait_callback: fd %d not found in states table", fd
);
283 if (list
&& (events
& EVENT_IN
) != 0) {
284 MonoIOSelectorJob
*job
= get_job_for_event (&list
, EVENT_IN
);
286 mono_threadpool_enqueue_work_item (((MonoObject
*) job
)->vtable
->domain
, (MonoObject
*) job
, error
);
287 mono_error_assert_ok (error
);
291 if (list
&& (events
& EVENT_OUT
) != 0) {
292 MonoIOSelectorJob
*job
= get_job_for_event (&list
, EVENT_OUT
);
294 mono_threadpool_enqueue_work_item (((MonoObject
*) job
)->vtable
->domain
, (MonoObject
*) job
, error
);
295 mono_error_assert_ok (error
);
299 remove_fd
= (events
& EVENT_ERR
) == EVENT_ERR
;
301 mono_g_hash_table_replace (states
, GINT_TO_POINTER (fd
), list
);
303 operations
= get_operations_for_jobs (list
);
305 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
306 fd
, (operations
& EVENT_IN
) ? "RD" : "..", (operations
& EVENT_OUT
) ? "WR" : "..", (operations
& EVENT_ERR
) ? "ERR" : "...");
308 threadpool_io
->backend
.register_fd (fd
, operations
, FALSE
);
310 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: err fd %d", fd
);
312 mono_g_hash_table_remove (states
, GINT_TO_POINTER (fd
));
314 threadpool_io
->backend
.remove_fd (fd
);
320 selector_thread_interrupt (gpointer unused
)
322 selector_thread_wakeup ();
326 selector_thread (gpointer data
)
329 MonoGHashTable
*states
;
331 MonoString
*thread_name
= mono_string_new_checked (mono_get_root_domain (), "Thread Pool I/O Selector", error
);
332 mono_error_assert_ok (error
);
333 mono_thread_set_name_internal (mono_thread_internal_current (), thread_name
, FALSE
, TRUE
, error
);
334 mono_error_assert_ok (error
);
336 if (mono_runtime_is_shutting_down ()) {
337 io_selector_running
= FALSE
;
341 states
= mono_g_hash_table_new_type (g_direct_hash
, NULL
, MONO_HASH_VALUE_GC
, MONO_ROOT_SOURCE_THREAD_POOL
, NULL
, "Thread Pool I/O State Table");
343 while (!mono_runtime_is_shutting_down ()) {
346 gboolean interrupted
= FALSE
;
348 if (mono_thread_interruption_checkpoint_bool ())
351 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
353 for (i
= 0; i
< threadpool_io
->updates_size
; ++i
) {
354 ThreadPoolIOUpdate
*update
= &threadpool_io
->updates
[i
];
356 switch (update
->type
) {
364 MonoMList
*list
= NULL
;
365 MonoIOSelectorJob
*job
;
367 fd
= update
->data
.add
.fd
;
370 job
= update
->data
.add
.job
;
373 exists
= mono_g_hash_table_lookup_extended (states
, GINT_TO_POINTER (fd
), &k
, (gpointer
*) &list
);
374 list
= mono_mlist_append_checked (list
, (MonoObject
*) job
, error
);
375 mono_error_assert_ok (error
);
376 mono_g_hash_table_replace (states
, GINT_TO_POINTER (fd
), list
);
378 operations
= get_operations_for_jobs (list
);
380 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
381 exists
? "mod" : "add", fd
, (operations
& EVENT_IN
) ? "RD" : "..", (operations
& EVENT_OUT
) ? "WR" : "..", (operations
& EVENT_ERR
) ? "ERR" : "...");
383 threadpool_io
->backend
.register_fd (fd
, operations
, !exists
);
387 case UPDATE_REMOVE_SOCKET
: {
390 MonoMList
*list
= NULL
;
392 fd
= update
->data
.remove_socket
.fd
;
395 if (mono_g_hash_table_lookup_extended (states
, GINT_TO_POINTER (fd
), &k
, (gpointer
*) &list
)) {
396 mono_g_hash_table_remove (states
, GINT_TO_POINTER (fd
));
398 for (j
= i
+ 1; j
< threadpool_io
->updates_size
; ++j
) {
399 ThreadPoolIOUpdate
*update
= &threadpool_io
->updates
[j
];
400 if (update
->type
== UPDATE_ADD
&& update
->data
.add
.fd
== fd
)
401 memset (update
, 0, sizeof (ThreadPoolIOUpdate
));
404 for (; list
; list
= mono_mlist_remove_item (list
, list
)) {
405 mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list
)), mono_mlist_get_data (list
), error
);
406 mono_mlist_set_data (list
, NULL
);
407 mono_error_assert_ok (error
);
410 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: del fd %3d", fd
);
411 threadpool_io
->backend
.remove_fd (fd
);
416 case UPDATE_REMOVE_DOMAIN
: {
419 domain
= update
->data
.remove_domain
.domain
;
422 FilterSockaresForDomainData user_data
;
423 memset (&user_data
, 0, sizeof (user_data
));
424 user_data
.domain
= domain
;
425 user_data
.states
= states
;
426 mono_g_hash_table_foreach (states
, filter_jobs_for_domain
, &user_data
);
428 for (j
= i
+ 1; j
< threadpool_io
->updates_size
; ++j
) {
429 ThreadPoolIOUpdate
*update
= &threadpool_io
->updates
[j
];
430 if (update
->type
== UPDATE_ADD
&& mono_object_domain (update
->data
.add
.job
) == domain
)
431 memset (update
, 0, sizeof (ThreadPoolIOUpdate
));
437 g_assert_not_reached ();
441 mono_coop_cond_broadcast (&threadpool_io
->updates_cond
);
443 if (threadpool_io
->updates_size
> 0) {
444 threadpool_io
->updates_size
= 0;
445 memset (&threadpool_io
->updates
, 0, UPDATES_CAPACITY
* sizeof (ThreadPoolIOUpdate
));
448 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
450 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: wai");
452 mono_thread_info_install_interrupt (selector_thread_interrupt
, NULL
, &interrupted
);
456 res
= threadpool_io
->backend
.event_wait (wait_callback
, states
);
460 mono_thread_info_uninstall_interrupt (&interrupted
);
463 mono_g_hash_table_destroy (states
);
465 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
467 io_selector_running
= FALSE
;
468 mono_coop_cond_broadcast (&threadpool_io
->updates_cond
);
470 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
475 /* Locking: threadpool_io->updates_lock must be held */
476 static ThreadPoolIOUpdate
*
477 update_get_new (void)
479 ThreadPoolIOUpdate
*update
= NULL
;
480 g_assert (threadpool_io
->updates_size
<= UPDATES_CAPACITY
);
482 while (threadpool_io
->updates_size
== UPDATES_CAPACITY
) {
483 /* we wait for updates to be applied in the selector_thread and we loop
484 * as long as none are available. if it happends too much, then we need
485 * to increase UPDATES_CAPACITY */
486 mono_coop_cond_wait (&threadpool_io
->updates_cond
, &threadpool_io
->updates_lock
);
489 g_assert (threadpool_io
->updates_size
< UPDATES_CAPACITY
);
491 update
= &threadpool_io
->updates
[threadpool_io
->updates_size
++];
497 wakeup_pipes_init (void)
499 #if !defined(HOST_WIN32)
500 if (pipe (threadpool_io
->wakeup_pipes
) == -1)
501 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno
, g_strerror (errno
));
502 if (fcntl (threadpool_io
->wakeup_pipes
[0], F_SETFL
, O_NONBLOCK
) == -1)
503 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno
, g_strerror (errno
));
505 struct sockaddr_in client
;
506 struct sockaddr_in server
;
511 server_sock
= socket (AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
512 g_assert (server_sock
!= INVALID_SOCKET
);
513 threadpool_io
->wakeup_pipes
[1] = socket (AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
514 g_assert (threadpool_io
->wakeup_pipes
[1] != INVALID_SOCKET
);
516 server
.sin_family
= AF_INET
;
517 server
.sin_addr
.s_addr
= inet_addr ("127.0.0.1");
519 if (bind (server_sock
, (SOCKADDR
*) &server
, sizeof (server
)) == SOCKET_ERROR
) {
520 closesocket (server_sock
);
521 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
524 size
= sizeof (server
);
525 if (getsockname (server_sock
, (SOCKADDR
*) &server
, &size
) == SOCKET_ERROR
) {
526 closesocket (server_sock
);
527 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
529 if (listen (server_sock
, 1024) == SOCKET_ERROR
) {
530 closesocket (server_sock
);
531 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
533 if (connect ((SOCKET
) threadpool_io
->wakeup_pipes
[1], (SOCKADDR
*) &server
, sizeof (server
)) == SOCKET_ERROR
) {
534 closesocket (server_sock
);
535 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
538 size
= sizeof (client
);
539 threadpool_io
->wakeup_pipes
[0] = accept (server_sock
, (SOCKADDR
*) &client
, &size
);
540 g_assert (threadpool_io
->wakeup_pipes
[0] != INVALID_SOCKET
);
543 if (ioctlsocket (threadpool_io
->wakeup_pipes
[0], FIONBIO
, &arg
) == SOCKET_ERROR
) {
544 closesocket (threadpool_io
->wakeup_pipes
[0]);
545 closesocket (server_sock
);
546 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
549 closesocket (server_sock
);
556 g_assert (!threadpool_io
);
557 threadpool_io
= g_new0 (ThreadPoolIO
, 1);
558 g_assert (threadpool_io
);
560 mono_coop_mutex_init (&threadpool_io
->updates_lock
);
561 mono_coop_cond_init (&threadpool_io
->updates_cond
);
562 mono_gc_register_root ((char *)&threadpool_io
->updates
[0], sizeof (threadpool_io
->updates
), MONO_GC_DESCRIPTOR_NULL
, MONO_ROOT_SOURCE_THREAD_POOL
, NULL
, "Thread Pool I/O Update List");
564 threadpool_io
->updates_size
= 0;
566 threadpool_io
->backend
= backend_poll
;
567 if (g_hasenv ("MONO_ENABLE_AIO")) {
568 #if defined(HAVE_EPOLL)
569 threadpool_io
->backend
= backend_epoll
;
570 #elif defined(HAVE_KQUEUE)
571 threadpool_io
->backend
= backend_kqueue
;
575 wakeup_pipes_init ();
577 if (!threadpool_io
->backend
.init (threadpool_io
->wakeup_pipes
[0]))
578 g_error ("initialize: backend->init () failed");
580 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
582 io_selector_running
= TRUE
;
585 if (!mono_thread_create_internal (mono_get_root_domain (), (gpointer
)selector_thread
, NULL
, (MonoThreadCreateFlags
)(MONO_THREAD_CREATE_FLAGS_THREADPOOL
| MONO_THREAD_CREATE_FLAGS_SMALL_STACK
), error
))
586 g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (error
));
588 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
594 // FIXME destroy everything
598 mono_threadpool_io_cleanup (void)
600 mono_lazy_cleanup (&io_status
, cleanup
);
604 ves_icall_System_IOSelector_Add (gpointer handle
, MonoIOSelectorJob
*job
)
606 ThreadPoolIOUpdate
*update
;
610 g_assert ((job
->operation
== EVENT_IN
) ^ (job
->operation
== EVENT_OUT
));
611 g_assert (job
->callback
);
613 if (mono_runtime_is_shutting_down ())
615 if (mono_domain_is_unloading (mono_object_domain (job
)))
618 mono_lazy_initialize (&io_status
, initialize
);
620 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
622 if (!io_selector_running
) {
623 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
627 update
= update_get_new ();
628 update
->type
= UPDATE_ADD
;
629 update
->data
.add
.fd
= GPOINTER_TO_INT (handle
);
630 update
->data
.add
.job
= job
;
631 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
633 selector_thread_wakeup ();
635 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
639 ves_icall_System_IOSelector_Remove (gpointer handle
)
641 mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle
));
645 mono_threadpool_io_remove_socket (int fd
)
647 ThreadPoolIOUpdate
*update
;
649 if (!mono_lazy_is_initialized (&io_status
))
652 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
654 if (!io_selector_running
) {
655 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
659 update
= update_get_new ();
660 update
->type
= UPDATE_REMOVE_SOCKET
;
661 update
->data
.add
.fd
= fd
;
662 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
664 selector_thread_wakeup ();
666 mono_coop_cond_wait (&threadpool_io
->updates_cond
, &threadpool_io
->updates_lock
);
668 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
672 mono_threadpool_io_remove_domain_jobs (MonoDomain
*domain
)
674 ThreadPoolIOUpdate
*update
;
676 if (!mono_lazy_is_initialized (&io_status
))
679 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
681 if (!io_selector_running
) {
682 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
686 update
= update_get_new ();
687 update
->type
= UPDATE_REMOVE_DOMAIN
;
688 update
->data
.remove_domain
.domain
= domain
;
689 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
691 selector_thread_wakeup ();
693 mono_coop_cond_wait (&threadpool_io
->updates_cond
, &threadpool_io
->updates_lock
);
695 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
701 ves_icall_System_IOSelector_Add (gpointer handle
, MonoIOSelectorJob
*job
)
703 g_assert_not_reached ();
707 ves_icall_System_IOSelector_Remove (gpointer handle
)
709 g_assert_not_reached ();
713 mono_threadpool_io_cleanup (void)
715 g_assert_not_reached ();
719 mono_threadpool_io_remove_socket (int fd
)
721 g_assert_not_reached ();
725 mono_threadpool_io_remove_domain_jobs (MonoDomain
*domain
)
727 g_assert_not_reached ();