3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // OrderPreservingMergeHelper.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System
.Collections
.Generic
;
15 using System
.Threading
.Tasks
;
16 using System
.Diagnostics
.Contracts
;
18 namespace System
.Linq
.Parallel
21 /// The order preserving merge helper guarantees the output stream is in a specific order. This is done
22 /// by comparing keys from a set of already-sorted input partitions, and coalescing output data using
23 /// incremental key comparisons.
25 /// <typeparam name="TInputOutput"></typeparam>
26 /// <typeparam name="TKey"></typeparam>
27 internal class OrderPreservingMergeHelper
<TInputOutput
, TKey
> : IMergeHelper
<TInputOutput
>
29 private QueryTaskGroupState m_taskGroupState
; // State shared among tasks.
30 private PartitionedStream
<TInputOutput
, TKey
> m_partitions
; // Source partitions.
31 private Shared
<TInputOutput
[]> m_results
; // The array where results are stored.
32 private TaskScheduler m_taskScheduler
; // The task manager to execute the query.
34 //-----------------------------------------------------------------------------------
35 // Instantiates a new merge helper.
38 // partitions - the source partitions from which to consume data.
39 // ignoreOutput - whether we're enumerating "for effect" or for output.
42 internal OrderPreservingMergeHelper(PartitionedStream
<TInputOutput
, TKey
> partitions
, TaskScheduler taskScheduler
,
43 CancellationState cancellationState
, int queryId
)
45 Contract
.Assert(partitions
!= null);
47 TraceHelpers
.TraceInfo("KeyOrderPreservingMergeHelper::.ctor(..): creating an order preserving merge helper");
49 m_taskGroupState
= new QueryTaskGroupState(cancellationState
, queryId
);
50 m_partitions
= partitions
;
51 m_results
= new Shared
<TInputOutput
[]>(null);
52 m_taskScheduler
= taskScheduler
;
55 //-----------------------------------------------------------------------------------
56 // Schedules execution of the merge itself.
59 // ordinalIndexState - the state of the ordinal index of the merged partitions
62 void IMergeHelper
<TInputOutput
>.Execute()
64 OrderPreservingSpoolingTask
<TInputOutput
, TKey
>.Spool(m_taskGroupState
, m_partitions
, m_results
, m_taskScheduler
);
67 //-----------------------------------------------------------------------------------
68 // Gets the enumerator from which to enumerate output results.
71 IEnumerator
<TInputOutput
> IMergeHelper
<TInputOutput
>.GetEnumerator()
73 Contract
.Assert(m_results
.Value
!= null);
74 return ((IEnumerable
<TInputOutput
>)m_results
.Value
).GetEnumerator();
78 //-----------------------------------------------------------------------------------
79 // Returns the results as an array.
82 public TInputOutput
[] GetResultsAsArray()
84 return m_results
.Value
;