Revert some changes which don't have proper dependencies.
[mono-project.git] / mono / utils / mono-utility-thread.c
blobce248018d117197ed24857077c9087f5893402e9
1 /**
2 * \file
3 * A lightweight worker thread with lockless messaging
5 * Author:
6 * Alexander Kyte (alkyte@microsoft.com)
8 * (C) 2018 Microsoft, Inc.
12 #include <mono/utils/mono-utility-thread.h>
14 typedef struct {
15 MonoLockFreeQueueNode node;
17 // For cleanup metadata
18 MonoUtilityThread *thread;
20 // For synch calls
21 gboolean *finished;
22 MonoSemType *response_sem;
24 // Variably-sized, size is thread->payload_size
25 gpointer payload [MONO_ZERO_LEN_ARRAY];
26 } UtilityThreadQueueEntry;
28 static void
29 free_queue_entry (gpointer p)
31 UtilityThreadQueueEntry *util = (UtilityThreadQueueEntry *) p;
32 mono_lock_free_free (p, util->thread->message_block_size);
35 static gboolean
36 utility_thread_handle_inbox (MonoUtilityThread *thread, gboolean at_shutdown)
38 UtilityThreadQueueEntry *entry = (UtilityThreadQueueEntry *) mono_lock_free_queue_dequeue (&thread->work_queue);
39 if (!entry)
40 return FALSE;
42 thread->callbacks.command (thread->state_ptr, &entry->payload, at_shutdown);
43 if (entry->response_sem) {
44 *entry->finished = TRUE;
45 mono_os_sem_post (entry->response_sem);
48 mono_thread_hazardous_try_free (entry, free_queue_entry);
50 return TRUE;
53 static void *
54 utility_thread (void *arg)
56 MonoUtilityThread *thread = (MonoUtilityThread *) arg;
57 if (thread->callbacks.early_init)
58 thread->callbacks.early_init (&thread->state_ptr);
60 mono_thread_info_wait_inited ();
61 mono_thread_info_attach ();
63 thread->callbacks.init (&thread->state_ptr);
65 while (mono_atomic_load_i32 (&thread->run_thread)) {
66 MONO_ENTER_GC_SAFE;
67 mono_os_sem_timedwait (&thread->work_queue_sem, 1000, MONO_SEM_FLAGS_NONE);
68 MONO_EXIT_GC_SAFE;
69 utility_thread_handle_inbox (thread, FALSE);
72 /* Drain any remaining entries on shutdown. */
73 while (utility_thread_handle_inbox (thread, TRUE));
75 mono_os_sem_destroy (&thread->work_queue_sem);
77 thread->callbacks.cleanup (thread->state_ptr);
79 return NULL;
82 MonoUtilityThread *
83 mono_utility_thread_launch (size_t payload_size, MonoUtilityThreadCallbacks *callbacks, MonoMemAccountType accountType)
85 MonoUtilityThread *thread = (MonoUtilityThread*)g_malloc0 (sizeof (MonoUtilityThread));
86 size_t entry_size = offsetof (UtilityThreadQueueEntry, payload) + payload_size;
88 thread->message_block_size = mono_pagesize ();
89 thread->payload_size = payload_size;
90 thread->callbacks = *callbacks;
92 mono_lock_free_queue_init (&thread->work_queue);
93 mono_lock_free_allocator_init_size_class (&thread->message_size_class, entry_size, thread->message_block_size);
94 mono_lock_free_allocator_init_allocator (&thread->message_allocator, &thread->message_size_class, accountType);
95 mono_os_sem_init (&thread->work_queue_sem, 0);
96 mono_atomic_store_i32 (&thread->run_thread, 1);
98 if (!mono_native_thread_create (&thread->thread_id, utility_thread, thread))
99 g_error ("Could not create utility thread");
101 return thread;
104 static void
105 mono_utility_thread_send_internal (MonoUtilityThread *thread, UtilityThreadQueueEntry *entry)
107 mono_lock_free_queue_node_init (&entry->node, FALSE);
108 mono_lock_free_queue_enqueue (&thread->work_queue, &entry->node);
109 mono_os_sem_post (&thread->work_queue_sem);
112 void
113 mono_utility_thread_send (MonoUtilityThread *thread, gpointer message)
115 int small_id = mono_thread_info_get_small_id ();
116 if (small_id < 0) {
117 #if MONO_PRINT_DROPPED_MESSAGES
118 fprintf (stderr, "Dropping message send because thread not attached yet\n");
119 #endif
120 return;
121 } else if (!thread->run_thread) {
122 #if MONO_PRINT_DROPPED_MESSAGES
123 fprintf (stderr, "Dropping message send because thread killed\n");
124 #endif
125 return;
128 UtilityThreadQueueEntry *entry = (UtilityThreadQueueEntry*)mono_lock_free_alloc (&thread->message_allocator);
129 entry->response_sem = NULL;
130 entry->thread = thread;
131 memcpy (entry->payload, message, thread->payload_size);
132 mono_utility_thread_send_internal (thread, entry);
135 gboolean
136 mono_utility_thread_send_sync (MonoUtilityThread *thread, gpointer message)
138 int small_id = mono_thread_info_get_small_id ();
139 if (small_id < 0) {
140 #if MONO_PRINT_DROPPED_MESSAGES
141 fprintf (stderr, "Dropping message send because thread not attached yet\n");
142 #endif
143 return FALSE;
144 } else if (!thread->run_thread) {
145 #if MONO_PRINT_DROPPED_MESSAGES
146 fprintf (stderr, "Dropping message send because thread killed\n");
147 #endif
148 return FALSE;
151 MonoSemType sem;
152 mono_os_sem_init (&sem, 0);
154 UtilityThreadQueueEntry *entry = (UtilityThreadQueueEntry*)mono_lock_free_alloc (&thread->message_allocator);
155 gboolean done;
157 entry->finished = &done;
158 entry->response_sem = &sem;
159 entry->thread = thread;
160 memcpy (entry->payload, message, thread->payload_size);
161 mono_utility_thread_send_internal (thread, entry);
163 while (thread->run_thread && !done) {
164 // After returns, the entry is filled out with results
165 gboolean timedout = mono_os_sem_timedwait (&sem, 1000, MONO_SEM_FLAGS_NONE) == MONO_SEM_TIMEDWAIT_RET_TIMEDOUT;
166 if (!timedout)
167 break;
168 mono_os_sem_post (&thread->work_queue_sem);
171 mono_os_sem_destroy (&sem);
173 // Return whether we ended successfully
174 return done;
177 void
178 mono_utility_thread_stop (MonoUtilityThread *thread)
180 int small_id = mono_thread_info_get_small_id ();
181 if (small_id < 0) {
182 #if MONO_PRINT_DROPPED_MESSAGES
183 fprintf (stderr, "Dropping attempt to stop thread, calling thread not attached yet\n");
184 #endif
185 return;
186 } else if (!thread->run_thread) {
187 return;
190 mono_atomic_store_i32 (&thread->run_thread, 0);
191 mono_os_sem_post (&thread->work_queue_sem);