Track the end of the underlying enumerator in EnumerablePartitioner
[mono-project.git] / mcs / class / corlib / System.Collections.Concurrent.Partitioners / EnumerablePartitioner.cs
blob6ca6e8e6fc5cb5710163da9aa04f732dd6efec06
1 //
2 // EnumerablePartitioner.cs
3 //
4 // Author:
5 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
6 //
7 // Copyright (c) 2009 Jérémie "Garuma" Laval
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining a copy
10 // of this software and associated documentation files (the "Software"), to deal
11 // in the Software without restriction, including without limitation the rights
12 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 // copies of the Software, and to permit persons to whom the Software is
14 // furnished to do so, subject to the following conditions:
15 //
16 // The above copyright notice and this permission notice shall be included in
17 // all copies or substantial portions of the Software.
18 //
19 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 // THE SOFTWARE.
27 #if NET_4_0
29 using System;
30 using System.Threading;
31 using System.Threading.Tasks;
32 using System.Collections.Generic;
34 namespace System.Collections.Concurrent.Partitioners
36 // Represent a chunk partitioner
37 internal class EnumerablePartitioner<T> : OrderablePartitioner<T>
39 IEnumerable<T> source;
41 const int InitialPartitionSize = 1;
42 const int PartitionMultiplier = 2;
44 int initialPartitionSize;
45 int partitionMultiplier;
47 public EnumerablePartitioner (IEnumerable<T> source)
48 : this (source, InitialPartitionSize, PartitionMultiplier)
53 // This is used to get striped partitionning (for Take and Skip for instance
54 public EnumerablePartitioner (IEnumerable<T> source, int initialPartitionSize, int partitionMultiplier)
55 : base (true, false, true)
57 this.source = source;
58 this.initialPartitionSize = initialPartitionSize;
59 this.partitionMultiplier = partitionMultiplier;
62 public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions (int partitionCount)
64 if (partitionCount <= 0)
65 throw new ArgumentOutOfRangeException ("partitionCount");
67 IEnumerator<KeyValuePair<long, T>>[] enumerators
68 = new IEnumerator<KeyValuePair<long, T>>[partitionCount];
70 PartitionerState state = new PartitionerState ();
71 IEnumerator<T> src = source.GetEnumerator ();
72 bool isSimple = initialPartitionSize == 1 && partitionMultiplier == 1;
74 for (int i = 0; i < enumerators.Length; i++) {
75 enumerators[i] = isSimple ? GetPartitionEnumeratorSimple (src, state, i == enumerators.Length - 1) : GetPartitionEnumerator (src, state);
78 return enumerators;
81 // This partitioner that is simpler than the general case (don't use a list) is called in the case
82 // of initialPartitionSize == partitionMultiplier == 1
83 IEnumerator<KeyValuePair<long, T>> GetPartitionEnumeratorSimple (IEnumerator<T> src,
84 PartitionerState state,
85 bool last)
87 long index = -1;
88 var value = default (T);
90 try {
91 do {
92 lock (state.SyncLock) {
93 if (state.Finished)
94 break;
95 if (state.Finished = !src.MoveNext ())
96 break;
98 index = state.Index++;
99 value = src.Current;
102 yield return new KeyValuePair<long, T> (index, value);
103 } while (!state.Finished);
104 } finally {
105 if (last)
106 src.Dispose ();
110 IEnumerator<KeyValuePair<long, T>> GetPartitionEnumerator (IEnumerator<T> src, PartitionerState state)
112 int count = initialPartitionSize;
113 List<T> list = new List<T> ();
115 while (!state.Finished) {
116 list.Clear ();
117 long ind = -1;
119 lock (state.SyncLock) {
120 if (state.Finished)
121 break;
123 ind = state.Index;
125 for (int i = 0; i < count; i++) {
126 if (state.Finished = !src.MoveNext ()) {
127 if (list.Count == 0)
128 yield break;
129 else
130 break;
133 list.Add (src.Current);
134 state.Index++;
138 for (int i = 0; i < list.Count; i++)
139 yield return new KeyValuePair<long, T> (ind + i, list[i]);
141 count *= partitionMultiplier;
145 class PartitionerState
147 public bool Finished;
148 public long Index = 0;
149 public readonly object SyncLock = new object ();
153 #endif