[System] Tweak socket test
[mono-project.git] / mono / metadata / threadpool-io.c
blob618d995dceb7db96eccc7cccbdc98e1852f4cd56
1 /*
2 * threadpool-io.c: Microsoft IO threadpool runtime support
4 * Author:
5 * Ludovic Henry (ludovic.henry@xamarin.com)
7 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
9 */
11 #include <config.h>
13 #ifndef DISABLE_SOCKETS
15 #include <glib.h>
17 #if defined(HOST_WIN32)
18 #include <windows.h>
19 #else
20 #include <errno.h>
21 #include <fcntl.h>
22 #endif
24 #include <mono/metadata/gc-internals.h>
25 #include <mono/metadata/mono-mlist.h>
26 #include <mono/metadata/threadpool.h>
27 #include <mono/metadata/threadpool-io.h>
28 #include <mono/utils/atomic.h>
29 #include <mono/utils/mono-threads.h>
30 #include <mono/utils/mono-lazy-init.h>
31 #include <mono/utils/mono-logger-internals.h>
32 #include <mono/utils/w32api.h>
34 typedef struct {
35 gboolean (*init) (gint wakeup_pipe_fd);
36 void (*register_fd) (gint fd, gint events, gboolean is_new);
37 void (*remove_fd) (gint fd);
38 gint (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
39 } ThreadPoolIOBackend;
41 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
42 enum MonoIOOperation {
43 EVENT_IN = 1 << 0,
44 EVENT_OUT = 1 << 1,
45 EVENT_ERR = 1 << 2, /* not in managed */
48 #include "threadpool-io-epoll.c"
49 #include "threadpool-io-kqueue.c"
50 #include "threadpool-io-poll.c"
52 #define UPDATES_CAPACITY 128
54 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
55 struct _MonoIOSelectorJob {
56 MonoObject object;
57 gint32 operation;
58 MonoObject *callback;
59 MonoObject *state;
62 typedef enum {
63 UPDATE_EMPTY = 0,
64 UPDATE_ADD,
65 UPDATE_REMOVE_SOCKET,
66 UPDATE_REMOVE_DOMAIN,
67 } ThreadPoolIOUpdateType;
69 typedef struct {
70 gint fd;
71 MonoIOSelectorJob *job;
72 } ThreadPoolIOUpdate_Add;
74 typedef struct {
75 gint fd;
76 } ThreadPoolIOUpdate_RemoveSocket;
78 typedef struct {
79 MonoDomain *domain;
80 } ThreadPoolIOUpdate_RemoveDomain;
82 typedef struct {
83 ThreadPoolIOUpdateType type;
84 union {
85 ThreadPoolIOUpdate_Add add;
86 ThreadPoolIOUpdate_RemoveSocket remove_socket;
87 ThreadPoolIOUpdate_RemoveDomain remove_domain;
88 } data;
89 } ThreadPoolIOUpdate;
91 typedef struct {
92 ThreadPoolIOBackend backend;
94 ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
95 gint updates_size;
96 MonoCoopMutex updates_lock;
97 MonoCoopCond updates_cond;
99 #if !defined(HOST_WIN32)
100 gint wakeup_pipes [2];
101 #else
102 SOCKET wakeup_pipes [2];
103 #endif
104 } ThreadPoolIO;
106 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
108 static gboolean io_selector_running = FALSE;
110 static ThreadPoolIO* threadpool_io;
112 static MonoIOSelectorJob*
113 get_job_for_event (MonoMList **list, gint32 event)
115 MonoMList *current;
117 g_assert (list);
119 for (current = *list; current; current = mono_mlist_next (current)) {
120 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
121 if (job->operation == event) {
122 *list = mono_mlist_remove_item (*list, current);
123 return job;
127 return NULL;
130 static gint
131 get_operations_for_jobs (MonoMList *list)
133 MonoMList *current;
134 gint operations = 0;
136 for (current = list; current; current = mono_mlist_next (current))
137 operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
139 return operations;
142 static void
143 selector_thread_wakeup (void)
145 gchar msg = 'c';
146 gint written;
148 for (;;) {
149 #if !defined(HOST_WIN32)
150 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
151 if (written == 1)
152 break;
153 if (written == -1) {
154 g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
155 break;
157 #else
158 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
159 if (written == 1)
160 break;
161 if (written == SOCKET_ERROR) {
162 g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
163 break;
165 #endif
169 static void
170 selector_thread_wakeup_drain_pipes (void)
172 gchar buffer [128];
173 gint received;
175 for (;;) {
176 #if !defined(HOST_WIN32)
177 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
178 if (received == 0)
179 break;
180 if (received == -1) {
181 if (errno != EINTR && errno != EAGAIN)
182 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
183 break;
185 #else
186 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
187 if (received == 0)
188 break;
189 if (received == SOCKET_ERROR) {
190 if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
191 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
192 break;
194 #endif
198 typedef struct {
199 MonoDomain *domain;
200 MonoGHashTable *states;
201 } FilterSockaresForDomainData;
203 static void
204 filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
206 FilterSockaresForDomainData *data;
207 MonoMList *list = (MonoMList *)value, *element;
208 MonoDomain *domain;
209 MonoGHashTable *states;
211 g_assert (user_data);
212 data = (FilterSockaresForDomainData *)user_data;
213 domain = data->domain;
214 states = data->states;
216 for (element = list; element; element = mono_mlist_next (element)) {
217 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
218 if (mono_object_domain (job) == domain)
219 mono_mlist_set_data (element, NULL);
222 /* we skip all the first elements which are NULL */
223 for (; list; list = mono_mlist_next (list)) {
224 if (mono_mlist_get_data (list))
225 break;
228 if (list) {
229 g_assert (mono_mlist_get_data (list));
231 /* we delete all the NULL elements after the first one */
232 for (element = list; element;) {
233 MonoMList *next;
234 if (!(next = mono_mlist_next (element)))
235 break;
236 if (mono_mlist_get_data (next))
237 element = next;
238 else
239 mono_mlist_set_next (element, mono_mlist_next (next));
243 mono_g_hash_table_replace (states, key, list);
246 static void
247 wait_callback (gint fd, gint events, gpointer user_data)
249 MonoError error;
251 if (mono_runtime_is_shutting_down ())
252 return;
254 if (fd == threadpool_io->wakeup_pipes [0]) {
255 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
256 selector_thread_wakeup_drain_pipes ();
257 } else {
258 MonoGHashTable *states;
259 MonoMList *list = NULL;
260 gpointer k;
261 gboolean remove_fd = FALSE;
262 gint operations;
264 g_assert (user_data);
265 states = (MonoGHashTable *)user_data;
267 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
268 fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
270 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
271 g_error ("wait_callback: fd %d not found in states table", fd);
273 if (list && (events & EVENT_IN) != 0) {
274 MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
275 if (job) {
276 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
277 mono_error_assert_ok (&error);
281 if (list && (events & EVENT_OUT) != 0) {
282 MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
283 if (job) {
284 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
285 mono_error_assert_ok (&error);
289 remove_fd = (events & EVENT_ERR) == EVENT_ERR;
290 if (!remove_fd) {
291 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
293 operations = get_operations_for_jobs (list);
295 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
296 fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
298 threadpool_io->backend.register_fd (fd, operations, FALSE);
299 } else {
300 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
302 mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
304 threadpool_io->backend.remove_fd (fd);
309 static void
310 selector_thread_interrupt (gpointer unused)
312 selector_thread_wakeup ();
315 static gsize WINAPI
316 selector_thread (gpointer data)
318 MonoError error;
319 MonoGHashTable *states;
321 io_selector_running = TRUE;
323 if (mono_runtime_is_shutting_down ()) {
324 io_selector_running = FALSE;
325 return 0;
328 states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
330 while (!mono_runtime_is_shutting_down ()) {
331 gint i, j;
332 gint res;
333 gboolean interrupted = FALSE;
335 if (mono_thread_interruption_checkpoint ())
336 continue;
338 mono_coop_mutex_lock (&threadpool_io->updates_lock);
340 for (i = 0; i < threadpool_io->updates_size; ++i) {
341 ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
343 switch (update->type) {
344 case UPDATE_EMPTY:
345 break;
346 case UPDATE_ADD: {
347 gint fd;
348 gint operations;
349 gpointer k;
350 gboolean exists;
351 MonoMList *list = NULL;
352 MonoIOSelectorJob *job;
354 fd = update->data.add.fd;
355 g_assert (fd >= 0);
357 job = update->data.add.job;
358 g_assert (job);
360 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
361 list = mono_mlist_append_checked (list, (MonoObject*) job, &error);
362 mono_error_assert_ok (&error);
363 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
365 operations = get_operations_for_jobs (list);
367 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
368 exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
370 threadpool_io->backend.register_fd (fd, operations, !exists);
372 break;
374 case UPDATE_REMOVE_SOCKET: {
375 gint fd;
376 gpointer k;
377 MonoMList *list = NULL;
379 fd = update->data.remove_socket.fd;
380 g_assert (fd >= 0);
382 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
383 mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
385 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
386 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
387 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
388 memset (update, 0, sizeof (ThreadPoolIOUpdate));
391 for (; list; list = mono_mlist_remove_item (list, list)) {
392 mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
393 mono_error_assert_ok (&error);
396 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
397 threadpool_io->backend.remove_fd (fd);
400 break;
402 case UPDATE_REMOVE_DOMAIN: {
403 MonoDomain *domain;
405 domain = update->data.remove_domain.domain;
406 g_assert (domain);
408 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
409 mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
411 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
412 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
413 if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
414 memset (update, 0, sizeof (ThreadPoolIOUpdate));
417 break;
419 default:
420 g_assert_not_reached ();
424 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
426 if (threadpool_io->updates_size > 0) {
427 threadpool_io->updates_size = 0;
428 memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
431 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
433 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
435 mono_thread_info_install_interrupt (selector_thread_interrupt, NULL, &interrupted);
436 if (interrupted)
437 continue;
439 res = threadpool_io->backend.event_wait (wait_callback, states);
440 if (res == -1)
441 break;
443 mono_thread_info_uninstall_interrupt (&interrupted);
446 mono_g_hash_table_destroy (states);
448 io_selector_running = FALSE;
450 return 0;
453 /* Locking: threadpool_io->updates_lock must be held */
454 static ThreadPoolIOUpdate*
455 update_get_new (void)
457 ThreadPoolIOUpdate *update = NULL;
458 g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
460 while (threadpool_io->updates_size == UPDATES_CAPACITY) {
461 /* we wait for updates to be applied in the selector_thread and we loop
462 * as long as none are available. if it happends too much, then we need
463 * to increase UPDATES_CAPACITY */
464 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
467 g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
469 update = &threadpool_io->updates [threadpool_io->updates_size ++];
471 return update;
474 static void
475 wakeup_pipes_init (void)
477 #if !defined(HOST_WIN32)
478 if (pipe (threadpool_io->wakeup_pipes) == -1)
479 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
480 if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
481 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
482 #else
483 struct sockaddr_in client;
484 struct sockaddr_in server;
485 SOCKET server_sock;
486 gulong arg;
487 gint size;
489 server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
490 g_assert (server_sock != INVALID_SOCKET);
491 threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
492 g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
494 server.sin_family = AF_INET;
495 server.sin_addr.s_addr = inet_addr ("127.0.0.1");
496 server.sin_port = 0;
497 if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
498 closesocket (server_sock);
499 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
502 size = sizeof (server);
503 if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
504 closesocket (server_sock);
505 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
507 if (listen (server_sock, 1024) == SOCKET_ERROR) {
508 closesocket (server_sock);
509 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
511 if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
512 closesocket (server_sock);
513 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
516 size = sizeof (client);
517 threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
518 g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
520 arg = 1;
521 if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
522 closesocket (threadpool_io->wakeup_pipes [0]);
523 closesocket (server_sock);
524 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
527 closesocket (server_sock);
528 #endif
531 static void
532 initialize (void)
534 g_assert (!threadpool_io);
535 threadpool_io = g_new0 (ThreadPoolIO, 1);
536 g_assert (threadpool_io);
538 mono_coop_mutex_init (&threadpool_io->updates_lock);
539 mono_coop_cond_init (&threadpool_io->updates_cond);
540 mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
542 threadpool_io->updates_size = 0;
544 threadpool_io->backend = backend_poll;
545 if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
546 #if defined(HAVE_EPOLL)
547 threadpool_io->backend = backend_epoll;
548 #elif defined(HAVE_KQUEUE)
549 threadpool_io->backend = backend_kqueue;
550 #endif
553 wakeup_pipes_init ();
555 if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
556 g_error ("initialize: backend->init () failed");
558 MonoError error;
559 if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK, &error))
560 g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error));
563 static void
564 cleanup (void)
566 // FIXME destroy everything
569 void
570 mono_threadpool_io_cleanup (void)
572 mono_lazy_cleanup (&io_status, cleanup);
575 void
576 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
578 ThreadPoolIOUpdate *update;
580 g_assert (handle);
582 g_assert ((job->operation == EVENT_IN) ^ (job->operation == EVENT_OUT));
583 g_assert (job->callback);
585 if (mono_runtime_is_shutting_down ())
586 return;
587 if (mono_domain_is_unloading (mono_object_domain (job)))
588 return;
590 mono_lazy_initialize (&io_status, initialize);
592 mono_coop_mutex_lock (&threadpool_io->updates_lock);
594 update = update_get_new ();
595 update->type = UPDATE_ADD;
596 update->data.add.fd = GPOINTER_TO_INT (handle);
597 update->data.add.job = job;
598 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
600 selector_thread_wakeup ();
602 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
605 void
606 ves_icall_System_IOSelector_Remove (gpointer handle)
608 mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle));
611 void
612 mono_threadpool_io_remove_socket (int fd)
614 ThreadPoolIOUpdate *update;
616 if (!mono_lazy_is_initialized (&io_status))
617 return;
619 mono_coop_mutex_lock (&threadpool_io->updates_lock);
621 update = update_get_new ();
622 update->type = UPDATE_REMOVE_SOCKET;
623 update->data.add.fd = fd;
624 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
626 selector_thread_wakeup ();
628 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
630 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
633 void
634 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
636 ThreadPoolIOUpdate *update;
638 if (!mono_lazy_is_initialized (&io_status))
639 return;
641 mono_coop_mutex_lock (&threadpool_io->updates_lock);
643 update = update_get_new ();
644 update->type = UPDATE_REMOVE_DOMAIN;
645 update->data.remove_domain.domain = domain;
646 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
648 selector_thread_wakeup ();
650 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
652 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
655 #else
657 void
658 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
660 g_assert_not_reached ();
663 void
664 ves_icall_System_IOSelector_Remove (gpointer handle)
666 g_assert_not_reached ();
669 void
670 mono_threadpool_io_cleanup (void)
672 g_assert_not_reached ();
675 void
676 mono_threadpool_io_remove_socket (int fd)
678 g_assert_not_reached ();
681 void
682 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
684 g_assert_not_reached ();
687 #endif