Contribute to IDE0059 (unnecessary assignment)
[mono-project.git] / netcore / System.Private.CoreLib / shared / System / Threading / ThreadPool.cs
blobfae558441156509cb445c8c2f9e1c974ab00b62b
1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 /*=============================================================================
6 **
7 **
8 **
9 ** Purpose: Class for creating and managing a threadpool
12 =============================================================================*/
14 using System.Collections.Concurrent;
15 using System.Collections.Generic;
16 using System.Diagnostics;
17 using System.Diagnostics.CodeAnalysis;
18 using System.Diagnostics.Tracing;
19 using System.Runtime.CompilerServices;
20 using System.Runtime.InteropServices;
21 using System.Threading.Tasks;
22 using Internal.Runtime.CompilerServices;
24 namespace System.Threading
26 internal static class ThreadPoolGlobals
28 public static readonly int processorCount = Environment.ProcessorCount;
30 public static volatile bool threadPoolInitialized;
31 public static bool enableWorkerTracking;
33 public static readonly ThreadPoolWorkQueue workQueue = new ThreadPoolWorkQueue();
35 /// <summary>Shim used to invoke <see cref="IAsyncStateMachineBox.MoveNext"/> of the supplied <see cref="IAsyncStateMachineBox"/>.</summary>
36 internal static readonly Action<object?> s_invokeAsyncStateMachineBox = state =>
38 if (!(state is IAsyncStateMachineBox box))
40 ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.state);
41 return;
44 box.MoveNext();
48 [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
49 internal sealed class ThreadPoolWorkQueue
51 internal static class WorkStealingQueueList
53 #pragma warning disable CA1825 // avoid the extra generic instantation for Array.Empty<T>(); this is the only place we'll ever create this array
54 private static volatile WorkStealingQueue[] _queues = new WorkStealingQueue[0];
55 #pragma warning restore CA1825
57 public static WorkStealingQueue[] Queues => _queues;
59 public static void Add(WorkStealingQueue queue)
61 Debug.Assert(queue != null);
62 while (true)
64 WorkStealingQueue[] oldQueues = _queues;
65 Debug.Assert(Array.IndexOf(oldQueues, queue) == -1);
67 var newQueues = new WorkStealingQueue[oldQueues.Length + 1];
68 Array.Copy(oldQueues, 0, newQueues, 0, oldQueues.Length);
69 newQueues[newQueues.Length - 1] = queue;
70 if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues)
72 break;
77 public static void Remove(WorkStealingQueue queue)
79 Debug.Assert(queue != null);
80 while (true)
82 WorkStealingQueue[] oldQueues = _queues;
83 if (oldQueues.Length == 0)
85 return;
88 int pos = Array.IndexOf(oldQueues, queue);
89 if (pos == -1)
91 Debug.Fail("Should have found the queue");
92 return;
95 var newQueues = new WorkStealingQueue[oldQueues.Length - 1];
96 if (pos == 0)
98 Array.Copy(oldQueues, 1, newQueues, 0, newQueues.Length);
100 else if (pos == oldQueues.Length - 1)
102 Array.Copy(oldQueues, 0, newQueues, 0, newQueues.Length);
104 else
106 Array.Copy(oldQueues, 0, newQueues, 0, pos);
107 Array.Copy(oldQueues, pos + 1, newQueues, pos, newQueues.Length - pos);
110 if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues)
112 break;
118 internal sealed class WorkStealingQueue
120 private const int INITIAL_SIZE = 32;
121 internal volatile object?[] m_array = new object[INITIAL_SIZE]; // SOS's ThreadPool command depends on this name
122 private volatile int m_mask = INITIAL_SIZE - 1;
124 #if DEBUG
125 // in debug builds, start at the end so we exercise the index reset logic.
126 private const int START_INDEX = int.MaxValue;
127 #else
128 private const int START_INDEX = 0;
129 #endif
131 private volatile int m_headIndex = START_INDEX;
132 private volatile int m_tailIndex = START_INDEX;
134 private SpinLock m_foreignLock = new SpinLock(enableThreadOwnerTracking: false);
136 public void LocalPush(object obj)
138 int tail = m_tailIndex;
140 // We're going to increment the tail; if we'll overflow, then we need to reset our counts
141 if (tail == int.MaxValue)
143 bool lockTaken = false;
146 m_foreignLock.Enter(ref lockTaken);
148 if (m_tailIndex == int.MaxValue)
151 // Rather than resetting to zero, we'll just mask off the bits we don't care about.
152 // This way we don't need to rearrange the items already in the queue; they'll be found
153 // correctly exactly where they are. One subtlety here is that we need to make sure that
154 // if head is currently < tail, it remains that way. This happens to just fall out from
155 // the bit-masking, because we only do this if tail == int.MaxValue, meaning that all
156 // bits are set, so all of the bits we're keeping will also be set. Thus it's impossible
157 // for the head to end up > than the tail, since you can't set any more bits than all of
158 // them.
160 m_headIndex = m_headIndex & m_mask;
161 m_tailIndex = tail = m_tailIndex & m_mask;
162 Debug.Assert(m_headIndex <= m_tailIndex);
165 finally
167 if (lockTaken)
168 m_foreignLock.Exit(useMemoryBarrier: true);
172 // When there are at least 2 elements' worth of space, we can take the fast path.
173 if (tail < m_headIndex + m_mask)
175 Volatile.Write(ref m_array[tail & m_mask], obj);
176 m_tailIndex = tail + 1;
178 else
180 // We need to contend with foreign pops, so we lock.
181 bool lockTaken = false;
184 m_foreignLock.Enter(ref lockTaken);
186 int head = m_headIndex;
187 int count = m_tailIndex - m_headIndex;
189 // If there is still space (one left), just add the element.
190 if (count >= m_mask)
192 // We're full; expand the queue by doubling its size.
193 var newArray = new object?[m_array.Length << 1];
194 for (int i = 0; i < m_array.Length; i++)
195 newArray[i] = m_array[(i + head) & m_mask];
197 // Reset the field values, incl. the mask.
198 m_array = newArray;
199 m_headIndex = 0;
200 m_tailIndex = tail = count;
201 m_mask = (m_mask << 1) | 1;
204 Volatile.Write(ref m_array[tail & m_mask], obj);
205 m_tailIndex = tail + 1;
207 finally
209 if (lockTaken)
210 m_foreignLock.Exit(useMemoryBarrier: false);
215 [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
216 public bool LocalFindAndPop(object obj)
218 // Fast path: check the tail. If equal, we can skip the lock.
219 if (m_array[(m_tailIndex - 1) & m_mask] == obj)
221 object? unused = LocalPop();
222 Debug.Assert(unused == null || unused == obj);
223 return unused != null;
226 // Else, do an O(N) search for the work item. The theory of work stealing and our
227 // inlining logic is that most waits will happen on recently queued work. And
228 // since recently queued work will be close to the tail end (which is where we
229 // begin our search), we will likely find it quickly. In the worst case, we
230 // will traverse the whole local queue; this is typically not going to be a
231 // problem (although degenerate cases are clearly an issue) because local work
232 // queues tend to be somewhat shallow in length, and because if we fail to find
233 // the work item, we are about to block anyway (which is very expensive).
234 for (int i = m_tailIndex - 2; i >= m_headIndex; i--)
236 if (m_array[i & m_mask] == obj)
238 // If we found the element, block out steals to avoid interference.
239 bool lockTaken = false;
242 m_foreignLock.Enter(ref lockTaken);
244 // If we encountered a race condition, bail.
245 if (m_array[i & m_mask] == null)
246 return false;
248 // Otherwise, null out the element.
249 Volatile.Write(ref m_array[i & m_mask], null);
251 // And then check to see if we can fix up the indexes (if we're at
252 // the edge). If we can't, we just leave nulls in the array and they'll
253 // get filtered out eventually (but may lead to superfluous resizing).
254 if (i == m_tailIndex)
255 m_tailIndex -= 1;
256 else if (i == m_headIndex)
257 m_headIndex += 1;
259 return true;
261 finally
263 if (lockTaken)
264 m_foreignLock.Exit(useMemoryBarrier: false);
269 return false;
272 public object? LocalPop() => m_headIndex < m_tailIndex ? LocalPopCore() : null;
274 [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
275 private object? LocalPopCore()
277 while (true)
279 int tail = m_tailIndex;
280 if (m_headIndex >= tail)
282 return null;
285 // Decrement the tail using a fence to ensure subsequent read doesn't come before.
286 tail -= 1;
287 Interlocked.Exchange(ref m_tailIndex, tail);
289 // If there is no interaction with a take, we can head down the fast path.
290 if (m_headIndex <= tail)
292 int idx = tail & m_mask;
293 object? obj = Volatile.Read(ref m_array[idx]);
295 // Check for nulls in the array.
296 if (obj == null) continue;
298 m_array[idx] = null;
299 return obj;
301 else
303 // Interaction with takes: 0 or 1 elements left.
304 bool lockTaken = false;
307 m_foreignLock.Enter(ref lockTaken);
309 if (m_headIndex <= tail)
311 // Element still available. Take it.
312 int idx = tail & m_mask;
313 object? obj = Volatile.Read(ref m_array[idx]);
315 // Check for nulls in the array.
316 if (obj == null) continue;
318 m_array[idx] = null;
319 return obj;
321 else
323 // If we encountered a race condition and element was stolen, restore the tail.
324 m_tailIndex = tail + 1;
325 return null;
328 finally
330 if (lockTaken)
331 m_foreignLock.Exit(useMemoryBarrier: false);
337 public bool CanSteal => m_headIndex < m_tailIndex;
339 public object? TrySteal(ref bool missedSteal)
341 while (true)
343 if (CanSteal)
345 bool taken = false;
348 m_foreignLock.TryEnter(ref taken);
349 if (taken)
351 // Increment head, and ensure read of tail doesn't move before it (fence).
352 int head = m_headIndex;
353 Interlocked.Exchange(ref m_headIndex, head + 1);
355 if (head < m_tailIndex)
357 int idx = head & m_mask;
358 object? obj = Volatile.Read(ref m_array[idx]);
360 // Check for nulls in the array.
361 if (obj == null) continue;
363 m_array[idx] = null;
364 return obj;
366 else
368 // Failed, restore head.
369 m_headIndex = head;
373 finally
375 if (taken)
376 m_foreignLock.Exit(useMemoryBarrier: false);
379 missedSteal = true;
382 return null;
386 public int Count
390 bool lockTaken = false;
393 m_foreignLock.Enter(ref lockTaken);
394 return Math.Max(0, m_tailIndex - m_headIndex);
396 finally
398 if (lockTaken)
400 m_foreignLock.Exit(useMemoryBarrier: false);
407 internal bool loggingEnabled;
408 internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>(); // SOS's ThreadPool command depends on this name
410 private readonly Internal.PaddingFor32 pad1;
412 private volatile int numOutstandingThreadRequests = 0;
414 private readonly Internal.PaddingFor32 pad2;
416 public ThreadPoolWorkQueue()
418 loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
421 public ThreadPoolWorkQueueThreadLocals GetOrCreateThreadLocals() =>
422 ThreadPoolWorkQueueThreadLocals.threadLocals ?? CreateThreadLocals();
424 [MethodImpl(MethodImplOptions.NoInlining)]
425 private ThreadPoolWorkQueueThreadLocals CreateThreadLocals()
427 Debug.Assert(ThreadPoolWorkQueueThreadLocals.threadLocals == null);
429 return (ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this));
432 internal void EnsureThreadRequested()
435 // If we have not yet requested #procs threads, then request a new thread.
437 // CoreCLR: Note that there is a separate count in the VM which has already been incremented
438 // by the VM by the time we reach this point.
440 int count = numOutstandingThreadRequests;
441 while (count < ThreadPoolGlobals.processorCount)
443 int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count);
444 if (prev == count)
446 ThreadPool.RequestWorkerThread();
447 break;
449 count = prev;
453 internal void MarkThreadRequestSatisfied()
456 // One of our outstanding thread requests has been satisfied.
457 // Decrement the count so that future calls to EnsureThreadRequested will succeed.
459 // CoreCLR: Note that there is a separate count in the VM which has already been decremented
460 // by the VM by the time we reach this point.
462 int count = numOutstandingThreadRequests;
463 while (count > 0)
465 int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count);
466 if (prev == count)
468 break;
470 count = prev;
474 public void Enqueue(object callback, bool forceGlobal)
476 Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task));
478 if (loggingEnabled)
479 System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);
481 ThreadPoolWorkQueueThreadLocals? tl = null;
482 if (!forceGlobal)
483 tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
485 if (null != tl)
487 tl.workStealingQueue.LocalPush(callback);
489 else
491 workItems.Enqueue(callback);
494 EnsureThreadRequested();
497 internal bool LocalFindAndPop(object callback)
499 ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
500 return tl != null && tl.workStealingQueue.LocalFindAndPop(callback);
503 public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
505 WorkStealingQueue localWsq = tl.workStealingQueue;
506 object? callback;
508 if ((callback = localWsq.LocalPop()) == null && // first try the local queue
509 !workItems.TryDequeue(out callback)) // then try the global queue
511 // finally try to steal from another thread's local queue
512 WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
513 int c = queues.Length;
514 Debug.Assert(c > 0, "There must at least be a queue for this thread.");
515 int maxIndex = c - 1;
516 int i = tl.random.Next(c);
517 while (c > 0)
519 i = (i < maxIndex) ? i + 1 : 0;
520 WorkStealingQueue otherQueue = queues[i];
521 if (otherQueue != localWsq && otherQueue.CanSteal)
523 callback = otherQueue.TrySteal(ref missedSteal);
524 if (callback != null)
526 break;
529 c--;
533 return callback;
536 public long LocalCount
540 long count = 0;
541 foreach (WorkStealingQueue workStealingQueue in WorkStealingQueueList.Queues)
543 count += workStealingQueue.Count;
545 return count;
549 public long GlobalCount => workItems.Count;
551 /// <summary>
552 /// Dispatches work items to this thread.
553 /// </summary>
554 /// <returns>
555 /// <c>true</c> if this thread did as much work as was available or its quantum expired.
556 /// <c>false</c> if this thread stopped working early.
557 /// </returns>
558 internal static bool Dispatch()
560 ThreadPoolWorkQueue outerWorkQueue = ThreadPoolGlobals.workQueue;
563 // Save the start time
565 int startTickCount = Environment.TickCount;
568 // Update our records to indicate that an outstanding request for a thread has now been fulfilled.
569 // From this point on, we are responsible for requesting another thread if we stop working for any
570 // reason, and we believe there might still be work in the queue.
572 // CoreCLR: Note that if this thread is aborted before we get a chance to request another one, the VM will
573 // record a thread request on our behalf. So we don't need to worry about getting aborted right here.
575 outerWorkQueue.MarkThreadRequestSatisfied();
577 // Has the desire for logging changed since the last time we entered?
578 outerWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
581 // Assume that we're going to need another thread if this one returns to the VM. We'll set this to
582 // false later, but only if we're absolutely certain that the queue is empty.
584 bool needAnotherThread = true;
588 // Set up our thread-local data
590 // Use operate on workQueue local to try block so it can be enregistered
591 ThreadPoolWorkQueue workQueue = outerWorkQueue;
592 ThreadPoolWorkQueueThreadLocals tl = workQueue.GetOrCreateThreadLocals();
593 Thread currentThread = tl.currentThread;
595 // Start on clean ExecutionContext and SynchronizationContext
596 currentThread._executionContext = null;
597 currentThread._synchronizationContext = null;
600 // Loop until our quantum expires or there is no work.
602 while (ThreadPool.KeepDispatching(startTickCount))
604 bool missedSteal = false;
605 // Use operate on workItem local to try block so it can be enregistered
606 object? workItem = workQueue.Dequeue(tl, ref missedSteal);
608 if (workItem == null)
611 // No work.
612 // If we missed a steal, though, there may be more work in the queue.
613 // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread
614 // that owns the contended work-stealing queue will pick up its own workitems in the meantime,
615 // which will be more efficient than this thread doing it anyway.
617 needAnotherThread = missedSteal;
619 // Tell the VM we're returning normally, not because Hill Climbing asked us to return.
620 return true;
623 if (workQueue.loggingEnabled)
624 System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolDequeueWorkObject(workItem);
627 // If we found work, there may be more work. Ask for another thread so that the other work can be processed
628 // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue.
630 workQueue.EnsureThreadRequested();
633 // Execute the workitem outside of any finally blocks, so that it can be aborted if needed.
635 if (ThreadPoolGlobals.enableWorkerTracking)
637 bool reportedStatus = false;
640 ThreadPool.ReportThreadStatus(isWorking: true);
641 reportedStatus = true;
642 if (workItem is Task task)
644 task.ExecuteFromThreadPool(currentThread);
646 else
648 Debug.Assert(workItem is IThreadPoolWorkItem);
649 Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
652 finally
654 if (reportedStatus)
655 ThreadPool.ReportThreadStatus(isWorking: false);
658 else if (workItem is Task task)
660 // Check for Task first as it's currently faster to type check
661 // for Task and then Unsafe.As for the interface, rather than
662 // vice versa, in particular when the object implements a bunch
663 // of interfaces.
664 task.ExecuteFromThreadPool(currentThread);
666 else
668 Debug.Assert(workItem is IThreadPoolWorkItem);
669 Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
672 currentThread.ResetThreadPoolThread();
674 // Release refs
675 workItem = null;
677 // Return to clean ExecutionContext and SynchronizationContext
678 ExecutionContext.ResetThreadPoolThread(currentThread);
681 // Notify the VM that we executed this workitem. This is also our opportunity to ask whether Hill Climbing wants
682 // us to return the thread to the pool or not.
684 if (!ThreadPool.NotifyWorkItemComplete())
685 return false;
688 // If we get here, it's because our quantum expired. Tell the VM we're returning normally.
689 return true;
691 finally
694 // If we are exiting for any reason other than that the queue is definitely empty, ask for another
695 // thread to pick up where we left off.
697 if (needAnotherThread)
698 outerWorkQueue.EnsureThreadRequested();
703 // Simple random number generator. We don't need great randomness, we just need a little and for it to be fast.
704 internal struct FastRandom // xorshift prng
706 private uint _w, _x, _y, _z;
708 public FastRandom(int seed)
710 _x = (uint)seed;
711 _w = 88675123;
712 _y = 362436069;
713 _z = 521288629;
716 public int Next(int maxValue)
718 Debug.Assert(maxValue > 0);
720 uint t = _x ^ (_x << 11);
721 _x = _y; _y = _z; _z = _w;
722 _w = _w ^ (_w >> 19) ^ (t ^ (t >> 8));
724 return (int)(_w % (uint)maxValue);
728 // Holds a WorkStealingQueue, and removes it from the list when this object is no longer referenced.
729 internal sealed class ThreadPoolWorkQueueThreadLocals
731 [ThreadStatic]
732 public static ThreadPoolWorkQueueThreadLocals? threadLocals;
734 public readonly ThreadPoolWorkQueue workQueue;
735 public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
736 public readonly Thread currentThread;
737 public FastRandom random = new FastRandom(Thread.CurrentThread.ManagedThreadId); // mutable struct, do not copy or make readonly
739 public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
741 workQueue = tpq;
742 workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue();
743 ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue);
744 currentThread = Thread.CurrentThread;
747 ~ThreadPoolWorkQueueThreadLocals()
749 // Transfer any pending workitems into the global queue so that they will be executed by another thread
750 if (null != workStealingQueue)
752 if (null != workQueue)
754 object? cb;
755 while ((cb = workStealingQueue.LocalPop()) != null)
757 Debug.Assert(null != cb);
758 workQueue.Enqueue(cb, forceGlobal: true);
762 ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
767 public delegate void WaitCallback(object? state);
769 public delegate void WaitOrTimerCallback(object? state, bool timedOut); // signaled or timed out
771 internal abstract class QueueUserWorkItemCallbackBase : IThreadPoolWorkItem
773 #if DEBUG
774 private int executed;
776 [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1821:RemoveEmptyFinalizers")]
777 ~QueueUserWorkItemCallbackBase()
779 Interlocked.MemoryBarrier(); // ensure that an old cached value is not read below
780 Debug.Assert(
781 executed != 0, "A QueueUserWorkItemCallback was never called!");
783 #endif
785 public virtual void Execute()
787 #if DEBUG
788 GC.SuppressFinalize(this);
789 Debug.Assert(
790 0 == Interlocked.Exchange(ref executed, 1),
791 "A QueueUserWorkItemCallback was called twice!");
792 #endif
796 internal sealed class QueueUserWorkItemCallback : QueueUserWorkItemCallbackBase
798 private WaitCallback? _callback; // SOS's ThreadPool command depends on this name
799 private readonly object? _state;
800 private readonly ExecutionContext _context;
802 private static readonly Action<QueueUserWorkItemCallback> s_executionContextShim = quwi =>
804 Debug.Assert(quwi._callback != null);
805 WaitCallback callback = quwi._callback;
806 quwi._callback = null;
808 callback(quwi._state);
811 internal QueueUserWorkItemCallback(WaitCallback callback, object? state, ExecutionContext context)
813 Debug.Assert(context != null);
815 _callback = callback;
816 _state = state;
817 _context = context;
820 public override void Execute()
822 base.Execute();
824 ExecutionContext.RunForThreadPoolUnsafe(_context, s_executionContextShim, this);
828 internal sealed class QueueUserWorkItemCallback<TState> : QueueUserWorkItemCallbackBase
830 private Action<TState>? _callback; // SOS's ThreadPool command depends on this name
831 private readonly TState _state;
832 private readonly ExecutionContext _context;
834 internal QueueUserWorkItemCallback(Action<TState> callback, TState state, ExecutionContext context)
836 Debug.Assert(callback != null);
838 _callback = callback;
839 _state = state;
840 _context = context;
843 public override void Execute()
845 base.Execute();
847 Debug.Assert(_callback != null);
848 Action<TState> callback = _callback;
849 _callback = null;
851 ExecutionContext.RunForThreadPoolUnsafe(_context, callback, in _state);
855 internal sealed class QueueUserWorkItemCallbackDefaultContext : QueueUserWorkItemCallbackBase
857 private WaitCallback? _callback; // SOS's ThreadPool command depends on this name
858 private readonly object? _state;
860 internal QueueUserWorkItemCallbackDefaultContext(WaitCallback callback, object? state)
862 Debug.Assert(callback != null);
864 _callback = callback;
865 _state = state;
868 public override void Execute()
870 ExecutionContext.CheckThreadPoolAndContextsAreDefault();
871 base.Execute();
873 Debug.Assert(_callback != null);
874 WaitCallback callback = _callback;
875 _callback = null;
877 callback(_state);
879 // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default
883 internal sealed class QueueUserWorkItemCallbackDefaultContext<TState> : QueueUserWorkItemCallbackBase
885 private Action<TState>? _callback; // SOS's ThreadPool command depends on this name
886 private readonly TState _state;
888 internal QueueUserWorkItemCallbackDefaultContext(Action<TState> callback, TState state)
890 Debug.Assert(callback != null);
892 _callback = callback;
893 _state = state;
896 public override void Execute()
898 ExecutionContext.CheckThreadPoolAndContextsAreDefault();
899 base.Execute();
901 Debug.Assert(_callback != null);
902 Action<TState> callback = _callback;
903 _callback = null;
905 callback(_state);
907 // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default
911 internal sealed class _ThreadPoolWaitOrTimerCallback
913 private readonly WaitOrTimerCallback _waitOrTimerCallback;
914 private readonly ExecutionContext? _executionContext;
915 private readonly object? _state;
916 private static readonly ContextCallback _ccbt = new ContextCallback(WaitOrTimerCallback_Context_t);
917 private static readonly ContextCallback _ccbf = new ContextCallback(WaitOrTimerCallback_Context_f);
919 internal _ThreadPoolWaitOrTimerCallback(WaitOrTimerCallback waitOrTimerCallback, object? state, bool flowExecutionContext)
921 _waitOrTimerCallback = waitOrTimerCallback;
922 _state = state;
924 if (flowExecutionContext)
926 // capture the exection context
927 _executionContext = ExecutionContext.Capture();
931 private static void WaitOrTimerCallback_Context_t(object? state) =>
932 WaitOrTimerCallback_Context(state, timedOut: true);
934 private static void WaitOrTimerCallback_Context_f(object? state) =>
935 WaitOrTimerCallback_Context(state, timedOut: false);
937 private static void WaitOrTimerCallback_Context(object? state, bool timedOut)
939 _ThreadPoolWaitOrTimerCallback helper = (_ThreadPoolWaitOrTimerCallback)state!;
940 helper._waitOrTimerCallback(helper._state, timedOut);
943 // call back helper
944 internal static void PerformWaitOrTimerCallback(_ThreadPoolWaitOrTimerCallback helper, bool timedOut)
946 Debug.Assert(helper != null, "Null state passed to PerformWaitOrTimerCallback!");
947 // call directly if it is an unsafe call OR EC flow is suppressed
948 ExecutionContext? context = helper._executionContext;
949 if (context == null)
951 WaitOrTimerCallback callback = helper._waitOrTimerCallback;
952 callback(helper._state, timedOut);
954 else
956 ExecutionContext.Run(context, timedOut ? _ccbt : _ccbf, helper);
961 public static partial class ThreadPool
963 [CLSCompliant(false)]
964 public static RegisteredWaitHandle RegisterWaitForSingleObject(
965 WaitHandle waitObject,
966 WaitOrTimerCallback callBack,
967 object? state,
968 uint millisecondsTimeOutInterval,
969 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
972 if (millisecondsTimeOutInterval > (uint)int.MaxValue && millisecondsTimeOutInterval != uint.MaxValue)
973 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
974 return RegisterWaitForSingleObject(waitObject, callBack, state, millisecondsTimeOutInterval, executeOnlyOnce, true);
977 [CLSCompliant(false)]
978 public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
979 WaitHandle waitObject,
980 WaitOrTimerCallback callBack,
981 object? state,
982 uint millisecondsTimeOutInterval,
983 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
986 if (millisecondsTimeOutInterval > (uint)int.MaxValue && millisecondsTimeOutInterval != uint.MaxValue)
987 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
988 return RegisterWaitForSingleObject(waitObject, callBack, state, millisecondsTimeOutInterval, executeOnlyOnce, false);
991 public static RegisteredWaitHandle RegisterWaitForSingleObject(
992 WaitHandle waitObject,
993 WaitOrTimerCallback callBack,
994 object? state,
995 int millisecondsTimeOutInterval,
996 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
999 if (millisecondsTimeOutInterval < -1)
1000 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
1001 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, true);
1004 public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
1005 WaitHandle waitObject,
1006 WaitOrTimerCallback callBack,
1007 object? state,
1008 int millisecondsTimeOutInterval,
1009 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
1012 if (millisecondsTimeOutInterval < -1)
1013 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
1014 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, false);
1017 public static RegisteredWaitHandle RegisterWaitForSingleObject(
1018 WaitHandle waitObject,
1019 WaitOrTimerCallback callBack,
1020 object? state,
1021 long millisecondsTimeOutInterval,
1022 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
1025 if (millisecondsTimeOutInterval < -1)
1026 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
1027 if (millisecondsTimeOutInterval > (uint)int.MaxValue)
1028 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
1029 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, true);
1032 public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
1033 WaitHandle waitObject,
1034 WaitOrTimerCallback callBack,
1035 object? state,
1036 long millisecondsTimeOutInterval,
1037 bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
1040 if (millisecondsTimeOutInterval < -1)
1041 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
1042 if (millisecondsTimeOutInterval > (uint)int.MaxValue)
1043 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
1044 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, false);
1047 public static RegisteredWaitHandle RegisterWaitForSingleObject(
1048 WaitHandle waitObject,
1049 WaitOrTimerCallback callBack,
1050 object? state,
1051 TimeSpan timeout,
1052 bool executeOnlyOnce
1055 long tm = (long)timeout.TotalMilliseconds;
1056 if (tm < -1)
1057 throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
1058 if (tm > (long)int.MaxValue)
1059 throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
1060 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)tm, executeOnlyOnce, true);
1063 public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
1064 WaitHandle waitObject,
1065 WaitOrTimerCallback callBack,
1066 object? state,
1067 TimeSpan timeout,
1068 bool executeOnlyOnce
1071 long tm = (long)timeout.TotalMilliseconds;
1072 if (tm < -1)
1073 throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
1074 if (tm > (long)int.MaxValue)
1075 throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
1076 return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)tm, executeOnlyOnce, false);
1079 public static bool QueueUserWorkItem(WaitCallback callBack) =>
1080 QueueUserWorkItem(callBack, null);
1082 public static bool QueueUserWorkItem(WaitCallback callBack, object? state)
1084 if (callBack == null)
1086 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
1089 EnsureInitialized();
1091 ExecutionContext? context = ExecutionContext.Capture();
1093 object tpcallBack = (context == null || context.IsDefault) ?
1094 new QueueUserWorkItemCallbackDefaultContext(callBack!, state) :
1095 (object)new QueueUserWorkItemCallback(callBack!, state, context);
1097 ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: true);
1099 return true;
1102 public static bool QueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal)
1104 if (callBack == null)
1106 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
1109 EnsureInitialized();
1111 ExecutionContext? context = ExecutionContext.Capture();
1113 object tpcallBack = (context == null || context.IsDefault) ?
1114 new QueueUserWorkItemCallbackDefaultContext<TState>(callBack!, state) :
1115 (object)new QueueUserWorkItemCallback<TState>(callBack!, state, context);
1117 ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: !preferLocal);
1119 return true;
1122 public static bool UnsafeQueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal)
1124 if (callBack == null)
1126 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
1129 // If the callback is the runtime-provided invocation of an IAsyncStateMachineBox,
1130 // then we can queue the Task state directly to the ThreadPool instead of
1131 // wrapping it in a QueueUserWorkItemCallback.
1133 // This occurs when user code queues its provided continuation to the ThreadPool;
1134 // internally we call UnsafeQueueUserWorkItemInternal directly for Tasks.
1135 if (ReferenceEquals(callBack, ThreadPoolGlobals.s_invokeAsyncStateMachineBox))
1137 if (!(state is IAsyncStateMachineBox))
1139 // The provided state must be the internal IAsyncStateMachineBox (Task) type
1140 ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.state);
1143 UnsafeQueueUserWorkItemInternal((object)state!, preferLocal);
1144 return true;
1147 EnsureInitialized();
1149 ThreadPoolGlobals.workQueue.Enqueue(
1150 new QueueUserWorkItemCallbackDefaultContext<TState>(callBack!, state), forceGlobal: !preferLocal);
1152 return true;
1155 public static bool UnsafeQueueUserWorkItem(WaitCallback callBack, object? state)
1157 if (callBack == null)
1159 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
1162 EnsureInitialized();
1164 object tpcallBack = new QueueUserWorkItemCallbackDefaultContext(callBack!, state);
1166 ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: true);
1168 return true;
1171 public static bool UnsafeQueueUserWorkItem(IThreadPoolWorkItem callBack, bool preferLocal)
1173 if (callBack == null)
1175 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
1177 if (callBack is Task)
1179 // Prevent code from queueing a derived Task that also implements the interface,
1180 // as that would bypass Task.Start and its safety checks.
1181 ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.callBack);
1184 UnsafeQueueUserWorkItemInternal(callBack!, preferLocal);
1185 return true;
1188 internal static void UnsafeQueueUserWorkItemInternal(object callBack, bool preferLocal)
1190 Debug.Assert((callBack is IThreadPoolWorkItem) ^ (callBack is Task));
1192 EnsureInitialized();
1194 ThreadPoolGlobals.workQueue.Enqueue(callBack, forceGlobal: !preferLocal);
1197 // This method tries to take the target callback out of the current thread's queue.
1198 internal static bool TryPopCustomWorkItem(object workItem)
1200 Debug.Assert(null != workItem);
1201 return
1202 ThreadPoolGlobals.threadPoolInitialized && // if not initialized, so there's no way this workitem was ever queued.
1203 ThreadPoolGlobals.workQueue.LocalFindAndPop(workItem);
1206 // Get all workitems. Called by TaskScheduler in its debugger hooks.
1207 internal static IEnumerable<object> GetQueuedWorkItems()
1209 // Enumerate global queue
1210 foreach (object workItem in ThreadPoolGlobals.workQueue.workItems)
1212 yield return workItem;
1215 // Enumerate each local queue
1216 foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.WorkStealingQueueList.Queues)
1218 if (wsq != null && wsq.m_array != null)
1220 object?[] items = wsq.m_array;
1221 for (int i = 0; i < items.Length; i++)
1223 object? item = items[i];
1224 if (item != null)
1226 yield return item;
1233 internal static IEnumerable<object> GetLocallyQueuedWorkItems()
1235 ThreadPoolWorkQueue.WorkStealingQueue? wsq = ThreadPoolWorkQueueThreadLocals.threadLocals?.workStealingQueue;
1236 if (wsq != null && wsq.m_array != null)
1238 object?[] items = wsq.m_array;
1239 for (int i = 0; i < items.Length; i++)
1241 object? item = items[i];
1242 if (item != null)
1243 yield return item;
1248 internal static IEnumerable<object> GetGloballyQueuedWorkItems() => ThreadPoolGlobals.workQueue.workItems;
1250 private static object[] ToObjectArray(IEnumerable<object> workitems)
1252 int i = 0;
1253 foreach (object item in workitems)
1255 i++;
1258 object[] result = new object[i];
1259 i = 0;
1260 foreach (object item in workitems)
1262 if (i < result.Length) //just in case someone calls us while the queues are in motion
1263 result[i] = item;
1264 i++;
1267 return result;
1270 // This is the method the debugger will actually call, if it ends up calling
1271 // into ThreadPool directly. Tests can use this to simulate a debugger, as well.
1272 internal static object[] GetQueuedWorkItemsForDebugger() =>
1273 ToObjectArray(GetQueuedWorkItems());
1275 internal static object[] GetGloballyQueuedWorkItemsForDebugger() =>
1276 ToObjectArray(GetGloballyQueuedWorkItems());
1278 internal static object[] GetLocallyQueuedWorkItemsForDebugger() =>
1279 ToObjectArray(GetLocallyQueuedWorkItems());
1281 /// <summary>
1282 /// Gets the number of work items that are currently queued to be processed.
1283 /// </summary>
1284 /// <remarks>
1285 /// For a thread pool implementation that may have different types of work items, the count includes all types that can
1286 /// be tracked, which may only be the user work items including tasks. Some implementations may also include queued
1287 /// timer and wait callbacks in the count. On Windows, the count is unlikely to include the number of pending IO
1288 /// completions, as they get posted directly to an IO completion port.
1289 /// </remarks>
1290 public static long PendingWorkItemCount
1294 ThreadPoolWorkQueue workQueue = ThreadPoolGlobals.workQueue;
1295 return workQueue.LocalCount + workQueue.GlobalCount + PendingUnmanagedWorkItemCount;