3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // BinaryQueryOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System
.Collections
.Generic
;
15 using System
.Diagnostics
.Contracts
;
17 namespace System
.Linq
.Parallel
20 /// The base class from which all binary query operators derive, that is, those that
21 /// have two child operators. This introduces some convenience methods for those
22 /// classes, as well as any state common to all subclasses.
24 /// <typeparam name="TLeftInput"></typeparam>
25 /// <typeparam name="TRightInput"></typeparam>
26 /// <typeparam name="TOutput"></typeparam>
27 internal abstract class BinaryQueryOperator
<TLeftInput
, TRightInput
, TOutput
> : QueryOperator
<TOutput
>
29 // A set of child operators for the current node.
30 private readonly QueryOperator
<TLeftInput
> m_leftChild
;
31 private readonly QueryOperator
<TRightInput
> m_rightChild
;
32 private OrdinalIndexState m_indexState
= OrdinalIndexState
.Shuffled
;
34 //---------------------------------------------------------------------------------------
35 // Stores a set of child operators on this query node.
38 internal BinaryQueryOperator(ParallelQuery
<TLeftInput
> leftChild
, ParallelQuery
<TRightInput
> rightChild
)
39 :this(QueryOperator
<TLeftInput
>.AsQueryOperator(leftChild
), QueryOperator
<TRightInput
>.AsQueryOperator(rightChild
))
43 internal BinaryQueryOperator(QueryOperator
<TLeftInput
> leftChild
, QueryOperator
<TRightInput
> rightChild
)
44 : base(false, leftChild
.SpecifiedQuerySettings
.Merge(rightChild
.SpecifiedQuerySettings
))
46 Contract
.Assert(leftChild
!= null && rightChild
!= null);
47 m_leftChild
= leftChild
;
48 m_rightChild
= rightChild
;
51 internal QueryOperator
<TLeftInput
> LeftChild
53 get { return m_leftChild; }
56 internal QueryOperator
<TRightInput
> RightChild
58 get { return m_rightChild; }
61 internal override sealed OrdinalIndexState OrdinalIndexState
63 get { return m_indexState; }
66 protected void SetOrdinalIndex(OrdinalIndexState indexState
)
68 m_indexState
= indexState
;
72 //---------------------------------------------------------------------------------------
73 // This method wraps accepts two child partitioned streams, and constructs an output
74 // partitioned stream. However, instead of returning the transformed partitioned
75 // stream, we pass it to a recipient object by calling recipient.Give<TNewKey>(..). That
76 // way, we can "return" a partitioned stream that uses an order key selected by the operator.
78 public abstract void WrapPartitionedStream
<TLeftKey
, TRightKey
>(
79 PartitionedStream
<TLeftInput
, TLeftKey
> leftPartitionedStream
, PartitionedStream
<TRightInput
, TRightKey
> rightPartitionedStream
,
80 IPartitionedStreamRecipient
<TOutput
> outputRecipient
, bool preferStriping
, QuerySettings settings
);
82 //---------------------------------------------------------------------------------------
83 // Implementation of QueryResults for a binary operator. The results will not be indexible
84 // unless a derived class provides that functionality.
87 internal class BinaryQueryOperatorResults
: QueryResults
<TOutput
>
89 protected QueryResults
<TLeftInput
> m_leftChildQueryResults
; // Results of the left child query
90 protected QueryResults
<TRightInput
> m_rightChildQueryResults
; // Results of the right child query
91 private BinaryQueryOperator
<TLeftInput
, TRightInput
, TOutput
> m_op
; // Operator that generated these results
92 private QuerySettings m_settings
; // Settings collected from the query
93 private bool m_preferStriping
; // If the results are indexible, should we use striping when partitioning them
95 internal BinaryQueryOperatorResults(
96 QueryResults
<TLeftInput
> leftChildQueryResults
, QueryResults
<TRightInput
> rightChildQueryResults
,
97 BinaryQueryOperator
<TLeftInput
, TRightInput
, TOutput
> op
, QuerySettings settings
,
100 m_leftChildQueryResults
= leftChildQueryResults
;
101 m_rightChildQueryResults
= rightChildQueryResults
;
103 m_settings
= settings
;
104 m_preferStriping
= preferStriping
;
107 internal override void GivePartitionedStream(IPartitionedStreamRecipient
<TOutput
> recipient
)
109 Contract
.Assert(IsIndexible
== (m_op
.OrdinalIndexState
== OrdinalIndexState
.Indexible
));
111 if (m_settings
.ExecutionMode
.Value
== ParallelExecutionMode
.Default
&& m_op
.LimitsParallelism
)
113 // We need to run the query sequentially up to and including this operator
114 IEnumerable
<TOutput
> opSequential
= m_op
.AsSequentialQuery(m_settings
.CancellationState
.ExternalCancellationToken
);
115 PartitionedStream
<TOutput
, int> result
= ExchangeUtilities
.PartitionDataSource(
116 opSequential
, m_settings
.DegreeOfParallelism
.Value
, m_preferStriping
);
117 recipient
.Receive
<int>(result
);
119 else if (IsIndexible
)
121 // The output of this operator is indexible. Pass the partitioned output into the IPartitionedStreamRecipient.
122 PartitionedStream
<TOutput
, int> result
= ExchangeUtilities
.PartitionDataSource(this, m_settings
.DegreeOfParallelism
.Value
, m_preferStriping
);
123 recipient
.Receive
<int>(result
);
127 // The common case: get partitions from the child and wrap each partition.
128 m_leftChildQueryResults
.GivePartitionedStream(new LeftChildResultsRecipient(recipient
, this, m_preferStriping
, m_settings
));
132 //---------------------------------------------------------------------------------------
133 // LeftChildResultsRecipient is a recipient of a partitioned stream. It receives a partitioned
134 // stream from the left child operator, and passes the results along to a
135 // RightChildResultsRecipient.
138 private class LeftChildResultsRecipient
: IPartitionedStreamRecipient
<TLeftInput
>
140 IPartitionedStreamRecipient
<TOutput
> m_outputRecipient
;
141 BinaryQueryOperatorResults m_results
;
142 bool m_preferStriping
;
143 QuerySettings m_settings
;
145 internal LeftChildResultsRecipient(IPartitionedStreamRecipient
<TOutput
> outputRecipient
, BinaryQueryOperatorResults results
,
146 bool preferStriping
, QuerySettings settings
)
148 m_outputRecipient
= outputRecipient
;
150 m_preferStriping
= preferStriping
;
151 m_settings
= settings
;
154 public void Receive
<TLeftKey
>(PartitionedStream
<TLeftInput
, TLeftKey
> source
)
156 RightChildResultsRecipient
<TLeftKey
> rightChildRecipient
=
157 new RightChildResultsRecipient
<TLeftKey
>(m_outputRecipient
, m_results
.m_op
, source
, m_preferStriping
, m_settings
);
158 m_results
.m_rightChildQueryResults
.GivePartitionedStream(rightChildRecipient
);
162 //---------------------------------------------------------------------------------------
163 // RightChildResultsRecipient receives a partitioned from the right child operator. Also,
164 // the partitioned stream from the left child operator is passed into the constructor.
165 // So, Receive has partitioned streams for both child operators, and also is called in
166 // a context where it has access to both TLeftKey and TRightKey. Then, it passes both
167 // streams (as arguments) and key types (as type arguments) to the operator's
168 // WrapPartitionedStream method.
171 private class RightChildResultsRecipient
<TLeftKey
> : IPartitionedStreamRecipient
<TRightInput
>
173 IPartitionedStreamRecipient
<TOutput
> m_outputRecipient
;
174 PartitionedStream
<TLeftInput
, TLeftKey
> m_leftPartitionedStream
;
175 BinaryQueryOperator
<TLeftInput
, TRightInput
, TOutput
> m_op
;
176 bool m_preferStriping
;
177 QuerySettings m_settings
;
179 internal RightChildResultsRecipient(
180 IPartitionedStreamRecipient
<TOutput
> outputRecipient
, BinaryQueryOperator
<TLeftInput
, TRightInput
, TOutput
> op
,
181 PartitionedStream
<TLeftInput
, TLeftKey
> leftPartitionedStream
, bool preferStriping
, QuerySettings settings
)
183 m_outputRecipient
= outputRecipient
;
185 m_preferStriping
= preferStriping
;
186 m_leftPartitionedStream
= leftPartitionedStream
;
187 m_settings
= settings
;
190 public void Receive
<TRightKey
>(PartitionedStream
<TRightInput
, TRightKey
> rightPartitionedStream
)
192 m_op
.WrapPartitionedStream(m_leftPartitionedStream
, rightPartitionedStream
, m_outputRecipient
, m_preferStriping
, m_settings
);