3 * Microsoft IO threadpool runtime support
6 * Ludovic Henry (ludovic.henry@xamarin.com)
8 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
9 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
14 #include <mono/metadata/threadpool-io.h>
16 #ifndef DISABLE_SOCKETS
18 #if defined(HOST_WIN32)
25 #include <mono/metadata/gc-internals.h>
26 #include <mono/metadata/mono-hash-internals.h>
27 #include <mono/metadata/mono-mlist.h>
28 #include <mono/metadata/threadpool.h>
29 #include <mono/utils/atomic.h>
30 #include <mono/utils/mono-threads.h>
31 #include <mono/utils/mono-lazy-init.h>
32 #include <mono/utils/mono-logger-internals.h>
33 #include <mono/utils/w32api.h>
36 gboolean (*init
) (gint wakeup_pipe_fd
);
37 gboolean (*can_register_fd
) (int fd
);
38 void (*register_fd
) (gint fd
, gint events
, gboolean is_new
);
39 void (*remove_fd
) (gint fd
);
40 gint (*event_wait
) (void (*callback
) (gint fd
, gint events
, gpointer user_data
), gpointer user_data
);
41 } ThreadPoolIOBackend
;
43 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
44 enum MonoIOOperation
{
47 EVENT_ERR
= 1 << 2, /* not in managed */
50 #include "threadpool-io-epoll.c"
51 #include "threadpool-io-kqueue.c"
52 #include "threadpool-io-poll.c"
54 #define UPDATES_CAPACITY 128
56 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
57 struct _MonoIOSelectorJob
{
69 } ThreadPoolIOUpdateType
;
73 MonoIOSelectorJob
*job
;
74 } ThreadPoolIOUpdate_Add
;
78 } ThreadPoolIOUpdate_RemoveSocket
;
82 } ThreadPoolIOUpdate_RemoveDomain
;
85 ThreadPoolIOUpdateType type
;
87 ThreadPoolIOUpdate_Add add
;
88 ThreadPoolIOUpdate_RemoveSocket remove_socket
;
89 ThreadPoolIOUpdate_RemoveDomain remove_domain
;
94 ThreadPoolIOBackend backend
;
96 ThreadPoolIOUpdate updates
[UPDATES_CAPACITY
];
98 MonoCoopMutex updates_lock
;
99 MonoCoopCond updates_cond
;
101 #if !defined(HOST_WIN32)
102 gint wakeup_pipes
[2];
104 SOCKET wakeup_pipes
[2];
108 static mono_lazy_init_t io_status
= MONO_LAZY_INIT_STATUS_NOT_INITIALIZED
;
110 static gboolean io_selector_running
= FALSE
;
112 static ThreadPoolIO
* threadpool_io
;
114 static MonoIOSelectorJob
*
115 get_job_for_event (MonoMList
**list
, gint32 event
)
121 for (current
= *list
; current
; current
= mono_mlist_next (current
)) {
122 MonoIOSelectorJob
*job
= (MonoIOSelectorJob
*) mono_mlist_get_data (current
);
123 if (job
->operation
== event
) {
124 *list
= mono_mlist_remove_item (*list
, current
);
125 mono_mlist_set_data (current
, NULL
);
134 get_operations_for_jobs (MonoMList
*list
)
139 for (current
= list
; current
; current
= mono_mlist_next (current
))
140 operations
|= ((MonoIOSelectorJob
*) mono_mlist_get_data (current
))->operation
;
146 selector_thread_wakeup (void)
152 #if !defined(HOST_WIN32)
153 written
= write (threadpool_io
->wakeup_pipes
[1], &msg
, 1);
157 g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno
, g_strerror (errno
));
161 written
= send (threadpool_io
->wakeup_pipes
[1], &msg
, 1, 0);
164 if (written
== SOCKET_ERROR
) {
165 g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
173 selector_thread_wakeup_drain_pipes (void)
179 #if !defined(HOST_WIN32)
180 received
= read (threadpool_io
->wakeup_pipes
[0], buffer
, sizeof (buffer
));
183 if (received
== -1) {
186 * some unices (like AIX) send ERESTART, which doesn't
187 * exist on some other OSes errno
189 if (errno
!= EINTR
&& errno
!= EAGAIN
&& errno
!= ERESTART
)
191 if (errno
!= EINTR
&& errno
!= EAGAIN
)
193 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno
, g_strerror (errno
));
197 received
= recv (threadpool_io
->wakeup_pipes
[0], buffer
, sizeof (buffer
), 0);
200 if (received
== SOCKET_ERROR
) {
201 if (WSAGetLastError () != WSAEINTR
&& WSAGetLastError () != WSAEWOULDBLOCK
)
202 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d)\n", WSAGetLastError ());
211 MonoGHashTable
*states
;
212 } FilterSockaresForDomainData
;
215 filter_jobs_for_domain (gpointer key
, gpointer value
, gpointer user_data
)
217 FilterSockaresForDomainData
*data
;
218 MonoMList
*list
= (MonoMList
*)value
, *element
;
220 MonoGHashTable
*states
;
222 g_assert (user_data
);
223 data
= (FilterSockaresForDomainData
*)user_data
;
224 domain
= data
->domain
;
225 states
= data
->states
;
227 for (element
= list
; element
; element
= mono_mlist_next (element
)) {
228 MonoIOSelectorJob
*job
= (MonoIOSelectorJob
*) mono_mlist_get_data (element
);
229 if (mono_object_domain (job
) == domain
)
230 mono_mlist_set_data (element
, NULL
);
233 /* we skip all the first elements which are NULL */
234 for (; list
; list
= mono_mlist_next (list
)) {
235 if (mono_mlist_get_data (list
))
240 g_assert (mono_mlist_get_data (list
));
242 /* we delete all the NULL elements after the first one */
243 for (element
= list
; element
;) {
245 if (!(next
= mono_mlist_next (element
)))
247 if (mono_mlist_get_data (next
))
250 mono_mlist_set_next (element
, mono_mlist_next (next
));
254 mono_g_hash_table_replace (states
, key
, list
);
258 wait_callback (gint fd
, gint events
, gpointer user_data
)
262 if (mono_runtime_is_shutting_down ())
265 if (fd
== threadpool_io
->wakeup_pipes
[0]) {
266 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: wke");
267 selector_thread_wakeup_drain_pipes ();
269 MonoGHashTable
*states
;
270 MonoMList
*list
= NULL
;
272 gboolean remove_fd
= FALSE
;
275 g_assert (user_data
);
276 states
= (MonoGHashTable
*)user_data
;
278 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
279 fd
, (events
& EVENT_IN
) ? "RD" : "..", (events
& EVENT_OUT
) ? "WR" : "..", (events
& EVENT_ERR
) ? "ERR" : "...");
281 if (!mono_g_hash_table_lookup_extended (states
, GINT_TO_POINTER (fd
), &k
, (gpointer
*) &list
))
282 g_error ("wait_callback: fd %d not found in states table", fd
);
284 if (list
&& (events
& EVENT_IN
) != 0) {
285 MonoIOSelectorJob
*job
= get_job_for_event (&list
, EVENT_IN
);
287 mono_threadpool_enqueue_work_item (((MonoObject
*) job
)->vtable
->domain
, (MonoObject
*) job
, error
);
288 mono_error_assert_ok (error
);
292 if (list
&& (events
& EVENT_OUT
) != 0) {
293 MonoIOSelectorJob
*job
= get_job_for_event (&list
, EVENT_OUT
);
295 mono_threadpool_enqueue_work_item (((MonoObject
*) job
)->vtable
->domain
, (MonoObject
*) job
, error
);
296 mono_error_assert_ok (error
);
300 remove_fd
= (events
& EVENT_ERR
) == EVENT_ERR
;
302 mono_g_hash_table_replace (states
, GINT_TO_POINTER (fd
), list
);
304 operations
= get_operations_for_jobs (list
);
306 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
307 fd
, (operations
& EVENT_IN
) ? "RD" : "..", (operations
& EVENT_OUT
) ? "WR" : "..", (operations
& EVENT_ERR
) ? "ERR" : "...");
309 threadpool_io
->backend
.register_fd (fd
, operations
, FALSE
);
311 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: err fd %d", fd
);
313 mono_g_hash_table_remove (states
, GINT_TO_POINTER (fd
));
315 threadpool_io
->backend
.remove_fd (fd
);
321 selector_thread_interrupt (gpointer unused
)
323 selector_thread_wakeup ();
327 selector_thread (gpointer data
)
329 MonoGHashTable
*states
;
331 mono_thread_set_name_constant_ignore_error (mono_thread_internal_current (), "Thread Pool I/O Selector", MonoSetThreadNameFlag_Reset
);
335 if (mono_runtime_is_shutting_down ()) {
336 io_selector_running
= FALSE
;
340 states
= mono_g_hash_table_new_type_internal (g_direct_hash
, NULL
, MONO_HASH_VALUE_GC
, MONO_ROOT_SOURCE_THREAD_POOL
, NULL
, "Thread Pool I/O State Table");
342 while (!mono_runtime_is_shutting_down ()) {
345 gboolean interrupted
= FALSE
;
347 if (mono_thread_interruption_checkpoint_bool ())
350 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
352 for (i
= 0; i
< threadpool_io
->updates_size
; ++i
) {
353 ThreadPoolIOUpdate
*update
= &threadpool_io
->updates
[i
];
355 switch (update
->type
) {
363 MonoMList
*list
= NULL
;
364 MonoIOSelectorJob
*job
;
366 fd
= update
->data
.add
.fd
;
369 job
= update
->data
.add
.job
;
372 exists
= mono_g_hash_table_lookup_extended (states
, GINT_TO_POINTER (fd
), &k
, (gpointer
*) &list
);
373 list
= mono_mlist_append_checked (list
, (MonoObject
*) job
, error
);
374 mono_error_assert_ok (error
);
375 mono_g_hash_table_replace (states
, GINT_TO_POINTER (fd
), list
);
377 operations
= get_operations_for_jobs (list
);
379 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
380 exists
? "mod" : "add", fd
, (operations
& EVENT_IN
) ? "RD" : "..", (operations
& EVENT_OUT
) ? "WR" : "..", (operations
& EVENT_ERR
) ? "ERR" : "...");
382 threadpool_io
->backend
.register_fd (fd
, operations
, !exists
);
386 case UPDATE_REMOVE_SOCKET
: {
389 MonoMList
*list
= NULL
;
391 fd
= update
->data
.remove_socket
.fd
;
394 if (mono_g_hash_table_lookup_extended (states
, GINT_TO_POINTER (fd
), &k
, (gpointer
*) &list
)) {
395 mono_g_hash_table_remove (states
, GINT_TO_POINTER (fd
));
397 for (j
= i
+ 1; j
< threadpool_io
->updates_size
; ++j
) {
398 ThreadPoolIOUpdate
*update
= &threadpool_io
->updates
[j
];
399 if (update
->type
== UPDATE_ADD
&& update
->data
.add
.fd
== fd
)
400 memset (update
, 0, sizeof (ThreadPoolIOUpdate
));
403 for (; list
; list
= mono_mlist_remove_item (list
, list
)) {
404 mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list
)), mono_mlist_get_data (list
), error
);
405 mono_mlist_set_data (list
, NULL
);
406 mono_error_assert_ok (error
);
409 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: del fd %3d", fd
);
410 threadpool_io
->backend
.remove_fd (fd
);
415 case UPDATE_REMOVE_DOMAIN
: {
418 domain
= update
->data
.remove_domain
.domain
;
421 FilterSockaresForDomainData user_data
;
422 memset (&user_data
, 0, sizeof (user_data
));
423 user_data
.domain
= domain
;
424 user_data
.states
= states
;
425 mono_g_hash_table_foreach (states
, filter_jobs_for_domain
, &user_data
);
427 for (j
= i
+ 1; j
< threadpool_io
->updates_size
; ++j
) {
428 ThreadPoolIOUpdate
*update
= &threadpool_io
->updates
[j
];
429 if (update
->type
== UPDATE_ADD
&& mono_object_domain (update
->data
.add
.job
) == domain
)
430 memset (update
, 0, sizeof (ThreadPoolIOUpdate
));
436 g_assert_not_reached ();
440 mono_coop_cond_broadcast (&threadpool_io
->updates_cond
);
442 if (threadpool_io
->updates_size
> 0) {
443 threadpool_io
->updates_size
= 0;
444 memset (&threadpool_io
->updates
, 0, UPDATES_CAPACITY
* sizeof (ThreadPoolIOUpdate
));
447 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
449 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_SELECTOR
, "io threadpool: wai");
451 mono_thread_info_install_interrupt (selector_thread_interrupt
, NULL
, &interrupted
);
455 res
= threadpool_io
->backend
.event_wait (wait_callback
, states
);
459 mono_thread_info_uninstall_interrupt (&interrupted
);
462 mono_g_hash_table_destroy (states
);
464 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
466 io_selector_running
= FALSE
;
467 mono_coop_cond_broadcast (&threadpool_io
->updates_cond
);
469 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
474 /* Locking: threadpool_io->updates_lock must be held */
475 static ThreadPoolIOUpdate
*
476 update_get_new (void)
478 ThreadPoolIOUpdate
*update
= NULL
;
479 g_assert (threadpool_io
->updates_size
<= UPDATES_CAPACITY
);
481 while (threadpool_io
->updates_size
== UPDATES_CAPACITY
) {
482 /* we wait for updates to be applied in the selector_thread and we loop
483 * as long as none are available. if it happends too much, then we need
484 * to increase UPDATES_CAPACITY */
485 mono_coop_cond_wait (&threadpool_io
->updates_cond
, &threadpool_io
->updates_lock
);
488 g_assert (threadpool_io
->updates_size
< UPDATES_CAPACITY
);
490 update
= &threadpool_io
->updates
[threadpool_io
->updates_size
++];
496 wakeup_pipes_init (void)
498 #if !defined(HOST_WIN32)
499 if (pipe (threadpool_io
->wakeup_pipes
) == -1)
500 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno
, g_strerror (errno
));
501 if (fcntl (threadpool_io
->wakeup_pipes
[0], F_SETFL
, O_NONBLOCK
) == -1)
502 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno
, g_strerror (errno
));
504 struct sockaddr_in client
;
505 struct sockaddr_in server
;
510 server_sock
= socket (AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
511 g_assert (server_sock
!= INVALID_SOCKET
);
512 threadpool_io
->wakeup_pipes
[1] = socket (AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
513 g_assert (threadpool_io
->wakeup_pipes
[1] != INVALID_SOCKET
);
515 server
.sin_family
= AF_INET
;
516 server
.sin_addr
.s_addr
= inet_addr ("127.0.0.1");
518 if (bind (server_sock
, (SOCKADDR
*) &server
, sizeof (server
)) == SOCKET_ERROR
) {
519 closesocket (server_sock
);
520 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
523 size
= sizeof (server
);
524 if (getsockname (server_sock
, (SOCKADDR
*) &server
, &size
) == SOCKET_ERROR
) {
525 closesocket (server_sock
);
526 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
528 if (listen (server_sock
, 1024) == SOCKET_ERROR
) {
529 closesocket (server_sock
);
530 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
532 if (connect ((SOCKET
) threadpool_io
->wakeup_pipes
[1], (SOCKADDR
*) &server
, sizeof (server
)) == SOCKET_ERROR
) {
533 closesocket (server_sock
);
534 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
537 size
= sizeof (client
);
538 threadpool_io
->wakeup_pipes
[0] = accept (server_sock
, (SOCKADDR
*) &client
, &size
);
539 g_assert (threadpool_io
->wakeup_pipes
[0] != INVALID_SOCKET
);
542 if (ioctlsocket (threadpool_io
->wakeup_pipes
[0], FIONBIO
, &arg
) == SOCKET_ERROR
) {
543 closesocket (threadpool_io
->wakeup_pipes
[0]);
544 closesocket (server_sock
);
545 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
548 closesocket (server_sock
);
555 g_assert (!threadpool_io
);
556 threadpool_io
= g_new0 (ThreadPoolIO
, 1);
557 g_assert (threadpool_io
);
559 mono_coop_mutex_init (&threadpool_io
->updates_lock
);
560 mono_coop_cond_init (&threadpool_io
->updates_cond
);
561 mono_gc_register_root ((char *)&threadpool_io
->updates
[0], sizeof (threadpool_io
->updates
), MONO_GC_DESCRIPTOR_NULL
, MONO_ROOT_SOURCE_THREAD_POOL
, NULL
, "Thread Pool I/O Update List");
563 threadpool_io
->updates_size
= 0;
565 threadpool_io
->backend
= backend_poll
;
566 if (g_hasenv ("MONO_ENABLE_AIO")) {
567 #if defined(HAVE_EPOLL)
568 threadpool_io
->backend
= backend_epoll
;
569 #elif defined(HAVE_KQUEUE)
570 threadpool_io
->backend
= backend_kqueue
;
574 wakeup_pipes_init ();
576 if (!threadpool_io
->backend
.init (threadpool_io
->wakeup_pipes
[0]))
577 g_error ("initialize: backend->init () failed");
579 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
581 io_selector_running
= TRUE
;
584 if (!mono_thread_create_internal (mono_get_root_domain (), (gpointer
)selector_thread
, NULL
, (MonoThreadCreateFlags
)(MONO_THREAD_CREATE_FLAGS_THREADPOOL
| MONO_THREAD_CREATE_FLAGS_SMALL_STACK
), error
))
585 g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (error
));
587 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
593 // FIXME destroy everything
597 mono_threadpool_io_cleanup (void)
599 mono_lazy_cleanup (&io_status
, cleanup
);
603 ves_icall_System_IOSelector_Add (gpointer handle
, MonoIOSelectorJob
*job
)
606 ThreadPoolIOUpdate
*update
;
610 g_assert ((job
->operation
== EVENT_IN
) ^ (job
->operation
== EVENT_OUT
));
611 g_assert (job
->callback
);
613 if (mono_runtime_is_shutting_down ())
615 if (mono_domain_is_unloading (mono_object_domain (job
)))
618 mono_lazy_initialize (&io_status
, initialize
);
620 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
622 if (!io_selector_running
) {
623 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
627 int fd
= GPOINTER_TO_INT (handle
);
629 if (!threadpool_io
->backend
.can_register_fd (fd
)) {
630 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
631 mono_trace (G_LOG_LEVEL_WARNING
, MONO_TRACE_IO_SELECTOR
, "Could not register to wait for file descriptor %d", fd
);
632 mono_error_set_not_supported (error
, "Could not register to wait for file descriptor %d", fd
);
633 mono_error_set_pending_exception (error
);
637 update
= update_get_new ();
638 update
->type
= UPDATE_ADD
;
639 update
->data
.add
.fd
= fd
;
640 update
->data
.add
.job
= job
;
641 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
643 selector_thread_wakeup ();
645 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
649 ves_icall_System_IOSelector_Remove (gpointer handle
)
651 mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle
));
655 mono_threadpool_io_remove_socket (int fd
)
657 ThreadPoolIOUpdate
*update
;
659 if (!mono_lazy_is_initialized (&io_status
))
662 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
664 if (!io_selector_running
) {
665 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
669 update
= update_get_new ();
670 update
->type
= UPDATE_REMOVE_SOCKET
;
671 update
->data
.remove_socket
.fd
= fd
;
672 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
674 selector_thread_wakeup ();
676 mono_coop_cond_wait (&threadpool_io
->updates_cond
, &threadpool_io
->updates_lock
);
678 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
682 mono_threadpool_io_remove_domain_jobs (MonoDomain
*domain
)
684 ThreadPoolIOUpdate
*update
;
686 if (!mono_lazy_is_initialized (&io_status
))
689 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
691 if (!io_selector_running
) {
692 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
696 update
= update_get_new ();
697 update
->type
= UPDATE_REMOVE_DOMAIN
;
698 update
->data
.remove_domain
.domain
= domain
;
699 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
701 selector_thread_wakeup ();
703 mono_coop_cond_wait (&threadpool_io
->updates_cond
, &threadpool_io
->updates_lock
);
705 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
711 ves_icall_System_IOSelector_Add (gpointer handle
, MonoIOSelectorJob
*job
)
713 g_assert_not_reached ();
717 ves_icall_System_IOSelector_Remove (gpointer handle
)
719 g_assert_not_reached ();
723 mono_threadpool_io_cleanup (void)
725 g_assert_not_reached ();
729 mono_threadpool_io_remove_socket (int fd
)
731 g_assert_not_reached ();
735 mono_threadpool_io_remove_domain_jobs (MonoDomain
*domain
)
737 g_assert_not_reached ();