3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // FirstQueryOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System
.Collections
.Generic
;
15 using System
.Diagnostics
.Contracts
;
16 using System
.Threading
;
18 namespace System
.Linq
.Parallel
21 /// First tries to discover the first element in the source, optionally matching a
22 /// predicate. All partitions search in parallel, publish the lowest index for a
23 /// candidate match, and reach a barrier. Only the partition that "wins" the ----,
24 /// i.e. who found the candidate with the smallest index, will yield an element.
26 /// <typeparam name="TSource"></typeparam>
27 internal sealed class FirstQueryOperator
<TSource
> : UnaryQueryOperator
<TSource
, TSource
>
30 private readonly Func
<TSource
, bool> m_predicate
; // The optional predicate used during the search.
31 private readonly bool m_prematureMergeNeeded
; // Whether to prematurely merge the input of this operator.
33 //---------------------------------------------------------------------------------------
34 // Initializes a new first operator.
37 // child - the child whose data we will reverse
40 internal FirstQueryOperator(IEnumerable
<TSource
> child
, Func
<TSource
, bool> predicate
)
43 Contract
.Assert(child
!= null, "child data source cannot be null");
44 m_predicate
= predicate
;
45 m_prematureMergeNeeded
= Child
.OrdinalIndexState
.IsWorseThan(OrdinalIndexState
.Increasing
);
48 //---------------------------------------------------------------------------------------
49 // Just opens the current operator, including opening the child and wrapping it with
50 // partitions as needed.
53 internal override QueryResults
<TSource
> Open(QuerySettings settings
, bool preferStriping
)
55 // We just open the child operator.
56 QueryResults
<TSource
> childQueryResults
= Child
.Open(settings
, false);
57 return new UnaryQueryOperatorResults(childQueryResults
, this, settings
, preferStriping
);
60 internal override void WrapPartitionedStream
<TKey
>(
61 PartitionedStream
<TSource
, TKey
> inputStream
, IPartitionedStreamRecipient
<TSource
> recipient
, bool preferStriping
, QuerySettings settings
)
63 // If the index is not at least increasing, we need to reindex.
64 if (m_prematureMergeNeeded
)
66 ListQueryResults
<TSource
> listResults
= ExecuteAndCollectResults(inputStream
, inputStream
.PartitionCount
, Child
.OutputOrdered
, preferStriping
, settings
);
67 WrapHelper
<int>(listResults
.GetPartitionedStream(), recipient
, settings
);
71 WrapHelper
<TKey
>(inputStream
, recipient
, settings
);
75 private void WrapHelper
<TKey
>(
76 PartitionedStream
<TSource
, TKey
> inputStream
, IPartitionedStreamRecipient
<TSource
> recipient
, QuerySettings settings
)
78 int partitionCount
= inputStream
.PartitionCount
;
80 // Generate the shared data.
81 FirstQueryOperatorState
<TKey
> operatorState
= new FirstQueryOperatorState
<TKey
>();
82 CountdownEvent sharedBarrier
= new CountdownEvent(partitionCount
);
84 PartitionedStream
<TSource
, int> outputStream
= new PartitionedStream
<TSource
, int>(
85 partitionCount
, Util
.GetDefaultComparer
<int>(), OrdinalIndexState
.Shuffled
);
87 for (int i
= 0; i
< partitionCount
; i
++)
89 outputStream
[i
] = new FirstQueryOperatorEnumerator
<TKey
>(
90 inputStream
[i
], m_predicate
, operatorState
, sharedBarrier
,
91 settings
.CancellationState
.MergedCancellationToken
, inputStream
.KeyComparer
, i
);
94 recipient
.Receive(outputStream
);
98 //---------------------------------------------------------------------------------------
99 // Returns an enumerable that represents the query executing sequentially.
102 internal override IEnumerable
<TSource
> AsSequentialQuery(CancellationToken token
)
104 Contract
.Assert(false, "This method should never be called as fallback to sequential is handled in ParallelEnumerable.First().");
105 throw new NotSupportedException();
108 //---------------------------------------------------------------------------------------
109 // Whether this operator performs a premature merge that would not be performed in
110 // a similar sequential operation (i.e., in LINQ to Objects).
113 internal override bool LimitsParallelism
115 get { return false; }
118 //---------------------------------------------------------------------------------------
119 // The enumerator type responsible for executing the first operation.
122 class FirstQueryOperatorEnumerator
<TKey
> : QueryOperatorEnumerator
<TSource
, int>
125 private QueryOperatorEnumerator
<TSource
, TKey
> m_source
; // The data source to enumerate.
126 private Func
<TSource
, bool> m_predicate
; // The optional predicate used during the search.
127 private bool m_alreadySearched
; // Set once the enumerator has performed the search.
128 private int m_partitionId
; // ID of this partition
130 // Data shared among partitions.
131 private FirstQueryOperatorState
<TKey
> m_operatorState
; // The current first candidate and its partition index.
132 private CountdownEvent m_sharedBarrier
; // Shared barrier, signaled when partitions find their 1st element.
133 private CancellationToken m_cancellationToken
; // Token used to cancel this operator.
134 private IComparer
<TKey
> m_keyComparer
; // Comparer for the order keys
136 //---------------------------------------------------------------------------------------
137 // Instantiates a new enumerator.
140 internal FirstQueryOperatorEnumerator(
141 QueryOperatorEnumerator
<TSource
, TKey
> source
, Func
<TSource
, bool> predicate
,
142 FirstQueryOperatorState
<TKey
> operatorState
, CountdownEvent sharedBarrier
, CancellationToken cancellationToken
,
143 IComparer
<TKey
> keyComparer
, int partitionId
)
145 Contract
.Assert(source
!= null);
146 Contract
.Assert(operatorState
!= null);
147 Contract
.Assert(sharedBarrier
!= null);
148 Contract
.Assert(keyComparer
!= null);
151 m_predicate
= predicate
;
152 m_operatorState
= operatorState
;
153 m_sharedBarrier
= sharedBarrier
;
154 m_cancellationToken
= cancellationToken
;
155 m_keyComparer
= keyComparer
;
156 m_partitionId
= partitionId
;
159 //---------------------------------------------------------------------------------------
160 // Straightforward IEnumerator<T> methods.
163 internal override bool MoveNext(ref TSource currentElement
, ref int currentKey
)
165 Contract
.Assert(m_source
!= null);
167 if (m_alreadySearched
)
172 // Look for the lowest element.
173 TSource candidate
= default(TSource
);
174 TKey candidateKey
= default(TKey
);
177 TSource
value = default(TSource
);
178 TKey key
= default(TKey
);
180 while (m_source
.MoveNext(ref value, ref key
))
182 if ((i
++ & CancellationState
.POLL_INTERVAL
) == 0)
183 CancellationState
.ThrowIfCanceled(m_cancellationToken
);
185 // If the predicate is null or the current element satisfies it, we have found the
186 // current partition's "candidate" for the first element. Note it.
187 if (m_predicate
== null || m_predicate(value))
192 lock (m_operatorState
)
194 if (m_operatorState
.m_partitionId
== -1 || m_keyComparer
.Compare(candidateKey
, m_operatorState
.m_key
) < 0)
196 m_operatorState
.m_key
= candidateKey
;
197 m_operatorState
.m_partitionId
= m_partitionId
;
207 // No matter whether we exit due to an exception or normal completion, we must ensure
208 // that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
209 m_sharedBarrier
.Signal();
212 m_alreadySearched
= true;
214 // Wait only if we may have the result
215 if (m_partitionId
== m_operatorState
.m_partitionId
)
217 m_sharedBarrier
.Wait(m_cancellationToken
);
219 // Now re-read the shared index. If it's the same as ours, we won and return true.
220 if (m_partitionId
== m_operatorState
.m_partitionId
)
222 currentElement
= candidate
;
223 currentKey
= 0; // 1st (and only) element, so we hardcode the output index to 0.
228 // If we got here, we didn't win. Return false.
232 protected override void Dispose(bool disposing
)
238 class FirstQueryOperatorState
<TKey
>
241 internal int m_partitionId
= -1;