3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // ReverseQueryOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System
.Collections
.Generic
;
15 using System
.Diagnostics
.Contracts
;
16 using System
.Threading
;
18 namespace System
.Linq
.Parallel
21 /// Reverse imposes ordinal order preservation. There are normally two phases to this
22 /// operator's execution. Each partition first builds a buffer containing all of its
23 /// elements, and then proceeds to yielding the elements in reverse. There is a
24 /// 'barrier' (but not a blocking barrier) in between these two steps, at which point the largest index becomes
25 /// known. This is necessary so that when elements from the buffer are yielded, the
26 /// CurrentIndex can be reported as the largest index minus the original index (thereby
27 /// reversing the indices as well as the elements themselves). If the largest index is
28 /// known a priori, because we have an array for example, we can avoid the barrier in
29 /// between the steps.
31 /// <typeparam name="TSource"></typeparam>
32 internal sealed class ReverseQueryOperator
<TSource
> : UnaryQueryOperator
<TSource
, TSource
>
35 //---------------------------------------------------------------------------------------
36 // Initializes a new reverse operator.
39 // child - the child whose data we will reverse
42 internal ReverseQueryOperator(IEnumerable
<TSource
> child
)
45 Contract
.Assert(child
!= null, "child data source cannot be null");
47 if (Child
.OrdinalIndexState
== OrdinalIndexState
.Indexible
)
49 SetOrdinalIndexState(OrdinalIndexState
.Indexible
);
53 SetOrdinalIndexState(OrdinalIndexState
.Shuffled
);
58 internal override void WrapPartitionedStream
<TKey
>(
59 PartitionedStream
<TSource
, TKey
> inputStream
, IPartitionedStreamRecipient
<TSource
> recipient
, bool preferStriping
, QuerySettings settings
)
61 Contract
.Assert(Child
.OrdinalIndexState
!= OrdinalIndexState
.Indexible
, "Don't take this code path if the child is indexible.");
63 int partitionCount
= inputStream
.PartitionCount
;
64 PartitionedStream
<TSource
, TKey
> outputStream
= new PartitionedStream
<TSource
, TKey
>(
65 partitionCount
, new ReverseComparer
<TKey
>(inputStream
.KeyComparer
), OrdinalIndexState
.Shuffled
);
67 for (int i
= 0; i
< partitionCount
; i
++)
69 outputStream
[i
] = new ReverseQueryOperatorEnumerator
<TKey
>(inputStream
[i
], settings
.CancellationState
.MergedCancellationToken
);
71 recipient
.Receive(outputStream
);
74 //---------------------------------------------------------------------------------------
75 // Just opens the current operator, including opening the child and wrapping it with
76 // partitions as needed.
79 internal override QueryResults
<TSource
> Open(QuerySettings settings
, bool preferStriping
)
81 QueryResults
<TSource
> childQueryResults
= Child
.Open(settings
, false);
82 return ReverseQueryOperatorResults
.NewResults(childQueryResults
, this, settings
, preferStriping
);
85 //---------------------------------------------------------------------------------------
86 // Returns an enumerable that represents the query executing sequentially.
89 internal override IEnumerable
<TSource
> AsSequentialQuery(CancellationToken token
)
91 IEnumerable
<TSource
> wrappedChild
= CancellableEnumerable
.Wrap(Child
.AsSequentialQuery(token
), token
);
92 return wrappedChild
.Reverse();
95 //---------------------------------------------------------------------------------------
96 // Whether this operator performs a premature merge that would not be performed in
97 // a similar sequential operation (i.e., in LINQ to Objects).
100 internal override bool LimitsParallelism
102 get { return false; }
105 //---------------------------------------------------------------------------------------
106 // The enumerator type responsible for executing the reverse operation.
109 class ReverseQueryOperatorEnumerator
<TKey
> : QueryOperatorEnumerator
<TSource
, TKey
>
112 private readonly QueryOperatorEnumerator
<TSource
, TKey
> m_source
; // The data source to reverse.
113 private readonly CancellationToken m_cancellationToken
;
114 private List
<Pair
<TSource
, TKey
>> m_buffer
; // Our buffer. [allocate in moveNext to avoid false-sharing]
115 private Shared
<int> m_bufferIndex
; // Our current index within the buffer. [allocate in moveNext to avoid false-sharing]
117 //---------------------------------------------------------------------------------------
118 // Instantiates a new select enumerator.
121 internal ReverseQueryOperatorEnumerator(QueryOperatorEnumerator
<TSource
, TKey
> source
,
122 CancellationToken cancellationToken
)
124 Contract
.Assert(source
!= null);
126 m_cancellationToken
= cancellationToken
;
129 //---------------------------------------------------------------------------------------
130 // Straightforward IEnumerator<T> methods.
133 internal override bool MoveNext(ref TSource currentElement
, ref TKey currentKey
)
135 // If the buffer has not been created, we will generate it lazily on demand.
136 if (m_buffer
== null)
138 m_bufferIndex
= new Shared
<int>(0);
139 // Buffer all of our data.
140 m_buffer
= new List
<Pair
<TSource
, TKey
>>();
141 TSource current
= default(TSource
);
142 TKey key
= default(TKey
);
144 while (m_source
.MoveNext(ref current
, ref key
))
146 if ((i
++ & CancellationState
.POLL_INTERVAL
) == 0)
147 CancellationState
.ThrowIfCanceled(m_cancellationToken
);
149 m_buffer
.Add(new Pair
<TSource
, TKey
>(current
, key
));
150 m_bufferIndex
.Value
++;
154 // Continue yielding elements from our buffer.
155 if (--m_bufferIndex
.Value
>= 0)
157 currentElement
= m_buffer
[m_bufferIndex
.Value
].First
;
158 currentKey
= m_buffer
[m_bufferIndex
.Value
].Second
;
165 protected override void Dispose(bool disposing
)
171 //-----------------------------------------------------------------------------------
172 // Query results for a Reverse operator. The results are indexible if the child
173 // results were indexible.
176 class ReverseQueryOperatorResults
: UnaryQueryOperatorResults
178 private int m_count
; // The number of elements in child results
180 public static QueryResults
<TSource
> NewResults(
181 QueryResults
<TSource
> childQueryResults
, ReverseQueryOperator
<TSource
> op
,
182 QuerySettings settings
, bool preferStriping
)
184 if (childQueryResults
.IsIndexible
)
186 return new ReverseQueryOperatorResults(
187 childQueryResults
, op
, settings
, preferStriping
);
191 return new UnaryQueryOperatorResults(
192 childQueryResults
, op
, settings
, preferStriping
);
196 private ReverseQueryOperatorResults(
197 QueryResults
<TSource
> childQueryResults
, ReverseQueryOperator
<TSource
> op
,
198 QuerySettings settings
, bool preferStriping
)
199 : base(childQueryResults
, op
, settings
, preferStriping
)
201 Contract
.Assert(m_childQueryResults
.IsIndexible
);
202 m_count
= m_childQueryResults
.ElementsCount
;
205 internal override bool IsIndexible
210 internal override int ElementsCount
214 Contract
.Assert(m_count
>= 0);
219 internal override TSource
GetElement(int index
)
221 Contract
.Assert(m_count
>= 0);
222 Contract
.Assert(index
>= 0);
223 Contract
.Assert(index
< m_count
);
225 return m_childQueryResults
.GetElement(m_count
- index
- 1);