Nullable: ThreadPool (dotnet/coreclr#23600)
[mono-project.git] / netcore / System.Private.CoreLib / shared / System / Threading / ThreadPool.cs
blob5e3fdf99706365d1abceefb170a907f90cc32672
1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 /*=============================================================================
6 **
7 **
8 **
9 ** Purpose: Class for creating and managing a threadpool
12 =============================================================================*/
14 #nullable enable
15 using System.Collections.Concurrent;
16 using System.Collections.Generic;
17 using System.Diagnostics;
18 using System.Diagnostics.CodeAnalysis;
19 using System.Diagnostics.Tracing;
20 using System.Runtime.CompilerServices;
21 using System.Runtime.InteropServices;
22 using System.Threading.Tasks;
23 using Internal.Runtime.CompilerServices;
25 namespace System.Threading
27 internal static class ThreadPoolGlobals
29 public static readonly int processorCount = Environment.ProcessorCount;
31 public static volatile bool threadPoolInitialized;
32 public static bool enableWorkerTracking;
34 public static readonly ThreadPoolWorkQueue workQueue = new ThreadPoolWorkQueue();
36 /// <summary>Shim used to invoke <see cref="IAsyncStateMachineBox.MoveNext"/> of the supplied <see cref="IAsyncStateMachineBox"/>.</summary>
37 internal static readonly Action<object> s_invokeAsyncStateMachineBox = state =>
39 if (!(state is IAsyncStateMachineBox box))
41 ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.state);
42 return;
45 box.MoveNext();
49 [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
50 internal sealed class ThreadPoolWorkQueue
52 internal static class WorkStealingQueueList
54 private static volatile WorkStealingQueue[] _queues = new WorkStealingQueue[0];
56 public static WorkStealingQueue[] Queues => _queues;
58 public static void Add(WorkStealingQueue queue)
60 Debug.Assert(queue != null);
61 while (true)
63 WorkStealingQueue[] oldQueues = _queues;
64 Debug.Assert(Array.IndexOf(oldQueues, queue) == -1);
66 var newQueues = new WorkStealingQueue[oldQueues.Length + 1];
67 Array.Copy(oldQueues, 0, newQueues, 0, oldQueues.Length);
68 newQueues[newQueues.Length - 1] = queue;
69 if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues)
71 break;
76 public static void Remove(WorkStealingQueue queue)
78 Debug.Assert(queue != null);
79 while (true)
81 WorkStealingQueue[] oldQueues = _queues;
82 if (oldQueues.Length == 0)
84 return;
87 int pos = Array.IndexOf(oldQueues, queue);
88 if (pos == -1)
90 Debug.Fail("Should have found the queue");
91 return;
94 var newQueues = new WorkStealingQueue[oldQueues.Length - 1];
95 if (pos == 0)
97 Array.Copy(oldQueues, 1, newQueues, 0, newQueues.Length);
99 else if (pos == oldQueues.Length - 1)
101 Array.Copy(oldQueues, 0, newQueues, 0, newQueues.Length);
103 else
105 Array.Copy(oldQueues, 0, newQueues, 0, pos);
106 Array.Copy(oldQueues, pos + 1, newQueues, pos, newQueues.Length - pos);
109 if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues)
111 break;
117 internal sealed class WorkStealingQueue
119 private const int INITIAL_SIZE = 32;
120 internal volatile object?[] m_array = new object[INITIAL_SIZE]; // SOS's ThreadPool command depends on this name
121 private volatile int m_mask = INITIAL_SIZE - 1;
123 #if DEBUG
124 // in debug builds, start at the end so we exercise the index reset logic.
125 private const int START_INDEX = int.MaxValue;
126 #else
127 private const int START_INDEX = 0;
128 #endif
130 private volatile int m_headIndex = START_INDEX;
131 private volatile int m_tailIndex = START_INDEX;
133 private SpinLock m_foreignLock = new SpinLock(enableThreadOwnerTracking: false);
135 public void LocalPush(object obj)
137 int tail = m_tailIndex;
139 // We're going to increment the tail; if we'll overflow, then we need to reset our counts
140 if (tail == int.MaxValue)
142 bool lockTaken = false;
145 m_foreignLock.Enter(ref lockTaken);
147 if (m_tailIndex == int.MaxValue)
150 // Rather than resetting to zero, we'll just mask off the bits we don't care about.
151 // This way we don't need to rearrange the items already in the queue; they'll be found
152 // correctly exactly where they are. One subtlety here is that we need to make sure that
153 // if head is currently < tail, it remains that way. This happens to just fall out from
154 // the bit-masking, because we only do this if tail == int.MaxValue, meaning that all
155 // bits are set, so all of the bits we're keeping will also be set. Thus it's impossible
156 // for the head to end up > than the tail, since you can't set any more bits than all of
157 // them.
159 m_headIndex = m_headIndex & m_mask;
160 m_tailIndex = tail = m_tailIndex & m_mask;
161 Debug.Assert(m_headIndex <= m_tailIndex);
164 finally
166 if (lockTaken)
167 m_foreignLock.Exit(useMemoryBarrier: true);
171 // When there are at least 2 elements' worth of space, we can take the fast path.
172 if (tail < m_headIndex + m_mask)
174 Volatile.Write(ref m_array[tail & m_mask], obj);
175 m_tailIndex = tail + 1;
177 else
179 // We need to contend with foreign pops, so we lock.
180 bool lockTaken = false;
183 m_foreignLock.Enter(ref lockTaken);
185 int head = m_headIndex;
186 int count = m_tailIndex - m_headIndex;
188 // If there is still space (one left), just add the element.
189 if (count >= m_mask)
191 // We're full; expand the queue by doubling its size.
192 var newArray = new object?[m_array.Length << 1];
193 for (int i = 0; i < m_array.Length; i++)
194 newArray[i] = m_array[(i + head) & m_mask];
196 // Reset the field values, incl. the mask.
197 m_array = newArray;
198 m_headIndex = 0;
199 m_tailIndex = tail = count;
200 m_mask = (m_mask << 1) | 1;
203 Volatile.Write(ref m_array[tail & m_mask], obj);
204 m_tailIndex = tail + 1;
206 finally
208 if (lockTaken)
209 m_foreignLock.Exit(useMemoryBarrier: false);
214 [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
215 public bool LocalFindAndPop(object obj)
217 // Fast path: check the tail. If equal, we can skip the lock.
218 if (m_array[(m_tailIndex - 1) & m_mask] == obj)
220 object? unused = LocalPop();
221 Debug.Assert(unused == null || unused == obj);
222 return unused != null;
225 // Else, do an O(N) search for the work item. The theory of work stealing and our
226 // inlining logic is that most waits will happen on recently queued work. And
227 // since recently queued work will be close to the tail end (which is where we
228 // begin our search), we will likely find it quickly. In the worst case, we
229 // will traverse the whole local queue; this is typically not going to be a
230 // problem (although degenerate cases are clearly an issue) because local work
231 // queues tend to be somewhat shallow in length, and because if we fail to find
232 // the work item, we are about to block anyway (which is very expensive).
233 for (int i = m_tailIndex - 2; i >= m_headIndex; i--)
235 if (m_array[i & m_mask] == obj)
237 // If we found the element, block out steals to avoid interference.
238 bool lockTaken = false;
241 m_foreignLock.Enter(ref lockTaken);
243 // If we encountered a race condition, bail.
244 if (m_array[i & m_mask] == null)
245 return false;
247 // Otherwise, null out the element.
248 Volatile.Write(ref m_array[i & m_mask], null);
250 // And then check to see if we can fix up the indexes (if we're at
251 // the edge). If we can't, we just leave nulls in the array and they'll
252 // get filtered out eventually (but may lead to superfluous resizing).
253 if (i == m_tailIndex)
254 m_tailIndex -= 1;
255 else if (i == m_headIndex)
256 m_headIndex += 1;
258 return true;
260 finally
262 if (lockTaken)
263 m_foreignLock.Exit(useMemoryBarrier: false);
268 return false;
271 public object? LocalPop() => m_headIndex < m_tailIndex ? LocalPopCore() : null;
273 [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
274 private object? LocalPopCore()
276 while (true)
278 int tail = m_tailIndex;
279 if (m_headIndex >= tail)
281 return null;
284 // Decrement the tail using a fence to ensure subsequent read doesn't come before.
285 tail -= 1;
286 Interlocked.Exchange(ref m_tailIndex, tail);
288 // If there is no interaction with a take, we can head down the fast path.
289 if (m_headIndex <= tail)
291 int idx = tail & m_mask;
292 object? obj = Volatile.Read(ref m_array[idx]);
294 // Check for nulls in the array.
295 if (obj == null) continue;
297 m_array[idx] = null;
298 return obj;
300 else
302 // Interaction with takes: 0 or 1 elements left.
303 bool lockTaken = false;
306 m_foreignLock.Enter(ref lockTaken);
308 if (m_headIndex <= tail)
310 // Element still available. Take it.
311 int idx = tail & m_mask;
312 object? obj = Volatile.Read(ref m_array[idx]);
314 // Check for nulls in the array.
315 if (obj == null) continue;
317 m_array[idx] = null;
318 return obj;
320 else
322 // If we encountered a race condition and element was stolen, restore the tail.
323 m_tailIndex = tail + 1;
324 return null;
327 finally
329 if (lockTaken)
330 m_foreignLock.Exit(useMemoryBarrier: false);
336 public bool CanSteal => m_headIndex < m_tailIndex;
338 public object? TrySteal(ref bool missedSteal)
340 while (true)
342 if (CanSteal)
344 bool taken = false;
347 m_foreignLock.TryEnter(ref taken);
348 if (taken)
350 // Increment head, and ensure read of tail doesn't move before it (fence).
351 int head = m_headIndex;
352 Interlocked.Exchange(ref m_headIndex, head + 1);
354 if (head < m_tailIndex)
356 int idx = head & m_mask;
357 object? obj = Volatile.Read(ref m_array[idx]);
359 // Check for nulls in the array.
360 if (obj == null) continue;
362 m_array[idx] = null;
363 return obj;
365 else
367 // Failed, restore head.
368 m_headIndex = head;
372 finally
374 if (taken)
375 m_foreignLock.Exit(useMemoryBarrier: false);
378 missedSteal = true;
381 return null;
386 internal bool loggingEnabled;
387 internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>(); // SOS's ThreadPool command depends on this name
389 private Internal.PaddingFor32 pad1;
391 private volatile int numOutstandingThreadRequests = 0;
393 private Internal.PaddingFor32 pad2;
395 public ThreadPoolWorkQueue()
397 loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
400 public ThreadPoolWorkQueueThreadLocals GetOrCreateThreadLocals() =>
401 ThreadPoolWorkQueueThreadLocals.threadLocals ?? CreateThreadLocals();
403 [MethodImpl(MethodImplOptions.NoInlining)]
404 private ThreadPoolWorkQueueThreadLocals CreateThreadLocals()
406 Debug.Assert(ThreadPoolWorkQueueThreadLocals.threadLocals == null);
408 return (ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this));
411 internal void EnsureThreadRequested()
414 // If we have not yet requested #procs threads, then request a new thread.
416 // CoreCLR: Note that there is a separate count in the VM which has already been incremented
417 // by the VM by the time we reach this point.
419 int count = numOutstandingThreadRequests;
420 while (count < ThreadPoolGlobals.processorCount)
422 int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count);
423 if (prev == count)
425 ThreadPool.RequestWorkerThread();
426 break;
428 count = prev;
432 internal void MarkThreadRequestSatisfied()
435 // One of our outstanding thread requests has been satisfied.
436 // Decrement the count so that future calls to EnsureThreadRequested will succeed.
438 // CoreCLR: Note that there is a separate count in the VM which has already been decremented
439 // by the VM by the time we reach this point.
441 int count = numOutstandingThreadRequests;
442 while (count > 0)
444 int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count);
445 if (prev == count)
447 break;
449 count = prev;
453 public void Enqueue(object callback, bool forceGlobal)
455 Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task));
457 if (loggingEnabled)
458 System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);
460 ThreadPoolWorkQueueThreadLocals? tl = null;
461 if (!forceGlobal)
462 tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
464 if (null != tl)
466 tl.workStealingQueue.LocalPush(callback);
468 else
470 workItems.Enqueue(callback);
473 EnsureThreadRequested();
476 internal bool LocalFindAndPop(object callback)
478 ThreadPoolWorkQueueThreadLocals tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
479 return tl != null && tl.workStealingQueue.LocalFindAndPop(callback);
482 public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
484 WorkStealingQueue localWsq = tl.workStealingQueue;
485 object? callback;
487 if ((callback = localWsq.LocalPop()) == null && // first try the local queue
488 !workItems.TryDequeue(out callback)) // then try the global queue
490 // finally try to steal from another thread's local queue
491 WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
492 int c = queues.Length;
493 Debug.Assert(c > 0, "There must at least be a queue for this thread.");
494 int maxIndex = c - 1;
495 int i = tl.random.Next(c);
496 while (c > 0)
498 i = (i < maxIndex) ? i + 1 : 0;
499 WorkStealingQueue otherQueue = queues[i];
500 if (otherQueue != localWsq && otherQueue.CanSteal)
502 callback = otherQueue.TrySteal(ref missedSteal);
503 if (callback != null)
505 break;
508 c--;
512 return callback;
515 /// <summary>
516 /// Dispatches work items to this thread.
517 /// </summary>
518 /// <returns>
519 /// <c>true</c> if this thread did as much work as was available or its quantum expired.
520 /// <c>false</c> if this thread stopped working early.
521 /// </returns>
522 internal static bool Dispatch()
524 ThreadPoolWorkQueue outerWorkQueue = ThreadPoolGlobals.workQueue;
527 // Save the start time
529 int startTickCount = Environment.TickCount;
532 // Update our records to indicate that an outstanding request for a thread has now been fulfilled.
533 // From this point on, we are responsible for requesting another thread if we stop working for any
534 // reason, and we believe there might still be work in the queue.
536 // CoreCLR: Note that if this thread is aborted before we get a chance to request another one, the VM will
537 // record a thread request on our behalf. So we don't need to worry about getting aborted right here.
539 outerWorkQueue.MarkThreadRequestSatisfied();
541 // Has the desire for logging changed since the last time we entered?
542 outerWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
545 // Assume that we're going to need another thread if this one returns to the VM. We'll set this to
546 // false later, but only if we're absolutely certain that the queue is empty.
548 bool needAnotherThread = true;
549 object? outerWorkItem = null;
553 // Set up our thread-local data
555 // Use operate on workQueue local to try block so it can be enregistered
556 ThreadPoolWorkQueue workQueue = outerWorkQueue;
557 ThreadPoolWorkQueueThreadLocals tl = workQueue.GetOrCreateThreadLocals();
558 Thread currentThread = tl.currentThread;
560 // Start on clean ExecutionContext and SynchronizationContext
561 currentThread._executionContext = null;
562 currentThread._synchronizationContext = null;
565 // Loop until our quantum expires or there is no work.
567 while (ThreadPool.KeepDispatching(startTickCount))
569 bool missedSteal = false;
570 // Use operate on workItem local to try block so it can be enregistered
571 object? workItem = outerWorkItem = workQueue.Dequeue(tl, ref missedSteal);
573 if (workItem == null)
576 // No work.
577 // If we missed a steal, though, there may be more work in the queue.
578 // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread
579 // that owns the contended work-stealing queue will pick up its own workitems in the meantime,
580 // which will be more efficient than this thread doing it anyway.
582 needAnotherThread = missedSteal;
584 // Tell the VM we're returning normally, not because Hill Climbing asked us to return.
585 return true;
588 if (workQueue.loggingEnabled)
589 System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolDequeueWorkObject(workItem);
592 // If we found work, there may be more work. Ask for another thread so that the other work can be processed
593 // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue.
595 workQueue.EnsureThreadRequested();
598 // Execute the workitem outside of any finally blocks, so that it can be aborted if needed.
600 if (ThreadPoolGlobals.enableWorkerTracking)
602 bool reportedStatus = false;
605 ThreadPool.ReportThreadStatus(isWorking: true);
606 reportedStatus = true;
607 if (workItem is Task task)
609 task.ExecuteFromThreadPool(currentThread);
611 else
613 Debug.Assert(workItem is IThreadPoolWorkItem);
614 Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
617 finally
619 if (reportedStatus)
620 ThreadPool.ReportThreadStatus(isWorking: false);
623 else if (workItem is Task task)
625 // Check for Task first as it's currently faster to type check
626 // for Task and then Unsafe.As for the interface, rather than
627 // vice versa, in particular when the object implements a bunch
628 // of interfaces.
629 task.ExecuteFromThreadPool(currentThread);
631 else
633 Debug.Assert(workItem is IThreadPoolWorkItem);
634 Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
637 currentThread.ResetThreadPoolThread();
639 // Release refs
640 outerWorkItem = workItem = null;
642 // Return to clean ExecutionContext and SynchronizationContext
643 ExecutionContext.ResetThreadPoolThread(currentThread);
646 // Notify the VM that we executed this workitem. This is also our opportunity to ask whether Hill Climbing wants
647 // us to return the thread to the pool or not.
649 if (!ThreadPool.NotifyWorkItemComplete())
650 return false;
653 // If we get here, it's because our quantum expired. Tell the VM we're returning normally.
654 return true;
656 finally
659 // If we are exiting for any reason other than that the queue is definitely empty, ask for another
660 // thread to pick up where we left off.
662 if (needAnotherThread)
663 outerWorkQueue.EnsureThreadRequested();
668 // Simple random number generator. We don't need great randomness, we just need a little and for it to be fast.
669 internal struct FastRandom // xorshift prng
671 private uint _w, _x, _y, _z;
673 public FastRandom(int seed)
675 _x = (uint)seed;
676 _w = 88675123;
677 _y = 362436069;
678 _z = 521288629;
681 public int Next(int maxValue)
683 Debug.Assert(maxValue > 0);
685 uint t = _x ^ (_x << 11);
686 _x = _y; _y = _z; _z = _w;
687 _w = _w ^ (_w >> 19) ^ (t ^ (t >> 8));
689 return (int)(_w % (uint)maxValue);
693 // Holds a WorkStealingQueue, and removes it from the list when this object is no longer referenced.
694 internal sealed class ThreadPoolWorkQueueThreadLocals
696 [ThreadStatic]
697 public static ThreadPoolWorkQueueThreadLocals threadLocals;
699 public readonly ThreadPoolWorkQueue workQueue;
700 public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
701 public readonly Thread currentThread;
702 public FastRandom random = new FastRandom(Thread.CurrentThread.ManagedThreadId); // mutable struct, do not copy or make readonly
704 public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
706 workQueue = tpq;
707 workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue();
708 ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue);
709 currentThread = Thread.CurrentThread;
712 private void CleanUp()
714 if (null != workStealingQueue)
716 if (null != workQueue)
718 object? cb;
719 while ((cb = workStealingQueue.LocalPop()) != null)
721 Debug.Assert(null != cb);
722 workQueue.Enqueue(cb, forceGlobal: true);
726 ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
730 ~ThreadPoolWorkQueueThreadLocals()
732 // Since the purpose of calling CleanUp is to transfer any pending workitems into the global
733 // queue so that they will be executed by another thread, there's no point in doing this cleanup
734 // if we're in the process of shutting down or unloading the AD. In those cases, the work won't
735 // execute anyway. And there are subtle race conditions involved there that would lead us to do the wrong
736 // thing anyway. So we'll only clean up if this is a "normal" finalization.
737 if (!Environment.HasShutdownStarted)
738 CleanUp();
742 public delegate void WaitCallback(object? state);
744 public delegate void WaitOrTimerCallback(object? state, bool timedOut); // signaled or timed out
746 internal abstract class QueueUserWorkItemCallbackBase : IThreadPoolWorkItem
748 #if DEBUG
749 private volatile int executed;
751 [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1821:RemoveEmptyFinalizers")]
752 ~QueueUserWorkItemCallbackBase()
754 Debug.Assert(
755 executed != 0 || Environment.HasShutdownStarted,
756 "A QueueUserWorkItemCallback was never called!");
758 #endif
760 public virtual void Execute()
762 #if DEBUG
763 GC.SuppressFinalize(this);
764 Debug.Assert(
765 0 == Interlocked.Exchange(ref executed, 1),
766 "A QueueUserWorkItemCallback was called twice!");
767 #endif
771 internal sealed class QueueUserWorkItemCallback : QueueUserWorkItemCallbackBase
773 private WaitCallback? _callback; // SOS's ThreadPool command depends on this name
774 private readonly object? _state;
775 private readonly ExecutionContext _context;
777 private static readonly Action<QueueUserWorkItemCallback> s_executionContextShim = quwi =>
779 Debug.Assert(quwi._callback != null);
780 WaitCallback callback = quwi._callback;
781 quwi._callback = null;
783 callback(quwi._state);
786 internal QueueUserWorkItemCallback(WaitCallback callback, object? state, ExecutionContext context)
788 Debug.Assert(context != null);
790 _callback = callback;
791 _state = state;
792 _context = context;
795 public override void Execute()
797 base.Execute();
799 ExecutionContext.RunForThreadPoolUnsafe(_context, s_executionContextShim, this);
803 internal sealed class QueueUserWorkItemCallback<TState> : QueueUserWorkItemCallbackBase
805 private Action<TState>? _callback; // SOS's ThreadPool command depends on this name
806 private readonly TState _state;
807 private readonly ExecutionContext _context;
809 internal QueueUserWorkItemCallback(Action<TState> callback, TState state, ExecutionContext context)
811 Debug.Assert(callback != null);
813 _callback = callback;
814 _state = state;
815 _context = context;
818 public override void Execute()
820 base.Execute();
822 Debug.Assert(_callback != null);
823 Action<TState> callback = _callback;
824 _callback = null;
826 ExecutionContext.RunForThreadPoolUnsafe(_context, callback, in _state);
830 internal sealed class QueueUserWorkItemCallbackDefaultContext : QueueUserWorkItemCallbackBase
832 private WaitCallback? _callback; // SOS's ThreadPool command depends on this name
833 private readonly object? _state;
835 internal QueueUserWorkItemCallbackDefaultContext(WaitCallback callback, object? state)
837 Debug.Assert(callback != null);
839 _callback = callback;
840 _state = state;
843 public override void Execute()
845 ExecutionContext.CheckThreadPoolAndContextsAreDefault();
846 base.Execute();
848 Debug.Assert(_callback != null);
849 WaitCallback callback = _callback;
850 _callback = null;
852 callback(_state);
854 // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default
858 internal sealed class QueueUserWorkItemCallbackDefaultContext<TState> : QueueUserWorkItemCallbackBase
860 private Action<TState>? _callback; // SOS's ThreadPool command depends on this name
861 private readonly TState _state;
863 internal QueueUserWorkItemCallbackDefaultContext(Action<TState> callback, TState state)
865 Debug.Assert(callback != null);
867 _callback = callback;
868 _state = state;
871 public override void Execute()
873 ExecutionContext.CheckThreadPoolAndContextsAreDefault();
874 base.Execute();
876 Debug.Assert(_callback != null);
877 Action<TState> callback = _callback;
878 _callback = null;
880 callback(_state);
882 // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default
886 internal sealed class _ThreadPoolWaitOrTimerCallback
888 private WaitOrTimerCallback _waitOrTimerCallback;
889 private ExecutionContext? _executionContext;
890 private object? _state;
891 private static readonly ContextCallback _ccbt = new ContextCallback(WaitOrTimerCallback_Context_t);
892 private static readonly ContextCallback _ccbf = new ContextCallback(WaitOrTimerCallback_Context_f);
894 internal _ThreadPoolWaitOrTimerCallback(WaitOrTimerCallback waitOrTimerCallback, object? state, bool flowExecutionContext)
896 _waitOrTimerCallback = waitOrTimerCallback;
897 _state = state;
899 if (flowExecutionContext)
901 // capture the exection context
902 _executionContext = ExecutionContext.Capture();
906 private static void WaitOrTimerCallback_Context_t(object state) =>
907 WaitOrTimerCallback_Context(state, timedOut: true);
909 private static void WaitOrTimerCallback_Context_f(object state) =>
910 WaitOrTimerCallback_Context(state, timedOut: false);
912 private static void WaitOrTimerCallback_Context(object state, bool timedOut)
914 _ThreadPoolWaitOrTimerCallback helper = (_ThreadPoolWaitOrTimerCallback)state;
915 helper._waitOrTimerCallback(helper._state, timedOut);
918 // call back helper
919 internal static void PerformWaitOrTimerCallback(_ThreadPoolWaitOrTimerCallback helper, bool timedOut)
921 Debug.Assert(helper != null, "Null state passed to PerformWaitOrTimerCallback!");
922 // call directly if it is an unsafe call OR EC flow is suppressed
923 ExecutionContext? context = helper._executionContext;
924 if (context == null)
926 WaitOrTimerCallback callback = helper._waitOrTimerCallback;
927 callback(helper._state, timedOut);
929 else
931 ExecutionContext.Run(context, timedOut ? _ccbt : _ccbf, helper);
936 public static partial class ThreadPool
938 [CLSCompliant(false)]
939 public static RegisteredWaitHandle RegisterWaitForSingleObject(
940 WaitHandle waitObject,
941 WaitOrTimerCallback callBack,
942 object? state,
943 uint millisecondsTimeOutInterval,
944 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
947 if (millisecondsTimeOutInterval > (uint)int.MaxValue && millisecondsTimeOutInterval != uint.MaxValue)
948 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
949 return RegisterWaitForSingleObject(waitObject, callBack, state, millisecondsTimeOutInterval, executeOnlyOnce, true);
952 [CLSCompliant(false)]
953 public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
954 WaitHandle waitObject,
955 WaitOrTimerCallback callBack,
956 object? state,
957 uint millisecondsTimeOutInterval,
958 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
961 if (millisecondsTimeOutInterval > (uint)int.MaxValue && millisecondsTimeOutInterval != uint.MaxValue)
962 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
963 return RegisterWaitForSingleObject(waitObject, callBack, state, millisecondsTimeOutInterval, executeOnlyOnce, false);
966 public static RegisteredWaitHandle RegisterWaitForSingleObject(
967 WaitHandle waitObject,
968 WaitOrTimerCallback callBack,
969 object? state,
970 int millisecondsTimeOutInterval,
971 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
974 if (millisecondsTimeOutInterval < -1)
975 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
976 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, true);
979 public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
980 WaitHandle waitObject,
981 WaitOrTimerCallback callBack,
982 object? state,
983 int millisecondsTimeOutInterval,
984 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
987 if (millisecondsTimeOutInterval < -1)
988 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
989 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, false);
992 public static RegisteredWaitHandle RegisterWaitForSingleObject(
993 WaitHandle waitObject,
994 WaitOrTimerCallback callBack,
995 object? state,
996 long millisecondsTimeOutInterval,
997 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
1000 if (millisecondsTimeOutInterval < -1)
1001 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
1002 if (millisecondsTimeOutInterval > (uint)int.MaxValue)
1003 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
1004 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, true);
1007 public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
1008 WaitHandle waitObject,
1009 WaitOrTimerCallback callBack,
1010 object? state,
1011 long millisecondsTimeOutInterval,
1012 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
1015 if (millisecondsTimeOutInterval < -1)
1016 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
1017 if (millisecondsTimeOutInterval > (uint)int.MaxValue)
1018 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
1019 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, false);
1022 public static RegisteredWaitHandle RegisterWaitForSingleObject(
1023 WaitHandle waitObject,
1024 WaitOrTimerCallback callBack,
1025 object? state,
1026 TimeSpan timeout,
1027 bool executeOnlyOnce
1030 long tm = (long)timeout.TotalMilliseconds;
1031 if (tm < -1)
1032 throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
1033 if (tm > (long)int.MaxValue)
1034 throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
1035 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)tm, executeOnlyOnce, true);
1038 public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
1039 WaitHandle waitObject,
1040 WaitOrTimerCallback callBack,
1041 object? state,
1042 TimeSpan timeout,
1043 bool executeOnlyOnce
1046 long tm = (long)timeout.TotalMilliseconds;
1047 if (tm < -1)
1048 throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
1049 if (tm > (long)int.MaxValue)
1050 throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
1051 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)tm, executeOnlyOnce, false);
1054 public static bool QueueUserWorkItem(WaitCallback callBack) =>
1055 QueueUserWorkItem(callBack, null);
1057 public static bool QueueUserWorkItem(WaitCallback callBack, object? state)
1059 if (callBack == null)
1061 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
1064 EnsureInitialized();
1066 ExecutionContext context = ExecutionContext.Capture();
1068 object tpcallBack = (context == null || context.IsDefault) ?
1069 new QueueUserWorkItemCallbackDefaultContext(callBack!, state) :
1070 (object)new QueueUserWorkItemCallback(callBack!, state, context);
1072 ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: true);
1074 return true;
1077 public static bool QueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal)
1079 if (callBack == null)
1081 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
1084 EnsureInitialized();
1086 ExecutionContext context = ExecutionContext.Capture();
1088 object tpcallBack = (context == null || context.IsDefault) ?
1089 new QueueUserWorkItemCallbackDefaultContext<TState>(callBack!, state) :
1090 (object)new QueueUserWorkItemCallback<TState>(callBack!, state, context);
1092 ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: !preferLocal);
1094 return true;
1097 public static bool UnsafeQueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal)
1099 if (callBack == null)
1101 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
1104 // If the callback is the runtime-provided invocation of an IAsyncStateMachineBox,
1105 // then we can queue the Task state directly to the ThreadPool instead of
1106 // wrapping it in a QueueUserWorkItemCallback.
1108 // This occurs when user code queues its provided continuation to the ThreadPool;
1109 // internally we call UnsafeQueueUserWorkItemInternal directly for Tasks.
1110 if (ReferenceEquals(callBack, ThreadPoolGlobals.s_invokeAsyncStateMachineBox))
1112 if (!(state is IAsyncStateMachineBox))
1114 // The provided state must be the internal IAsyncStateMachineBox (Task) type
1115 ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.state);
1118 UnsafeQueueUserWorkItemInternal((object)state!, preferLocal);
1119 return true;
1122 EnsureInitialized();
1124 ThreadPoolGlobals.workQueue.Enqueue(
1125 new QueueUserWorkItemCallbackDefaultContext<TState>(callBack!, state), forceGlobal: !preferLocal);
1127 return true;
1130 public static bool UnsafeQueueUserWorkItem(WaitCallback callBack, object? state)
1132 if (callBack == null)
1134 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
1137 EnsureInitialized();
1139 object tpcallBack = new QueueUserWorkItemCallbackDefaultContext(callBack!, state);
1141 ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: true);
1143 return true;
1146 public static bool UnsafeQueueUserWorkItem(IThreadPoolWorkItem callBack, bool preferLocal)
1148 if (callBack == null)
1150 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
1152 if (callBack is Task)
1154 // Prevent code from queueing a derived Task that also implements the interface,
1155 // as that would bypass Task.Start and its safety checks.
1156 ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.callBack);
1159 UnsafeQueueUserWorkItemInternal(callBack!, preferLocal);
1160 return true;
1163 internal static void UnsafeQueueUserWorkItemInternal(object callBack, bool preferLocal)
1165 Debug.Assert((callBack is IThreadPoolWorkItem) ^ (callBack is Task));
1167 EnsureInitialized();
1169 ThreadPoolGlobals.workQueue.Enqueue(callBack, forceGlobal: !preferLocal);
1172 // This method tries to take the target callback out of the current thread's queue.
1173 internal static bool TryPopCustomWorkItem(object workItem)
1175 Debug.Assert(null != workItem);
1176 return
1177 ThreadPoolGlobals.threadPoolInitialized && // if not initialized, so there's no way this workitem was ever queued.
1178 ThreadPoolGlobals.workQueue.LocalFindAndPop(workItem);
1181 // Get all workitems. Called by TaskScheduler in its debugger hooks.
1182 internal static IEnumerable<object> GetQueuedWorkItems()
1184 // Enumerate global queue
1185 foreach (object workItem in ThreadPoolGlobals.workQueue.workItems)
1187 yield return workItem;
1190 // Enumerate each local queue
1191 foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.WorkStealingQueueList.Queues)
1193 if (wsq != null && wsq.m_array != null)
1195 object?[] items = wsq.m_array;
1196 for (int i = 0; i < items.Length; i++)
1198 object? item = items[i];
1199 if (item != null)
1201 yield return item;
1208 internal static IEnumerable<object> GetLocallyQueuedWorkItems()
1210 ThreadPoolWorkQueue.WorkStealingQueue wsq = ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue;
1211 if (wsq != null && wsq.m_array != null)
1213 object?[] items = wsq.m_array;
1214 for (int i = 0; i < items.Length; i++)
1216 object? item = items[i];
1217 if (item != null)
1218 yield return item;
1223 internal static IEnumerable<object> GetGloballyQueuedWorkItems() => ThreadPoolGlobals.workQueue.workItems;
1225 private static object[] ToObjectArray(IEnumerable<object> workitems)
1227 int i = 0;
1228 foreach (object item in workitems)
1230 i++;
1233 object[] result = new object[i];
1234 i = 0;
1235 foreach (object item in workitems)
1237 if (i < result.Length) //just in case someone calls us while the queues are in motion
1238 result[i] = item;
1239 i++;
1242 return result;
1245 // This is the method the debugger will actually call, if it ends up calling
1246 // into ThreadPool directly. Tests can use this to simulate a debugger, as well.
1247 internal static object[] GetQueuedWorkItemsForDebugger() =>
1248 ToObjectArray(GetQueuedWorkItems());
1250 internal static object[] GetGloballyQueuedWorkItemsForDebugger() =>
1251 ToObjectArray(GetGloballyQueuedWorkItems());
1253 internal static object[] GetLocallyQueuedWorkItemsForDebugger() =>
1254 ToObjectArray(GetLocallyQueuedWorkItems());