2 * threadpool-worker.c: native threadpool worker
5 * Ludovic Henry (ludovic.henry@xamarin.com)
7 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
11 #define _USE_MATH_DEFINES // needed by MSVC to define math constants
16 #include <mono/metadata/class-internals.h>
17 #include <mono/metadata/exception.h>
18 #include <mono/metadata/gc-internals.h>
19 #include <mono/metadata/object.h>
20 #include <mono/metadata/object-internals.h>
21 #include <mono/metadata/threadpool.h>
22 #include <mono/metadata/threadpool-worker.h>
23 #include <mono/metadata/threadpool-io.h>
24 #include <mono/metadata/w32event.h>
25 #include <mono/utils/atomic.h>
26 #include <mono/utils/mono-compiler.h>
27 #include <mono/utils/mono-complex.h>
28 #include <mono/utils/mono-lazy-init.h>
29 #include <mono/utils/mono-logger.h>
30 #include <mono/utils/mono-logger-internals.h>
31 #include <mono/utils/mono-proclib.h>
32 #include <mono/utils/mono-threads.h>
33 #include <mono/utils/mono-time.h>
34 #include <mono/utils/mono-rand.h>
35 #include <mono/utils/refcount.h>
36 #include <mono/utils/w32api.h>
38 #define CPU_USAGE_LOW 80
39 #define CPU_USAGE_HIGH 95
41 #define MONITOR_INTERVAL 500 // ms
42 #define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms
44 #define WORKER_CREATION_MAX_PER_SEC 10
46 /* The exponent to apply to the gain. 1.0 means to use linear gain,
47 * higher values will enhance large moves and damp small ones.
49 #define HILL_CLIMBING_GAIN_EXPONENT 2.0
51 /* The 'cost' of a thread. 0 means drive for increased throughput regardless
52 * of thread count, higher values bias more against higher thread counts.
54 #define HILL_CLIMBING_BIAS 0.15
56 #define HILL_CLIMBING_WAVE_PERIOD 4
57 #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
58 #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
59 #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
60 #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
61 #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
62 #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
63 #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
64 #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
65 #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
66 #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
70 TRANSITION_INITIALIZING
,
71 TRANSITION_RANDOM_MOVE
,
72 TRANSITION_CLIMBING_MOVE
,
73 TRANSITION_CHANGE_POINT
,
74 TRANSITION_STABILIZING
,
75 TRANSITION_STARVATION
,
76 TRANSITION_THREAD_TIMED_OUT
,
78 } ThreadPoolHeuristicStateTransition
;
82 gint32 samples_to_measure
;
83 gdouble target_throughput_ratio
;
84 gdouble target_signal_to_noise_ratio
;
85 gdouble max_change_per_second
;
86 gdouble max_change_per_sample
;
87 gint32 max_thread_wave_magnitude
;
88 gint32 sample_interval_low
;
89 gdouble thread_magnitude_multiplier
;
90 gint32 sample_interval_high
;
91 gdouble throughput_error_smoothing_factor
;
92 gdouble gain_exponent
;
93 gdouble max_sample_error
;
95 gdouble current_control_setting
;
97 gint16 last_thread_count
;
98 gdouble elapsed_since_last_change
;
99 gdouble completions_since_last_change
;
101 gdouble average_throughput_noise
;
104 gdouble
*thread_counts
;
106 guint32 current_sample_interval
;
107 gpointer random_interval_generator
;
109 gint32 accumulated_completion_count
;
110 gdouble accumulated_sample_duration
;
111 } ThreadPoolHillClimbing
;
114 MonoThreadPoolWorkerCallback callback
;
116 } ThreadPoolWorkItem
;
120 gint16 max_working
; /* determined by heuristic */
121 gint16 starting
; /* starting, but not yet in worker_thread */
122 gint16 working
; /* executing worker_thread */
123 gint16 parked
; /* parked */
126 } ThreadPoolWorkerCounter
;
128 typedef MonoInternalThread ThreadPoolWorkerThread
;
130 struct MonoThreadPoolWorker
{
133 ThreadPoolWorkerCounter counters
;
135 GPtrArray
*threads
; // ThreadPoolWorkerThread* []
136 MonoCoopMutex threads_lock
; /* protect access to working_threads and parked_threads */
137 gint32 parked_threads_count
;
138 MonoCoopCond parked_threads_cond
;
139 MonoCoopCond threads_exit_cond
;
141 ThreadPoolWorkItem
*work_items
; // ThreadPoolWorkItem []
142 gint32 work_items_count
;
143 gint32 work_items_size
;
144 MonoCoopMutex work_items_lock
;
146 guint32 worker_creation_current_second
;
147 guint32 worker_creation_current_count
;
148 MonoCoopMutex worker_creation_lock
;
150 gint32 heuristic_completions
;
151 gint64 heuristic_sample_start
;
152 gint64 heuristic_last_dequeue
; // ms
153 gint64 heuristic_last_adjustment
; // ms
154 gint64 heuristic_adjustment_interval
; // ms
155 ThreadPoolHillClimbing heuristic_hill_climbing
;
156 MonoCoopMutex heuristic_lock
;
158 gint32 limit_worker_min
;
159 gint32 limit_worker_max
;
161 MonoCpuUsageState
*cpu_usage_state
;
164 /* suspended by the debugger */
167 gint32 monitor_status
;
171 MONITOR_STATUS_REQUESTED
,
172 MONITOR_STATUS_WAITING_FOR_REQUEST
,
173 MONITOR_STATUS_NOT_RUNNING
,
176 #define COUNTER_CHECK(counter) \
178 g_assert (counter._.max_working > 0); \
179 g_assert (counter._.starting >= 0); \
180 g_assert (counter._.working >= 0); \
183 #define COUNTER_ATOMIC(worker,var,block) \
185 ThreadPoolWorkerCounter __old; \
188 __old = COUNTER_READ (worker); \
191 COUNTER_CHECK (var); \
192 } while (InterlockedCompareExchange64 (&worker->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
195 static inline ThreadPoolWorkerCounter
196 COUNTER_READ (MonoThreadPoolWorker
*worker
)
198 ThreadPoolWorkerCounter counter
;
199 counter
.as_gint64
= InterlockedRead64 (&worker
->counters
.as_gint64
);
207 return mono_rand_init (NULL
, 0);
211 rand_next (gpointer
*handle
, guint32 min
, guint32 max
)
215 mono_rand_try_get_uint32 (handle
, &val
, min
, max
, &error
);
216 // FIXME handle error
217 mono_error_assert_ok (&error
);
222 destroy (gpointer data
)
224 MonoThreadPoolWorker
*worker
;
226 worker
= (MonoThreadPoolWorker
*) data
;
229 // FIXME destroy everything
235 mono_threadpool_worker_init (MonoThreadPoolWorker
**worker
)
237 MonoThreadPoolWorker
*wk
;
238 ThreadPoolHillClimbing
*hc
;
239 const char *threads_per_cpu_env
;
240 gint threads_per_cpu
;
245 wk
= *worker
= g_new0 (MonoThreadPoolWorker
, 1);
247 mono_refcount_init (wk
, destroy
);
249 wk
->threads
= g_ptr_array_new ();
250 mono_coop_mutex_init (&wk
->threads_lock
);
251 wk
->parked_threads_count
= 0;
252 mono_coop_cond_init (&wk
->parked_threads_cond
);
253 mono_coop_cond_init (&wk
->threads_exit_cond
);
255 /* wk->work_items_size is inited to 0 */
256 mono_coop_mutex_init (&wk
->work_items_lock
);
258 wk
->worker_creation_current_second
= -1;
259 mono_coop_mutex_init (&wk
->worker_creation_lock
);
261 wk
->heuristic_adjustment_interval
= 10;
262 mono_coop_mutex_init (&wk
->heuristic_lock
);
266 hc
= &wk
->heuristic_hill_climbing
;
268 hc
->wave_period
= HILL_CLIMBING_WAVE_PERIOD
;
269 hc
->max_thread_wave_magnitude
= HILL_CLIMBING_MAX_WAVE_MAGNITUDE
;
270 hc
->thread_magnitude_multiplier
= (gdouble
) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER
;
271 hc
->samples_to_measure
= hc
->wave_period
* HILL_CLIMBING_WAVE_HISTORY_SIZE
;
272 hc
->target_throughput_ratio
= (gdouble
) HILL_CLIMBING_BIAS
;
273 hc
->target_signal_to_noise_ratio
= (gdouble
) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO
;
274 hc
->max_change_per_second
= (gdouble
) HILL_CLIMBING_MAX_CHANGE_PER_SECOND
;
275 hc
->max_change_per_sample
= (gdouble
) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE
;
276 hc
->sample_interval_low
= HILL_CLIMBING_SAMPLE_INTERVAL_LOW
;
277 hc
->sample_interval_high
= HILL_CLIMBING_SAMPLE_INTERVAL_HIGH
;
278 hc
->throughput_error_smoothing_factor
= (gdouble
) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR
;
279 hc
->gain_exponent
= (gdouble
) HILL_CLIMBING_GAIN_EXPONENT
;
280 hc
->max_sample_error
= (gdouble
) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT
;
281 hc
->current_control_setting
= 0;
282 hc
->total_samples
= 0;
283 hc
->last_thread_count
= 0;
284 hc
->average_throughput_noise
= 0;
285 hc
->elapsed_since_last_change
= 0;
286 hc
->accumulated_completion_count
= 0;
287 hc
->accumulated_sample_duration
= 0;
288 hc
->samples
= g_new0 (gdouble
, hc
->samples_to_measure
);
289 hc
->thread_counts
= g_new0 (gdouble
, hc
->samples_to_measure
);
290 hc
->random_interval_generator
= rand_create ();
291 hc
->current_sample_interval
= rand_next (&hc
->random_interval_generator
, hc
->sample_interval_low
, hc
->sample_interval_high
);
293 if (!(threads_per_cpu_env
= g_getenv ("MONO_THREADS_PER_CPU")))
296 threads_per_cpu
= CLAMP (atoi (threads_per_cpu_env
), 1, 50);
298 threads_count
= mono_cpu_count () * threads_per_cpu
;
300 wk
->limit_worker_min
= threads_count
;
302 #if defined (PLATFORM_ANDROID) || defined (HOST_IOS)
303 wk
->limit_worker_max
= CLAMP (threads_count
* 100, MIN (threads_count
, 200), MAX (threads_count
, 200));
305 wk
->limit_worker_max
= threads_count
* 100;
308 wk
->counters
._
.max_working
= wk
->limit_worker_min
;
310 wk
->cpu_usage_state
= g_new0 (MonoCpuUsageState
, 1);
312 wk
->suspended
= FALSE
;
314 wk
->monitor_status
= MONITOR_STATUS_NOT_RUNNING
;
318 mono_threadpool_worker_cleanup (MonoThreadPoolWorker
*worker
)
320 MonoInternalThread
*current
;
322 /* we make the assumption along the code that we are
323 * cleaning up only if the runtime is shutting down */
324 g_assert (mono_runtime_is_shutting_down ());
326 current
= mono_thread_internal_current ();
328 while (worker
->monitor_status
!= MONITOR_STATUS_NOT_RUNNING
)
329 mono_thread_info_sleep (1, NULL
);
331 mono_coop_mutex_lock (&worker
->threads_lock
);
333 /* unpark all worker->parked_threads */
334 mono_coop_cond_broadcast (&worker
->parked_threads_cond
);
338 ThreadPoolWorkerCounter counter
;
340 counter
= COUNTER_READ (worker
);
341 if (counter
._
.starting
+ counter
._
.working
+ counter
._
.parked
== 0)
344 if (counter
._
.starting
+ counter
._
.working
+ counter
._
.parked
== 1) {
345 if (worker
->threads
->len
== 1 && g_ptr_array_index (worker
->threads
, 0) == current
) {
346 /* We are waiting on ourselves */
351 mono_coop_cond_wait (&worker
->threads_exit_cond
, &worker
->threads_lock
);
355 mono_coop_mutex_unlock (&worker
->threads_lock
);
357 mono_refcount_dec (worker
);
361 work_item_lock (MonoThreadPoolWorker
*worker
)
363 mono_coop_mutex_lock (&worker
->work_items_lock
);
367 work_item_unlock (MonoThreadPoolWorker
*worker
)
369 mono_coop_mutex_unlock (&worker
->work_items_lock
);
373 work_item_push (MonoThreadPoolWorker
*worker
, MonoThreadPoolWorkerCallback callback
, gpointer data
)
375 ThreadPoolWorkItem work_item
;
380 work_item
.callback
= callback
;
381 work_item
.data
= data
;
383 work_item_lock (worker
);
385 g_assert (worker
->work_items_count
<= worker
->work_items_size
);
387 if (G_UNLIKELY (worker
->work_items_count
== worker
->work_items_size
)) {
388 worker
->work_items_size
+= 64;
389 worker
->work_items
= g_renew (ThreadPoolWorkItem
, worker
->work_items
, worker
->work_items_size
);
392 g_assert (worker
->work_items
);
394 worker
->work_items
[worker
->work_items_count
++] = work_item
;
396 // printf ("[push] worker->work_items = %p, worker->work_items_count = %d, worker->work_items_size = %d\n",
397 // worker->work_items, worker->work_items_count, worker->work_items_size);
399 work_item_unlock (worker
);
403 work_item_try_pop (MonoThreadPoolWorker
*worker
, ThreadPoolWorkItem
*work_item
)
406 g_assert (work_item
);
408 work_item_lock (worker
);
410 // printf ("[pop] worker->work_items = %p, worker->work_items_count = %d, worker->work_items_size = %d\n",
411 // worker->work_items, worker->work_items_count, worker->work_items_size);
413 if (worker
->work_items_count
== 0) {
414 work_item_unlock (worker
);
418 *work_item
= worker
->work_items
[-- worker
->work_items_count
];
420 if (G_UNLIKELY (worker
->work_items_count
>= 64 * 3 && worker
->work_items_count
< worker
->work_items_size
/ 2)) {
421 worker
->work_items_size
-= 64;
422 worker
->work_items
= g_renew (ThreadPoolWorkItem
, worker
->work_items
, worker
->work_items_size
);
425 work_item_unlock (worker
);
431 work_item_count (MonoThreadPoolWorker
*worker
)
435 work_item_lock (worker
);
436 count
= worker
->work_items_count
;
437 work_item_unlock (worker
);
442 static void worker_request (MonoThreadPoolWorker
*worker
);
445 mono_threadpool_worker_enqueue (MonoThreadPoolWorker
*worker
, MonoThreadPoolWorkerCallback callback
, gpointer data
)
447 work_item_push (worker
, callback
, data
);
449 worker_request (worker
);
453 worker_wait_interrupt (gpointer data
)
455 MonoThreadPoolWorker
*worker
;
457 worker
= (MonoThreadPoolWorker
*) data
;
460 mono_coop_mutex_lock (&worker
->threads_lock
);
461 mono_coop_cond_signal (&worker
->parked_threads_cond
);
462 mono_coop_mutex_unlock (&worker
->threads_lock
);
464 mono_refcount_dec (worker
);
467 /* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */
469 worker_park (MonoThreadPoolWorker
*worker
)
471 gboolean timeout
= FALSE
;
473 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] worker parking", mono_native_thread_id_get ());
475 mono_coop_mutex_lock (&worker
->threads_lock
);
477 if (!mono_runtime_is_shutting_down ()) {
478 static gpointer rand_handle
= NULL
;
479 MonoInternalThread
*thread
;
480 gboolean interrupted
= FALSE
;
481 ThreadPoolWorkerCounter counter
;
484 rand_handle
= rand_create ();
485 g_assert (rand_handle
);
487 thread
= mono_thread_internal_current ();
490 COUNTER_ATOMIC (worker
, counter
, {
491 counter
._
.working
--;
495 worker
->parked_threads_count
+= 1;
497 mono_thread_info_install_interrupt (worker_wait_interrupt
, mono_refcount_inc (worker
), &interrupted
);
499 mono_refcount_dec (worker
);
503 if (mono_coop_cond_timedwait (&worker
->parked_threads_cond
, &worker
->threads_lock
, rand_next (&rand_handle
, 5 * 1000, 60 * 1000)) != 0)
506 mono_thread_info_uninstall_interrupt (&interrupted
);
508 mono_refcount_dec (worker
);
511 worker
->parked_threads_count
-= 1;
513 COUNTER_ATOMIC (worker
, counter
, {
514 counter
._
.working
++;
519 mono_coop_mutex_unlock (&worker
->threads_lock
);
521 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] worker unparking, timeout? %s", mono_native_thread_id_get (), timeout
? "yes" : "no");
527 worker_try_unpark (MonoThreadPoolWorker
*worker
)
529 gboolean res
= FALSE
;
531 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] try unpark worker", mono_native_thread_id_get ());
533 mono_coop_mutex_lock (&worker
->threads_lock
);
534 if (worker
->parked_threads_count
> 0) {
535 mono_coop_cond_signal (&worker
->parked_threads_cond
);
538 mono_coop_mutex_unlock (&worker
->threads_lock
);
540 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res
? "yes" : "no");
546 worker_thread (gpointer data
)
548 MonoThreadPoolWorker
*worker
;
550 MonoInternalThread
*thread
;
551 ThreadPoolWorkerCounter counter
;
553 mono_trace (G_LOG_LEVEL_INFO
, MONO_TRACE_THREADPOOL
, "[%p] worker starting", mono_native_thread_id_get ());
555 worker
= (MonoThreadPoolWorker
*) data
;
558 COUNTER_ATOMIC (worker
, counter
, {
559 counter
._
.starting
--;
560 counter
._
.working
++;
563 thread
= mono_thread_internal_current ();
566 mono_coop_mutex_lock (&worker
->threads_lock
);
567 g_ptr_array_add (worker
->threads
, thread
);
568 mono_coop_mutex_unlock (&worker
->threads_lock
);
570 mono_thread_set_name_internal (thread
, mono_string_new (mono_get_root_domain (), "Threadpool worker"), FALSE
, &error
);
571 mono_error_assert_ok (&error
);
573 while (!mono_runtime_is_shutting_down ()) {
574 ThreadPoolWorkItem work_item
;
576 if (mono_thread_interruption_checkpoint ())
579 if (!work_item_try_pop (worker
, &work_item
)) {
582 timeout
= worker_park (worker
);
589 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] worker executing %p (%p)",
590 mono_native_thread_id_get (), work_item
.callback
, work_item
.data
);
592 work_item
.callback (work_item
.data
);
595 mono_coop_mutex_lock (&worker
->threads_lock
);
597 COUNTER_ATOMIC (worker
, counter
, {
598 counter
._
.working
--;
601 g_ptr_array_remove (worker
->threads
, thread
);
603 mono_coop_cond_signal (&worker
->threads_exit_cond
);
605 mono_coop_mutex_unlock (&worker
->threads_lock
);
607 mono_trace (G_LOG_LEVEL_INFO
, MONO_TRACE_THREADPOOL
, "[%p] worker finishing", mono_native_thread_id_get ());
609 mono_refcount_dec (worker
);
615 worker_try_create (MonoThreadPoolWorker
*worker
)
618 MonoInternalThread
*thread
;
619 gint64 current_ticks
;
621 ThreadPoolWorkerCounter counter
;
623 if (mono_runtime_is_shutting_down ())
626 mono_coop_mutex_lock (&worker
->worker_creation_lock
);
628 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] try create worker", mono_native_thread_id_get ());
630 current_ticks
= mono_100ns_ticks ();
631 if (0 == current_ticks
) {
632 g_warning ("failed to get 100ns ticks");
634 now
= current_ticks
/ (10 * 1000 * 1000);
635 if (worker
->worker_creation_current_second
!= now
) {
636 worker
->worker_creation_current_second
= now
;
637 worker
->worker_creation_current_count
= 0;
639 g_assert (worker
->worker_creation_current_count
<= WORKER_CREATION_MAX_PER_SEC
);
640 if (worker
->worker_creation_current_count
== WORKER_CREATION_MAX_PER_SEC
) {
641 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d",
642 mono_native_thread_id_get (), worker
->worker_creation_current_count
);
643 mono_coop_mutex_unlock (&worker
->worker_creation_lock
);
649 COUNTER_ATOMIC (worker
, counter
, {
650 if (counter
._
.working
>= counter
._
.max_working
) {
651 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] try create worker, failed: maximum number of working threads reached",
652 mono_native_thread_id_get ());
653 mono_coop_mutex_unlock (&worker
->worker_creation_lock
);
656 counter
._
.starting
++;
659 thread
= mono_thread_create_internal (mono_get_root_domain (), worker_thread
, mono_refcount_inc (worker
), TRUE
, 0, &error
);
661 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error
));
662 mono_error_cleanup (&error
);
664 COUNTER_ATOMIC (worker
, counter
, {
665 counter
._
.starting
--;
668 mono_coop_mutex_unlock (&worker
->worker_creation_lock
);
670 mono_refcount_dec (worker
);
675 worker
->worker_creation_current_count
+= 1;
677 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] try create worker, created %p, now = %d count = %d",
678 mono_native_thread_id_get (), (gpointer
) thread
->tid
, now
, worker
->worker_creation_current_count
);
680 mono_coop_mutex_unlock (&worker
->worker_creation_lock
);
684 static void monitor_ensure_running (MonoThreadPoolWorker
*worker
);
687 worker_request (MonoThreadPoolWorker
*worker
)
691 if (worker
->suspended
)
694 monitor_ensure_running (worker
);
696 if (worker_try_unpark (worker
)) {
697 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] request worker, unparked", mono_native_thread_id_get ());
701 if (worker_try_create (worker
)) {
702 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] request worker, created", mono_native_thread_id_get ());
706 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] request worker, failed", mono_native_thread_id_get ());
710 monitor_should_keep_running (MonoThreadPoolWorker
*worker
)
712 static gint64 last_should_keep_running
= -1;
714 g_assert (worker
->monitor_status
== MONITOR_STATUS_WAITING_FOR_REQUEST
|| worker
->monitor_status
== MONITOR_STATUS_REQUESTED
);
716 if (InterlockedExchange (&worker
->monitor_status
, MONITOR_STATUS_WAITING_FOR_REQUEST
) == MONITOR_STATUS_WAITING_FOR_REQUEST
) {
717 gboolean should_keep_running
= TRUE
, force_should_keep_running
= FALSE
;
719 if (mono_runtime_is_shutting_down ()) {
720 should_keep_running
= FALSE
;
722 if (work_item_count (worker
) == 0)
723 should_keep_running
= FALSE
;
725 if (!should_keep_running
) {
726 if (last_should_keep_running
== -1 || mono_100ns_ticks () - last_should_keep_running
< MONITOR_MINIMAL_LIFETIME
* 1000 * 10) {
727 should_keep_running
= force_should_keep_running
= TRUE
;
732 if (should_keep_running
) {
733 if (last_should_keep_running
== -1 || !force_should_keep_running
)
734 last_should_keep_running
= mono_100ns_ticks ();
736 last_should_keep_running
= -1;
737 if (InterlockedCompareExchange (&worker
->monitor_status
, MONITOR_STATUS_NOT_RUNNING
, MONITOR_STATUS_WAITING_FOR_REQUEST
) == MONITOR_STATUS_WAITING_FOR_REQUEST
)
742 g_assert (worker
->monitor_status
== MONITOR_STATUS_WAITING_FOR_REQUEST
|| worker
->monitor_status
== MONITOR_STATUS_REQUESTED
);
748 monitor_sufficient_delay_since_last_dequeue (MonoThreadPoolWorker
*worker
)
754 if (worker
->cpu_usage
< CPU_USAGE_LOW
) {
755 threshold
= MONITOR_INTERVAL
;
757 ThreadPoolWorkerCounter counter
;
758 counter
= COUNTER_READ (worker
);
759 threshold
= counter
._
.max_working
* MONITOR_INTERVAL
* 2;
762 return mono_msec_ticks () >= worker
->heuristic_last_dequeue
+ threshold
;
765 static void hill_climbing_force_change (MonoThreadPoolWorker
*worker
, gint16 new_thread_count
, ThreadPoolHeuristicStateTransition transition
);
768 monitor_thread (gpointer data
)
770 MonoThreadPoolWorker
*worker
;
771 MonoInternalThread
*internal
;
774 worker
= (MonoThreadPoolWorker
*) data
;
777 internal
= mono_thread_internal_current ();
780 mono_cpu_usage (worker
->cpu_usage_state
);
782 // printf ("monitor_thread: start\n");
784 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] monitor thread, started", mono_native_thread_id_get ());
787 ThreadPoolWorkerCounter counter
;
788 gboolean limit_worker_max_reached
;
789 gint32 interval_left
= MONITOR_INTERVAL
;
790 gint32 awake
= 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
792 g_assert (worker
->monitor_status
!= MONITOR_STATUS_NOT_RUNNING
);
794 // counter = COUNTER_READ (worker);
795 // printf ("monitor_thread: starting = %d working = %d parked = %d max_working = %d\n",
796 // counter._.starting, counter._.working, counter._.parked, counter._.max_working);
800 gboolean alerted
= FALSE
;
802 if (mono_runtime_is_shutting_down ())
805 ts
= mono_msec_ticks ();
806 if (mono_thread_info_sleep (interval_left
, &alerted
) == 0)
808 interval_left
-= mono_msec_ticks () - ts
;
810 g_assert (!(internal
->state
& ThreadState_StopRequested
));
811 mono_thread_interruption_checkpoint ();
812 } while (interval_left
> 0 && ++awake
< 10);
814 if (mono_runtime_is_shutting_down ())
817 if (worker
->suspended
)
820 if (work_item_count (worker
) == 0)
823 worker
->cpu_usage
= mono_cpu_usage (worker
->cpu_usage_state
);
825 if (!monitor_sufficient_delay_since_last_dequeue (worker
))
828 limit_worker_max_reached
= FALSE
;
830 COUNTER_ATOMIC (worker
, counter
, {
831 if (counter
._
.max_working
>= worker
->limit_worker_max
) {
832 limit_worker_max_reached
= TRUE
;
835 counter
._
.max_working
++;
838 if (limit_worker_max_reached
)
841 hill_climbing_force_change (worker
, counter
._
.max_working
, TRANSITION_STARVATION
);
843 for (i
= 0; i
< 5; ++i
) {
844 if (mono_runtime_is_shutting_down ())
847 if (worker_try_unpark (worker
)) {
848 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] monitor thread, unparked", mono_native_thread_id_get ());
852 if (worker_try_create (worker
)) {
853 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] monitor thread, created", mono_native_thread_id_get ());
857 } while (monitor_should_keep_running (worker
));
859 // printf ("monitor_thread: stop\n");
861 mono_trace (G_LOG_LEVEL_DEBUG
, MONO_TRACE_THREADPOOL
, "[%p] monitor thread, finished", mono_native_thread_id_get ());
867 monitor_ensure_running (MonoThreadPoolWorker
*worker
)
871 switch (worker
->monitor_status
) {
872 case MONITOR_STATUS_REQUESTED
:
873 // printf ("monitor_thread: requested\n");
875 case MONITOR_STATUS_WAITING_FOR_REQUEST
:
876 // printf ("monitor_thread: waiting for request\n");
877 InterlockedCompareExchange (&worker
->monitor_status
, MONITOR_STATUS_REQUESTED
, MONITOR_STATUS_WAITING_FOR_REQUEST
);
879 case MONITOR_STATUS_NOT_RUNNING
:
880 // printf ("monitor_thread: not running\n");
881 if (mono_runtime_is_shutting_down ())
883 if (InterlockedCompareExchange (&worker
->monitor_status
, MONITOR_STATUS_REQUESTED
, MONITOR_STATUS_NOT_RUNNING
) == MONITOR_STATUS_NOT_RUNNING
) {
884 // printf ("monitor_thread: creating\n");
885 if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread
, worker
, TRUE
, SMALL_STACK
, &error
)) {
886 // printf ("monitor_thread: creating failed\n");
887 worker
->monitor_status
= MONITOR_STATUS_NOT_RUNNING
;
888 mono_error_cleanup (&error
);
893 default: g_assert_not_reached ();
899 hill_climbing_change_thread_count (MonoThreadPoolWorker
*worker
, gint16 new_thread_count
, ThreadPoolHeuristicStateTransition transition
)
901 ThreadPoolHillClimbing
*hc
;
905 hc
= &worker
->heuristic_hill_climbing
;
907 mono_trace (G_LOG_LEVEL_INFO
, MONO_TRACE_THREADPOOL
, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count
);
909 hc
->last_thread_count
= new_thread_count
;
910 hc
->current_sample_interval
= rand_next (&hc
->random_interval_generator
, hc
->sample_interval_low
, hc
->sample_interval_high
);
911 hc
->elapsed_since_last_change
= 0;
912 hc
->completions_since_last_change
= 0;
916 hill_climbing_force_change (MonoThreadPoolWorker
*worker
, gint16 new_thread_count
, ThreadPoolHeuristicStateTransition transition
)
918 ThreadPoolHillClimbing
*hc
;
922 hc
= &worker
->heuristic_hill_climbing
;
924 if (new_thread_count
!= hc
->last_thread_count
) {
925 hc
->current_control_setting
+= new_thread_count
- hc
->last_thread_count
;
926 hill_climbing_change_thread_count (worker
, new_thread_count
, transition
);
930 static double_complex
931 hill_climbing_get_wave_component (MonoThreadPoolWorker
*worker
, gdouble
*samples
, guint sample_count
, gdouble period
)
933 ThreadPoolHillClimbing
*hc
;
934 gdouble w
, cosine
, sine
, coeff
, q0
, q1
, q2
;
938 g_assert (sample_count
>= period
);
939 g_assert (period
>= 2);
941 hc
= &worker
->heuristic_hill_climbing
;
943 w
= 2.0 * M_PI
/ period
;
946 coeff
= 2.0 * cosine
;
949 for (i
= 0; i
< sample_count
; ++i
) {
950 q0
= coeff
* q1
- q2
+ samples
[(hc
->total_samples
- sample_count
+ i
) % hc
->samples_to_measure
];
955 return mono_double_complex_scalar_div (mono_double_complex_make (q1
- q2
* cosine
, (q2
* sine
)), ((gdouble
)sample_count
));
959 hill_climbing_update (MonoThreadPoolWorker
*worker
, gint16 current_thread_count
, guint32 sample_duration
, gint32 completions
, gint64
*adjustment_interval
)
961 ThreadPoolHillClimbing
*hc
;
962 ThreadPoolHeuristicStateTransition transition
;
964 gdouble throughput_error_estimate
;
970 gint new_thread_wave_magnitude
;
971 gint new_thread_count
;
972 double_complex thread_wave_component
;
973 double_complex throughput_wave_component
;
974 double_complex ratio
;
977 g_assert (adjustment_interval
);
979 hc
= &worker
->heuristic_hill_climbing
;
981 /* If someone changed the thread count without telling us, update our records accordingly. */
982 if (current_thread_count
!= hc
->last_thread_count
)
983 hill_climbing_force_change (worker
, current_thread_count
, TRANSITION_INITIALIZING
);
985 /* Update the cumulative stats for this thread count */
986 hc
->elapsed_since_last_change
+= sample_duration
;
987 hc
->completions_since_last_change
+= completions
;
989 /* Add in any data we've already collected about this sample */
990 sample_duration
+= hc
->accumulated_sample_duration
;
991 completions
+= hc
->accumulated_completion_count
;
993 /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
994 * of each work item, we are goinng to be missing some data about what really happened during the
995 * sample interval. The count produced by each thread includes an initial work item that may have
996 * started well before the start of the interval, and each thread may have been running some new
997 * work item for some time before the end of the interval, which did not yet get counted. So
998 * our count is going to be off by +/- threadCount workitems.
1000 * The exception is that the thread that reported to us last time definitely wasn't running any work
1001 * at that time, and the thread that's reporting now definitely isn't running a work item now. So
1002 * we really only need to consider threadCount-1 threads.
1004 * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
1006 * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
1007 * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
1008 * then the next one likely will be too. The one after that will include the sum of the completions
1009 * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
1010 * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
1011 * range we're targeting, which will not be filtered by the frequency-domain translation. */
1012 if (hc
->total_samples
> 0 && ((current_thread_count
- 1.0) / completions
) >= hc
->max_sample_error
) {
1013 /* Not accurate enough yet. Let's accumulate the data so
1014 * far, and tell the MonoThreadPoolWorker to collect a little more. */
1015 hc
->accumulated_sample_duration
= sample_duration
;
1016 hc
->accumulated_completion_count
= completions
;
1017 *adjustment_interval
= 10;
1018 return current_thread_count
;
1021 /* We've got enouugh data for our sample; reset our accumulators for next time. */
1022 hc
->accumulated_sample_duration
= 0;
1023 hc
->accumulated_completion_count
= 0;
1025 /* Add the current thread count and throughput sample to our history. */
1026 throughput
= ((gdouble
) completions
) / sample_duration
;
1028 sample_index
= hc
->total_samples
% hc
->samples_to_measure
;
1029 hc
->samples
[sample_index
] = throughput
;
1030 hc
->thread_counts
[sample_index
] = current_thread_count
;
1031 hc
->total_samples
++;
1033 /* Set up defaults for our metrics. */
1034 thread_wave_component
= mono_double_complex_make(0, 0);
1035 throughput_wave_component
= mono_double_complex_make(0, 0);
1036 throughput_error_estimate
= 0;
1037 ratio
= mono_double_complex_make(0, 0);
1040 transition
= TRANSITION_WARMUP
;
1042 /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
1043 * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
1044 * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
1045 sample_count
= ((gint
) MIN (hc
->total_samples
- 1, hc
->samples_to_measure
) / hc
->wave_period
) * hc
->wave_period
;
1047 if (sample_count
> hc
->wave_period
) {
1049 gdouble average_throughput
;
1050 gdouble average_thread_count
;
1051 gdouble sample_sum
= 0;
1052 gdouble thread_sum
= 0;
1054 /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
1055 for (i
= 0; i
< sample_count
; ++i
) {
1056 guint j
= (hc
->total_samples
- sample_count
+ i
) % hc
->samples_to_measure
;
1057 sample_sum
+= hc
->samples
[j
];
1058 thread_sum
+= hc
->thread_counts
[j
];
1061 average_throughput
= sample_sum
/ sample_count
;
1062 average_thread_count
= thread_sum
/ sample_count
;
1064 if (average_throughput
> 0 && average_thread_count
> 0) {
1065 gdouble noise_for_confidence
, adjacent_period_1
, adjacent_period_2
;
1067 /* Calculate the periods of the adjacent frequency bands we'll be using to
1068 * measure noise levels. We want the two adjacent Fourier frequency bands. */
1069 adjacent_period_1
= sample_count
/ (((gdouble
) sample_count
) / ((gdouble
) hc
->wave_period
) + 1);
1070 adjacent_period_2
= sample_count
/ (((gdouble
) sample_count
) / ((gdouble
) hc
->wave_period
) - 1);
1072 /* Get the the three different frequency components of the throughput (scaled by average
1073 * throughput). Our "error" estimate (the amount of noise that might be present in the
1074 * frequency band we're really interested in) is the average of the adjacent bands. */
1075 throughput_wave_component
= mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker
, hc
->samples
, sample_count
, hc
->wave_period
), average_throughput
);
1076 throughput_error_estimate
= cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker
, hc
->samples
, sample_count
, adjacent_period_1
), average_throughput
));
1078 if (adjacent_period_2
<= sample_count
) {
1079 throughput_error_estimate
= MAX (throughput_error_estimate
, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (
1080 worker
, hc
->samples
, sample_count
, adjacent_period_2
), average_throughput
)));
1083 /* Do the same for the thread counts, so we have something to compare to. We don't
1084 * measure thread count noise, because there is none; these are exact measurements. */
1085 thread_wave_component
= mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker
, hc
->thread_counts
, sample_count
, hc
->wave_period
), average_thread_count
);
1087 /* Update our moving average of the throughput noise. We'll use this
1088 * later as feedback to determine the new size of the thread wave. */
1089 if (hc
->average_throughput_noise
== 0) {
1090 hc
->average_throughput_noise
= throughput_error_estimate
;
1092 hc
->average_throughput_noise
= (hc
->throughput_error_smoothing_factor
* throughput_error_estimate
)
1093 + ((1.0 + hc
->throughput_error_smoothing_factor
) * hc
->average_throughput_noise
);
1096 if (cabs (thread_wave_component
) > 0) {
1097 /* Adjust the throughput wave so it's centered around the target wave,
1098 * and then calculate the adjusted throughput/thread ratio. */
1099 ratio
= mono_double_complex_div (mono_double_complex_sub (throughput_wave_component
, mono_double_complex_scalar_mul(thread_wave_component
, hc
->target_throughput_ratio
)), thread_wave_component
);
1100 transition
= TRANSITION_CLIMBING_MOVE
;
1102 ratio
= mono_double_complex_make (0, 0);
1103 transition
= TRANSITION_STABILIZING
;
1106 noise_for_confidence
= MAX (hc
->average_throughput_noise
, throughput_error_estimate
);
1107 if (noise_for_confidence
> 0) {
1108 confidence
= cabs (thread_wave_component
) / noise_for_confidence
/ hc
->target_signal_to_noise_ratio
;
1110 /* there is no noise! */
1116 /* We use just the real part of the complex ratio we just calculated. If the throughput signal
1117 * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
1118 * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
1119 * backward (because this indicates that our changes are having the opposite of the intended effect).
1120 * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
1121 * having a negative or positive effect on throughput. */
1122 move
= creal (ratio
);
1123 move
= CLAMP (move
, -1.0, 1.0);
1125 /* Apply our confidence multiplier. */
1126 move
*= CLAMP (confidence
, -1.0, 1.0);
1128 /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
1129 * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
1130 * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
1131 gain
= hc
->max_change_per_second
* sample_duration
;
1132 move
= pow (fabs (move
), hc
->gain_exponent
) * (move
>= 0.0 ? 1 : -1) * gain
;
1133 move
= MIN (move
, hc
->max_change_per_sample
);
1135 /* If the result was positive, and CPU is > 95%, refuse the move. */
1136 if (move
> 0.0 && worker
->cpu_usage
> CPU_USAGE_HIGH
)
1139 /* Apply the move to our control setting. */
1140 hc
->current_control_setting
+= move
;
1142 /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
1143 * throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. */
1144 new_thread_wave_magnitude
= (gint
)(0.5 + (hc
->current_control_setting
* hc
->average_throughput_noise
1145 * hc
->target_signal_to_noise_ratio
* hc
->thread_magnitude_multiplier
* 2.0));
1146 new_thread_wave_magnitude
= CLAMP (new_thread_wave_magnitude
, 1, hc
->max_thread_wave_magnitude
);
1148 /* Make sure our control setting is within the MonoThreadPoolWorker's limits. */
1149 hc
->current_control_setting
= CLAMP (hc
->current_control_setting
, worker
->limit_worker_min
, worker
->limit_worker_max
- new_thread_wave_magnitude
);
1151 /* Calculate the new thread count (control setting + square wave). */
1152 new_thread_count
= (gint
)(hc
->current_control_setting
+ new_thread_wave_magnitude
* ((hc
->total_samples
/ (hc
->wave_period
/ 2)) % 2));
1154 /* Make sure the new thread count doesn't exceed the MonoThreadPoolWorker's limits. */
1155 new_thread_count
= CLAMP (new_thread_count
, worker
->limit_worker_min
, worker
->limit_worker_max
);
1157 if (new_thread_count
!= current_thread_count
)
1158 hill_climbing_change_thread_count (worker
, new_thread_count
, transition
);
1160 if (creal (ratio
) < 0.0 && new_thread_count
== worker
->limit_worker_min
)
1161 *adjustment_interval
= (gint
)(0.5 + hc
->current_sample_interval
* (10.0 * MAX (-1.0 * creal (ratio
), 1.0)));
1163 *adjustment_interval
= hc
->current_sample_interval
;
1165 return new_thread_count
;
1169 heuristic_should_adjust (MonoThreadPoolWorker
*worker
)
1171 if (worker
->heuristic_last_dequeue
> worker
->heuristic_last_adjustment
+ worker
->heuristic_adjustment_interval
) {
1172 ThreadPoolWorkerCounter counter
;
1173 counter
= COUNTER_READ (worker
);
1174 if (counter
._
.working
<= counter
._
.max_working
)
1182 heuristic_adjust (MonoThreadPoolWorker
*worker
)
1184 if (mono_coop_mutex_trylock (&worker
->heuristic_lock
) == 0) {
1185 gint32 completions
= InterlockedExchange (&worker
->heuristic_completions
, 0);
1186 gint64 sample_end
= mono_msec_ticks ();
1187 gint64 sample_duration
= sample_end
- worker
->heuristic_sample_start
;
1189 if (sample_duration
>= worker
->heuristic_adjustment_interval
/ 2) {
1190 ThreadPoolWorkerCounter counter
;
1191 gint16 new_thread_count
;
1193 counter
= COUNTER_READ (worker
);
1194 new_thread_count
= hill_climbing_update (worker
, counter
._
.max_working
, sample_duration
, completions
, &worker
->heuristic_adjustment_interval
);
1196 COUNTER_ATOMIC (worker
, counter
, {
1197 counter
._
.max_working
= new_thread_count
;
1200 if (new_thread_count
> counter
._
.max_working
)
1201 worker_request (worker
);
1203 worker
->heuristic_sample_start
= sample_end
;
1204 worker
->heuristic_last_adjustment
= mono_msec_ticks ();
1207 mono_coop_mutex_unlock (&worker
->heuristic_lock
);
1212 heuristic_notify_work_completed (MonoThreadPoolWorker
*worker
)
1216 InterlockedIncrement (&worker
->heuristic_completions
);
1217 worker
->heuristic_last_dequeue
= mono_msec_ticks ();
1219 if (heuristic_should_adjust (worker
))
1220 heuristic_adjust (worker
);
1224 mono_threadpool_worker_notify_completed (MonoThreadPoolWorker
*worker
)
1226 ThreadPoolWorkerCounter counter
;
1228 heuristic_notify_work_completed (worker
);
1230 counter
= COUNTER_READ (worker
);
1231 return counter
._
.working
<= counter
._
.max_working
;
1235 mono_threadpool_worker_get_min (MonoThreadPoolWorker
*worker
)
1237 return worker
->limit_worker_min
;
1241 mono_threadpool_worker_set_min (MonoThreadPoolWorker
*worker
, gint32 value
)
1243 if (value
<= 0 || value
> worker
->limit_worker_max
)
1246 worker
->limit_worker_min
= value
;
1251 mono_threadpool_worker_get_max (MonoThreadPoolWorker
*worker
)
1253 return worker
->limit_worker_max
;
1257 mono_threadpool_worker_set_max (MonoThreadPoolWorker
*worker
, gint32 value
)
1259 gint32 cpu_count
= mono_cpu_count ();
1261 if (value
< worker
->limit_worker_min
|| value
< cpu_count
)
1264 worker
->limit_worker_max
= value
;
1269 mono_threadpool_worker_set_suspended (MonoThreadPoolWorker
*worker
, gboolean suspended
)
1271 worker
->suspended
= suspended
;
1273 worker_request (worker
);