Updates referencesource to .NET 4.7
[mono-project.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / BinaryQueryOperator.cs
blob3beed03e1ec4c1c4cc39e12349843414d00968e2
1 // ==++==
2 //
3 // Copyright (c) Microsoft Corporation. All rights reserved.
4 //
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // BinaryQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
17 namespace System.Linq.Parallel
19 /// <summary>
20 /// The base class from which all binary query operators derive, that is, those that
21 /// have two child operators. This introduces some convenience methods for those
22 /// classes, as well as any state common to all subclasses.
23 /// </summary>
24 /// <typeparam name="TLeftInput"></typeparam>
25 /// <typeparam name="TRightInput"></typeparam>
26 /// <typeparam name="TOutput"></typeparam>
27 internal abstract class BinaryQueryOperator<TLeftInput, TRightInput, TOutput> : QueryOperator<TOutput>
29 // A set of child operators for the current node.
30 private readonly QueryOperator<TLeftInput> m_leftChild;
31 private readonly QueryOperator<TRightInput> m_rightChild;
32 private OrdinalIndexState m_indexState = OrdinalIndexState.Shuffled;
34 //---------------------------------------------------------------------------------------
35 // Stores a set of child operators on this query node.
38 internal BinaryQueryOperator(ParallelQuery<TLeftInput> leftChild, ParallelQuery<TRightInput> rightChild)
39 :this(QueryOperator<TLeftInput>.AsQueryOperator(leftChild), QueryOperator<TRightInput>.AsQueryOperator(rightChild))
43 internal BinaryQueryOperator(QueryOperator<TLeftInput> leftChild, QueryOperator<TRightInput> rightChild)
44 : base(false, leftChild.SpecifiedQuerySettings.Merge(rightChild.SpecifiedQuerySettings))
46 Contract.Assert(leftChild != null && rightChild != null);
47 m_leftChild = leftChild;
48 m_rightChild = rightChild;
51 internal QueryOperator<TLeftInput> LeftChild
53 get { return m_leftChild; }
56 internal QueryOperator<TRightInput> RightChild
58 get { return m_rightChild; }
61 internal override sealed OrdinalIndexState OrdinalIndexState
63 get { return m_indexState; }
66 protected void SetOrdinalIndex(OrdinalIndexState indexState)
68 m_indexState = indexState;
72 //---------------------------------------------------------------------------------------
73 // This method wraps accepts two child partitioned streams, and constructs an output
74 // partitioned stream. However, instead of returning the transformed partitioned
75 // stream, we pass it to a recipient object by calling recipient.Give<TNewKey>(..). That
76 // way, we can "return" a partitioned stream that uses an order key selected by the operator.
78 public abstract void WrapPartitionedStream<TLeftKey, TRightKey>(
79 PartitionedStream<TLeftInput, TLeftKey> leftPartitionedStream, PartitionedStream<TRightInput, TRightKey> rightPartitionedStream,
80 IPartitionedStreamRecipient<TOutput> outputRecipient, bool preferStriping, QuerySettings settings);
82 //---------------------------------------------------------------------------------------
83 // Implementation of QueryResults for a binary operator. The results will not be indexible
84 // unless a derived class provides that functionality.
87 internal class BinaryQueryOperatorResults : QueryResults<TOutput>
89 protected QueryResults<TLeftInput> m_leftChildQueryResults; // Results of the left child query
90 protected QueryResults<TRightInput> m_rightChildQueryResults; // Results of the right child query
91 private BinaryQueryOperator<TLeftInput, TRightInput, TOutput> m_op; // Operator that generated these results
92 private QuerySettings m_settings; // Settings collected from the query
93 private bool m_preferStriping; // If the results are indexible, should we use striping when partitioning them
95 internal BinaryQueryOperatorResults(
96 QueryResults<TLeftInput> leftChildQueryResults, QueryResults<TRightInput> rightChildQueryResults,
97 BinaryQueryOperator<TLeftInput, TRightInput, TOutput> op, QuerySettings settings,
98 bool preferStriping)
100 m_leftChildQueryResults = leftChildQueryResults;
101 m_rightChildQueryResults = rightChildQueryResults;
102 m_op = op;
103 m_settings = settings;
104 m_preferStriping = preferStriping;
107 internal override void GivePartitionedStream(IPartitionedStreamRecipient<TOutput> recipient)
109 Contract.Assert(IsIndexible == (m_op.OrdinalIndexState == OrdinalIndexState.Indexible));
111 if (m_settings.ExecutionMode.Value == ParallelExecutionMode.Default && m_op.LimitsParallelism)
113 // We need to run the query sequentially up to and including this operator
114 IEnumerable<TOutput> opSequential = m_op.AsSequentialQuery(m_settings.CancellationState.ExternalCancellationToken);
115 PartitionedStream<TOutput, int> result = ExchangeUtilities.PartitionDataSource(
116 opSequential, m_settings.DegreeOfParallelism.Value, m_preferStriping);
117 recipient.Receive<int>(result);
119 else if (IsIndexible)
121 // The output of this operator is indexible. Pass the partitioned output into the IPartitionedStreamRecipient.
122 PartitionedStream<TOutput, int> result = ExchangeUtilities.PartitionDataSource(this, m_settings.DegreeOfParallelism.Value, m_preferStriping);
123 recipient.Receive<int>(result);
125 else
127 // The common case: get partitions from the child and wrap each partition.
128 m_leftChildQueryResults.GivePartitionedStream(new LeftChildResultsRecipient(recipient, this, m_preferStriping, m_settings));
132 //---------------------------------------------------------------------------------------
133 // LeftChildResultsRecipient is a recipient of a partitioned stream. It receives a partitioned
134 // stream from the left child operator, and passes the results along to a
135 // RightChildResultsRecipient.
138 private class LeftChildResultsRecipient : IPartitionedStreamRecipient<TLeftInput>
140 IPartitionedStreamRecipient<TOutput> m_outputRecipient;
141 BinaryQueryOperatorResults m_results;
142 bool m_preferStriping;
143 QuerySettings m_settings;
145 internal LeftChildResultsRecipient(IPartitionedStreamRecipient<TOutput> outputRecipient, BinaryQueryOperatorResults results,
146 bool preferStriping, QuerySettings settings)
148 m_outputRecipient = outputRecipient;
149 m_results = results;
150 m_preferStriping = preferStriping;
151 m_settings = settings;
154 public void Receive<TLeftKey>(PartitionedStream<TLeftInput, TLeftKey> source)
156 RightChildResultsRecipient<TLeftKey> rightChildRecipient =
157 new RightChildResultsRecipient<TLeftKey>(m_outputRecipient, m_results.m_op, source, m_preferStriping, m_settings);
158 m_results.m_rightChildQueryResults.GivePartitionedStream(rightChildRecipient);
162 //---------------------------------------------------------------------------------------
163 // RightChildResultsRecipient receives a partitioned from the right child operator. Also,
164 // the partitioned stream from the left child operator is passed into the constructor.
165 // So, Receive has partitioned streams for both child operators, and also is called in
166 // a context where it has access to both TLeftKey and TRightKey. Then, it passes both
167 // streams (as arguments) and key types (as type arguments) to the operator's
168 // WrapPartitionedStream method.
171 private class RightChildResultsRecipient<TLeftKey> : IPartitionedStreamRecipient<TRightInput>
173 IPartitionedStreamRecipient<TOutput> m_outputRecipient;
174 PartitionedStream<TLeftInput, TLeftKey> m_leftPartitionedStream;
175 BinaryQueryOperator<TLeftInput, TRightInput, TOutput> m_op;
176 bool m_preferStriping;
177 QuerySettings m_settings;
179 internal RightChildResultsRecipient(
180 IPartitionedStreamRecipient<TOutput> outputRecipient, BinaryQueryOperator<TLeftInput, TRightInput, TOutput> op,
181 PartitionedStream<TLeftInput, TLeftKey> leftPartitionedStream, bool preferStriping, QuerySettings settings)
183 m_outputRecipient = outputRecipient;
184 m_op = op;
185 m_preferStriping = preferStriping;
186 m_leftPartitionedStream = leftPartitionedStream;
187 m_settings = settings;
190 public void Receive<TRightKey>(PartitionedStream<TRightInput, TRightKey> rightPartitionedStream)
192 m_op.WrapPartitionedStream(m_leftPartitionedStream, rightPartitionedStream, m_outputRecipient, m_preferStriping, m_settings);