3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // ProducerConsumerQueues.cs
10 // <OWNER>Microsoft, Microsoft</OWNER>
12 // Specialized producer/consumer queues.
15 // ************<IMPORTANT NOTE>*************
17 // There are two exact copies of this file:
18 // src\ndp\clr\src\bcl\system\threading\tasks\producerConsumerQueue.cs
19 // src\ndp\fx\src\dataflow\system\threading\tasks\dataflow\internal\producerConsumerQueue.cs
20 // Keep both of them consistent by changing the other file when you change this one, also avoid:
21 // 1- To reference interneal types in mscorlib
22 // 2- To reference any dataflow specific types
23 // This should be fixed post Dev11 when this class becomes public.
25 // ************</IMPORTANT NOTE>*************
26 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
28 using System
.Collections
;
29 using System
.Collections
.Concurrent
;
30 using System
.Collections
.Generic
;
31 using System
.Diagnostics
;
32 using System
.Diagnostics
.Contracts
;
33 using System
.Runtime
.InteropServices
;
35 namespace System
.Threading
.Tasks
37 /// <summary>Represents a producer/consumer queue used internally by dataflow blocks.</summary>
38 /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
39 internal interface IProducerConsumerQueue
<T
> : IEnumerable
<T
>
41 /// <summary>Enqueues an item into the queue.</summary>
42 /// <param name="item">The item to enqueue.</param>
43 /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
46 /// <summary>Attempts to dequeue an item from the queue.</summary>
47 /// <param name="result">The dequeued item.</param>
48 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
49 /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
50 bool TryDequeue(out T result
);
52 /// <summary>Gets whether the collection is currently empty.</summary>
53 /// <remarks>This method may or may not be thread-safe.</remarks>
56 /// <summary>Gets the number of items in the collection.</summary>
57 /// <remarks>In many implementations, this method will not be thread-safe.</remarks>
60 /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
61 /// <param name="syncObj">The sync object used to lock</param>
62 /// <returns>The collection count</returns>
63 int GetCountSafe(object syncObj
);
67 /// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently.
69 /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
70 [DebuggerDisplay("Count = {Count}")]
71 internal sealed class MultiProducerMultiConsumerQueue
<T
> : ConcurrentQueue
<T
>, IProducerConsumerQueue
<T
>
73 /// <summary>Enqueues an item into the queue.</summary>
74 /// <param name="item">The item to enqueue.</param>
75 void IProducerConsumerQueue
<T
>.Enqueue(T item
) { base.Enqueue(item); }
77 /// <summary>Attempts to dequeue an item from the queue.</summary>
78 /// <param name="result">The dequeued item.</param>
79 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
80 bool IProducerConsumerQueue
<T
>.TryDequeue(out T result
) { return base.TryDequeue(out result); }
82 /// <summary>Gets whether the collection is currently empty.</summary>
83 bool IProducerConsumerQueue
<T
>.IsEmpty { get { return base.IsEmpty; }
}
85 /// <summary>Gets the number of items in the collection.</summary>
86 int IProducerConsumerQueue
<T
>.Count { get { return base.Count; }
}
88 /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
89 /// <remarks>ConcurrentQueue.Count is thread safe, no need to acquire the lock.</remarks>
90 int IProducerConsumerQueue
<T
>.GetCountSafe(object syncObj
) { return base.Count; }
94 /// Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently.
96 /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
97 [DebuggerDisplay("Count = {Count}")]
98 [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue
<>.SingleProducerSingleConsumerQueue_DebugView
))]
99 internal sealed class SingleProducerSingleConsumerQueue
<T
> : IProducerConsumerQueue
<T
>
103 // SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used
104 // by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by
105 // multiple producer threads concurrently or multiple consumer threads concurrently.
107 // SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented
108 // as an array with two indexes: m_first and m_last. m_first is the index of the array slot for the consumer
109 // to read next, and m_last is the slot for the producer to write next. The circular buffer is empty when
110 // (m_first == m_last), and full when ((m_last+1) % m_array.Length == m_first).
112 // Since m_first is only ever modified by the consumer thread and m_last by the producer, the two indices can
113 // be updated without interlocked operations. As long as the queue size fits inside a single circular buffer,
114 // enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds
115 // that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big
116 // as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first
117 // empty out the old buffer and only then follow the producer into the new (larger) buffer.
119 // As described above, the enqueue operation on the fast path only modifies the m_first field of the current segment.
120 // However, it also needs to read m_last in order to verify that there is room in the current segment. Similarly, the
121 // dequeue operation on the fast path only needs to modify m_last, but also needs to read m_first to verify that the
122 // queue is non-empty. This results in true cache line sharing between the producer and the consumer.
124 // The cache line sharing issue can be mitigating by having a possibly stale copy of m_first that is owned by the producer,
125 // and a possibly stale copy of m_last that is owned by the consumer. So, the consumer state is described using
126 // (m_first, m_lastCopy) and the producer state using (m_firstCopy, m_last). The consumer state is separated from
127 // the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines.
128 // m_lastCopy is the consumer's copy of m_last. Whenever the consumer can tell that there is room in the buffer
129 // simply by observing m_lastCopy, the consumer thread does not need to read m_last and thus encounter a cache miss. Only
130 // when the buffer appears to be empty will the consumer refresh m_lastCopy from m_last. m_firstCopy is used by the producer
131 // in the same way to avoid reading m_first on the hot path.
133 /// <summary>The initial size to use for segments (in number of elements).</summary>
134 private const int INIT_SEGMENT_SIZE
= 32; // must be a power of 2
135 /// <summary>The maximum size to use for segments (in number of elements).</summary>
136 private const int MAX_SEGMENT_SIZE
= 0x1000000; // this could be made as large as Int32.MaxValue / 2
138 /// <summary>The head of the linked list of segments.</summary>
139 private volatile Segment m_head
;
140 /// <summary>The tail of the linked list of segments.</summary>
141 private volatile Segment m_tail
;
143 /// <summary>Initializes the queue.</summary>
144 internal SingleProducerSingleConsumerQueue()
146 // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation
147 Contract
.Assert(INIT_SEGMENT_SIZE
> 0, "Initial segment size must be > 0.");
148 Contract
.Assert((INIT_SEGMENT_SIZE
& (INIT_SEGMENT_SIZE
- 1)) == 0, "Initial segment size must be a power of 2");
149 Contract
.Assert(INIT_SEGMENT_SIZE
<= MAX_SEGMENT_SIZE
, "Initial segment size should be <= maximum.");
150 Contract
.Assert(MAX_SEGMENT_SIZE
< Int32
.MaxValue
/ 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur.");
152 // Initialize the queue
153 m_head
= m_tail
= new Segment(INIT_SEGMENT_SIZE
);
156 /// <summary>Enqueues an item into the queue.</summary>
157 /// <param name="item">The item to enqueue.</param>
158 public void Enqueue(T item
)
160 Segment segment
= m_tail
;
161 var array
= segment
.m_array
;
162 int last
= segment
.m_state
.m_last
; // local copy to avoid multiple volatile reads
164 // Fast path: there's obviously room in the current segment
165 int tail2
= (last
+ 1) & (array
.Length
- 1);
166 if (tail2
!= segment
.m_state
.m_firstCopy
)
169 segment
.m_state
.m_last
= tail2
;
171 // Slow path: there may not be room in the current segment.
172 else EnqueueSlow(item
, ref segment
);
175 /// <summary>Enqueues an item into the queue.</summary>
176 /// <param name="item">The item to enqueue.</param>
177 /// <param name="segment">The segment in which to first attempt to store the item.</param>
178 private void EnqueueSlow(T item
, ref Segment segment
)
180 Contract
.Requires(segment
!= null, "Expected a non-null segment.");
182 if (segment
.m_state
.m_firstCopy
!= segment
.m_state
.m_first
)
184 segment
.m_state
.m_firstCopy
= segment
.m_state
.m_first
;
185 Enqueue(item
); // will only recur once for this enqueue operation
189 int newSegmentSize
= m_tail
.m_array
.Length
<< 1; // double size
190 Contract
.Assert(newSegmentSize
> 0, "The max size should always be small enough that we don't overflow.");
191 if (newSegmentSize
> MAX_SEGMENT_SIZE
) newSegmentSize
= MAX_SEGMENT_SIZE
;
193 var newSegment
= new Segment(newSegmentSize
);
194 newSegment
.m_array
[0] = item
;
195 newSegment
.m_state
.m_last
= 1;
196 newSegment
.m_state
.m_lastCopy
= 1;
200 // Finally block to protect against corruption due to a thread abort
201 // between setting m_next and setting m_tail.
202 Volatile
.Write(ref m_tail
.m_next
, newSegment
); // ensure segment not published until item is fully stored
207 /// <summary>Attempts to dequeue an item from the queue.</summary>
208 /// <param name="result">The dequeued item.</param>
209 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
210 public bool TryDequeue(out T result
)
212 Segment segment
= m_head
;
213 var array
= segment
.m_array
;
214 int first
= segment
.m_state
.m_first
; // local copy to avoid multiple volatile reads
216 // Fast path: there's obviously data available in the current segment
217 if (first
!= segment
.m_state
.m_lastCopy
)
219 result
= array
[first
];
220 array
[first
] = default(T
); // Clear the slot to release the element
221 segment
.m_state
.m_first
= (first
+ 1) & (array
.Length
- 1);
224 // Slow path: there may not be data available in the current segment
225 else return TryDequeueSlow(ref segment
, ref array
, out result
);
228 /// <summary>Attempts to dequeue an item from the queue.</summary>
229 /// <param name="array">The array from which the item was dequeued.</param>
230 /// <param name="segment">The segment from which the item was dequeued.</param>
231 /// <param name="result">The dequeued item.</param>
232 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
233 private bool TryDequeueSlow(ref Segment segment
, ref T
[] array
, out T result
)
235 Contract
.Requires(segment
!= null, "Expected a non-null segment.");
236 Contract
.Requires(array
!= null, "Expected a non-null item array.");
238 if (segment
.m_state
.m_last
!= segment
.m_state
.m_lastCopy
)
240 segment
.m_state
.m_lastCopy
= segment
.m_state
.m_last
;
241 return TryDequeue(out result
); // will only recur once for this dequeue operation
244 if (segment
.m_next
!= null && segment
.m_state
.m_first
== segment
.m_state
.m_last
)
246 segment
= segment
.m_next
;
247 array
= segment
.m_array
;
251 var first
= segment
.m_state
.m_first
; // local copy to avoid extraneous volatile reads
253 if (first
== segment
.m_state
.m_last
)
259 result
= array
[first
];
260 array
[first
] = default(T
); // Clear the slot to release the element
261 segment
.m_state
.m_first
= (first
+ 1) & (segment
.m_array
.Length
- 1);
262 segment
.m_state
.m_lastCopy
= segment
.m_state
.m_last
; // Refresh m_lastCopy to ensure that m_first has not passed m_lastCopy
267 /// <summary>Attempts to peek at an item in the queue.</summary>
268 /// <param name="result">The peeked item.</param>
269 /// <returns>true if an item could be peeked; otherwise, false.</returns>
270 public bool TryPeek(out T result
)
272 Segment segment
= m_head
;
273 var array
= segment
.m_array
;
274 int first
= segment
.m_state
.m_first
; // local copy to avoid multiple volatile reads
276 // Fast path: there's obviously data available in the current segment
277 if (first
!= segment
.m_state
.m_lastCopy
)
279 result
= array
[first
];
282 // Slow path: there may not be data available in the current segment
283 else return TryPeekSlow(ref segment
, ref array
, out result
);
286 /// <summary>Attempts to peek at an item in the queue.</summary>
287 /// <param name="array">The array from which the item is peeked.</param>
288 /// <param name="segment">The segment from which the item is peeked.</param>
289 /// <param name="result">The peeked item.</param>
290 /// <returns>true if an item could be peeked; otherwise, false.</returns>
291 private bool TryPeekSlow(ref Segment segment
, ref T
[] array
, out T result
)
293 Contract
.Requires(segment
!= null, "Expected a non-null segment.");
294 Contract
.Requires(array
!= null, "Expected a non-null item array.");
296 if (segment
.m_state
.m_last
!= segment
.m_state
.m_lastCopy
)
298 segment
.m_state
.m_lastCopy
= segment
.m_state
.m_last
;
299 return TryPeek(out result
); // will only recur once for this peek operation
302 if (segment
.m_next
!= null && segment
.m_state
.m_first
== segment
.m_state
.m_last
)
304 segment
= segment
.m_next
;
305 array
= segment
.m_array
;
309 var first
= segment
.m_state
.m_first
; // local copy to avoid extraneous volatile reads
311 if (first
== segment
.m_state
.m_last
)
317 result
= array
[first
];
321 /// <summary>Attempts to dequeue an item from the queue.</summary>
322 /// <param name="predicate">The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.</param>
323 /// <param name="result">The dequeued item.</param>
324 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
325 public bool TryDequeueIf(Predicate
<T
> predicate
, out T result
)
327 Segment segment
= m_head
;
328 var array
= segment
.m_array
;
329 int first
= segment
.m_state
.m_first
; // local copy to avoid multiple volatile reads
331 // Fast path: there's obviously data available in the current segment
332 if (first
!= segment
.m_state
.m_lastCopy
)
334 result
= array
[first
];
335 if (predicate
== null || predicate(result
))
337 array
[first
] = default(T
); // Clear the slot to release the element
338 segment
.m_state
.m_first
= (first
+ 1) & (array
.Length
- 1);
347 // Slow path: there may not be data available in the current segment
348 else return TryDequeueIfSlow(predicate
, ref segment
, ref array
, out result
);
351 /// <summary>Attempts to dequeue an item from the queue.</summary>
352 /// <param name="predicate">The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.</param>
353 /// <param name="array">The array from which the item was dequeued.</param>
354 /// <param name="segment">The segment from which the item was dequeued.</param>
355 /// <param name="result">The dequeued item.</param>
356 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
357 private bool TryDequeueIfSlow(Predicate
<T
> predicate
, ref Segment segment
, ref T
[] array
, out T result
)
359 Contract
.Requires(segment
!= null, "Expected a non-null segment.");
360 Contract
.Requires(array
!= null, "Expected a non-null item array.");
362 if (segment
.m_state
.m_last
!= segment
.m_state
.m_lastCopy
)
364 segment
.m_state
.m_lastCopy
= segment
.m_state
.m_last
;
365 return TryDequeueIf(predicate
, out result
); // will only recur once for this dequeue operation
368 if (segment
.m_next
!= null && segment
.m_state
.m_first
== segment
.m_state
.m_last
)
370 segment
= segment
.m_next
;
371 array
= segment
.m_array
;
375 var first
= segment
.m_state
.m_first
; // local copy to avoid extraneous volatile reads
377 if (first
== segment
.m_state
.m_last
)
383 result
= array
[first
];
384 if (predicate
== null || predicate(result
))
386 array
[first
] = default(T
); // Clear the slot to release the element
387 segment
.m_state
.m_first
= (first
+ 1) & (segment
.m_array
.Length
- 1);
388 segment
.m_state
.m_lastCopy
= segment
.m_state
.m_last
; // Refresh m_lastCopy to ensure that m_first has not passed m_lastCopy
401 while (TryDequeue(out ignored
)) ;
404 /// <summary>Gets whether the collection is currently empty.</summary>
405 /// <remarks>WARNING: This should not be used concurrently without further vetting.</remarks>
408 // This implementation is optimized for calls from the consumer.
412 if (head
.m_state
.m_first
!= head
.m_state
.m_lastCopy
) return false; // m_first is volatile, so the read of m_lastCopy cannot get reordered
413 if (head
.m_state
.m_first
!= head
.m_state
.m_last
) return false;
414 return head
.m_next
== null;
418 /// <summary>Gets an enumerable for the collection.</summary>
419 /// <remarks>WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.</remarks>
420 public IEnumerator
<T
> GetEnumerator()
422 for (Segment segment
= m_head
; segment
!= null; segment
= segment
.m_next
)
424 for (int pt
= segment
.m_state
.m_first
;
425 pt
!= segment
.m_state
.m_last
;
426 pt
= (pt
+ 1) & (segment
.m_array
.Length
- 1))
428 yield return segment
.m_array
[pt
];
432 /// <summary>Gets an enumerable for the collection.</summary>
433 /// <remarks>WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.</remarks>
434 IEnumerator IEnumerable
.GetEnumerator() { return GetEnumerator(); }
436 /// <summary>Gets the number of items in the collection.</summary>
437 /// <remarks>WARNING: This should only be used for debugging purposes. It is not meant to be used concurrently.</remarks>
443 for (Segment segment
= m_head
; segment
!= null; segment
= segment
.m_next
)
445 int arraySize
= segment
.m_array
.Length
;
447 while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is
449 first
= segment
.m_state
.m_first
;
450 last
= segment
.m_state
.m_last
;
451 if (first
== segment
.m_state
.m_first
) break;
453 count
+= (last
- first
) & (arraySize
- 1);
459 /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
460 /// <remarks>The Count is not thread safe, so we need to acquire the lock.</remarks>
461 int IProducerConsumerQueue
<T
>.GetCountSafe(object syncObj
)
463 Contract
.Assert(syncObj
!= null, "The syncObj parameter is null.");
470 /// <summary>A segment in the queue containing one or more items.</summary>
471 [StructLayout(LayoutKind
.Sequential
)]
472 private sealed class Segment
474 /// <summary>The next segment in the linked list of segments.</summary>
475 internal Segment m_next
;
476 /// <summary>The data stored in this segment.</summary>
477 internal readonly T
[] m_array
;
478 /// <summary>Details about the segment.</summary>
479 internal SegmentState m_state
; // separated out to enable StructLayout attribute to take effect
481 /// <summary>Initializes the segment.</summary>
482 /// <param name="size">The size to use for this segment.</param>
483 internal Segment(int size
)
485 Contract
.Requires((size
& (size
- 1)) == 0, "Size must be a power of 2");
486 m_array
= new T
[size
];
490 /// <summary>Stores information about a segment.</summary>
491 [StructLayout(LayoutKind
.Sequential
)] // enforce layout so that padding reduces false sharing
492 private struct SegmentState
494 /// <summary>Padding to reduce false sharing between the segment's array and m_first.</summary>
495 internal PaddingFor32 m_pad0
;
497 /// <summary>The index of the current head in the segment.</summary>
498 internal volatile int m_first
;
499 /// <summary>A copy of the current tail index.</summary>
500 internal int m_lastCopy
; // not volatile as read and written by the producer, except for IsEmpty, and there m_lastCopy is only read after reading the volatile m_first
502 /// <summary>Padding to reduce false sharing between the first and last.</summary>
503 internal PaddingFor32 m_pad1
;
505 /// <summary>A copy of the current head index.</summary>
506 internal int m_firstCopy
; // not voliatle as only read and written by the consumer thread
507 /// <summary>The index of the current tail in the segment.</summary>
508 internal volatile int m_last
;
510 /// <summary>Padding to reduce false sharing with the last and what's after the segment.</summary>
511 internal PaddingFor32 m_pad2
;
514 /// <summary>Debugger type proxy for a SingleProducerSingleConsumerQueue of T.</summary>
515 private sealed class SingleProducerSingleConsumerQueue_DebugView
517 /// <summary>The queue being visualized.</summary>
518 private readonly SingleProducerSingleConsumerQueue
<T
> m_queue
;
520 /// <summary>Initializes the debug view.</summary>
521 /// <param name="enumerable">The queue being debugged.</param>
522 public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue
<T
> queue
)
524 Contract
.Requires(queue
!= null, "Expected a non-null queue.");
528 /// <summary>Gets the contents of the list.</summary>
529 [DebuggerBrowsable(DebuggerBrowsableState
.RootHidden
)]
534 List
<T
> list
= new List
<T
>();
535 foreach (T item
in m_queue
)
537 return list
.ToArray();
544 /// <summary>A placeholder class for common padding constants and eventually routines.</summary>
545 static class PaddingHelpers
547 /// <summary>A size greater than or equal to the size of the most common CPU cache lines.</summary>
548 internal const int CACHE_LINE_SIZE
= 128;
551 /// <summary>Padding structure used to minimize false sharing in SingleProducerSingleConsumerQueue{T}.</summary>
552 [StructLayout(LayoutKind
.Explicit
, Size
= PaddingHelpers
.CACHE_LINE_SIZE
- sizeof(Int32
))] // Based on common case of 64-byte cache lines