3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // OrderedHashRepartitionStream.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System
.Collections
.Generic
;
15 using System
.Threading
;
17 namespace System
.Linq
.Parallel
19 internal class OrderedHashRepartitionStream
<TInputOutput
, THashKey
, TOrderKey
> : HashRepartitionStream
<TInputOutput
, THashKey
, TOrderKey
>
21 internal OrderedHashRepartitionStream(
22 PartitionedStream
<TInputOutput
, TOrderKey
> inputStream
, Func
<TInputOutput
, THashKey
> hashKeySelector
,
23 IEqualityComparer
<THashKey
> hashKeyComparer
, IEqualityComparer
<TInputOutput
> elementComparer
, CancellationToken cancellationToken
)
24 : base(inputStream
.PartitionCount
, inputStream
.KeyComparer
, hashKeyComparer
, elementComparer
)
27 new OrderedHashRepartitionEnumerator
<TInputOutput
, THashKey
, TOrderKey
>[inputStream
.PartitionCount
];
29 // Initialize state shared among the partitions. A latch and a matrix of buffers. Note that
30 // the actual elements in the buffer array are lazily allocated if needed.
31 CountdownEvent barrier
= new CountdownEvent(inputStream
.PartitionCount
);
32 ListChunk
<Pair
<TInputOutput
, THashKey
>>[,] valueExchangeMatrix
=
33 new ListChunk
<Pair
<TInputOutput
, THashKey
>>[inputStream
.PartitionCount
, inputStream
.PartitionCount
];
34 ListChunk
<TOrderKey
>[,] keyExchangeMatrix
= new ListChunk
<TOrderKey
>[inputStream
.PartitionCount
, inputStream
.PartitionCount
];
36 // Now construct each partition object.
37 for (int i
= 0; i
< inputStream
.PartitionCount
; i
++)
39 m_partitions
[i
] = new OrderedHashRepartitionEnumerator
<TInputOutput
, THashKey
, TOrderKey
>(
40 inputStream
[i
], inputStream
.PartitionCount
, i
, hashKeySelector
, this, barrier
,
41 valueExchangeMatrix
, keyExchangeMatrix
, cancellationToken
);