1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 using System
.Collections
.Generic
;
6 using System
.Diagnostics
;
7 using System
.Diagnostics
.CodeAnalysis
;
8 using System
.Threading
;
10 namespace System
.Collections
.Concurrent
13 /// Represents a thread-safe first-in, first-out collection of objects.
15 /// <typeparam name="T">Specifies the type of elements in the queue.</typeparam>
17 /// All public and protected members of <see cref="ConcurrentQueue{T}"/> are thread-safe and may be used
18 /// concurrently from multiple threads.
20 [DebuggerDisplay("Count = {Count}")]
21 [DebuggerTypeProxy(typeof(IProducerConsumerCollectionDebugView
<>))]
22 public class ConcurrentQueue
<T
> : IProducerConsumerCollection
<T
>, IReadOnlyCollection
<T
>
24 // This implementation provides an unbounded, multi-producer multi-consumer queue
25 // that supports the standard Enqueue/TryDequeue operations, as well as support for
26 // snapshot enumeration (GetEnumerator, ToArray, CopyTo), peeking, and Count/IsEmpty.
27 // It is composed of a linked list of bounded ring buffers, each of which has a head
28 // and a tail index, isolated from each other to minimize false sharing. As long as
29 // the number of elements in the queue remains less than the size of the current
30 // buffer (Segment), no additional allocations are required for enqueued items. When
31 // the number of items exceeds the size of the current segment, the current segment is
32 // "frozen" to prevent further enqueues, and a new segment is linked from it and set
33 // as the new tail segment for subsequent enqueues. As old segments are consumed by
34 // dequeues, the head reference is updated to point to the segment that dequeuers should
35 // try next. To support snapshot enumeration, segments also support the notion of
36 // preserving for observation, whereby they avoid overwriting state as part of dequeues.
37 // Any operation that requires a snapshot results in all current segments being
38 // both frozen for enqueues and preserved for observation: any new enqueues will go
39 // to new segments, and dequeuers will consume from the existing segments but without
40 // overwriting the existing data.
42 /// <summary>Initial length of the segments used in the queue.</summary>
43 private const int InitialSegmentLength
= 32;
45 /// Maximum length of the segments used in the queue. This is a somewhat arbitrary limit:
46 /// larger means that as long as we don't exceed the size, we avoid allocating more segments,
47 /// but if we do exceed it, then the segment becomes garbage.
49 private const int MaxSegmentLength
= 1024 * 1024;
52 /// Lock used to protect cross-segment operations, including any updates to <see cref="_tail"/> or <see cref="_head"/>
53 /// and any operations that need to get a consistent view of them.
55 private readonly object _crossSegmentLock
;
56 /// <summary>The current tail segment.</summary>
57 private volatile ConcurrentQueueSegment
<T
> _tail
;
58 /// <summary>The current head segment.</summary>
59 private volatile ConcurrentQueueSegment
<T
> _head
; // SOS's ThreadPool command depends on this name
62 /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class.
64 public ConcurrentQueue()
66 _crossSegmentLock
= new object();
67 _tail
= _head
= new ConcurrentQueueSegment
<T
>(InitialSegmentLength
);
71 /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class that contains elements copied
72 /// from the specified collection.
74 /// <param name="collection">
75 /// The collection whose elements are copied to the new <see cref="ConcurrentQueue{T}"/>.
77 /// <exception cref="System.ArgumentNullException">The <paramref name="collection"/> argument is null.</exception>
78 public ConcurrentQueue(IEnumerable
<T
> collection
)
80 if (collection
== null)
82 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.collection
);
85 _crossSegmentLock
= new object();
87 // Determine the initial segment size. We'll use the default,
88 // unless the collection is known to be larger than that, in which
89 // case we round its length up to a power of 2, as all segments must
90 // be a power of 2 in length.
91 int length
= InitialSegmentLength
;
92 if (collection
is ICollection
<T
> c
)
97 length
= Math
.Min(ConcurrentQueueSegment
<T
>.RoundUpToPowerOf2(count
), MaxSegmentLength
);
101 // Initialize the segment and add all of the data to it.
102 _tail
= _head
= new ConcurrentQueueSegment
<T
>(length
);
103 foreach (T item
in collection
)
110 /// Copies the elements of the <see cref="ICollection"/> to an <see
111 /// cref="Array"/>, starting at a particular <see cref="Array"/> index.
113 /// <param name="array">
114 /// The one-dimensional <see cref="Array">Array</see> that is the destination of the
115 /// elements copied from the <see cref="ConcurrentQueue{T}"/>. <paramref name="array"/> must have
116 /// zero-based indexing.
118 /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param>
119 /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in
120 /// Visual Basic).</exception>
121 /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than
122 /// zero.</exception>
123 /// <exception cref="ArgumentException">
124 /// <paramref name="array"/> is multidimensional. -or-
125 /// <paramref name="array"/> does not have zero-based indexing. -or-
126 /// <paramref name="index"/> is equal to or greater than the length of the <paramref name="array"/>
127 /// -or- The number of elements in the source <see cref="ICollection"/> is
128 /// greater than the available space from <paramref name="index"/> to the end of the destination
129 /// <paramref name="array"/>. -or- The type of the source <see
130 /// cref="ICollection"/> cannot be cast automatically to the type of the
131 /// destination <paramref name="array"/>.
133 void ICollection
.CopyTo(Array array
, int index
)
135 // Special-case when the Array is actually a T[], taking a faster path
136 if (array
is T
[] szArray
)
138 CopyTo(szArray
, index
);
142 // Validate arguments.
145 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.array
);
148 // Otherwise, fall back to the slower path that first copies the contents
149 // to an array, and then uses that array's non-generic CopyTo to do the copy.
150 ToArray().CopyTo(array
, index
);
154 /// Gets a value indicating whether access to the <see cref="ICollection"/> is
155 /// synchronized with the SyncRoot.
157 /// <value>true if access to the <see cref="ICollection"/> is synchronized
158 /// with the SyncRoot; otherwise, false. For <see cref="ConcurrentQueue{T}"/>, this property always
159 /// returns false.</value>
160 bool ICollection
.IsSynchronized
=> false; // always false, as true implies synchronization via SyncRoot
163 /// Gets an object that can be used to synchronize access to the <see
164 /// cref="ICollection"/>. This property is not supported.
166 /// <exception cref="NotSupportedException">The SyncRoot property is not supported.</exception>
167 object ICollection
.SyncRoot { get { ThrowHelper.ThrowNotSupportedException(ExceptionResource.ConcurrentCollection_SyncRoot_NotSupported); return default; }
}
169 /// <summary>Returns an enumerator that iterates through a collection.</summary>
170 /// <returns>An <see cref="IEnumerator"/> that can be used to iterate through the collection.</returns>
171 IEnumerator IEnumerable
.GetEnumerator() => ((IEnumerable
<T
>)this).GetEnumerator();
174 /// Attempts to add an object to the <see cref="Concurrent.IProducerConsumerCollection{T}"/>.
176 /// <param name="item">The object to add to the <see
177 /// cref="Concurrent.IProducerConsumerCollection{T}"/>. The value can be a null
178 /// reference (Nothing in Visual Basic) for reference types.
180 /// <returns>true if the object was added successfully; otherwise, false.</returns>
181 /// <remarks>For <see cref="ConcurrentQueue{T}"/>, this operation will always add the object to the
182 /// end of the <see cref="ConcurrentQueue{T}"/>
183 /// and return true.</remarks>
184 bool IProducerConsumerCollection
<T
>.TryAdd(T item
)
191 /// Attempts to remove and return an object from the <see cref="Concurrent.IProducerConsumerCollection{T}"/>.
193 /// <param name="item">
194 /// When this method returns, if the operation was successful, <paramref name="item"/> contains the
195 /// object removed. If no object was available to be removed, the value is unspecified.
197 /// <returns>true if an element was removed and returned successfully; otherwise, false.</returns>
198 /// <remarks>For <see cref="ConcurrentQueue{T}"/>, this operation will attempt to remove the object
199 /// from the beginning of the <see cref="ConcurrentQueue{T}"/>.
201 bool IProducerConsumerCollection
<T
>.TryTake(out T item
) => TryDequeue(out item
);
204 /// Gets a value that indicates whether the <see cref="ConcurrentQueue{T}"/> is empty.
206 /// <value>true if the <see cref="ConcurrentQueue{T}"/> is empty; otherwise, false.</value>
208 /// For determining whether the collection contains any items, use of this property is recommended
209 /// rather than retrieving the number of items from the <see cref="Count"/> property and comparing it
210 /// to 0. However, as this collection is intended to be accessed concurrently, it may be the case
211 /// that another thread will modify the collection after <see cref="IsEmpty"/> returns, thus invalidating
214 public bool IsEmpty
=>
215 // IsEmpty == !TryPeek. We use a "resultUsed:false" peek in order to avoid marking
216 // segments as preserved for observation, making IsEmpty a cheaper way than either
217 // TryPeek(out T) or Count == 0 to check whether any elements are in the queue.
218 !TryPeek(out _
, resultUsed
: false);
220 /// <summary>Copies the elements stored in the <see cref="ConcurrentQueue{T}"/> to a new array.</summary>
221 /// <returns>A new array containing a snapshot of elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
224 // Snap the current contents for enumeration.
225 ConcurrentQueueSegment
<T
> head
, tail
;
226 int headHead
, tailTail
;
227 SnapForObservation(out head
, out headHead
, out tail
, out tailTail
);
229 // Count the number of items in that snapped set, and use it to allocate an
230 // array of the right size.
231 long count
= GetCount(head
, headHead
, tail
, tailTail
);
232 T
[] arr
= new T
[count
];
234 // Now enumerate the contents, copying each element into the array.
235 using (IEnumerator
<T
> e
= Enumerate(head
, headHead
, tail
, tailTail
))
240 arr
[i
++] = e
.Current
;
242 Debug
.Assert(count
== i
);
250 /// Gets the number of elements contained in the <see cref="ConcurrentQueue{T}"/>.
252 /// <value>The number of elements contained in the <see cref="ConcurrentQueue{T}"/>.</value>
254 /// For determining whether the collection contains any items, use of the <see cref="IsEmpty"/>
255 /// property is recommended rather than retrieving the number of items from the <see cref="Count"/>
256 /// property and comparing it to 0.
262 var spinner
= new SpinWait();
265 // Capture the head and tail, as well as the head's head and tail.
266 ConcurrentQueueSegment
<T
> head
= _head
;
267 ConcurrentQueueSegment
<T
> tail
= _tail
;
268 int headHead
= Volatile
.Read(ref head
._headAndTail
.Head
);
269 int headTail
= Volatile
.Read(ref head
._headAndTail
.Tail
);
273 // There was a single segment in the queue. If the captured segments still
274 // match, then we can trust the values to compute the segment's count. (It's
275 // theoretically possible the values could have looped around and still exactly match,
276 // but that would required at least ~4 billion elements to have been enqueued and
277 // dequeued between the reads.)
280 headHead
== Volatile
.Read(ref head
._headAndTail
.Head
) &&
281 headTail
== Volatile
.Read(ref head
._headAndTail
.Tail
))
283 return GetCount(head
, headHead
, headTail
);
286 else if (head
._nextSegment
== tail
)
288 // There were two segments in the queue. Get the positions from the tail, and as above,
289 // if the captured values match the previous reads, return the sum of the counts from both segments.
290 int tailHead
= Volatile
.Read(ref tail
._headAndTail
.Head
);
291 int tailTail
= Volatile
.Read(ref tail
._headAndTail
.Tail
);
294 headHead
== Volatile
.Read(ref head
._headAndTail
.Head
) &&
295 headTail
== Volatile
.Read(ref head
._headAndTail
.Tail
) &&
296 tailHead
== Volatile
.Read(ref tail
._headAndTail
.Head
) &&
297 tailTail
== Volatile
.Read(ref tail
._headAndTail
.Tail
))
299 return GetCount(head
, headHead
, headTail
) + GetCount(tail
, tailHead
, tailTail
);
304 // There were more than two segments in the queue. Fall back to taking the cross-segment lock,
305 // which will ensure that the head and tail segments we read are stable (since the lock is needed to change them);
306 // for the two-segment case above, we can simply rely on subsequent comparisons, but for the two+ case, we need
307 // to be able to trust the internal segments between the head and tail.
308 lock (_crossSegmentLock
)
310 // Now that we hold the lock, re-read the previously captured head and tail segments and head positions.
311 // If either has changed, start over.
312 if (head
== _head
&& tail
== _tail
)
314 // Get the positions from the tail, and as above, if the captured values match the previous reads,
315 // we can use the values to compute the count of the head and tail segments.
316 int tailHead
= Volatile
.Read(ref tail
._headAndTail
.Head
);
317 int tailTail
= Volatile
.Read(ref tail
._headAndTail
.Tail
);
318 if (headHead
== Volatile
.Read(ref head
._headAndTail
.Head
) &&
319 headTail
== Volatile
.Read(ref head
._headAndTail
.Tail
) &&
320 tailHead
== Volatile
.Read(ref tail
._headAndTail
.Head
) &&
321 tailTail
== Volatile
.Read(ref tail
._headAndTail
.Tail
))
323 // We got stable values for the head and tail segments, so we can just compute the sizes
324 // based on those and add them. Note that this and the below additions to count may overflow: previous
325 // implementations allowed that, so we don't check, either, and it is theoretically possible for the
326 // queue to store more than int.MaxValue items.
327 int count
= GetCount(head
, headHead
, headTail
) + GetCount(tail
, tailHead
, tailTail
);
329 // Now add the counts for each internal segment. Since there were segments before these,
330 // for counting purposes we consider them to start at the 0th element, and since there is at
331 // least one segment after each, each was frozen, so we can count until each's frozen tail.
332 // With the cross-segment lock held, we're guaranteed that all of these internal segments are
333 // consistent, as the head and tail segment can't be changed while we're holding the lock, and
334 // dequeueing and enqueueing can only be done from the head and tail segments, which these aren't.
335 for (ConcurrentQueueSegment
<T
> s
= head
._nextSegment
!; s
!= tail
; s
= s
._nextSegment
!)
337 Debug
.Assert(s
._frozenForEnqueues
, "Internal segment must be frozen as there's a following segment.");
338 count
+= s
._headAndTail
.Tail
- s
.FreezeOffset
;
347 // We raced with enqueues/dequeues and captured an inconsistent picture of the queue.
348 // Spin and try again.
354 /// <summary>Computes the number of items in a segment based on a fixed head and tail in that segment.</summary>
355 private static int GetCount(ConcurrentQueueSegment
<T
> s
, int head
, int tail
)
357 if (head
!= tail
&& head
!= tail
- s
.FreezeOffset
)
359 head
&= s
._slotsMask
;
360 tail
&= s
._slotsMask
;
361 return head
< tail
? tail
- head
: s
._slots
.Length
- head
+ tail
;
366 /// <summary>Gets the number of items in snapped region.</summary>
367 private static long GetCount(ConcurrentQueueSegment
<T
> head
, int headHead
, ConcurrentQueueSegment
<T
> tail
, int tailTail
)
369 // All of the segments should have been both frozen for enqueues and preserved for observation.
370 // Validate that here for head and tail; we'll validate it for intermediate segments later.
371 Debug
.Assert(head
._preservedForObservation
);
372 Debug
.Assert(head
._frozenForEnqueues
);
373 Debug
.Assert(tail
._preservedForObservation
);
374 Debug
.Assert(tail
._frozenForEnqueues
);
378 // Head segment. We've already marked it as frozen for enqueues, so its tail position is fixed,
379 // and we've already marked it as preserved for observation (before we grabbed the head), so we
380 // can safely enumerate from its head to its tail and access its elements.
381 int headTail
= (head
== tail
? tailTail
: Volatile
.Read(ref head
._headAndTail
.Tail
)) - head
.FreezeOffset
;
382 if (headHead
< headTail
)
384 // Mask the head and tail for the head segment
385 headHead
&= head
._slotsMask
;
386 headTail
&= head
._slotsMask
;
388 // Increase the count by either the one or two regions, based on whether tail
389 // has wrapped to be less than head.
390 count
+= headHead
< headTail
?
391 headTail
- headHead
:
392 head
._slots
.Length
- headHead
+ headTail
;
395 // We've enumerated the head. If the tail is different from the head, we need to
396 // enumerate the remaining segments.
399 // Count the contents of each segment between head and tail, not including head and tail.
400 // Since there were segments before these, for our purposes we consider them to start at
401 // the 0th element, and since there is at least one segment after each, each was frozen
402 // by the time we snapped it, so we can iterate until each's frozen tail.
403 for (ConcurrentQueueSegment
<T
> s
= head
._nextSegment
!; s
!= tail
; s
= s
._nextSegment
!)
405 Debug
.Assert(s
._preservedForObservation
);
406 Debug
.Assert(s
._frozenForEnqueues
);
407 count
+= s
._headAndTail
.Tail
- s
.FreezeOffset
;
410 // Finally, enumerate the tail. As with the intermediate segments, there were segments
411 // before this in the snapped region, so we can start counting from the beginning. Unlike
412 // the intermediate segments, we can't just go until the Tail, as that could still be changing;
413 // instead we need to go until the tail we snapped for observation.
414 count
+= tailTail
- tail
.FreezeOffset
;
417 // Return the computed count.
422 /// Copies the <see cref="ConcurrentQueue{T}"/> elements to an existing one-dimensional <see
423 /// cref="Array">Array</see>, starting at the specified array index.
425 /// <param name="array">The one-dimensional <see cref="Array">Array</see> that is the
426 /// destination of the elements copied from the
427 /// <see cref="ConcurrentQueue{T}"/>. The <see cref="Array">Array</see> must have zero-based
428 /// indexing.</param>
429 /// <param name="index">The zero-based index in <paramref name="array"/> at which copying
431 /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in
432 /// Visual Basic).</exception>
433 /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than
434 /// zero.</exception>
435 /// <exception cref="ArgumentException"><paramref name="index"/> is equal to or greater than the
436 /// length of the <paramref name="array"/>
437 /// -or- The number of elements in the source <see cref="ConcurrentQueue{T}"/> is greater than the
438 /// available space from <paramref name="index"/> to the end of the destination <paramref
441 public void CopyTo(T
[] array
, int index
)
445 ThrowHelper
.ThrowArgumentNullException(ExceptionArgument
.array
);
449 ThrowHelper
.ThrowArgumentOutOfRangeException(ExceptionArgument
.index
);
452 // Snap for enumeration
453 ConcurrentQueueSegment
<T
> head
, tail
;
454 int headHead
, tailTail
;
455 SnapForObservation(out head
, out headHead
, out tail
, out tailTail
);
457 // Get the number of items to be enumerated
458 long count
= GetCount(head
, headHead
, tail
, tailTail
);
459 if (index
> array
.Length
- count
)
461 ThrowHelper
.ThrowArgumentException(ExceptionResource
.Arg_ArrayPlusOffTooSmall
);
464 // Copy the items to the target array
466 using (IEnumerator
<T
> e
= Enumerate(head
, headHead
, tail
, tailTail
))
470 array
[i
++] = e
.Current
;
473 Debug
.Assert(count
== i
- index
);
476 /// <summary>Returns an enumerator that iterates through the <see cref="ConcurrentQueue{T}"/>.</summary>
477 /// <returns>An enumerator for the contents of the <see
478 /// cref="ConcurrentQueue{T}"/>.</returns>
480 /// The enumeration represents a moment-in-time snapshot of the contents
481 /// of the queue. It does not reflect any updates to the collection after
482 /// <see cref="GetEnumerator"/> was called. The enumerator is safe to use
483 /// concurrently with reads from and writes to the queue.
485 public IEnumerator
<T
> GetEnumerator()
487 ConcurrentQueueSegment
<T
> head
, tail
;
488 int headHead
, tailTail
;
489 SnapForObservation(out head
, out headHead
, out tail
, out tailTail
);
490 return Enumerate(head
, headHead
, tail
, tailTail
);
494 /// Gets the head and tail information of the current contents of the queue.
495 /// After this call returns, the specified region can be enumerated any number
496 /// of times and will not change.
498 private void SnapForObservation(out ConcurrentQueueSegment
<T
> head
, out int headHead
, out ConcurrentQueueSegment
<T
> tail
, out int tailTail
)
500 lock (_crossSegmentLock
) // _head and _tail may only change while the lock is held.
502 // Snap the head and tail
505 Debug
.Assert(head
!= null);
506 Debug
.Assert(tail
!= null);
507 Debug
.Assert(tail
._nextSegment
== null);
509 // Mark them and all segments in between as preserving, and ensure no additional items
510 // can be added to the tail.
511 for (ConcurrentQueueSegment
<T
> s
= head
; ; s
= s
._nextSegment
!)
513 s
._preservedForObservation
= true;
514 if (s
== tail
) break;
515 Debug
.Assert(s
._frozenForEnqueues
); // any non-tail should already be marked
517 tail
.EnsureFrozenForEnqueues(); // we want to prevent the tailTail from moving
519 // At this point, any dequeues from any segment won't overwrite the value, and
520 // none of the existing segments can have new items enqueued.
522 headHead
= Volatile
.Read(ref head
._headAndTail
.Head
);
523 tailTail
= Volatile
.Read(ref tail
._headAndTail
.Tail
);
527 /// <summary>Gets the item stored in the <paramref name="i"/>th entry in <paramref name="segment"/>.</summary>
528 private T
GetItemWhenAvailable(ConcurrentQueueSegment
<T
> segment
, int i
)
530 Debug
.Assert(segment
._preservedForObservation
);
532 // Get the expected value for the sequence number
533 int expectedSequenceNumberAndMask
= (i
+ 1) & segment
._slotsMask
;
535 // If the expected sequence number is not yet written, we're still waiting for
536 // an enqueuer to finish storing it. Spin until it's there.
537 if ((segment
._slots
[i
].SequenceNumber
& segment
._slotsMask
) != expectedSequenceNumberAndMask
)
539 var spinner
= new SpinWait();
540 while ((Volatile
.Read(ref segment
._slots
[i
].SequenceNumber
) & segment
._slotsMask
) != expectedSequenceNumberAndMask
)
546 // Return the value from the slot.
547 return segment
._slots
[i
].Item
;
550 private IEnumerator
<T
> Enumerate(ConcurrentQueueSegment
<T
> head
, int headHead
, ConcurrentQueueSegment
<T
> tail
, int tailTail
)
552 Debug
.Assert(head
._preservedForObservation
);
553 Debug
.Assert(head
._frozenForEnqueues
);
554 Debug
.Assert(tail
._preservedForObservation
);
555 Debug
.Assert(tail
._frozenForEnqueues
);
557 // Head segment. We've already marked it as not accepting any more enqueues,
558 // so its tail position is fixed, and we've already marked it as preserved for
559 // enumeration (before we grabbed its head), so we can safely enumerate from
560 // its head to its tail.
561 int headTail
= (head
== tail
? tailTail
: Volatile
.Read(ref head
._headAndTail
.Tail
)) - head
.FreezeOffset
;
562 if (headHead
< headTail
)
564 headHead
&= head
._slotsMask
;
565 headTail
&= head
._slotsMask
;
567 if (headHead
< headTail
)
569 for (int i
= headHead
; i
< headTail
; i
++) yield return GetItemWhenAvailable(head
, i
);
573 for (int i
= headHead
; i
< head
._slots
.Length
; i
++) yield return GetItemWhenAvailable(head
, i
);
574 for (int i
= 0; i
< headTail
; i
++) yield return GetItemWhenAvailable(head
, i
);
578 // We've enumerated the head. If the tail is the same, we're done.
581 // Each segment between head and tail, not including head and tail. Since there were
582 // segments before these, for our purposes we consider it to start at the 0th element.
583 for (ConcurrentQueueSegment
<T
> s
= head
._nextSegment
!; s
!= tail
; s
= s
._nextSegment
!)
585 Debug
.Assert(s
._preservedForObservation
, "Would have had to been preserved as a segment part of enumeration");
586 Debug
.Assert(s
._frozenForEnqueues
, "Would have had to be frozen for enqueues as it's intermediate");
588 int sTail
= s
._headAndTail
.Tail
- s
.FreezeOffset
;
589 for (int i
= 0; i
< sTail
; i
++)
591 yield return GetItemWhenAvailable(s
, i
);
595 // Enumerate the tail. Since there were segments before this, we can just start at
596 // its beginning, and iterate until the tail we already grabbed.
597 tailTail
-= tail
.FreezeOffset
;
598 for (int i
= 0; i
< tailTail
; i
++)
600 yield return GetItemWhenAvailable(tail
, i
);
605 /// <summary>Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>.</summary>
606 /// <param name="item">
607 /// The object to add to the end of the <see cref="ConcurrentQueue{T}"/>.
608 /// The value can be a null reference (Nothing in Visual Basic) for reference types.
610 public void Enqueue(T item
)
612 // Try to enqueue to the current tail.
613 if (!_tail
.TryEnqueue(item
))
615 // If we're unable to, we need to take a slow path that will
616 // try to add a new tail segment.
621 /// <summary>Adds to the end of the queue, adding a new segment if necessary.</summary>
622 private void EnqueueSlow(T item
)
626 ConcurrentQueueSegment
<T
> tail
= _tail
;
628 // Try to append to the existing tail.
629 if (tail
.TryEnqueue(item
))
634 // If we were unsuccessful, take the lock so that we can compare and manipulate
635 // the tail. Assuming another enqueuer hasn't already added a new segment,
636 // do so, then loop around to try enqueueing again.
637 lock (_crossSegmentLock
)
641 // Make sure no one else can enqueue to this segment.
642 tail
.EnsureFrozenForEnqueues();
644 // We determine the new segment's length based on the old length.
645 // In general, we double the size of the segment, to make it less likely
646 // that we'll need to grow again. However, if the tail segment is marked
647 // as preserved for observation, something caused us to avoid reusing this
648 // segment, and if that happens a lot and we grow, we'll end up allocating
649 // lots of wasted space. As such, in such situations we reset back to the
650 // initial segment length; if these observations are happening frequently,
651 // this will help to avoid wasted memory, and if they're not, we'll
652 // relatively quickly grow again to a larger size.
653 int nextSize
= tail
._preservedForObservation
? InitialSegmentLength
: Math
.Min(tail
.Capacity
* 2, MaxSegmentLength
);
654 var newTail
= new ConcurrentQueueSegment
<T
>(nextSize
);
656 // Hook up the new tail.
657 tail
._nextSegment
= newTail
;
665 /// Attempts to remove and return the object at the beginning of the <see
666 /// cref="ConcurrentQueue{T}"/>.
668 /// <param name="result">
669 /// When this method returns, if the operation was successful, <paramref name="result"/> contains the
670 /// object removed. If no object was available to be removed, the value is unspecified.
673 /// true if an element was removed and returned from the beginning of the
674 /// <see cref="ConcurrentQueue{T}"/> successfully; otherwise, false.
676 public bool TryDequeue([MaybeNullWhen(false)] out T result
) =>
677 _head
.TryDequeue(out result
) || // fast-path that operates just on the head segment
678 TryDequeueSlow(out result
); // slow path that needs to fix up segments
680 /// <summary>Tries to dequeue an item, removing empty segments as needed.</summary>
681 private bool TryDequeueSlow([MaybeNullWhen(false)] out T item
)
685 // Get the current head
686 ConcurrentQueueSegment
<T
> head
= _head
;
688 // Try to take. If we're successful, we're done.
689 if (head
.TryDequeue(out item
))
694 // Check to see whether this segment is the last. If it is, we can consider
695 // this to be a moment-in-time empty condition (even though between the TryDequeue
696 // check and this check, another item could have arrived).
697 if (head
._nextSegment
== null)
703 // At this point we know that head.Next != null, which means
704 // this segment has been frozen for additional enqueues. But between
705 // the time that we ran TryDequeue and checked for a next segment,
706 // another item could have been added. Try to dequeue one more time
707 // to confirm that the segment is indeed empty.
708 Debug
.Assert(head
._frozenForEnqueues
);
709 if (head
.TryDequeue(out item
))
714 // This segment is frozen (nothing more can be added) and empty (nothing is in it).
715 // Update head to point to the next segment in the list, assuming no one's beat us to it.
716 lock (_crossSegmentLock
)
720 _head
= head
._nextSegment
;
727 /// Attempts to return an object from the beginning of the <see cref="ConcurrentQueue{T}"/>
728 /// without removing it.
730 /// <param name="result">
731 /// When this method returns, <paramref name="result"/> contains an object from
732 /// the beginning of the <see cref="Concurrent.ConcurrentQueue{T}"/> or default(T)
733 /// if the operation failed.
735 /// <returns>true if and object was returned successfully; otherwise, false.</returns>
737 /// For determining whether the collection contains any items, use of the <see cref="IsEmpty"/>
738 /// property is recommended rather than peeking.
740 public bool TryPeek([MaybeNullWhen(false)] out T result
) => TryPeek(out result
, resultUsed
: true);
742 /// <summary>Attempts to retrieve the value for the first element in the queue.</summary>
743 /// <param name="result">The value of the first element, if found.</param>
744 /// <param name="resultUsed">true if the result is needed; otherwise false if only the true/false outcome is needed.</param>
745 /// <returns>true if an element was found; otherwise, false.</returns>
746 private bool TryPeek([MaybeNullWhen(false)] out T result
, bool resultUsed
)
748 // Starting with the head segment, look through all of the segments
749 // for the first one we can find that's not empty.
750 ConcurrentQueueSegment
<T
> s
= _head
;
753 // Grab the next segment from this one, before we peek.
754 // This is to be able to see whether the value has changed
755 // during the peek operation.
756 ConcurrentQueueSegment
<T
>? next
= Volatile
.Read(ref s
._nextSegment
);
758 // Peek at the segment. If we find an element, we're done.
759 if (s
.TryPeek(out result
, resultUsed
))
764 // The current segment was empty at the moment we checked.
768 // If prior to the peek there was already a next segment, then
769 // during the peek no additional items could have been enqueued
770 // to it and we can just move on to check the next segment.
771 Debug
.Assert(next
== s
._nextSegment
);
774 else if (Volatile
.Read(ref s
._nextSegment
) == null)
776 // The next segment is null. Nothing more to peek at.
780 // The next segment was null before we peeked but non-null after.
781 // That means either when we peeked the first segment had
782 // already been frozen but the new segment not yet added,
783 // or that the first segment was empty and between the time
784 // that we peeked and then checked _nextSegment, so many items
785 // were enqueued that we filled the first segment and went
786 // into the next. Since we need to peek in order, we simply
787 // loop around again to peek on the same segment. The next
788 // time around on this segment we'll then either successfully
789 // peek or we'll find that next was non-null before peeking,
790 // and we'll traverse to that segment.
798 /// Removes all objects from the <see cref="ConcurrentQueue{T}"/>.
802 lock (_crossSegmentLock
)
804 // Simply substitute a new segment for the existing head/tail,
805 // as is done in the constructor. Operations currently in flight
806 // may still read from or write to an existing segment that's
807 // getting dropped, meaning that in flight operations may not be
808 // linear with regards to this clear operation. To help mitigate
809 // in-flight operations enqueuing onto the tail that's about to
810 // be dropped, we first freeze it; that'll force enqueuers to take
811 // this lock to synchronize and see the new tail.
812 _tail
.EnsureFrozenForEnqueues();
813 _tail
= _head
= new ConcurrentQueueSegment
<T
>(InitialSegmentLength
);