3 * A lightweight worker thread with lockless messaging
6 * Alexander Kyte (alkyte@microsoft.com)
8 * (C) 2018 Microsoft, Inc.
12 #include <mono/utils/mono-utility-thread.h>
15 MonoLockFreeQueueNode node
;
17 // For cleanup metadata
18 MonoUtilityThread
*thread
;
22 MonoSemType
*response_sem
;
24 // Variably-sized, size is thread->payload_size
25 gpointer payload
[MONO_ZERO_LEN_ARRAY
];
26 } UtilityThreadQueueEntry
;
29 free_queue_entry (gpointer p
)
31 UtilityThreadQueueEntry
*util
= (UtilityThreadQueueEntry
*) p
;
32 mono_lock_free_free (p
, util
->thread
->message_block_size
);
36 utility_thread_handle_inbox (MonoUtilityThread
*thread
, gboolean at_shutdown
)
38 UtilityThreadQueueEntry
*entry
= (UtilityThreadQueueEntry
*) mono_lock_free_queue_dequeue (&thread
->work_queue
);
42 thread
->callbacks
.command (thread
->state_ptr
, &entry
->payload
, at_shutdown
);
43 if (entry
->response_sem
) {
44 *entry
->finished
= TRUE
;
45 mono_os_sem_post (entry
->response_sem
);
48 mono_thread_hazardous_try_free (entry
, free_queue_entry
);
54 utility_thread (void *arg
)
56 MonoUtilityThread
*thread
= (MonoUtilityThread
*) arg
;
57 if (thread
->callbacks
.early_init
)
58 thread
->callbacks
.early_init (&thread
->state_ptr
);
60 mono_thread_info_wait_inited ();
61 mono_thread_info_attach ();
63 thread
->callbacks
.init (&thread
->state_ptr
);
65 while (mono_atomic_load_i32 (&thread
->run_thread
)) {
67 mono_os_sem_timedwait (&thread
->work_queue_sem
, 1000, MONO_SEM_FLAGS_NONE
);
69 utility_thread_handle_inbox (thread
, FALSE
);
72 /* Drain any remaining entries on shutdown. */
73 while (utility_thread_handle_inbox (thread
, TRUE
));
75 mono_os_sem_destroy (&thread
->work_queue_sem
);
77 thread
->callbacks
.cleanup (thread
->state_ptr
);
83 mono_utility_thread_launch (size_t payload_size
, MonoUtilityThreadCallbacks
*callbacks
, MonoMemAccountType accountType
)
85 MonoUtilityThread
*thread
= (MonoUtilityThread
*)g_malloc0 (sizeof (MonoUtilityThread
));
86 size_t entry_size
= offsetof (UtilityThreadQueueEntry
, payload
) + payload_size
;
88 thread
->message_block_size
= mono_pagesize ();
89 thread
->payload_size
= payload_size
;
90 thread
->callbacks
= *callbacks
;
92 mono_lock_free_queue_init (&thread
->work_queue
);
93 mono_lock_free_allocator_init_size_class (&thread
->message_size_class
, entry_size
, thread
->message_block_size
);
94 mono_lock_free_allocator_init_allocator (&thread
->message_allocator
, &thread
->message_size_class
, accountType
);
95 mono_os_sem_init (&thread
->work_queue_sem
, 0);
96 mono_atomic_store_i32 (&thread
->run_thread
, 1);
98 if (!mono_native_thread_create (&thread
->thread_id
, utility_thread
, thread
))
99 g_error ("Could not create utility thread");
105 mono_utility_thread_send_internal (MonoUtilityThread
*thread
, UtilityThreadQueueEntry
*entry
)
107 mono_lock_free_queue_node_init (&entry
->node
, FALSE
);
108 mono_lock_free_queue_enqueue (&thread
->work_queue
, &entry
->node
);
109 mono_os_sem_post (&thread
->work_queue_sem
);
113 mono_utility_thread_send (MonoUtilityThread
*thread
, gpointer message
)
115 int small_id
= mono_thread_info_get_small_id ();
117 #if MONO_PRINT_DROPPED_MESSAGES
118 fprintf (stderr
, "Dropping message send because thread not attached yet\n");
121 } else if (!thread
->run_thread
) {
122 #if MONO_PRINT_DROPPED_MESSAGES
123 fprintf (stderr
, "Dropping message send because thread killed\n");
128 UtilityThreadQueueEntry
*entry
= (UtilityThreadQueueEntry
*)mono_lock_free_alloc (&thread
->message_allocator
);
129 entry
->response_sem
= NULL
;
130 entry
->thread
= thread
;
131 memcpy (entry
->payload
, message
, thread
->payload_size
);
132 mono_utility_thread_send_internal (thread
, entry
);
136 mono_utility_thread_send_sync (MonoUtilityThread
*thread
, gpointer message
)
138 int small_id
= mono_thread_info_get_small_id ();
140 #if MONO_PRINT_DROPPED_MESSAGES
141 fprintf (stderr
, "Dropping message send because thread not attached yet\n");
144 } else if (!thread
->run_thread
) {
145 #if MONO_PRINT_DROPPED_MESSAGES
146 fprintf (stderr
, "Dropping message send because thread killed\n");
152 mono_os_sem_init (&sem
, 0);
154 UtilityThreadQueueEntry
*entry
= (UtilityThreadQueueEntry
*)mono_lock_free_alloc (&thread
->message_allocator
);
157 entry
->finished
= &done
;
158 entry
->response_sem
= &sem
;
159 entry
->thread
= thread
;
160 memcpy (entry
->payload
, message
, thread
->payload_size
);
161 mono_utility_thread_send_internal (thread
, entry
);
163 while (thread
->run_thread
&& !done
) {
164 // After returns, the entry is filled out with results
165 gboolean timedout
= mono_os_sem_timedwait (&sem
, 1000, MONO_SEM_FLAGS_NONE
) == MONO_SEM_TIMEDWAIT_RET_TIMEDOUT
;
168 mono_os_sem_post (&thread
->work_queue_sem
);
171 mono_os_sem_destroy (&sem
);
173 // Return whether we ended successfully
178 mono_utility_thread_stop (MonoUtilityThread
*thread
)
180 int small_id
= mono_thread_info_get_small_id ();
182 #if MONO_PRINT_DROPPED_MESSAGES
183 fprintf (stderr
, "Dropping attempt to stop thread, calling thread not attached yet\n");
186 } else if (!thread
->run_thread
) {
190 mono_atomic_store_i32 (&thread
->run_thread
, 0);
191 mono_os_sem_post (&thread
->work_queue_sem
);