2 * threadpool-io.c: Microsoft IO threadpool runtime support
5 * Ludovic Henry (ludovic.henry@xamarin.com)
7 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
13 #ifndef DISABLE_SOCKETS
17 #if defined(HOST_WIN32)
24 #include <mono/metadata/gc-internals.h>
25 #include <mono/metadata/mono-mlist.h>
26 #include <mono/metadata/threadpool.h>
27 #include <mono/metadata/threadpool-io.h>
28 #include <mono/utils/atomic.h>
29 #include <mono/utils/mono-threads.h>
30 #include <mono/utils/mono-lazy-init.h>
31 #include <mono/utils/mono-logger-internals.h>
32 #include <mono/utils/w32api.h>
35 gboolean (*init
) (gint wakeup_pipe_fd
);
36 void (*register_fd
) (gint fd
, gint events
, gboolean is_new
);
37 void (*remove_fd
) (gint fd
);
38 gint (*event_wait
) (void (*callback
) (gint fd
, gint events
, gpointer user_data
), gpointer user_data
);
39 } ThreadPoolIOBackend
;
41 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
42 enum MonoIOOperation
{
45 EVENT_ERR
= 1 << 2, /* not in managed */
48 #include "threadpool-io-epoll.c"
49 #include "threadpool-io-kqueue.c"
50 #include "threadpool-io-poll.c"
52 #define UPDATES_CAPACITY 128
54 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
55 struct _MonoIOSelectorJob
{
67 } ThreadPoolIOUpdateType
;
71 MonoIOSelectorJob
*job
;
72 } ThreadPoolIOUpdate_Add
;
76 } ThreadPoolIOUpdate_RemoveSocket
;
80 } ThreadPoolIOUpdate_RemoveDomain
;
83 ThreadPoolIOUpdateType type
;
85 ThreadPoolIOUpdate_Add add
;
86 ThreadPoolIOUpdate_RemoveSocket remove_socket
;
87 ThreadPoolIOUpdate_RemoveDomain remove_domain
;
92 ThreadPoolIOBackend backend
;
94 ThreadPoolIOUpdate updates
[UPDATES_CAPACITY
];
96 MonoCoopMutex updates_lock
;
97 MonoCoopCond updates_cond
;
99 #if !defined(HOST_WIN32)
100 gint wakeup_pipes
[2];
102 SOCKET wakeup_pipes
[2];
106 static mono_lazy_init_t io_status
= MONO_LAZY_INIT_STATUS_NOT_INITIALIZED
;
108 static gboolean io_selector_running
= FALSE
;
110 static ThreadPoolIO
* threadpool_io
;
112 static MonoIOSelectorJob
*
113 get_job_for_event (MonoMList
**list
, gint32 event
)
119 for (current
= *list
; current
; current
= mono_mlist_next (current
)) {
120 MonoIOSelectorJob
*job
= (MonoIOSelectorJob
*) mono_mlist_get_data (current
);
121 if (job
->operation
== event
) {
122 *list
= mono_mlist_remove_item (*list
, current
);
131 get_operations_for_jobs (MonoMList
*list
)
136 for (current
= list
; current
; current
= mono_mlist_next (current
))
137 operations
|= ((MonoIOSelectorJob
*) mono_mlist_get_data (current
))->operation
;
143 selector_thread_wakeup (void)
149 #if !defined(HOST_WIN32)
150 written
= write (threadpool_io
->wakeup_pipes
[1], &msg
, 1);
154 g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno
, g_strerror (errno
));
158 written
= send (threadpool_io
->wakeup_pipes
[1], &msg
, 1, 0);
161 if (written
== SOCKET_ERROR
) {
162 g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
170 selector_thread_wakeup_drain_pipes (void)
176 #if !defined(HOST_WIN32)
177 received
= read (threadpool_io
->wakeup_pipes
[0], buffer
, sizeof (buffer
));
180 if (received
== -1) {
181 if (errno
!= EINTR
&& errno
!= EAGAIN
)
182 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno
, g_strerror (errno
));
186 received
= recv (threadpool_io
->wakeup_pipes
[0], buffer
, sizeof (buffer
), 0);
189 if (received
== SOCKET_ERROR
) {
190 if (WSAGetLastError () != WSAEINTR
&& WSAGetLastError () != WSAEWOULDBLOCK
)
191 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
200 MonoGHashTable
*states
;
201 } FilterSockaresForDomainData
;
204 filter_jobs_for_domain (gpointer key
, gpointer value
, gpointer user_data
)
206 FilterSockaresForDomainData
*data
;
207 MonoMList
*list
= (MonoMList
*)value
, *element
;
209 MonoGHashTable
*states
;
211 g_assert (user_data
);
212 data
= (FilterSockaresForDomainData
*)user_data
;
213 domain
= data
->domain
;
214 states
= data
->states
;
216 for (element
= list
; element
; element
= mono_mlist_next (element
)) {
217 MonoIOSelectorJob
*job
= (MonoIOSelectorJob
*) mono_mlist_get_data (element
);
218 if (mono_object_domain (job
) == domain
)
219 mono_mlist_set_data (element
, NULL
);
222 /* we skip all the first elements which are NULL */
223 for (; list
; list
= mono_mlist_next (list
)) {
224 if (mono_mlist_get_data (list
))
229 g_assert (mono_mlist_get_data (list
));
231 /* we delete all the NULL elements after the first one */
232 for (element
= list
; element
;) {
234 if (!(next
= mono_mlist_next (element
)))
236 if (mono_mlist_get_data (next
))
239 mono_mlist_set_next (element
, mono_mlist_next (next
));
243 mono_g_hash_table_replace (states
, key
, list
);
247 wait_callback (gint fd
, gint events
, gpointer user_data
)
251 if (mono_runtime_is_shutting_down ())
254 if (fd
== threadpool_io
->wakeup_pipes
[0]) {
255 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_THREADPOOL
, "io threadpool: wke");
256 selector_thread_wakeup_drain_pipes ();
258 MonoGHashTable
*states
;
259 MonoMList
*list
= NULL
;
261 gboolean remove_fd
= FALSE
;
264 g_assert (user_data
);
265 states
= (MonoGHashTable
*)user_data
;
267 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_THREADPOOL
, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
268 fd
, (events
& EVENT_IN
) ? "RD" : "..", (events
& EVENT_OUT
) ? "WR" : "..", (events
& EVENT_ERR
) ? "ERR" : "...");
270 if (!mono_g_hash_table_lookup_extended (states
, GINT_TO_POINTER (fd
), &k
, (gpointer
*) &list
))
271 g_error ("wait_callback: fd %d not found in states table", fd
);
273 if (list
&& (events
& EVENT_IN
) != 0) {
274 MonoIOSelectorJob
*job
= get_job_for_event (&list
, EVENT_IN
);
276 mono_threadpool_enqueue_work_item (((MonoObject
*) job
)->vtable
->domain
, (MonoObject
*) job
, &error
);
277 mono_error_assert_ok (&error
);
281 if (list
&& (events
& EVENT_OUT
) != 0) {
282 MonoIOSelectorJob
*job
= get_job_for_event (&list
, EVENT_OUT
);
284 mono_threadpool_enqueue_work_item (((MonoObject
*) job
)->vtable
->domain
, (MonoObject
*) job
, &error
);
285 mono_error_assert_ok (&error
);
289 remove_fd
= (events
& EVENT_ERR
) == EVENT_ERR
;
291 mono_g_hash_table_replace (states
, GINT_TO_POINTER (fd
), list
);
293 operations
= get_operations_for_jobs (list
);
295 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_THREADPOOL
, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
296 fd
, (operations
& EVENT_IN
) ? "RD" : "..", (operations
& EVENT_OUT
) ? "WR" : "..", (operations
& EVENT_ERR
) ? "ERR" : "...");
298 threadpool_io
->backend
.register_fd (fd
, operations
, FALSE
);
300 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_THREADPOOL
, "io threadpool: err fd %d", fd
);
302 mono_g_hash_table_remove (states
, GINT_TO_POINTER (fd
));
304 threadpool_io
->backend
.remove_fd (fd
);
310 selector_thread_interrupt (gpointer unused
)
312 selector_thread_wakeup ();
316 selector_thread (gpointer data
)
319 MonoGHashTable
*states
;
321 io_selector_running
= TRUE
;
323 if (mono_runtime_is_shutting_down ()) {
324 io_selector_running
= FALSE
;
328 states
= mono_g_hash_table_new_type (g_direct_hash
, g_direct_equal
, MONO_HASH_VALUE_GC
, MONO_ROOT_SOURCE_THREAD_POOL
, "i/o thread pool states table");
330 while (!mono_runtime_is_shutting_down ()) {
333 gboolean interrupted
= FALSE
;
335 if (mono_thread_interruption_checkpoint ())
338 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
340 for (i
= 0; i
< threadpool_io
->updates_size
; ++i
) {
341 ThreadPoolIOUpdate
*update
= &threadpool_io
->updates
[i
];
343 switch (update
->type
) {
351 MonoMList
*list
= NULL
;
352 MonoIOSelectorJob
*job
;
354 fd
= update
->data
.add
.fd
;
357 job
= update
->data
.add
.job
;
360 exists
= mono_g_hash_table_lookup_extended (states
, GINT_TO_POINTER (fd
), &k
, (gpointer
*) &list
);
361 list
= mono_mlist_append_checked (list
, (MonoObject
*) job
, &error
);
362 mono_error_assert_ok (&error
);
363 mono_g_hash_table_replace (states
, GINT_TO_POINTER (fd
), list
);
365 operations
= get_operations_for_jobs (list
);
367 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_THREADPOOL
, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
368 exists
? "mod" : "add", fd
, (operations
& EVENT_IN
) ? "RD" : "..", (operations
& EVENT_OUT
) ? "WR" : "..", (operations
& EVENT_ERR
) ? "ERR" : "...");
370 threadpool_io
->backend
.register_fd (fd
, operations
, !exists
);
374 case UPDATE_REMOVE_SOCKET
: {
377 MonoMList
*list
= NULL
;
379 fd
= update
->data
.remove_socket
.fd
;
382 if (mono_g_hash_table_lookup_extended (states
, GINT_TO_POINTER (fd
), &k
, (gpointer
*) &list
)) {
383 mono_g_hash_table_remove (states
, GINT_TO_POINTER (fd
));
385 for (j
= i
+ 1; j
< threadpool_io
->updates_size
; ++j
) {
386 ThreadPoolIOUpdate
*update
= &threadpool_io
->updates
[j
];
387 if (update
->type
== UPDATE_ADD
&& update
->data
.add
.fd
== fd
)
388 memset (update
, 0, sizeof (ThreadPoolIOUpdate
));
391 for (; list
; list
= mono_mlist_remove_item (list
, list
)) {
392 mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list
)), mono_mlist_get_data (list
), &error
);
393 mono_error_assert_ok (&error
);
396 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_THREADPOOL
, "io threadpool: del fd %3d", fd
);
397 threadpool_io
->backend
.remove_fd (fd
);
402 case UPDATE_REMOVE_DOMAIN
: {
405 domain
= update
->data
.remove_domain
.domain
;
408 FilterSockaresForDomainData user_data
= { .domain
= domain
, .states
= states
};
409 mono_g_hash_table_foreach (states
, filter_jobs_for_domain
, &user_data
);
411 for (j
= i
+ 1; j
< threadpool_io
->updates_size
; ++j
) {
412 ThreadPoolIOUpdate
*update
= &threadpool_io
->updates
[j
];
413 if (update
->type
== UPDATE_ADD
&& mono_object_domain (update
->data
.add
.job
) == domain
)
414 memset (update
, 0, sizeof (ThreadPoolIOUpdate
));
420 g_assert_not_reached ();
424 mono_coop_cond_broadcast (&threadpool_io
->updates_cond
);
426 if (threadpool_io
->updates_size
> 0) {
427 threadpool_io
->updates_size
= 0;
428 memset (&threadpool_io
->updates
, 0, UPDATES_CAPACITY
* sizeof (ThreadPoolIOUpdate
));
431 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
433 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_IO_THREADPOOL
, "io threadpool: wai");
435 mono_thread_info_install_interrupt (selector_thread_interrupt
, NULL
, &interrupted
);
439 res
= threadpool_io
->backend
.event_wait (wait_callback
, states
);
443 mono_thread_info_uninstall_interrupt (&interrupted
);
446 mono_g_hash_table_destroy (states
);
448 io_selector_running
= FALSE
;
453 /* Locking: threadpool_io->updates_lock must be held */
454 static ThreadPoolIOUpdate
*
455 update_get_new (void)
457 ThreadPoolIOUpdate
*update
= NULL
;
458 g_assert (threadpool_io
->updates_size
<= UPDATES_CAPACITY
);
460 while (threadpool_io
->updates_size
== UPDATES_CAPACITY
) {
461 /* we wait for updates to be applied in the selector_thread and we loop
462 * as long as none are available. if it happends too much, then we need
463 * to increase UPDATES_CAPACITY */
464 mono_coop_cond_wait (&threadpool_io
->updates_cond
, &threadpool_io
->updates_lock
);
467 g_assert (threadpool_io
->updates_size
< UPDATES_CAPACITY
);
469 update
= &threadpool_io
->updates
[threadpool_io
->updates_size
++];
475 wakeup_pipes_init (void)
477 #if !defined(HOST_WIN32)
478 if (pipe (threadpool_io
->wakeup_pipes
) == -1)
479 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno
, g_strerror (errno
));
480 if (fcntl (threadpool_io
->wakeup_pipes
[0], F_SETFL
, O_NONBLOCK
) == -1)
481 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno
, g_strerror (errno
));
483 struct sockaddr_in client
;
484 struct sockaddr_in server
;
489 server_sock
= socket (AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
490 g_assert (server_sock
!= INVALID_SOCKET
);
491 threadpool_io
->wakeup_pipes
[1] = socket (AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
492 g_assert (threadpool_io
->wakeup_pipes
[1] != INVALID_SOCKET
);
494 server
.sin_family
= AF_INET
;
495 server
.sin_addr
.s_addr
= inet_addr ("127.0.0.1");
497 if (bind (server_sock
, (SOCKADDR
*) &server
, sizeof (server
)) == SOCKET_ERROR
) {
498 closesocket (server_sock
);
499 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
502 size
= sizeof (server
);
503 if (getsockname (server_sock
, (SOCKADDR
*) &server
, &size
) == SOCKET_ERROR
) {
504 closesocket (server_sock
);
505 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
507 if (listen (server_sock
, 1024) == SOCKET_ERROR
) {
508 closesocket (server_sock
);
509 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
511 if (connect ((SOCKET
) threadpool_io
->wakeup_pipes
[1], (SOCKADDR
*) &server
, sizeof (server
)) == SOCKET_ERROR
) {
512 closesocket (server_sock
);
513 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
516 size
= sizeof (client
);
517 threadpool_io
->wakeup_pipes
[0] = accept (server_sock
, (SOCKADDR
*) &client
, &size
);
518 g_assert (threadpool_io
->wakeup_pipes
[0] != INVALID_SOCKET
);
521 if (ioctlsocket (threadpool_io
->wakeup_pipes
[0], FIONBIO
, &arg
) == SOCKET_ERROR
) {
522 closesocket (threadpool_io
->wakeup_pipes
[0]);
523 closesocket (server_sock
);
524 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
527 closesocket (server_sock
);
534 g_assert (!threadpool_io
);
535 threadpool_io
= g_new0 (ThreadPoolIO
, 1);
536 g_assert (threadpool_io
);
538 mono_coop_mutex_init (&threadpool_io
->updates_lock
);
539 mono_coop_cond_init (&threadpool_io
->updates_cond
);
540 mono_gc_register_root ((char *)&threadpool_io
->updates
[0], sizeof (threadpool_io
->updates
), MONO_GC_DESCRIPTOR_NULL
, MONO_ROOT_SOURCE_THREAD_POOL
, "i/o thread pool updates list");
542 threadpool_io
->updates_size
= 0;
544 threadpool_io
->backend
= backend_poll
;
545 if (g_getenv ("MONO_ENABLE_AIO") != NULL
) {
546 #if defined(HAVE_EPOLL)
547 threadpool_io
->backend
= backend_epoll
;
548 #elif defined(HAVE_KQUEUE)
549 threadpool_io
->backend
= backend_kqueue
;
553 wakeup_pipes_init ();
555 if (!threadpool_io
->backend
.init (threadpool_io
->wakeup_pipes
[0]))
556 g_error ("initialize: backend->init () failed");
559 if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread
, NULL
, TRUE
, SMALL_STACK
, &error
))
560 g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error
));
566 // FIXME destroy everything
570 mono_threadpool_io_cleanup (void)
572 mono_lazy_cleanup (&io_status
, cleanup
);
576 ves_icall_System_IOSelector_Add (gpointer handle
, MonoIOSelectorJob
*job
)
578 ThreadPoolIOUpdate
*update
;
582 g_assert ((job
->operation
== EVENT_IN
) ^ (job
->operation
== EVENT_OUT
));
583 g_assert (job
->callback
);
585 if (mono_runtime_is_shutting_down ())
587 if (mono_domain_is_unloading (mono_object_domain (job
)))
590 mono_lazy_initialize (&io_status
, initialize
);
592 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
594 update
= update_get_new ();
595 update
->type
= UPDATE_ADD
;
596 update
->data
.add
.fd
= GPOINTER_TO_INT (handle
);
597 update
->data
.add
.job
= job
;
598 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
600 selector_thread_wakeup ();
602 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
606 ves_icall_System_IOSelector_Remove (gpointer handle
)
608 mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle
));
612 mono_threadpool_io_remove_socket (int fd
)
614 ThreadPoolIOUpdate
*update
;
616 if (!mono_lazy_is_initialized (&io_status
))
619 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
621 update
= update_get_new ();
622 update
->type
= UPDATE_REMOVE_SOCKET
;
623 update
->data
.add
.fd
= fd
;
624 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
626 selector_thread_wakeup ();
628 mono_coop_cond_wait (&threadpool_io
->updates_cond
, &threadpool_io
->updates_lock
);
630 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
634 mono_threadpool_io_remove_domain_jobs (MonoDomain
*domain
)
636 ThreadPoolIOUpdate
*update
;
638 if (!mono_lazy_is_initialized (&io_status
))
641 mono_coop_mutex_lock (&threadpool_io
->updates_lock
);
643 update
= update_get_new ();
644 update
->type
= UPDATE_REMOVE_DOMAIN
;
645 update
->data
.remove_domain
.domain
= domain
;
646 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
648 selector_thread_wakeup ();
650 mono_coop_cond_wait (&threadpool_io
->updates_cond
, &threadpool_io
->updates_lock
);
652 mono_coop_mutex_unlock (&threadpool_io
->updates_lock
);
658 ves_icall_System_IOSelector_Add (gpointer handle
, MonoIOSelectorJob
*job
)
660 g_assert_not_reached ();
664 ves_icall_System_IOSelector_Remove (gpointer handle
)
666 g_assert_not_reached ();
670 mono_threadpool_io_cleanup (void)
672 g_assert_not_reached ();
676 mono_threadpool_io_remove_socket (int fd
)
678 g_assert_not_reached ();
682 mono_threadpool_io_remove_domain_jobs (MonoDomain
*domain
)
684 g_assert_not_reached ();