Updates referencesource to .NET 4.7
[mono-project.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / FirstQueryOperator.cs
blob8181237ca9f2a124dba2b70d4ee8e751654b1b82
1 // ==++==
2 //
3 // Copyright (c) Microsoft Corporation. All rights reserved.
4 //
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // FirstQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
20 /// <summary>
21 /// First tries to discover the first element in the source, optionally matching a
22 /// predicate. All partitions search in parallel, publish the lowest index for a
23 /// candidate match, and reach a barrier. Only the partition that "wins" the ----,
24 /// i.e. who found the candidate with the smallest index, will yield an element.
25 /// </summary>
26 /// <typeparam name="TSource"></typeparam>
27 internal sealed class FirstQueryOperator<TSource> : UnaryQueryOperator<TSource, TSource>
30 private readonly Func<TSource, bool> m_predicate; // The optional predicate used during the search.
31 private readonly bool m_prematureMergeNeeded; // Whether to prematurely merge the input of this operator.
33 //---------------------------------------------------------------------------------------
34 // Initializes a new first operator.
36 // Arguments:
37 // child - the child whose data we will reverse
40 internal FirstQueryOperator(IEnumerable<TSource> child, Func<TSource, bool> predicate)
41 :base(child)
43 Contract.Assert(child != null, "child data source cannot be null");
44 m_predicate = predicate;
45 m_prematureMergeNeeded = Child.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
48 //---------------------------------------------------------------------------------------
49 // Just opens the current operator, including opening the child and wrapping it with
50 // partitions as needed.
53 internal override QueryResults<TSource> Open(QuerySettings settings, bool preferStriping)
55 // We just open the child operator.
56 QueryResults<TSource> childQueryResults = Child.Open(settings, false);
57 return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
60 internal override void WrapPartitionedStream<TKey>(
61 PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
63 // If the index is not at least increasing, we need to reindex.
64 if (m_prematureMergeNeeded)
66 ListQueryResults<TSource> listResults = ExecuteAndCollectResults(inputStream, inputStream.PartitionCount, Child.OutputOrdered, preferStriping, settings);
67 WrapHelper<int>(listResults.GetPartitionedStream(), recipient, settings);
69 else
71 WrapHelper<TKey>(inputStream, recipient, settings);
75 private void WrapHelper<TKey>(
76 PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, QuerySettings settings)
78 int partitionCount = inputStream.PartitionCount;
80 // Generate the shared data.
81 FirstQueryOperatorState<TKey> operatorState = new FirstQueryOperatorState<TKey>();
82 CountdownEvent sharedBarrier = new CountdownEvent(partitionCount);
84 PartitionedStream<TSource, int> outputStream = new PartitionedStream<TSource, int>(
85 partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Shuffled);
87 for (int i = 0; i < partitionCount; i++)
89 outputStream[i] = new FirstQueryOperatorEnumerator<TKey>(
90 inputStream[i], m_predicate, operatorState, sharedBarrier,
91 settings.CancellationState.MergedCancellationToken, inputStream.KeyComparer, i);
94 recipient.Receive(outputStream);
98 //---------------------------------------------------------------------------------------
99 // Returns an enumerable that represents the query executing sequentially.
102 internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
104 Contract.Assert(false, "This method should never be called as fallback to sequential is handled in ParallelEnumerable.First().");
105 throw new NotSupportedException();
108 //---------------------------------------------------------------------------------------
109 // Whether this operator performs a premature merge that would not be performed in
110 // a similar sequential operation (i.e., in LINQ to Objects).
113 internal override bool LimitsParallelism
115 get { return false; }
118 //---------------------------------------------------------------------------------------
119 // The enumerator type responsible for executing the first operation.
122 class FirstQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TSource, int>
125 private QueryOperatorEnumerator<TSource, TKey> m_source; // The data source to enumerate.
126 private Func<TSource, bool> m_predicate; // The optional predicate used during the search.
127 private bool m_alreadySearched; // Set once the enumerator has performed the search.
128 private int m_partitionId; // ID of this partition
130 // Data shared among partitions.
131 private FirstQueryOperatorState<TKey> m_operatorState; // The current first candidate and its partition index.
132 private CountdownEvent m_sharedBarrier; // Shared barrier, signaled when partitions find their 1st element.
133 private CancellationToken m_cancellationToken; // Token used to cancel this operator.
134 private IComparer<TKey> m_keyComparer; // Comparer for the order keys
136 //---------------------------------------------------------------------------------------
137 // Instantiates a new enumerator.
140 internal FirstQueryOperatorEnumerator(
141 QueryOperatorEnumerator<TSource, TKey> source, Func<TSource, bool> predicate,
142 FirstQueryOperatorState<TKey> operatorState, CountdownEvent sharedBarrier, CancellationToken cancellationToken,
143 IComparer<TKey> keyComparer, int partitionId)
145 Contract.Assert(source != null);
146 Contract.Assert(operatorState != null);
147 Contract.Assert(sharedBarrier != null);
148 Contract.Assert(keyComparer != null);
150 m_source = source;
151 m_predicate = predicate;
152 m_operatorState = operatorState;
153 m_sharedBarrier = sharedBarrier;
154 m_cancellationToken = cancellationToken;
155 m_keyComparer = keyComparer;
156 m_partitionId = partitionId;
159 //---------------------------------------------------------------------------------------
160 // Straightforward IEnumerator<T> methods.
163 internal override bool MoveNext(ref TSource currentElement, ref int currentKey)
165 Contract.Assert(m_source != null);
167 if (m_alreadySearched)
169 return false;
172 // Look for the lowest element.
173 TSource candidate = default(TSource);
174 TKey candidateKey = default(TKey);
177 TSource value = default(TSource);
178 TKey key = default(TKey);
179 int i = 0;
180 while (m_source.MoveNext(ref value, ref key))
182 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
183 CancellationState.ThrowIfCanceled(m_cancellationToken);
185 // If the predicate is null or the current element satisfies it, we have found the
186 // current partition's "candidate" for the first element. Note it.
187 if (m_predicate == null || m_predicate(value))
189 candidate = value;
190 candidateKey = key;
192 lock (m_operatorState)
194 if (m_operatorState.m_partitionId == -1 || m_keyComparer.Compare(candidateKey, m_operatorState.m_key) < 0)
196 m_operatorState.m_key = candidateKey;
197 m_operatorState.m_partitionId = m_partitionId;
201 break;
205 finally
207 // No matter whether we exit due to an exception or normal completion, we must ensure
208 // that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
209 m_sharedBarrier.Signal();
212 m_alreadySearched = true;
214 // Wait only if we may have the result
215 if (m_partitionId == m_operatorState.m_partitionId)
217 m_sharedBarrier.Wait(m_cancellationToken);
219 // Now re-read the shared index. If it's the same as ours, we won and return true.
220 if (m_partitionId == m_operatorState.m_partitionId)
222 currentElement = candidate;
223 currentKey = 0; // 1st (and only) element, so we hardcode the output index to 0.
224 return true;
228 // If we got here, we didn't win. Return false.
229 return false;
232 protected override void Dispose(bool disposing)
234 m_source.Dispose();
238 class FirstQueryOperatorState<TKey>
240 internal TKey m_key;
241 internal int m_partitionId = -1;