1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // Specialized producer/consumer queues.
12 // ************<IMPORTANT NOTE>*************
14 // src\ndp\clr\src\bcl\system\threading\tasks\producerConsumerQueue.cs
15 // src\ndp\fx\src\dataflow\system\threading\tasks\dataflow\internal\producerConsumerQueue.cs
16 // Keep both of them consistent by changing the other file when you change this one, also avoid:
17 // 1- To reference interneal types in mscorlib
18 // 2- To reference any dataflow specific types
19 // This should be fixed post Dev11 when this class becomes public.
21 // ************</IMPORTANT NOTE>*************
22 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
24 using System
.Collections
;
25 using System
.Collections
.Concurrent
;
26 using System
.Collections
.Generic
;
27 using System
.Diagnostics
;
28 using System
.Diagnostics
.CodeAnalysis
;
29 using System
.Runtime
.InteropServices
;
31 namespace System
.Threading
.Tasks
33 /// <summary>Represents a producer/consumer queue used internally by dataflow blocks.</summary>
34 /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
35 internal interface IProducerConsumerQueue
<T
> : IEnumerable
<T
>
37 /// <summary>Enqueues an item into the queue.</summary>
38 /// <param name="item">The item to enqueue.</param>
39 /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
42 /// <summary>Attempts to dequeue an item from the queue.</summary>
43 /// <param name="result">The dequeued item.</param>
44 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
45 /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
46 bool TryDequeue([MaybeNullWhen(false)] out T result
);
48 /// <summary>Gets whether the collection is currently empty.</summary>
49 /// <remarks>This method may or may not be thread-safe.</remarks>
52 /// <summary>Gets the number of items in the collection.</summary>
53 /// <remarks>In many implementations, this method will not be thread-safe.</remarks>
58 /// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently.
60 /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
61 [DebuggerDisplay("Count = {Count}")]
62 internal sealed class MultiProducerMultiConsumerQueue
<T
> : ConcurrentQueue
<T
>, IProducerConsumerQueue
<T
>
64 /// <summary>Enqueues an item into the queue.</summary>
65 /// <param name="item">The item to enqueue.</param>
66 void IProducerConsumerQueue
<T
>.Enqueue(T item
) { base.Enqueue(item); }
68 /// <summary>Attempts to dequeue an item from the queue.</summary>
69 /// <param name="result">The dequeued item.</param>
70 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
71 bool IProducerConsumerQueue
<T
>.TryDequeue([MaybeNullWhen(false)] out T result
) { return base.TryDequeue(out result); }
73 /// <summary>Gets whether the collection is currently empty.</summary>
74 bool IProducerConsumerQueue
<T
>.IsEmpty
=> base.IsEmpty
;
76 /// <summary>Gets the number of items in the collection.</summary>
77 int IProducerConsumerQueue
<T
>.Count
=> base.Count
;
81 /// Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently.
83 /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
84 [DebuggerDisplay("Count = {Count}")]
85 [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue
<>.SingleProducerSingleConsumerQueue_DebugView
))]
86 internal sealed class SingleProducerSingleConsumerQueue
<T
> : IProducerConsumerQueue
<T
>
90 // SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used
91 // by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by
92 // multiple producer threads concurrently or multiple consumer threads concurrently.
94 // SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented
95 // as an array with two indexes: m_first and m_last. m_first is the index of the array slot for the consumer
96 // to read next, and m_last is the slot for the producer to write next. The circular buffer is empty when
97 // (m_first == m_last), and full when ((m_last+1) % m_array.Length == m_first).
99 // Since m_first is only ever modified by the consumer thread and m_last by the producer, the two indices can
100 // be updated without interlocked operations. As long as the queue size fits inside a single circular buffer,
101 // enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds
102 // that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big
103 // as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first
104 // empty out the old buffer and only then follow the producer into the new (larger) buffer.
106 // As described above, the enqueue operation on the fast path only modifies the m_first field of the current segment.
107 // However, it also needs to read m_last in order to verify that there is room in the current segment. Similarly, the
108 // dequeue operation on the fast path only needs to modify m_last, but also needs to read m_first to verify that the
109 // queue is non-empty. This results in true cache line sharing between the producer and the consumer.
111 // The cache line sharing issue can be mitigating by having a possibly stale copy of m_first that is owned by the producer,
112 // and a possibly stale copy of m_last that is owned by the consumer. So, the consumer state is described using
113 // (m_first, m_lastCopy) and the producer state using (m_firstCopy, m_last). The consumer state is separated from
114 // the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines.
115 // m_lastCopy is the consumer's copy of m_last. Whenever the consumer can tell that there is room in the buffer
116 // simply by observing m_lastCopy, the consumer thread does not need to read m_last and thus encounter a cache miss. Only
117 // when the buffer appears to be empty will the consumer refresh m_lastCopy from m_last. m_firstCopy is used by the producer
118 // in the same way to avoid reading m_first on the hot path.
120 /// <summary>The initial size to use for segments (in number of elements).</summary>
121 private const int INIT_SEGMENT_SIZE
= 32; // must be a power of 2
122 /// <summary>The maximum size to use for segments (in number of elements).</summary>
123 private const int MAX_SEGMENT_SIZE
= 0x1000000; // this could be made as large as int.MaxValue / 2
125 /// <summary>The head of the linked list of segments.</summary>
126 private volatile Segment m_head
;
127 /// <summary>The tail of the linked list of segments.</summary>
128 private volatile Segment m_tail
;
130 /// <summary>Initializes the queue.</summary>
131 internal SingleProducerSingleConsumerQueue()
133 // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation
134 Debug
.Assert(INIT_SEGMENT_SIZE
> 0, "Initial segment size must be > 0.");
135 Debug
.Assert((INIT_SEGMENT_SIZE
& (INIT_SEGMENT_SIZE
- 1)) == 0, "Initial segment size must be a power of 2");
136 Debug
.Assert(INIT_SEGMENT_SIZE
<= MAX_SEGMENT_SIZE
, "Initial segment size should be <= maximum.");
137 Debug
.Assert(MAX_SEGMENT_SIZE
< int.MaxValue
/ 2, "Max segment size * 2 must be < int.MaxValue, or else overflow could occur.");
139 // Initialize the queue
140 m_head
= m_tail
= new Segment(INIT_SEGMENT_SIZE
);
143 /// <summary>Enqueues an item into the queue.</summary>
144 /// <param name="item">The item to enqueue.</param>
145 public void Enqueue(T item
)
147 Segment segment
= m_tail
;
148 T
[] array
= segment
.m_array
;
149 int last
= segment
.m_state
.m_last
; // local copy to avoid multiple volatile reads
151 // Fast path: there's obviously room in the current segment
152 int tail2
= (last
+ 1) & (array
.Length
- 1);
153 if (tail2
!= segment
.m_state
.m_firstCopy
)
156 segment
.m_state
.m_last
= tail2
;
158 // Slow path: there may not be room in the current segment.
159 else EnqueueSlow(item
, ref segment
);
162 /// <summary>Enqueues an item into the queue.</summary>
163 /// <param name="item">The item to enqueue.</param>
164 /// <param name="segment">The segment in which to first attempt to store the item.</param>
165 private void EnqueueSlow(T item
, ref Segment segment
)
167 Debug
.Assert(segment
!= null, "Expected a non-null segment.");
169 if (segment
.m_state
.m_firstCopy
!= segment
.m_state
.m_first
)
171 segment
.m_state
.m_firstCopy
= segment
.m_state
.m_first
;
172 Enqueue(item
); // will only recur once for this enqueue operation
176 int newSegmentSize
= m_tail
.m_array
.Length
<< 1; // double size
177 Debug
.Assert(newSegmentSize
> 0, "The max size should always be small enough that we don't overflow.");
178 if (newSegmentSize
> MAX_SEGMENT_SIZE
) newSegmentSize
= MAX_SEGMENT_SIZE
;
180 var newSegment
= new Segment(newSegmentSize
);
181 newSegment
.m_array
[0] = item
;
182 newSegment
.m_state
.m_last
= 1;
183 newSegment
.m_state
.m_lastCopy
= 1;
188 // Finally block to protect against corruption due to a thread abort
189 // between setting m_next and setting m_tail.
190 Volatile
.Write(ref m_tail
.m_next
, newSegment
); // ensure segment not published until item is fully stored
195 /// <summary>Attempts to dequeue an item from the queue.</summary>
196 /// <param name="result">The dequeued item.</param>
197 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
198 public bool TryDequeue([MaybeNullWhen(false)] out T result
)
200 Segment segment
= m_head
;
201 T
[] array
= segment
.m_array
;
202 int first
= segment
.m_state
.m_first
; // local copy to avoid multiple volatile reads
204 // Fast path: there's obviously data available in the current segment
205 if (first
!= segment
.m_state
.m_lastCopy
)
207 result
= array
[first
];
208 array
[first
] = default!; // Clear the slot to release the element
209 segment
.m_state
.m_first
= (first
+ 1) & (array
.Length
- 1);
212 // Slow path: there may not be data available in the current segment
213 else return TryDequeueSlow(ref segment
, ref array
, out result
);
216 /// <summary>Attempts to dequeue an item from the queue.</summary>
217 /// <param name="array">The array from which the item was dequeued.</param>
218 /// <param name="segment">The segment from which the item was dequeued.</param>
219 /// <param name="result">The dequeued item.</param>
220 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
221 private bool TryDequeueSlow(ref Segment segment
, ref T
[] array
, [MaybeNullWhen(false)] out T result
)
223 Debug
.Assert(segment
!= null, "Expected a non-null segment.");
224 Debug
.Assert(array
!= null, "Expected a non-null item array.");
226 if (segment
.m_state
.m_last
!= segment
.m_state
.m_lastCopy
)
228 segment
.m_state
.m_lastCopy
= segment
.m_state
.m_last
;
229 return TryDequeue(out result
); // will only recur once for this dequeue operation
232 if (segment
.m_next
!= null && segment
.m_state
.m_first
== segment
.m_state
.m_last
)
234 segment
= segment
.m_next
;
235 array
= segment
.m_array
;
239 int first
= segment
.m_state
.m_first
; // local copy to avoid extraneous volatile reads
241 if (first
== segment
.m_state
.m_last
)
247 result
= array
[first
];
248 array
[first
] = default!; // Clear the slot to release the element
249 segment
.m_state
.m_first
= (first
+ 1) & (segment
.m_array
.Length
- 1);
250 segment
.m_state
.m_lastCopy
= segment
.m_state
.m_last
; // Refresh m_lastCopy to ensure that m_first has not passed m_lastCopy
255 /// <summary>Gets whether the collection is currently empty.</summary>
256 /// <remarks>WARNING: This should not be used concurrently without further vetting.</remarks>
259 // This implementation is optimized for calls from the consumer.
262 Segment head
= m_head
;
263 if (head
.m_state
.m_first
!= head
.m_state
.m_lastCopy
) return false; // m_first is volatile, so the read of m_lastCopy cannot get reordered
264 if (head
.m_state
.m_first
!= head
.m_state
.m_last
) return false;
265 return head
.m_next
== null;
269 /// <summary>Gets an enumerable for the collection.</summary>
270 /// <remarks>WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.</remarks>
271 public IEnumerator
<T
> GetEnumerator()
273 for (Segment
? segment
= m_head
; segment
!= null; segment
= segment
.m_next
)
275 for (int pt
= segment
.m_state
.m_first
;
276 pt
!= segment
.m_state
.m_last
;
277 pt
= (pt
+ 1) & (segment
.m_array
.Length
- 1))
279 yield return segment
.m_array
[pt
];
283 /// <summary>Gets an enumerable for the collection.</summary>
284 /// <remarks>WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.</remarks>
285 IEnumerator IEnumerable
.GetEnumerator() { return GetEnumerator(); }
287 /// <summary>Gets the number of items in the collection.</summary>
288 /// <remarks>WARNING: This should only be used for debugging purposes. It is not meant to be used concurrently.</remarks>
294 for (Segment
? segment
= m_head
; segment
!= null; segment
= segment
.m_next
)
296 int arraySize
= segment
.m_array
.Length
;
298 while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is
300 first
= segment
.m_state
.m_first
;
301 last
= segment
.m_state
.m_last
;
302 if (first
== segment
.m_state
.m_first
) break;
304 count
+= (last
- first
) & (arraySize
- 1);
310 /// <summary>A segment in the queue containing one or more items.</summary>
311 [StructLayout(LayoutKind
.Sequential
)]
312 private sealed class Segment
314 /// <summary>The next segment in the linked list of segments.</summary>
315 internal Segment
? m_next
;
316 /// <summary>The data stored in this segment.</summary>
317 internal readonly T
[] m_array
;
318 /// <summary>Details about the segment.</summary>
319 internal SegmentState m_state
; // separated out to enable StructLayout attribute to take effect
321 /// <summary>Initializes the segment.</summary>
322 /// <param name="size">The size to use for this segment.</param>
323 internal Segment(int size
)
325 Debug
.Assert((size
& (size
- 1)) == 0, "Size must be a power of 2");
326 m_array
= new T
[size
];
330 /// <summary>Stores information about a segment.</summary>
331 [StructLayout(LayoutKind
.Sequential
)] // enforce layout so that padding reduces false sharing
332 private struct SegmentState
334 /// <summary>Padding to reduce false sharing between the segment's array and m_first.</summary>
335 internal Internal
.PaddingFor32 m_pad0
;
337 /// <summary>The index of the current head in the segment.</summary>
338 internal volatile int m_first
;
339 /// <summary>A copy of the current tail index.</summary>
340 internal int m_lastCopy
; // not volatile as read and written by the producer, except for IsEmpty, and there m_lastCopy is only read after reading the volatile m_first
342 /// <summary>Padding to reduce false sharing between the first and last.</summary>
343 internal Internal
.PaddingFor32 m_pad1
;
345 /// <summary>A copy of the current head index.</summary>
346 internal int m_firstCopy
; // not voliatle as only read and written by the consumer thread
347 /// <summary>The index of the current tail in the segment.</summary>
348 internal volatile int m_last
;
350 /// <summary>Padding to reduce false sharing with the last and what's after the segment.</summary>
351 internal Internal
.PaddingFor32 m_pad2
;
354 /// <summary>Debugger type proxy for a SingleProducerSingleConsumerQueue of T.</summary>
355 private sealed class SingleProducerSingleConsumerQueue_DebugView
357 /// <summary>The queue being visualized.</summary>
358 private readonly SingleProducerSingleConsumerQueue
<T
> m_queue
;
360 /// <summary>Initializes the debug view.</summary>
361 /// <param name="queue">The queue being debugged.</param>
362 public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue
<T
> queue
)
364 Debug
.Assert(queue
!= null, "Expected a non-null queue.");