1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 /*=============================================================================
9 ** Purpose: Class for creating and managing a threadpool
12 =============================================================================*/
14 using System
.Collections
.Concurrent
;
15 using System
.Collections
.Generic
;
16 using System
.Diagnostics
;
17 using System
.Diagnostics
.CodeAnalysis
;
18 using System
.Diagnostics
.Tracing
;
19 using System
.Runtime
.CompilerServices
;
20 using System
.Runtime
.InteropServices
;
21 using System
.Threading
.Tasks
;
22 using Internal
.Runtime
.CompilerServices
;
24 namespace System
.Threading
26 internal static class ThreadPoolGlobals
28 public static readonly int processorCount
= Environment
.ProcessorCount
;
30 public static volatile bool threadPoolInitialized
;
31 public static bool enableWorkerTracking
;
33 public static readonly ThreadPoolWorkQueue workQueue
= new ThreadPoolWorkQueue();
35 /// <summary>Shim used to invoke <see cref="IAsyncStateMachineBox.MoveNext"/> of the supplied <see cref="IAsyncStateMachineBox"/>.</summary>
36 internal static readonly Action
<object?> s_invokeAsyncStateMachineBox
= state
=>
38 if (!(state
is IAsyncStateMachineBox box
))
40 ThrowHelper
.ThrowArgumentOutOfRangeException(ExceptionArgument
.state
);
48 [StructLayout(LayoutKind
.Sequential
)] // enforce layout so that padding reduces false sharing
49 internal sealed class ThreadPoolWorkQueue
51 internal static class WorkStealingQueueList
53 #pragma warning disable CA1825 // avoid the extra generic instantation for Array.Empty<T>(); this is the only place we'll ever create this array
54 private static volatile WorkStealingQueue
[] _queues
= new WorkStealingQueue
[0];
55 #pragma warning restore CA1825
57 public static WorkStealingQueue
[] Queues
=> _queues
;
59 public static void Add(WorkStealingQueue queue
)
61 Debug
.Assert(queue
!= null);
64 WorkStealingQueue
[] oldQueues
= _queues
;
65 Debug
.Assert(Array
.IndexOf(oldQueues
, queue
) == -1);
67 var newQueues
= new WorkStealingQueue
[oldQueues
.Length
+ 1];
68 Array
.Copy(oldQueues
, 0, newQueues
, 0, oldQueues
.Length
);
69 newQueues
[newQueues
.Length
- 1] = queue
;
70 if (Interlocked
.CompareExchange(ref _queues
, newQueues
, oldQueues
) == oldQueues
)
77 public static void Remove(WorkStealingQueue queue
)
79 Debug
.Assert(queue
!= null);
82 WorkStealingQueue
[] oldQueues
= _queues
;
83 if (oldQueues
.Length
== 0)
88 int pos
= Array
.IndexOf(oldQueues
, queue
);
91 Debug
.Fail("Should have found the queue");
95 var newQueues
= new WorkStealingQueue
[oldQueues
.Length
- 1];
98 Array
.Copy(oldQueues
, 1, newQueues
, 0, newQueues
.Length
);
100 else if (pos
== oldQueues
.Length
- 1)
102 Array
.Copy(oldQueues
, 0, newQueues
, 0, newQueues
.Length
);
106 Array
.Copy(oldQueues
, 0, newQueues
, 0, pos
);
107 Array
.Copy(oldQueues
, pos
+ 1, newQueues
, pos
, newQueues
.Length
- pos
);
110 if (Interlocked
.CompareExchange(ref _queues
, newQueues
, oldQueues
) == oldQueues
)
118 internal sealed class WorkStealingQueue
120 private const int INITIAL_SIZE
= 32;
121 internal volatile object?[] m_array
= new object[INITIAL_SIZE
]; // SOS's ThreadPool command depends on this name
122 private volatile int m_mask
= INITIAL_SIZE
- 1;
125 // in debug builds, start at the end so we exercise the index reset logic.
126 private const int START_INDEX
= int.MaxValue
;
128 private const int START_INDEX
= 0;
131 private volatile int m_headIndex
= START_INDEX
;
132 private volatile int m_tailIndex
= START_INDEX
;
134 private SpinLock m_foreignLock
= new SpinLock(enableThreadOwnerTracking
: false);
136 public void LocalPush(object obj
)
138 int tail
= m_tailIndex
;
140 // We're going to increment the tail; if we'll overflow, then we need to reset our counts
141 if (tail
== int.MaxValue
)
143 bool lockTaken
= false;
146 m_foreignLock
.Enter(ref lockTaken
);
148 if (m_tailIndex
== int.MaxValue
)
151 // Rather than resetting to zero, we'll just mask off the bits we don't care about.
152 // This way we don't need to rearrange the items already in the queue; they'll be found
153 // correctly exactly where they are. One subtlety here is that we need to make sure that
154 // if head is currently < tail, it remains that way. This happens to just fall out from
155 // the bit-masking, because we only do this if tail == int.MaxValue, meaning that all
156 // bits are set, so all of the bits we're keeping will also be set. Thus it's impossible
157 // for the head to end up > than the tail, since you can't set any more bits than all of
160 m_headIndex
= m_headIndex
& m_mask
;
161 m_tailIndex
= tail
= m_tailIndex
& m_mask
;
162 Debug
.Assert(m_headIndex
<= m_tailIndex
);
168 m_foreignLock
.Exit(useMemoryBarrier
: true);
172 // When there are at least 2 elements' worth of space, we can take the fast path.
173 if (tail
< m_headIndex
+ m_mask
)
175 Volatile
.Write(ref m_array
[tail
& m_mask
], obj
);
176 m_tailIndex
= tail
+ 1;
180 // We need to contend with foreign pops, so we lock.
181 bool lockTaken
= false;
184 m_foreignLock
.Enter(ref lockTaken
);
186 int head
= m_headIndex
;
187 int count
= m_tailIndex
- m_headIndex
;
189 // If there is still space (one left), just add the element.
192 // We're full; expand the queue by doubling its size.
193 var newArray
= new object?[m_array
.Length
<< 1];
194 for (int i
= 0; i
< m_array
.Length
; i
++)
195 newArray
[i
] = m_array
[(i
+ head
) & m_mask
];
197 // Reset the field values, incl. the mask.
200 m_tailIndex
= tail
= count
;
201 m_mask
= (m_mask
<< 1) | 1;
204 Volatile
.Write(ref m_array
[tail
& m_mask
], obj
);
205 m_tailIndex
= tail
+ 1;
210 m_foreignLock
.Exit(useMemoryBarrier
: false);
215 [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification
= "Reviewed for thread safety")]
216 public bool LocalFindAndPop(object obj
)
218 // Fast path: check the tail. If equal, we can skip the lock.
219 if (m_array
[(m_tailIndex
- 1) & m_mask
] == obj
)
221 object? unused
= LocalPop();
222 Debug
.Assert(unused
== null || unused
== obj
);
223 return unused
!= null;
226 // Else, do an O(N) search for the work item. The theory of work stealing and our
227 // inlining logic is that most waits will happen on recently queued work. And
228 // since recently queued work will be close to the tail end (which is where we
229 // begin our search), we will likely find it quickly. In the worst case, we
230 // will traverse the whole local queue; this is typically not going to be a
231 // problem (although degenerate cases are clearly an issue) because local work
232 // queues tend to be somewhat shallow in length, and because if we fail to find
233 // the work item, we are about to block anyway (which is very expensive).
234 for (int i
= m_tailIndex
- 2; i
>= m_headIndex
; i
--)
236 if (m_array
[i
& m_mask
] == obj
)
238 // If we found the element, block out steals to avoid interference.
239 bool lockTaken
= false;
242 m_foreignLock
.Enter(ref lockTaken
);
244 // If we encountered a race condition, bail.
245 if (m_array
[i
& m_mask
] == null)
248 // Otherwise, null out the element.
249 Volatile
.Write(ref m_array
[i
& m_mask
], null);
251 // And then check to see if we can fix up the indexes (if we're at
252 // the edge). If we can't, we just leave nulls in the array and they'll
253 // get filtered out eventually (but may lead to superfluous resizing).
254 if (i
== m_tailIndex
)
256 else if (i
== m_headIndex
)
264 m_foreignLock
.Exit(useMemoryBarrier
: false);
272 public object? LocalPop() => m_headIndex
< m_tailIndex
? LocalPopCore() : null;
274 [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification
= "Reviewed for thread safety")]
275 private object? LocalPopCore()
279 int tail
= m_tailIndex
;
280 if (m_headIndex
>= tail
)
285 // Decrement the tail using a fence to ensure subsequent read doesn't come before.
287 Interlocked
.Exchange(ref m_tailIndex
, tail
);
289 // If there is no interaction with a take, we can head down the fast path.
290 if (m_headIndex
<= tail
)
292 int idx
= tail
& m_mask
;
293 object? obj
= Volatile
.Read(ref m_array
[idx
]);
295 // Check for nulls in the array.
296 if (obj
== null) continue;
303 // Interaction with takes: 0 or 1 elements left.
304 bool lockTaken
= false;
307 m_foreignLock
.Enter(ref lockTaken
);
309 if (m_headIndex
<= tail
)
311 // Element still available. Take it.
312 int idx
= tail
& m_mask
;
313 object? obj
= Volatile
.Read(ref m_array
[idx
]);
315 // Check for nulls in the array.
316 if (obj
== null) continue;
323 // If we encountered a race condition and element was stolen, restore the tail.
324 m_tailIndex
= tail
+ 1;
331 m_foreignLock
.Exit(useMemoryBarrier
: false);
337 public bool CanSteal
=> m_headIndex
< m_tailIndex
;
339 public object? TrySteal(ref bool missedSteal
)
348 m_foreignLock
.TryEnter(ref taken
);
351 // Increment head, and ensure read of tail doesn't move before it (fence).
352 int head
= m_headIndex
;
353 Interlocked
.Exchange(ref m_headIndex
, head
+ 1);
355 if (head
< m_tailIndex
)
357 int idx
= head
& m_mask
;
358 object? obj
= Volatile
.Read(ref m_array
[idx
]);
360 // Check for nulls in the array.
361 if (obj
== null) continue;
368 // Failed, restore head.
376 m_foreignLock
.Exit(useMemoryBarrier
: false);
390 bool lockTaken
= false;
393 m_foreignLock
.Enter(ref lockTaken
);
394 return Math
.Max(0, m_tailIndex
- m_headIndex
);
400 m_foreignLock
.Exit(useMemoryBarrier
: false);
407 internal bool loggingEnabled
;
408 internal readonly ConcurrentQueue
<object> workItems
= new ConcurrentQueue
<object>(); // SOS's ThreadPool command depends on this name
410 private readonly Internal
.PaddingFor32 pad1
;
412 private volatile int numOutstandingThreadRequests
= 0;
414 private readonly Internal
.PaddingFor32 pad2
;
416 public ThreadPoolWorkQueue()
418 loggingEnabled
= FrameworkEventSource
.Log
.IsEnabled(EventLevel
.Verbose
, FrameworkEventSource
.Keywords
.ThreadPool
| FrameworkEventSource
.Keywords
.ThreadTransfer
);
421 public ThreadPoolWorkQueueThreadLocals
GetOrCreateThreadLocals() =>
422 ThreadPoolWorkQueueThreadLocals
.threadLocals
?? CreateThreadLocals();
424 [MethodImpl(MethodImplOptions
.NoInlining
)]
425 private ThreadPoolWorkQueueThreadLocals
CreateThreadLocals()
427 Debug
.Assert(ThreadPoolWorkQueueThreadLocals
.threadLocals
== null);
429 return (ThreadPoolWorkQueueThreadLocals
.threadLocals
= new ThreadPoolWorkQueueThreadLocals(this));
432 internal void EnsureThreadRequested()
435 // If we have not yet requested #procs threads, then request a new thread.
437 // CoreCLR: Note that there is a separate count in the VM which has already been incremented
438 // by the VM by the time we reach this point.
440 int count
= numOutstandingThreadRequests
;
441 while (count
< ThreadPoolGlobals
.processorCount
)
443 int prev
= Interlocked
.CompareExchange(ref numOutstandingThreadRequests
, count
+ 1, count
);
446 ThreadPool
.RequestWorkerThread();
453 internal void MarkThreadRequestSatisfied()
456 // One of our outstanding thread requests has been satisfied.
457 // Decrement the count so that future calls to EnsureThreadRequested will succeed.
459 // CoreCLR: Note that there is a separate count in the VM which has already been decremented
460 // by the VM by the time we reach this point.
462 int count
= numOutstandingThreadRequests
;
465 int prev
= Interlocked
.CompareExchange(ref numOutstandingThreadRequests
, count
- 1, count
);
474 public void Enqueue(object callback
, bool forceGlobal
)
476 Debug
.Assert((callback
is IThreadPoolWorkItem
) ^
(callback
is Task
));
479 System
.Diagnostics
.Tracing
.FrameworkEventSource
.Log
.ThreadPoolEnqueueWorkObject(callback
);
481 ThreadPoolWorkQueueThreadLocals
? tl
= null;
483 tl
= ThreadPoolWorkQueueThreadLocals
.threadLocals
;
487 tl
.workStealingQueue
.LocalPush(callback
);
491 workItems
.Enqueue(callback
);
494 EnsureThreadRequested();
497 internal bool LocalFindAndPop(object callback
)
499 ThreadPoolWorkQueueThreadLocals
? tl
= ThreadPoolWorkQueueThreadLocals
.threadLocals
;
500 return tl
!= null && tl
.workStealingQueue
.LocalFindAndPop(callback
);
503 public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl
, ref bool missedSteal
)
505 WorkStealingQueue localWsq
= tl
.workStealingQueue
;
508 if ((callback
= localWsq
.LocalPop()) == null && // first try the local queue
509 !workItems
.TryDequeue(out callback
)) // then try the global queue
511 // finally try to steal from another thread's local queue
512 WorkStealingQueue
[] queues
= WorkStealingQueueList
.Queues
;
513 int c
= queues
.Length
;
514 Debug
.Assert(c
> 0, "There must at least be a queue for this thread.");
515 int maxIndex
= c
- 1;
516 int i
= tl
.random
.Next(c
);
519 i
= (i
< maxIndex
) ? i
+ 1 : 0;
520 WorkStealingQueue otherQueue
= queues
[i
];
521 if (otherQueue
!= localWsq
&& otherQueue
.CanSteal
)
523 callback
= otherQueue
.TrySteal(ref missedSteal
);
524 if (callback
!= null)
536 public long LocalCount
541 foreach (WorkStealingQueue workStealingQueue
in WorkStealingQueueList
.Queues
)
543 count
+= workStealingQueue
.Count
;
549 public long GlobalCount
=> workItems
.Count
;
552 /// Dispatches work items to this thread.
555 /// <c>true</c> if this thread did as much work as was available or its quantum expired.
556 /// <c>false</c> if this thread stopped working early.
558 internal static bool Dispatch()
560 ThreadPoolWorkQueue outerWorkQueue
= ThreadPoolGlobals
.workQueue
;
563 // Save the start time
565 int startTickCount
= Environment
.TickCount
;
568 // Update our records to indicate that an outstanding request for a thread has now been fulfilled.
569 // From this point on, we are responsible for requesting another thread if we stop working for any
570 // reason, and we believe there might still be work in the queue.
572 // CoreCLR: Note that if this thread is aborted before we get a chance to request another one, the VM will
573 // record a thread request on our behalf. So we don't need to worry about getting aborted right here.
575 outerWorkQueue
.MarkThreadRequestSatisfied();
577 // Has the desire for logging changed since the last time we entered?
578 outerWorkQueue
.loggingEnabled
= FrameworkEventSource
.Log
.IsEnabled(EventLevel
.Verbose
, FrameworkEventSource
.Keywords
.ThreadPool
| FrameworkEventSource
.Keywords
.ThreadTransfer
);
581 // Assume that we're going to need another thread if this one returns to the VM. We'll set this to
582 // false later, but only if we're absolutely certain that the queue is empty.
584 bool needAnotherThread
= true;
588 // Set up our thread-local data
590 // Use operate on workQueue local to try block so it can be enregistered
591 ThreadPoolWorkQueue workQueue
= outerWorkQueue
;
592 ThreadPoolWorkQueueThreadLocals tl
= workQueue
.GetOrCreateThreadLocals();
593 Thread currentThread
= tl
.currentThread
;
595 // Start on clean ExecutionContext and SynchronizationContext
596 currentThread
._executionContext
= null;
597 currentThread
._synchronizationContext
= null;
600 // Loop until our quantum expires or there is no work.
602 while (ThreadPool
.KeepDispatching(startTickCount
))
604 bool missedSteal
= false;
605 // Use operate on workItem local to try block so it can be enregistered
606 object? workItem
= workQueue
.Dequeue(tl
, ref missedSteal
);
608 if (workItem
== null)
612 // If we missed a steal, though, there may be more work in the queue.
613 // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread
614 // that owns the contended work-stealing queue will pick up its own workitems in the meantime,
615 // which will be more efficient than this thread doing it anyway.
617 needAnotherThread
= missedSteal
;
619 // Tell the VM we're returning normally, not because Hill Climbing asked us to return.
623 if (workQueue
.loggingEnabled
)
624 System
.Diagnostics
.Tracing
.FrameworkEventSource
.Log
.ThreadPoolDequeueWorkObject(workItem
);
627 // If we found work, there may be more work. Ask for another thread so that the other work can be processed
628 // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue.
630 workQueue
.EnsureThreadRequested();
633 // Execute the workitem outside of any finally blocks, so that it can be aborted if needed.
635 if (ThreadPoolGlobals
.enableWorkerTracking
)
637 bool reportedStatus
= false;
640 ThreadPool
.ReportThreadStatus(isWorking
: true);
641 reportedStatus
= true;
642 if (workItem
is Task task
)
644 task
.ExecuteFromThreadPool(currentThread
);
648 Debug
.Assert(workItem
is IThreadPoolWorkItem
);
649 Unsafe
.As
<IThreadPoolWorkItem
>(workItem
).Execute();
655 ThreadPool
.ReportThreadStatus(isWorking
: false);
658 else if (workItem
is Task task
)
660 // Check for Task first as it's currently faster to type check
661 // for Task and then Unsafe.As for the interface, rather than
662 // vice versa, in particular when the object implements a bunch
664 task
.ExecuteFromThreadPool(currentThread
);
668 Debug
.Assert(workItem
is IThreadPoolWorkItem
);
669 Unsafe
.As
<IThreadPoolWorkItem
>(workItem
).Execute();
672 currentThread
.ResetThreadPoolThread();
677 // Return to clean ExecutionContext and SynchronizationContext
678 ExecutionContext
.ResetThreadPoolThread(currentThread
);
681 // Notify the VM that we executed this workitem. This is also our opportunity to ask whether Hill Climbing wants
682 // us to return the thread to the pool or not.
684 if (!ThreadPool
.NotifyWorkItemComplete())
688 // If we get here, it's because our quantum expired. Tell the VM we're returning normally.
694 // If we are exiting for any reason other than that the queue is definitely empty, ask for another
695 // thread to pick up where we left off.
697 if (needAnotherThread
)
698 outerWorkQueue
.EnsureThreadRequested();
703 // Simple random number generator. We don't need great randomness, we just need a little and for it to be fast.
704 internal struct FastRandom
// xorshift prng
706 private uint _w
, _x
, _y
, _z
;
708 public FastRandom(int seed
)
716 public int Next(int maxValue
)
718 Debug
.Assert(maxValue
> 0);
720 uint t
= _x ^
(_x
<< 11);
721 _x
= _y
; _y
= _z
; _z
= _w
;
722 _w
= _w ^
(_w
>> 19) ^
(t ^
(t
>> 8));
724 return (int)(_w
% (uint)maxValue
);
728 // Holds a WorkStealingQueue, and removes it from the list when this object is no longer referenced.
729 internal sealed class ThreadPoolWorkQueueThreadLocals
732 public static ThreadPoolWorkQueueThreadLocals
? threadLocals
;
734 public readonly ThreadPoolWorkQueue workQueue
;
735 public readonly ThreadPoolWorkQueue
.WorkStealingQueue workStealingQueue
;
736 public readonly Thread currentThread
;
737 public FastRandom random
= new FastRandom(Thread
.CurrentThread
.ManagedThreadId
); // mutable struct, do not copy or make readonly
739 public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq
)
742 workStealingQueue
= new ThreadPoolWorkQueue
.WorkStealingQueue();
743 ThreadPoolWorkQueue
.WorkStealingQueueList
.Add(workStealingQueue
);
744 currentThread
= Thread
.CurrentThread
;
747 ~
ThreadPoolWorkQueueThreadLocals()
749 // Transfer any pending workitems into the global queue so that they will be executed by another thread
750 if (null != workStealingQueue
)
752 if (null != workQueue
)
755 while ((cb
= workStealingQueue
.LocalPop()) != null)
757 Debug
.Assert(null != cb
);
758 workQueue
.Enqueue(cb
, forceGlobal
: true);
762 ThreadPoolWorkQueue
.WorkStealingQueueList
.Remove(workStealingQueue
);
767 public delegate void WaitCallback(object? state
);
769 public delegate void WaitOrTimerCallback(object? state
, bool timedOut
); // signaled or timed out
771 internal abstract class QueueUserWorkItemCallbackBase
: IThreadPoolWorkItem
774 private int executed
;
776 [System
.Diagnostics
.CodeAnalysis
.SuppressMessage("Microsoft.Performance", "CA1821:RemoveEmptyFinalizers")]
777 ~
QueueUserWorkItemCallbackBase()
779 Interlocked
.MemoryBarrier(); // ensure that an old cached value is not read below
781 executed
!= 0, "A QueueUserWorkItemCallback was never called!");
785 public virtual void Execute()
788 GC
.SuppressFinalize(this);
790 0 == Interlocked
.Exchange(ref executed
, 1),
791 "A QueueUserWorkItemCallback was called twice!");
796 internal sealed class QueueUserWorkItemCallback
: QueueUserWorkItemCallbackBase
798 private WaitCallback
? _callback
; // SOS's ThreadPool command depends on this name
799 private readonly object? _state
;
800 private readonly ExecutionContext _context
;
802 private static readonly Action
<QueueUserWorkItemCallback
> s_executionContextShim
= quwi
=>
804 Debug
.Assert(quwi
._callback
!= null);
805 WaitCallback callback
= quwi
._callback
;
806 quwi
._callback
= null;
808 callback(quwi
._state
);
811 internal QueueUserWorkItemCallback(WaitCallback callback
, object? state
, ExecutionContext context
)
813 Debug
.Assert(context
!= null);
815 _callback
= callback
;
820 public override void Execute()
824 ExecutionContext
.RunForThreadPoolUnsafe(_context
, s_executionContextShim
, this);
828 internal sealed class QueueUserWorkItemCallback
<TState
> : QueueUserWorkItemCallbackBase
830 private Action
<TState
>? _callback
; // SOS's ThreadPool command depends on this name
831 private readonly TState _state
;
832 private readonly ExecutionContext _context
;
834 internal QueueUserWorkItemCallback(Action
<TState
> callback
, TState state
, ExecutionContext context
)
836 Debug
.Assert(callback
!= null);
838 _callback
= callback
;
843 public override void Execute()
847 Debug
.Assert(_callback
!= null);
848 Action
<TState
> callback
= _callback
;
851 ExecutionContext
.RunForThreadPoolUnsafe(_context
, callback
, in _state
);
855 internal sealed class QueueUserWorkItemCallbackDefaultContext
: QueueUserWorkItemCallbackBase
857 private WaitCallback
? _callback
; // SOS's ThreadPool command depends on this name
858 private readonly object? _state
;
860 internal QueueUserWorkItemCallbackDefaultContext(WaitCallback callback
, object? state
)
862 Debug
.Assert(callback
!= null);
864 _callback
= callback
;
868 public override void Execute()
870 ExecutionContext
.CheckThreadPoolAndContextsAreDefault();
873 Debug
.Assert(_callback
!= null);
874 WaitCallback callback
= _callback
;
879 // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default
883 internal sealed class QueueUserWorkItemCallbackDefaultContext
<TState
> : QueueUserWorkItemCallbackBase
885 private Action
<TState
>? _callback
; // SOS's ThreadPool command depends on this name
886 private readonly TState _state
;
888 internal QueueUserWorkItemCallbackDefaultContext(Action
<TState
> callback
, TState state
)
890 Debug
.Assert(callback
!= null);
892 _callback
= callback
;
896 public override void Execute()
898 ExecutionContext
.CheckThreadPoolAndContextsAreDefault();
901 Debug
.Assert(_callback
!= null);
902 Action
<TState
> callback
= _callback
;
907 // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default
911 internal sealed class _ThreadPoolWaitOrTimerCallback
913 private readonly WaitOrTimerCallback _waitOrTimerCallback
;
914 private readonly ExecutionContext
? _executionContext
;
915 private readonly object? _state
;
916 private static readonly ContextCallback _ccbt
= new ContextCallback(WaitOrTimerCallback_Context_t
);
917 private static readonly ContextCallback _ccbf
= new ContextCallback(WaitOrTimerCallback_Context_f
);
919 internal _ThreadPoolWaitOrTimerCallback(WaitOrTimerCallback waitOrTimerCallback
, object? state
, bool flowExecutionContext
)
921 _waitOrTimerCallback
= waitOrTimerCallback
;
924 if (flowExecutionContext
)
926 // capture the exection context
927 _executionContext
= ExecutionContext
.Capture();
931 private static void WaitOrTimerCallback_Context_t(object? state
) =>
932 WaitOrTimerCallback_Context(state
, timedOut
: true);
934 private static void WaitOrTimerCallback_Context_f(object? state
) =>
935 WaitOrTimerCallback_Context(state
, timedOut
: false);
937 private static void WaitOrTimerCallback_Context(object? state
, bool timedOut
)
939 _ThreadPoolWaitOrTimerCallback helper
= (_ThreadPoolWaitOrTimerCallback
)state
!;
940 helper
._waitOrTimerCallback(helper
._state
, timedOut
);
944 internal static void PerformWaitOrTimerCallback(_ThreadPoolWaitOrTimerCallback helper
, bool timedOut
)
946 Debug
.Assert(helper
!= null, "Null state passed to PerformWaitOrTimerCallback!");
947 // call directly if it is an unsafe call OR EC flow is suppressed
948 ExecutionContext
? context
= helper
._executionContext
;
951 WaitOrTimerCallback callback
= helper
._waitOrTimerCallback
;
952 callback(helper
._state
, timedOut
);
956 ExecutionContext
.Run(context
, timedOut
? _ccbt
: _ccbf
, helper
);
961 public static partial class ThreadPool
963 [CLSCompliant(false)]
964 public static RegisteredWaitHandle
RegisterWaitForSingleObject(
965 WaitHandle waitObject
,
966 WaitOrTimerCallback callBack
,
968 uint millisecondsTimeOutInterval
,
969 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
972 if (millisecondsTimeOutInterval
> (uint)int.MaxValue
&& millisecondsTimeOutInterval
!= uint.MaxValue
)
973 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_LessEqualToIntegerMaxVal
);
974 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, millisecondsTimeOutInterval
, executeOnlyOnce
, true);
977 [CLSCompliant(false)]
978 public static RegisteredWaitHandle
UnsafeRegisterWaitForSingleObject(
979 WaitHandle waitObject
,
980 WaitOrTimerCallback callBack
,
982 uint millisecondsTimeOutInterval
,
983 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
986 if (millisecondsTimeOutInterval
> (uint)int.MaxValue
&& millisecondsTimeOutInterval
!= uint.MaxValue
)
987 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
988 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, millisecondsTimeOutInterval
, executeOnlyOnce
, false);
991 public static RegisteredWaitHandle
RegisterWaitForSingleObject(
992 WaitHandle waitObject
,
993 WaitOrTimerCallback callBack
,
995 int millisecondsTimeOutInterval
,
996 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
999 if (millisecondsTimeOutInterval
< -1)
1000 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
1001 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)millisecondsTimeOutInterval
, executeOnlyOnce
, true);
1004 public static RegisteredWaitHandle
UnsafeRegisterWaitForSingleObject(
1005 WaitHandle waitObject
,
1006 WaitOrTimerCallback callBack
,
1008 int millisecondsTimeOutInterval
,
1009 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
1012 if (millisecondsTimeOutInterval
< -1)
1013 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
1014 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)millisecondsTimeOutInterval
, executeOnlyOnce
, false);
1017 public static RegisteredWaitHandle
RegisterWaitForSingleObject(
1018 WaitHandle waitObject
,
1019 WaitOrTimerCallback callBack
,
1021 long millisecondsTimeOutInterval
,
1022 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
1025 if (millisecondsTimeOutInterval
< -1)
1026 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
1027 if (millisecondsTimeOutInterval
> (uint)int.MaxValue
)
1028 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_LessEqualToIntegerMaxVal
);
1029 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)millisecondsTimeOutInterval
, executeOnlyOnce
, true);
1032 public static RegisteredWaitHandle
UnsafeRegisterWaitForSingleObject(
1033 WaitHandle waitObject
,
1034 WaitOrTimerCallback callBack
,
1036 long millisecondsTimeOutInterval
,
1037 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
1040 if (millisecondsTimeOutInterval
< -1)
1041 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
1042 if (millisecondsTimeOutInterval
> (uint)int.MaxValue
)
1043 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_LessEqualToIntegerMaxVal
);
1044 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)millisecondsTimeOutInterval
, executeOnlyOnce
, false);
1047 public static RegisteredWaitHandle
RegisterWaitForSingleObject(
1048 WaitHandle waitObject
,
1049 WaitOrTimerCallback callBack
,
1052 bool executeOnlyOnce
1055 long tm
= (long)timeout
.TotalMilliseconds
;
1057 throw new ArgumentOutOfRangeException(nameof(timeout
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
1058 if (tm
> (long)int.MaxValue
)
1059 throw new ArgumentOutOfRangeException(nameof(timeout
), SR
.ArgumentOutOfRange_LessEqualToIntegerMaxVal
);
1060 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)tm
, executeOnlyOnce
, true);
1063 public static RegisteredWaitHandle
UnsafeRegisterWaitForSingleObject(
1064 WaitHandle waitObject
,
1065 WaitOrTimerCallback callBack
,
1068 bool executeOnlyOnce
1071 long tm
= (long)timeout
.TotalMilliseconds
;
1073 throw new ArgumentOutOfRangeException(nameof(timeout
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
1074 if (tm
> (long)int.MaxValue
)
1075 throw new ArgumentOutOfRangeException(nameof(timeout
), SR
.ArgumentOutOfRange_LessEqualToIntegerMaxVal
);
1076 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)tm
, executeOnlyOnce
, false);
1079 public static bool QueueUserWorkItem(WaitCallback callBack
) =>
1080 QueueUserWorkItem(callBack
, null);
1082 public static bool QueueUserWorkItem(WaitCallback callBack
, object? state
)
1084 if (callBack
== null)
1086 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.callBack
);
1089 EnsureInitialized();
1091 ExecutionContext
? context
= ExecutionContext
.Capture();
1093 object tpcallBack
= (context
== null || context
.IsDefault
) ?
1094 new QueueUserWorkItemCallbackDefaultContext(callBack
!, state
) :
1095 (object)new QueueUserWorkItemCallback(callBack
!, state
, context
);
1097 ThreadPoolGlobals
.workQueue
.Enqueue(tpcallBack
, forceGlobal
: true);
1102 public static bool QueueUserWorkItem
<TState
>(Action
<TState
> callBack
, TState state
, bool preferLocal
)
1104 if (callBack
== null)
1106 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.callBack
);
1109 EnsureInitialized();
1111 ExecutionContext
? context
= ExecutionContext
.Capture();
1113 object tpcallBack
= (context
== null || context
.IsDefault
) ?
1114 new QueueUserWorkItemCallbackDefaultContext
<TState
>(callBack
!, state
) :
1115 (object)new QueueUserWorkItemCallback
<TState
>(callBack
!, state
, context
);
1117 ThreadPoolGlobals
.workQueue
.Enqueue(tpcallBack
, forceGlobal
: !preferLocal
);
1122 public static bool UnsafeQueueUserWorkItem
<TState
>(Action
<TState
> callBack
, TState state
, bool preferLocal
)
1124 if (callBack
== null)
1126 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.callBack
);
1129 // If the callback is the runtime-provided invocation of an IAsyncStateMachineBox,
1130 // then we can queue the Task state directly to the ThreadPool instead of
1131 // wrapping it in a QueueUserWorkItemCallback.
1133 // This occurs when user code queues its provided continuation to the ThreadPool;
1134 // internally we call UnsafeQueueUserWorkItemInternal directly for Tasks.
1135 if (ReferenceEquals(callBack
, ThreadPoolGlobals
.s_invokeAsyncStateMachineBox
))
1137 if (!(state
is IAsyncStateMachineBox
))
1139 // The provided state must be the internal IAsyncStateMachineBox (Task) type
1140 ThrowHelper
.ThrowArgumentOutOfRangeException(ExceptionArgument
.state
);
1143 UnsafeQueueUserWorkItemInternal((object)state
!, preferLocal
);
1147 EnsureInitialized();
1149 ThreadPoolGlobals
.workQueue
.Enqueue(
1150 new QueueUserWorkItemCallbackDefaultContext
<TState
>(callBack
!, state
), forceGlobal
: !preferLocal
);
1155 public static bool UnsafeQueueUserWorkItem(WaitCallback callBack
, object? state
)
1157 if (callBack
== null)
1159 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.callBack
);
1162 EnsureInitialized();
1164 object tpcallBack
= new QueueUserWorkItemCallbackDefaultContext(callBack
!, state
);
1166 ThreadPoolGlobals
.workQueue
.Enqueue(tpcallBack
, forceGlobal
: true);
1171 public static bool UnsafeQueueUserWorkItem(IThreadPoolWorkItem callBack
, bool preferLocal
)
1173 if (callBack
== null)
1175 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.callBack
);
1177 if (callBack
is Task
)
1179 // Prevent code from queueing a derived Task that also implements the interface,
1180 // as that would bypass Task.Start and its safety checks.
1181 ThrowHelper
.ThrowArgumentOutOfRangeException(ExceptionArgument
.callBack
);
1184 UnsafeQueueUserWorkItemInternal(callBack
!, preferLocal
);
1188 internal static void UnsafeQueueUserWorkItemInternal(object callBack
, bool preferLocal
)
1190 Debug
.Assert((callBack
is IThreadPoolWorkItem
) ^
(callBack
is Task
));
1192 EnsureInitialized();
1194 ThreadPoolGlobals
.workQueue
.Enqueue(callBack
, forceGlobal
: !preferLocal
);
1197 // This method tries to take the target callback out of the current thread's queue.
1198 internal static bool TryPopCustomWorkItem(object workItem
)
1200 Debug
.Assert(null != workItem
);
1202 ThreadPoolGlobals
.threadPoolInitialized
&& // if not initialized, so there's no way this workitem was ever queued.
1203 ThreadPoolGlobals
.workQueue
.LocalFindAndPop(workItem
);
1206 // Get all workitems. Called by TaskScheduler in its debugger hooks.
1207 internal static IEnumerable
<object> GetQueuedWorkItems()
1209 // Enumerate global queue
1210 foreach (object workItem
in ThreadPoolGlobals
.workQueue
.workItems
)
1212 yield return workItem
;
1215 // Enumerate each local queue
1216 foreach (ThreadPoolWorkQueue
.WorkStealingQueue wsq
in ThreadPoolWorkQueue
.WorkStealingQueueList
.Queues
)
1218 if (wsq
!= null && wsq
.m_array
!= null)
1220 object?[] items
= wsq
.m_array
;
1221 for (int i
= 0; i
< items
.Length
; i
++)
1223 object? item
= items
[i
];
1233 internal static IEnumerable
<object> GetLocallyQueuedWorkItems()
1235 ThreadPoolWorkQueue
.WorkStealingQueue
? wsq
= ThreadPoolWorkQueueThreadLocals
.threadLocals
?.workStealingQueue
;
1236 if (wsq
!= null && wsq
.m_array
!= null)
1238 object?[] items
= wsq
.m_array
;
1239 for (int i
= 0; i
< items
.Length
; i
++)
1241 object? item
= items
[i
];
1248 internal static IEnumerable
<object> GetGloballyQueuedWorkItems() => ThreadPoolGlobals
.workQueue
.workItems
;
1250 private static object[] ToObjectArray(IEnumerable
<object> workitems
)
1253 foreach (object item
in workitems
)
1258 object[] result
= new object[i
];
1260 foreach (object item
in workitems
)
1262 if (i
< result
.Length
) //just in case someone calls us while the queues are in motion
1270 // This is the method the debugger will actually call, if it ends up calling
1271 // into ThreadPool directly. Tests can use this to simulate a debugger, as well.
1272 internal static object[] GetQueuedWorkItemsForDebugger() =>
1273 ToObjectArray(GetQueuedWorkItems());
1275 internal static object[] GetGloballyQueuedWorkItemsForDebugger() =>
1276 ToObjectArray(GetGloballyQueuedWorkItems());
1278 internal static object[] GetLocallyQueuedWorkItemsForDebugger() =>
1279 ToObjectArray(GetLocallyQueuedWorkItems());
1282 /// Gets the number of work items that are currently queued to be processed.
1285 /// For a thread pool implementation that may have different types of work items, the count includes all types that can
1286 /// be tracked, which may only be the user work items including tasks. Some implementations may also include queued
1287 /// timer and wait callbacks in the count. On Windows, the count is unlikely to include the number of pending IO
1288 /// completions, as they get posted directly to an IO completion port.
1290 public static long PendingWorkItemCount
1294 ThreadPoolWorkQueue workQueue
= ThreadPoolGlobals
.workQueue
;
1295 return workQueue
.LocalCount
+ workQueue
.GlobalCount
+ PendingUnmanagedWorkItemCount
;