1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 /*=============================================================================
9 ** Purpose: Class for creating and managing a threadpool
12 =============================================================================*/
15 using System
.Collections
.Concurrent
;
16 using System
.Collections
.Generic
;
17 using System
.Diagnostics
;
18 using System
.Diagnostics
.CodeAnalysis
;
19 using System
.Diagnostics
.Tracing
;
20 using System
.Runtime
.CompilerServices
;
21 using System
.Runtime
.InteropServices
;
22 using System
.Threading
.Tasks
;
23 using Internal
.Runtime
.CompilerServices
;
25 namespace System
.Threading
27 internal static class ThreadPoolGlobals
29 public static readonly int processorCount
= Environment
.ProcessorCount
;
31 public static volatile bool threadPoolInitialized
;
32 public static bool enableWorkerTracking
;
34 public static readonly ThreadPoolWorkQueue workQueue
= new ThreadPoolWorkQueue();
36 /// <summary>Shim used to invoke <see cref="IAsyncStateMachineBox.MoveNext"/> of the supplied <see cref="IAsyncStateMachineBox"/>.</summary>
37 internal static readonly Action
<object> s_invokeAsyncStateMachineBox
= state
=>
39 if (!(state
is IAsyncStateMachineBox box
))
41 ThrowHelper
.ThrowArgumentOutOfRangeException(ExceptionArgument
.state
);
49 [StructLayout(LayoutKind
.Sequential
)] // enforce layout so that padding reduces false sharing
50 internal sealed class ThreadPoolWorkQueue
52 internal static class WorkStealingQueueList
54 private static volatile WorkStealingQueue
[] _queues
= new WorkStealingQueue
[0];
56 public static WorkStealingQueue
[] Queues
=> _queues
;
58 public static void Add(WorkStealingQueue queue
)
60 Debug
.Assert(queue
!= null);
63 WorkStealingQueue
[] oldQueues
= _queues
;
64 Debug
.Assert(Array
.IndexOf(oldQueues
, queue
) == -1);
66 var newQueues
= new WorkStealingQueue
[oldQueues
.Length
+ 1];
67 Array
.Copy(oldQueues
, 0, newQueues
, 0, oldQueues
.Length
);
68 newQueues
[newQueues
.Length
- 1] = queue
;
69 if (Interlocked
.CompareExchange(ref _queues
, newQueues
, oldQueues
) == oldQueues
)
76 public static void Remove(WorkStealingQueue queue
)
78 Debug
.Assert(queue
!= null);
81 WorkStealingQueue
[] oldQueues
= _queues
;
82 if (oldQueues
.Length
== 0)
87 int pos
= Array
.IndexOf(oldQueues
, queue
);
90 Debug
.Fail("Should have found the queue");
94 var newQueues
= new WorkStealingQueue
[oldQueues
.Length
- 1];
97 Array
.Copy(oldQueues
, 1, newQueues
, 0, newQueues
.Length
);
99 else if (pos
== oldQueues
.Length
- 1)
101 Array
.Copy(oldQueues
, 0, newQueues
, 0, newQueues
.Length
);
105 Array
.Copy(oldQueues
, 0, newQueues
, 0, pos
);
106 Array
.Copy(oldQueues
, pos
+ 1, newQueues
, pos
, newQueues
.Length
- pos
);
109 if (Interlocked
.CompareExchange(ref _queues
, newQueues
, oldQueues
) == oldQueues
)
117 internal sealed class WorkStealingQueue
119 private const int INITIAL_SIZE
= 32;
120 internal volatile object?[] m_array
= new object[INITIAL_SIZE
]; // SOS's ThreadPool command depends on this name
121 private volatile int m_mask
= INITIAL_SIZE
- 1;
124 // in debug builds, start at the end so we exercise the index reset logic.
125 private const int START_INDEX
= int.MaxValue
;
127 private const int START_INDEX
= 0;
130 private volatile int m_headIndex
= START_INDEX
;
131 private volatile int m_tailIndex
= START_INDEX
;
133 private SpinLock m_foreignLock
= new SpinLock(enableThreadOwnerTracking
: false);
135 public void LocalPush(object obj
)
137 int tail
= m_tailIndex
;
139 // We're going to increment the tail; if we'll overflow, then we need to reset our counts
140 if (tail
== int.MaxValue
)
142 bool lockTaken
= false;
145 m_foreignLock
.Enter(ref lockTaken
);
147 if (m_tailIndex
== int.MaxValue
)
150 // Rather than resetting to zero, we'll just mask off the bits we don't care about.
151 // This way we don't need to rearrange the items already in the queue; they'll be found
152 // correctly exactly where they are. One subtlety here is that we need to make sure that
153 // if head is currently < tail, it remains that way. This happens to just fall out from
154 // the bit-masking, because we only do this if tail == int.MaxValue, meaning that all
155 // bits are set, so all of the bits we're keeping will also be set. Thus it's impossible
156 // for the head to end up > than the tail, since you can't set any more bits than all of
159 m_headIndex
= m_headIndex
& m_mask
;
160 m_tailIndex
= tail
= m_tailIndex
& m_mask
;
161 Debug
.Assert(m_headIndex
<= m_tailIndex
);
167 m_foreignLock
.Exit(useMemoryBarrier
: true);
171 // When there are at least 2 elements' worth of space, we can take the fast path.
172 if (tail
< m_headIndex
+ m_mask
)
174 Volatile
.Write(ref m_array
[tail
& m_mask
], obj
);
175 m_tailIndex
= tail
+ 1;
179 // We need to contend with foreign pops, so we lock.
180 bool lockTaken
= false;
183 m_foreignLock
.Enter(ref lockTaken
);
185 int head
= m_headIndex
;
186 int count
= m_tailIndex
- m_headIndex
;
188 // If there is still space (one left), just add the element.
191 // We're full; expand the queue by doubling its size.
192 var newArray
= new object?[m_array
.Length
<< 1];
193 for (int i
= 0; i
< m_array
.Length
; i
++)
194 newArray
[i
] = m_array
[(i
+ head
) & m_mask
];
196 // Reset the field values, incl. the mask.
199 m_tailIndex
= tail
= count
;
200 m_mask
= (m_mask
<< 1) | 1;
203 Volatile
.Write(ref m_array
[tail
& m_mask
], obj
);
204 m_tailIndex
= tail
+ 1;
209 m_foreignLock
.Exit(useMemoryBarrier
: false);
214 [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification
= "Reviewed for thread safety")]
215 public bool LocalFindAndPop(object obj
)
217 // Fast path: check the tail. If equal, we can skip the lock.
218 if (m_array
[(m_tailIndex
- 1) & m_mask
] == obj
)
220 object? unused
= LocalPop();
221 Debug
.Assert(unused
== null || unused
== obj
);
222 return unused
!= null;
225 // Else, do an O(N) search for the work item. The theory of work stealing and our
226 // inlining logic is that most waits will happen on recently queued work. And
227 // since recently queued work will be close to the tail end (which is where we
228 // begin our search), we will likely find it quickly. In the worst case, we
229 // will traverse the whole local queue; this is typically not going to be a
230 // problem (although degenerate cases are clearly an issue) because local work
231 // queues tend to be somewhat shallow in length, and because if we fail to find
232 // the work item, we are about to block anyway (which is very expensive).
233 for (int i
= m_tailIndex
- 2; i
>= m_headIndex
; i
--)
235 if (m_array
[i
& m_mask
] == obj
)
237 // If we found the element, block out steals to avoid interference.
238 bool lockTaken
= false;
241 m_foreignLock
.Enter(ref lockTaken
);
243 // If we encountered a race condition, bail.
244 if (m_array
[i
& m_mask
] == null)
247 // Otherwise, null out the element.
248 Volatile
.Write(ref m_array
[i
& m_mask
], null);
250 // And then check to see if we can fix up the indexes (if we're at
251 // the edge). If we can't, we just leave nulls in the array and they'll
252 // get filtered out eventually (but may lead to superfluous resizing).
253 if (i
== m_tailIndex
)
255 else if (i
== m_headIndex
)
263 m_foreignLock
.Exit(useMemoryBarrier
: false);
271 public object? LocalPop() => m_headIndex
< m_tailIndex
? LocalPopCore() : null;
273 [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification
= "Reviewed for thread safety")]
274 private object? LocalPopCore()
278 int tail
= m_tailIndex
;
279 if (m_headIndex
>= tail
)
284 // Decrement the tail using a fence to ensure subsequent read doesn't come before.
286 Interlocked
.Exchange(ref m_tailIndex
, tail
);
288 // If there is no interaction with a take, we can head down the fast path.
289 if (m_headIndex
<= tail
)
291 int idx
= tail
& m_mask
;
292 object? obj
= Volatile
.Read(ref m_array
[idx
]);
294 // Check for nulls in the array.
295 if (obj
== null) continue;
302 // Interaction with takes: 0 or 1 elements left.
303 bool lockTaken
= false;
306 m_foreignLock
.Enter(ref lockTaken
);
308 if (m_headIndex
<= tail
)
310 // Element still available. Take it.
311 int idx
= tail
& m_mask
;
312 object? obj
= Volatile
.Read(ref m_array
[idx
]);
314 // Check for nulls in the array.
315 if (obj
== null) continue;
322 // If we encountered a race condition and element was stolen, restore the tail.
323 m_tailIndex
= tail
+ 1;
330 m_foreignLock
.Exit(useMemoryBarrier
: false);
336 public bool CanSteal
=> m_headIndex
< m_tailIndex
;
338 public object? TrySteal(ref bool missedSteal
)
347 m_foreignLock
.TryEnter(ref taken
);
350 // Increment head, and ensure read of tail doesn't move before it (fence).
351 int head
= m_headIndex
;
352 Interlocked
.Exchange(ref m_headIndex
, head
+ 1);
354 if (head
< m_tailIndex
)
356 int idx
= head
& m_mask
;
357 object? obj
= Volatile
.Read(ref m_array
[idx
]);
359 // Check for nulls in the array.
360 if (obj
== null) continue;
367 // Failed, restore head.
375 m_foreignLock
.Exit(useMemoryBarrier
: false);
386 internal bool loggingEnabled
;
387 internal readonly ConcurrentQueue
<object> workItems
= new ConcurrentQueue
<object>(); // SOS's ThreadPool command depends on this name
389 private Internal
.PaddingFor32 pad1
;
391 private volatile int numOutstandingThreadRequests
= 0;
393 private Internal
.PaddingFor32 pad2
;
395 public ThreadPoolWorkQueue()
397 loggingEnabled
= FrameworkEventSource
.Log
.IsEnabled(EventLevel
.Verbose
, FrameworkEventSource
.Keywords
.ThreadPool
| FrameworkEventSource
.Keywords
.ThreadTransfer
);
400 public ThreadPoolWorkQueueThreadLocals
GetOrCreateThreadLocals() =>
401 ThreadPoolWorkQueueThreadLocals
.threadLocals
?? CreateThreadLocals();
403 [MethodImpl(MethodImplOptions
.NoInlining
)]
404 private ThreadPoolWorkQueueThreadLocals
CreateThreadLocals()
406 Debug
.Assert(ThreadPoolWorkQueueThreadLocals
.threadLocals
== null);
408 return (ThreadPoolWorkQueueThreadLocals
.threadLocals
= new ThreadPoolWorkQueueThreadLocals(this));
411 internal void EnsureThreadRequested()
414 // If we have not yet requested #procs threads, then request a new thread.
416 // CoreCLR: Note that there is a separate count in the VM which has already been incremented
417 // by the VM by the time we reach this point.
419 int count
= numOutstandingThreadRequests
;
420 while (count
< ThreadPoolGlobals
.processorCount
)
422 int prev
= Interlocked
.CompareExchange(ref numOutstandingThreadRequests
, count
+ 1, count
);
425 ThreadPool
.RequestWorkerThread();
432 internal void MarkThreadRequestSatisfied()
435 // One of our outstanding thread requests has been satisfied.
436 // Decrement the count so that future calls to EnsureThreadRequested will succeed.
438 // CoreCLR: Note that there is a separate count in the VM which has already been decremented
439 // by the VM by the time we reach this point.
441 int count
= numOutstandingThreadRequests
;
444 int prev
= Interlocked
.CompareExchange(ref numOutstandingThreadRequests
, count
- 1, count
);
453 public void Enqueue(object callback
, bool forceGlobal
)
455 Debug
.Assert((callback
is IThreadPoolWorkItem
) ^
(callback
is Task
));
458 System
.Diagnostics
.Tracing
.FrameworkEventSource
.Log
.ThreadPoolEnqueueWorkObject(callback
);
460 ThreadPoolWorkQueueThreadLocals
? tl
= null;
462 tl
= ThreadPoolWorkQueueThreadLocals
.threadLocals
;
466 tl
.workStealingQueue
.LocalPush(callback
);
470 workItems
.Enqueue(callback
);
473 EnsureThreadRequested();
476 internal bool LocalFindAndPop(object callback
)
478 ThreadPoolWorkQueueThreadLocals tl
= ThreadPoolWorkQueueThreadLocals
.threadLocals
;
479 return tl
!= null && tl
.workStealingQueue
.LocalFindAndPop(callback
);
482 public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl
, ref bool missedSteal
)
484 WorkStealingQueue localWsq
= tl
.workStealingQueue
;
487 if ((callback
= localWsq
.LocalPop()) == null && // first try the local queue
488 !workItems
.TryDequeue(out callback
)) // then try the global queue
490 // finally try to steal from another thread's local queue
491 WorkStealingQueue
[] queues
= WorkStealingQueueList
.Queues
;
492 int c
= queues
.Length
;
493 Debug
.Assert(c
> 0, "There must at least be a queue for this thread.");
494 int maxIndex
= c
- 1;
495 int i
= tl
.random
.Next(c
);
498 i
= (i
< maxIndex
) ? i
+ 1 : 0;
499 WorkStealingQueue otherQueue
= queues
[i
];
500 if (otherQueue
!= localWsq
&& otherQueue
.CanSteal
)
502 callback
= otherQueue
.TrySteal(ref missedSteal
);
503 if (callback
!= null)
516 /// Dispatches work items to this thread.
519 /// <c>true</c> if this thread did as much work as was available or its quantum expired.
520 /// <c>false</c> if this thread stopped working early.
522 internal static bool Dispatch()
524 ThreadPoolWorkQueue outerWorkQueue
= ThreadPoolGlobals
.workQueue
;
527 // Save the start time
529 int startTickCount
= Environment
.TickCount
;
532 // Update our records to indicate that an outstanding request for a thread has now been fulfilled.
533 // From this point on, we are responsible for requesting another thread if we stop working for any
534 // reason, and we believe there might still be work in the queue.
536 // CoreCLR: Note that if this thread is aborted before we get a chance to request another one, the VM will
537 // record a thread request on our behalf. So we don't need to worry about getting aborted right here.
539 outerWorkQueue
.MarkThreadRequestSatisfied();
541 // Has the desire for logging changed since the last time we entered?
542 outerWorkQueue
.loggingEnabled
= FrameworkEventSource
.Log
.IsEnabled(EventLevel
.Verbose
, FrameworkEventSource
.Keywords
.ThreadPool
| FrameworkEventSource
.Keywords
.ThreadTransfer
);
545 // Assume that we're going to need another thread if this one returns to the VM. We'll set this to
546 // false later, but only if we're absolutely certain that the queue is empty.
548 bool needAnotherThread
= true;
549 object? outerWorkItem
= null;
553 // Set up our thread-local data
555 // Use operate on workQueue local to try block so it can be enregistered
556 ThreadPoolWorkQueue workQueue
= outerWorkQueue
;
557 ThreadPoolWorkQueueThreadLocals tl
= workQueue
.GetOrCreateThreadLocals();
558 Thread currentThread
= tl
.currentThread
;
560 // Start on clean ExecutionContext and SynchronizationContext
561 currentThread
._executionContext
= null;
562 currentThread
._synchronizationContext
= null;
565 // Loop until our quantum expires or there is no work.
567 while (ThreadPool
.KeepDispatching(startTickCount
))
569 bool missedSteal
= false;
570 // Use operate on workItem local to try block so it can be enregistered
571 object? workItem
= outerWorkItem
= workQueue
.Dequeue(tl
, ref missedSteal
);
573 if (workItem
== null)
577 // If we missed a steal, though, there may be more work in the queue.
578 // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread
579 // that owns the contended work-stealing queue will pick up its own workitems in the meantime,
580 // which will be more efficient than this thread doing it anyway.
582 needAnotherThread
= missedSteal
;
584 // Tell the VM we're returning normally, not because Hill Climbing asked us to return.
588 if (workQueue
.loggingEnabled
)
589 System
.Diagnostics
.Tracing
.FrameworkEventSource
.Log
.ThreadPoolDequeueWorkObject(workItem
);
592 // If we found work, there may be more work. Ask for another thread so that the other work can be processed
593 // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue.
595 workQueue
.EnsureThreadRequested();
598 // Execute the workitem outside of any finally blocks, so that it can be aborted if needed.
600 if (ThreadPoolGlobals
.enableWorkerTracking
)
602 bool reportedStatus
= false;
605 ThreadPool
.ReportThreadStatus(isWorking
: true);
606 reportedStatus
= true;
607 if (workItem
is Task task
)
609 task
.ExecuteFromThreadPool(currentThread
);
613 Debug
.Assert(workItem
is IThreadPoolWorkItem
);
614 Unsafe
.As
<IThreadPoolWorkItem
>(workItem
).Execute();
620 ThreadPool
.ReportThreadStatus(isWorking
: false);
623 else if (workItem
is Task task
)
625 // Check for Task first as it's currently faster to type check
626 // for Task and then Unsafe.As for the interface, rather than
627 // vice versa, in particular when the object implements a bunch
629 task
.ExecuteFromThreadPool(currentThread
);
633 Debug
.Assert(workItem
is IThreadPoolWorkItem
);
634 Unsafe
.As
<IThreadPoolWorkItem
>(workItem
).Execute();
637 currentThread
.ResetThreadPoolThread();
640 outerWorkItem
= workItem
= null;
642 // Return to clean ExecutionContext and SynchronizationContext
643 ExecutionContext
.ResetThreadPoolThread(currentThread
);
646 // Notify the VM that we executed this workitem. This is also our opportunity to ask whether Hill Climbing wants
647 // us to return the thread to the pool or not.
649 if (!ThreadPool
.NotifyWorkItemComplete())
653 // If we get here, it's because our quantum expired. Tell the VM we're returning normally.
659 // If we are exiting for any reason other than that the queue is definitely empty, ask for another
660 // thread to pick up where we left off.
662 if (needAnotherThread
)
663 outerWorkQueue
.EnsureThreadRequested();
668 // Simple random number generator. We don't need great randomness, we just need a little and for it to be fast.
669 internal struct FastRandom
// xorshift prng
671 private uint _w
, _x
, _y
, _z
;
673 public FastRandom(int seed
)
681 public int Next(int maxValue
)
683 Debug
.Assert(maxValue
> 0);
685 uint t
= _x ^
(_x
<< 11);
686 _x
= _y
; _y
= _z
; _z
= _w
;
687 _w
= _w ^
(_w
>> 19) ^
(t ^
(t
>> 8));
689 return (int)(_w
% (uint)maxValue
);
693 // Holds a WorkStealingQueue, and removes it from the list when this object is no longer referenced.
694 internal sealed class ThreadPoolWorkQueueThreadLocals
697 public static ThreadPoolWorkQueueThreadLocals threadLocals
;
699 public readonly ThreadPoolWorkQueue workQueue
;
700 public readonly ThreadPoolWorkQueue
.WorkStealingQueue workStealingQueue
;
701 public readonly Thread currentThread
;
702 public FastRandom random
= new FastRandom(Thread
.CurrentThread
.ManagedThreadId
); // mutable struct, do not copy or make readonly
704 public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq
)
707 workStealingQueue
= new ThreadPoolWorkQueue
.WorkStealingQueue();
708 ThreadPoolWorkQueue
.WorkStealingQueueList
.Add(workStealingQueue
);
709 currentThread
= Thread
.CurrentThread
;
712 private void CleanUp()
714 if (null != workStealingQueue
)
716 if (null != workQueue
)
719 while ((cb
= workStealingQueue
.LocalPop()) != null)
721 Debug
.Assert(null != cb
);
722 workQueue
.Enqueue(cb
, forceGlobal
: true);
726 ThreadPoolWorkQueue
.WorkStealingQueueList
.Remove(workStealingQueue
);
730 ~
ThreadPoolWorkQueueThreadLocals()
732 // Since the purpose of calling CleanUp is to transfer any pending workitems into the global
733 // queue so that they will be executed by another thread, there's no point in doing this cleanup
734 // if we're in the process of shutting down or unloading the AD. In those cases, the work won't
735 // execute anyway. And there are subtle race conditions involved there that would lead us to do the wrong
736 // thing anyway. So we'll only clean up if this is a "normal" finalization.
737 if (!Environment
.HasShutdownStarted
)
742 public delegate void WaitCallback(object? state
);
744 public delegate void WaitOrTimerCallback(object? state
, bool timedOut
); // signaled or timed out
746 internal abstract class QueueUserWorkItemCallbackBase
: IThreadPoolWorkItem
749 private volatile int executed
;
751 [System
.Diagnostics
.CodeAnalysis
.SuppressMessage("Microsoft.Performance", "CA1821:RemoveEmptyFinalizers")]
752 ~
QueueUserWorkItemCallbackBase()
755 executed
!= 0 || Environment
.HasShutdownStarted
,
756 "A QueueUserWorkItemCallback was never called!");
760 public virtual void Execute()
763 GC
.SuppressFinalize(this);
765 0 == Interlocked
.Exchange(ref executed
, 1),
766 "A QueueUserWorkItemCallback was called twice!");
771 internal sealed class QueueUserWorkItemCallback
: QueueUserWorkItemCallbackBase
773 private WaitCallback
? _callback
; // SOS's ThreadPool command depends on this name
774 private readonly object? _state
;
775 private readonly ExecutionContext _context
;
777 private static readonly Action
<QueueUserWorkItemCallback
> s_executionContextShim
= quwi
=>
779 Debug
.Assert(quwi
._callback
!= null);
780 WaitCallback callback
= quwi
._callback
;
781 quwi
._callback
= null;
783 callback(quwi
._state
);
786 internal QueueUserWorkItemCallback(WaitCallback callback
, object? state
, ExecutionContext context
)
788 Debug
.Assert(context
!= null);
790 _callback
= callback
;
795 public override void Execute()
799 ExecutionContext
.RunForThreadPoolUnsafe(_context
, s_executionContextShim
, this);
803 internal sealed class QueueUserWorkItemCallback
<TState
> : QueueUserWorkItemCallbackBase
805 private Action
<TState
>? _callback
; // SOS's ThreadPool command depends on this name
806 private readonly TState _state
;
807 private readonly ExecutionContext _context
;
809 internal QueueUserWorkItemCallback(Action
<TState
> callback
, TState state
, ExecutionContext context
)
811 Debug
.Assert(callback
!= null);
813 _callback
= callback
;
818 public override void Execute()
822 Debug
.Assert(_callback
!= null);
823 Action
<TState
> callback
= _callback
;
826 ExecutionContext
.RunForThreadPoolUnsafe(_context
, callback
, in _state
);
830 internal sealed class QueueUserWorkItemCallbackDefaultContext
: QueueUserWorkItemCallbackBase
832 private WaitCallback
? _callback
; // SOS's ThreadPool command depends on this name
833 private readonly object? _state
;
835 internal QueueUserWorkItemCallbackDefaultContext(WaitCallback callback
, object? state
)
837 Debug
.Assert(callback
!= null);
839 _callback
= callback
;
843 public override void Execute()
845 ExecutionContext
.CheckThreadPoolAndContextsAreDefault();
848 Debug
.Assert(_callback
!= null);
849 WaitCallback callback
= _callback
;
854 // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default
858 internal sealed class QueueUserWorkItemCallbackDefaultContext
<TState
> : QueueUserWorkItemCallbackBase
860 private Action
<TState
>? _callback
; // SOS's ThreadPool command depends on this name
861 private readonly TState _state
;
863 internal QueueUserWorkItemCallbackDefaultContext(Action
<TState
> callback
, TState state
)
865 Debug
.Assert(callback
!= null);
867 _callback
= callback
;
871 public override void Execute()
873 ExecutionContext
.CheckThreadPoolAndContextsAreDefault();
876 Debug
.Assert(_callback
!= null);
877 Action
<TState
> callback
= _callback
;
882 // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default
886 internal sealed class _ThreadPoolWaitOrTimerCallback
888 private WaitOrTimerCallback _waitOrTimerCallback
;
889 private ExecutionContext
? _executionContext
;
890 private object? _state
;
891 private static readonly ContextCallback _ccbt
= new ContextCallback(WaitOrTimerCallback_Context_t
);
892 private static readonly ContextCallback _ccbf
= new ContextCallback(WaitOrTimerCallback_Context_f
);
894 internal _ThreadPoolWaitOrTimerCallback(WaitOrTimerCallback waitOrTimerCallback
, object? state
, bool flowExecutionContext
)
896 _waitOrTimerCallback
= waitOrTimerCallback
;
899 if (flowExecutionContext
)
901 // capture the exection context
902 _executionContext
= ExecutionContext
.Capture();
906 private static void WaitOrTimerCallback_Context_t(object state
) =>
907 WaitOrTimerCallback_Context(state
, timedOut
: true);
909 private static void WaitOrTimerCallback_Context_f(object state
) =>
910 WaitOrTimerCallback_Context(state
, timedOut
: false);
912 private static void WaitOrTimerCallback_Context(object state
, bool timedOut
)
914 _ThreadPoolWaitOrTimerCallback helper
= (_ThreadPoolWaitOrTimerCallback
)state
;
915 helper
._waitOrTimerCallback(helper
._state
, timedOut
);
919 internal static void PerformWaitOrTimerCallback(_ThreadPoolWaitOrTimerCallback helper
, bool timedOut
)
921 Debug
.Assert(helper
!= null, "Null state passed to PerformWaitOrTimerCallback!");
922 // call directly if it is an unsafe call OR EC flow is suppressed
923 ExecutionContext
? context
= helper
._executionContext
;
926 WaitOrTimerCallback callback
= helper
._waitOrTimerCallback
;
927 callback(helper
._state
, timedOut
);
931 ExecutionContext
.Run(context
, timedOut
? _ccbt
: _ccbf
, helper
);
936 public static partial class ThreadPool
938 [CLSCompliant(false)]
939 public static RegisteredWaitHandle
RegisterWaitForSingleObject(
940 WaitHandle waitObject
,
941 WaitOrTimerCallback callBack
,
943 uint millisecondsTimeOutInterval
,
944 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
947 if (millisecondsTimeOutInterval
> (uint)int.MaxValue
&& millisecondsTimeOutInterval
!= uint.MaxValue
)
948 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_LessEqualToIntegerMaxVal
);
949 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, millisecondsTimeOutInterval
, executeOnlyOnce
, true);
952 [CLSCompliant(false)]
953 public static RegisteredWaitHandle
UnsafeRegisterWaitForSingleObject(
954 WaitHandle waitObject
,
955 WaitOrTimerCallback callBack
,
957 uint millisecondsTimeOutInterval
,
958 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
961 if (millisecondsTimeOutInterval
> (uint)int.MaxValue
&& millisecondsTimeOutInterval
!= uint.MaxValue
)
962 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
963 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, millisecondsTimeOutInterval
, executeOnlyOnce
, false);
966 public static RegisteredWaitHandle
RegisterWaitForSingleObject(
967 WaitHandle waitObject
,
968 WaitOrTimerCallback callBack
,
970 int millisecondsTimeOutInterval
,
971 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
974 if (millisecondsTimeOutInterval
< -1)
975 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
976 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)millisecondsTimeOutInterval
, executeOnlyOnce
, true);
979 public static RegisteredWaitHandle
UnsafeRegisterWaitForSingleObject(
980 WaitHandle waitObject
,
981 WaitOrTimerCallback callBack
,
983 int millisecondsTimeOutInterval
,
984 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
987 if (millisecondsTimeOutInterval
< -1)
988 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
989 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)millisecondsTimeOutInterval
, executeOnlyOnce
, false);
992 public static RegisteredWaitHandle
RegisterWaitForSingleObject(
993 WaitHandle waitObject
,
994 WaitOrTimerCallback callBack
,
996 long millisecondsTimeOutInterval
,
997 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
1000 if (millisecondsTimeOutInterval
< -1)
1001 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
1002 if (millisecondsTimeOutInterval
> (uint)int.MaxValue
)
1003 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_LessEqualToIntegerMaxVal
);
1004 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)millisecondsTimeOutInterval
, executeOnlyOnce
, true);
1007 public static RegisteredWaitHandle
UnsafeRegisterWaitForSingleObject(
1008 WaitHandle waitObject
,
1009 WaitOrTimerCallback callBack
,
1011 long millisecondsTimeOutInterval
,
1012 bool executeOnlyOnce
// NOTE: we do not allow other options that allow the callback to be queued as an APC
1015 if (millisecondsTimeOutInterval
< -1)
1016 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
1017 if (millisecondsTimeOutInterval
> (uint)int.MaxValue
)
1018 throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval
), SR
.ArgumentOutOfRange_LessEqualToIntegerMaxVal
);
1019 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)millisecondsTimeOutInterval
, executeOnlyOnce
, false);
1022 public static RegisteredWaitHandle
RegisterWaitForSingleObject(
1023 WaitHandle waitObject
,
1024 WaitOrTimerCallback callBack
,
1027 bool executeOnlyOnce
1030 long tm
= (long)timeout
.TotalMilliseconds
;
1032 throw new ArgumentOutOfRangeException(nameof(timeout
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
1033 if (tm
> (long)int.MaxValue
)
1034 throw new ArgumentOutOfRangeException(nameof(timeout
), SR
.ArgumentOutOfRange_LessEqualToIntegerMaxVal
);
1035 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)tm
, executeOnlyOnce
, true);
1038 public static RegisteredWaitHandle
UnsafeRegisterWaitForSingleObject(
1039 WaitHandle waitObject
,
1040 WaitOrTimerCallback callBack
,
1043 bool executeOnlyOnce
1046 long tm
= (long)timeout
.TotalMilliseconds
;
1048 throw new ArgumentOutOfRangeException(nameof(timeout
), SR
.ArgumentOutOfRange_NeedNonNegOrNegative1
);
1049 if (tm
> (long)int.MaxValue
)
1050 throw new ArgumentOutOfRangeException(nameof(timeout
), SR
.ArgumentOutOfRange_LessEqualToIntegerMaxVal
);
1051 return RegisterWaitForSingleObject(waitObject
, callBack
, state
, (uint)tm
, executeOnlyOnce
, false);
1054 public static bool QueueUserWorkItem(WaitCallback callBack
) =>
1055 QueueUserWorkItem(callBack
, null);
1057 public static bool QueueUserWorkItem(WaitCallback callBack
, object? state
)
1059 if (callBack
== null)
1061 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.callBack
);
1064 EnsureInitialized();
1066 ExecutionContext context
= ExecutionContext
.Capture();
1068 object tpcallBack
= (context
== null || context
.IsDefault
) ?
1069 new QueueUserWorkItemCallbackDefaultContext(callBack
!, state
) :
1070 (object)new QueueUserWorkItemCallback(callBack
!, state
, context
);
1072 ThreadPoolGlobals
.workQueue
.Enqueue(tpcallBack
, forceGlobal
: true);
1077 public static bool QueueUserWorkItem
<TState
>(Action
<TState
> callBack
, TState state
, bool preferLocal
)
1079 if (callBack
== null)
1081 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.callBack
);
1084 EnsureInitialized();
1086 ExecutionContext context
= ExecutionContext
.Capture();
1088 object tpcallBack
= (context
== null || context
.IsDefault
) ?
1089 new QueueUserWorkItemCallbackDefaultContext
<TState
>(callBack
!, state
) :
1090 (object)new QueueUserWorkItemCallback
<TState
>(callBack
!, state
, context
);
1092 ThreadPoolGlobals
.workQueue
.Enqueue(tpcallBack
, forceGlobal
: !preferLocal
);
1097 public static bool UnsafeQueueUserWorkItem
<TState
>(Action
<TState
> callBack
, TState state
, bool preferLocal
)
1099 if (callBack
== null)
1101 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.callBack
);
1104 // If the callback is the runtime-provided invocation of an IAsyncStateMachineBox,
1105 // then we can queue the Task state directly to the ThreadPool instead of
1106 // wrapping it in a QueueUserWorkItemCallback.
1108 // This occurs when user code queues its provided continuation to the ThreadPool;
1109 // internally we call UnsafeQueueUserWorkItemInternal directly for Tasks.
1110 if (ReferenceEquals(callBack
, ThreadPoolGlobals
.s_invokeAsyncStateMachineBox
))
1112 if (!(state
is IAsyncStateMachineBox
))
1114 // The provided state must be the internal IAsyncStateMachineBox (Task) type
1115 ThrowHelper
.ThrowArgumentOutOfRangeException(ExceptionArgument
.state
);
1118 UnsafeQueueUserWorkItemInternal((object)state
!, preferLocal
);
1122 EnsureInitialized();
1124 ThreadPoolGlobals
.workQueue
.Enqueue(
1125 new QueueUserWorkItemCallbackDefaultContext
<TState
>(callBack
!, state
), forceGlobal
: !preferLocal
);
1130 public static bool UnsafeQueueUserWorkItem(WaitCallback callBack
, object? state
)
1132 if (callBack
== null)
1134 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.callBack
);
1137 EnsureInitialized();
1139 object tpcallBack
= new QueueUserWorkItemCallbackDefaultContext(callBack
!, state
);
1141 ThreadPoolGlobals
.workQueue
.Enqueue(tpcallBack
, forceGlobal
: true);
1146 public static bool UnsafeQueueUserWorkItem(IThreadPoolWorkItem callBack
, bool preferLocal
)
1148 if (callBack
== null)
1150 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.callBack
);
1152 if (callBack
is Task
)
1154 // Prevent code from queueing a derived Task that also implements the interface,
1155 // as that would bypass Task.Start and its safety checks.
1156 ThrowHelper
.ThrowArgumentOutOfRangeException(ExceptionArgument
.callBack
);
1159 UnsafeQueueUserWorkItemInternal(callBack
!, preferLocal
);
1163 internal static void UnsafeQueueUserWorkItemInternal(object callBack
, bool preferLocal
)
1165 Debug
.Assert((callBack
is IThreadPoolWorkItem
) ^
(callBack
is Task
));
1167 EnsureInitialized();
1169 ThreadPoolGlobals
.workQueue
.Enqueue(callBack
, forceGlobal
: !preferLocal
);
1172 // This method tries to take the target callback out of the current thread's queue.
1173 internal static bool TryPopCustomWorkItem(object workItem
)
1175 Debug
.Assert(null != workItem
);
1177 ThreadPoolGlobals
.threadPoolInitialized
&& // if not initialized, so there's no way this workitem was ever queued.
1178 ThreadPoolGlobals
.workQueue
.LocalFindAndPop(workItem
);
1181 // Get all workitems. Called by TaskScheduler in its debugger hooks.
1182 internal static IEnumerable
<object> GetQueuedWorkItems()
1184 // Enumerate global queue
1185 foreach (object workItem
in ThreadPoolGlobals
.workQueue
.workItems
)
1187 yield return workItem
;
1190 // Enumerate each local queue
1191 foreach (ThreadPoolWorkQueue
.WorkStealingQueue wsq
in ThreadPoolWorkQueue
.WorkStealingQueueList
.Queues
)
1193 if (wsq
!= null && wsq
.m_array
!= null)
1195 object?[] items
= wsq
.m_array
;
1196 for (int i
= 0; i
< items
.Length
; i
++)
1198 object? item
= items
[i
];
1208 internal static IEnumerable
<object> GetLocallyQueuedWorkItems()
1210 ThreadPoolWorkQueue
.WorkStealingQueue wsq
= ThreadPoolWorkQueueThreadLocals
.threadLocals
.workStealingQueue
;
1211 if (wsq
!= null && wsq
.m_array
!= null)
1213 object?[] items
= wsq
.m_array
;
1214 for (int i
= 0; i
< items
.Length
; i
++)
1216 object? item
= items
[i
];
1223 internal static IEnumerable
<object> GetGloballyQueuedWorkItems() => ThreadPoolGlobals
.workQueue
.workItems
;
1225 private static object[] ToObjectArray(IEnumerable
<object> workitems
)
1228 foreach (object item
in workitems
)
1233 object[] result
= new object[i
];
1235 foreach (object item
in workitems
)
1237 if (i
< result
.Length
) //just in case someone calls us while the queues are in motion
1245 // This is the method the debugger will actually call, if it ends up calling
1246 // into ThreadPool directly. Tests can use this to simulate a debugger, as well.
1247 internal static object[] GetQueuedWorkItemsForDebugger() =>
1248 ToObjectArray(GetQueuedWorkItems());
1250 internal static object[] GetGloballyQueuedWorkItemsForDebugger() =>
1251 ToObjectArray(GetGloballyQueuedWorkItems());
1253 internal static object[] GetLocallyQueuedWorkItemsForDebugger() =>
1254 ToObjectArray(GetLocallyQueuedWorkItems());