4 // Copyright (c) 2008 Jérémie "Garuma" Laval
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 using System
.Collections
.Generic
;
28 using System
.Collections
.Concurrent
;
29 using System
.Threading
;
31 namespace System
.Threading
.Tasks
33 public static class Parallel
35 internal static int GetBestWorkerNumber ()
37 return GetBestWorkerNumber (TaskScheduler
.Current
);
40 internal static int GetBestWorkerNumber (TaskScheduler scheduler
)
42 return scheduler
.MaximumConcurrencyLevel
;
45 static int GetBestWorkerNumber (int from, int to
, ParallelOptions options
, out int step
)
47 int num
= Math
.Min (GetBestWorkerNumber (),
48 options
!= null && options
.MaxDegreeOfParallelism
!= -1 ? options
.MaxDegreeOfParallelism
: int.MaxValue
);
49 // Integer range that each task process
50 step
= Math
.Min (5, (to
- from) / num
);
57 static void HandleExceptions (IEnumerable
<Task
> tasks
)
59 HandleExceptions (tasks
, null);
62 static void HandleExceptions (IEnumerable
<Task
> tasks
, ParallelLoopState
.ExternalInfos infos
)
64 List
<Exception
> exs
= new List
<Exception
> ();
65 foreach (Task t
in tasks
) {
66 if (t
.Exception
!= null)
67 exs
.Add (t
.Exception
);
72 infos
.IsExceptional
= true;
74 throw new AggregateException (exs
);
78 static void InitTasks (Task
[] tasks
, int count
, Action action
, ParallelOptions options
)
80 TaskCreationOptions creation
= TaskCreationOptions
.LongRunning
;
82 for (int i
= 0; i
< count
; i
++) {
84 tasks
[i
] = Task
.Factory
.StartNew (action
, creation
);
86 tasks
[i
] = Task
.Factory
.StartNew (action
, options
.CancellationToken
, creation
, options
.TaskScheduler
);
91 public static ParallelLoopResult
For (int from, int to
, Action
<int> action
)
93 return For (from, to
, null, action
);
96 public static ParallelLoopResult
For (int from, int to
, Action
<int, ParallelLoopState
> action
)
98 return For (from, to
, null, action
);
101 public static ParallelLoopResult
For (int from, int to
, ParallelOptions options
, Action
<int> action
)
103 return For (from, to
, options
, (index
, state
) => action (index
));
106 public static ParallelLoopResult
For (int from, int to
, ParallelOptions options
, Action
<int, ParallelLoopState
> action
)
108 return For
<object> (from, to
, options
, null, (i
, s
, l
) => { action (i, s); return null; }
, null);
111 public static ParallelLoopResult For
<TLocal
> (int from, int to
, Func
<TLocal
> init
,
112 Func
<int, ParallelLoopState
, TLocal
, TLocal
> action
, Action
<TLocal
> destruct
)
114 return For
<TLocal
> (from, to
, null, init
, action
, destruct
);
118 public static ParallelLoopResult For
<TLocal
> (int from, int to
, ParallelOptions options
,
120 Func
<int, ParallelLoopState
, TLocal
, TLocal
> action
,
121 Action
<TLocal
> destruct
)
124 throw new ArgumentNullException ("action");
126 // Number of task to be launched (normally == Env.ProcessorCount)
128 int num
= GetBestWorkerNumber (from, to
, options
, out step
);
130 // Each worker put the indexes it's responsible for here
131 // so that other worker may steal if they starve.
132 SimpleConcurrentBag
<int> bag
= new SimpleConcurrentBag
<int> (num
);
133 Task
[] tasks
= new Task
[num
];
134 ParallelLoopState
.ExternalInfos infos
= new ParallelLoopState
.ExternalInfos ();
136 Func
<ParallelLoopState
, bool> cancellationTokenTest
= (s
) => {
137 if (options
!= null && options
.CancellationToken
.IsCancellationRequested
) {
144 Func
<int, bool> breakTest
= (i
) => infos
.LowestBreakIteration
!= null && infos
.LowestBreakIteration
> i
;
146 int currentIndex
= from;
148 Action workerMethod
= delegate {
150 TLocal local
= (init
== null) ? default (TLocal
) : init ();
152 ParallelLoopState state
= new ParallelLoopState (infos
);
153 int workIndex
= bag
.GetNextIndex ();
156 while (currentIndex
< to
&& (index
= Interlocked
.Add (ref currentIndex
, step
) - step
) < to
) {
157 if (infos
.IsStopped
.Value
)
160 if (cancellationTokenTest (state
))
163 for (int i
= index
; i
< to
&& i
< index
+ step
; i
++)
164 bag
.Add (workIndex
, i
);
166 for (int i
= index
; i
< to
&& i
< index
+ step
&& bag
.TryTake (workIndex
, out actual
); i
++) {
167 if (infos
.IsStopped
.Value
)
170 if (cancellationTokenTest (state
))
173 if (breakTest (actual
))
176 state
.CurrentIteration
= actual
;
177 local
= action (actual
, state
, local
);
181 while (bag
.TrySteal (workIndex
, out actual
)) {
182 if (infos
.IsStopped
.Value
)
185 if (cancellationTokenTest (state
))
188 if (breakTest (actual
))
191 state
.CurrentIteration
= actual
;
192 local
= action (actual
, state
, local
);
195 if (destruct
!= null)
200 InitTasks (tasks
, num
, workerMethod
, options
);
203 Task
.WaitAll (tasks
);
205 HandleExceptions (tasks
, infos
);
208 return new ParallelLoopResult (infos
.LowestBreakIteration
, !(infos
.IsStopped
.Value
|| infos
.IsExceptional
));
214 static ParallelLoopResult ForEach
<TSource
, TLocal
> (Func
<int, IList
<IEnumerator
<TSource
>>> enumerable
, ParallelOptions options
,
215 Func
<TLocal
> init
, Func
<TSource
, ParallelLoopState
, TLocal
, TLocal
> action
,
216 Action
<TLocal
> destruct
)
218 int num
= Math
.Min (GetBestWorkerNumber (),
219 options
!= null && options
.MaxDegreeOfParallelism
!= -1 ? options
.MaxDegreeOfParallelism
: int.MaxValue
);
221 Task
[] tasks
= new Task
[num
];
222 ParallelLoopState
.ExternalInfos infos
= new ParallelLoopState
.ExternalInfos ();
224 SimpleConcurrentBag
<TSource
> bag
= new SimpleConcurrentBag
<TSource
> (num
);
225 const int bagCount
= 5;
227 IList
<IEnumerator
<TSource
>> slices
= enumerable (num
);
231 Func
<ParallelLoopState
, bool> cancellationTokenTest
= (s
) => {
232 if (options
!= null && options
.CancellationToken
.IsCancellationRequested
) {
239 Action workerMethod
= delegate {
240 IEnumerator
<TSource
> slice
= slices
[Interlocked
.Increment (ref sliceIndex
) - 1];
242 TLocal local
= (init
!= null) ? init () : default (TLocal
);
243 ParallelLoopState state
= new ParallelLoopState (infos
);
244 int workIndex
= bag
.GetNextIndex ();
251 if (infos
.IsStopped
.Value
)
254 if (cancellationTokenTest (state
))
257 for (int i
= 0; i
< bagCount
&& (cont
= slice
.MoveNext ()); i
++) {
258 bag
.Add (workIndex
, slice
.Current
);
261 for (int i
= 0; i
< bagCount
&& bag
.TryTake (workIndex
, out element
); i
++) {
262 if (infos
.IsStopped
.Value
)
265 if (cancellationTokenTest (state
))
268 local
= action (element
, state
, local
);
272 while (bag
.TrySteal (workIndex
, out element
)) {
273 if (infos
.IsStopped
.Value
)
276 if (cancellationTokenTest (state
))
279 local
= action (element
, state
, local
);
282 if (destruct
!= null)
287 InitTasks (tasks
, num
, workerMethod
, options
);
290 Task
.WaitAll (tasks
);
292 HandleExceptions (tasks
, infos
);
295 return new ParallelLoopResult (infos
.LowestBreakIteration
, !(infos
.IsStopped
.Value
|| infos
.IsExceptional
));
299 public static ParallelLoopResult ForEach
<TSource
> (IEnumerable
<TSource
> enumerable
, Action
<TSource
> action
)
301 return ForEach
<TSource
, object> (Partitioner
.Create (enumerable
), ParallelOptions
.Default
, null,
302 (e
, s
, l
) => { action (e); return null; }
, null);
305 public static ParallelLoopResult ForEach
<TSource
> (IEnumerable
<TSource
> enumerable
, Action
<TSource
, ParallelLoopState
> action
)
307 return ForEach
<TSource
, object> (Partitioner
.Create (enumerable
), ParallelOptions
.Default
, null,
308 (e
, s
, l
) => { action (e, s); return null; }
, null);
311 public static ParallelLoopResult ForEach
<TSource
> (IEnumerable
<TSource
> enumerable
,
312 Action
<TSource
, ParallelLoopState
, long> action
)
314 return ForEach
<TSource
, object> (Partitioner
.Create (enumerable
), ParallelOptions
.Default
, null,
315 (e
, s
, l
) => { action (e, s, -1); return null; }
, null);
318 public static ParallelLoopResult ForEach
<TSource
> (Partitioner
<TSource
> source
,
319 Action
<TSource
, ParallelLoopState
> body
)
321 return ForEach
<TSource
, object> (source
, ParallelOptions
.Default
, null, (e
, s
, l
) => { body (e, s); return null; }
, null);
324 public static ParallelLoopResult ForEach
<TSource
> (OrderablePartitioner
<TSource
> source
,
325 Action
<TSource
, ParallelLoopState
, long> body
)
328 return ForEach
<TSource
, object> (source
, ParallelOptions
.Default
, null, (e
, s
, i
, l
) => { body (e, s, i); return null; }
, null);
331 public static ParallelLoopResult ForEach
<TSource
> (Partitioner
<TSource
> source
,
332 Action
<TSource
> body
)
335 return ForEach
<TSource
, object> (source
, ParallelOptions
.Default
, null, (e
, s
, l
) => { body (e); return null; }
, null);
338 public static ParallelLoopResult ForEach
<TSource
> (IEnumerable
<TSource
> source
, ParallelOptions parallelOptions
,
339 Action
<TSource
> body
)
341 return ForEach
<TSource
, object> (Partitioner
.Create (source
), parallelOptions
, null,
342 (e
, s
, l
) => { body (e); return null; }
, null);
345 public static ParallelLoopResult ForEach
<TSource
> (IEnumerable
<TSource
> source
, ParallelOptions parallelOptions
,
346 Action
<TSource
, ParallelLoopState
> body
)
348 return ForEach
<TSource
, object> (Partitioner
.Create (source
), parallelOptions
, null,
349 (e
, s
, l
) => { body (e, s); return null; }
, null);
352 public static ParallelLoopResult ForEach
<TSource
> (IEnumerable
<TSource
> source
, ParallelOptions parallelOptions
,
353 Action
<TSource
, ParallelLoopState
, long> body
)
355 return ForEach
<TSource
, object> (Partitioner
.Create (source
), parallelOptions
,
356 null, (e
, s
, i
, l
) => { body (e, s, i); return null; }
, null);
359 public static ParallelLoopResult ForEach
<TSource
> (OrderablePartitioner
<TSource
> source
, ParallelOptions parallelOptions
,
360 Action
<TSource
, ParallelLoopState
, long> body
)
363 return ForEach
<TSource
, object> (source
, parallelOptions
, null, (e
, s
, i
, l
) => { body (e, s, i); return null; }
, null);
366 public static ParallelLoopResult ForEach
<TSource
> (Partitioner
<TSource
> source
, ParallelOptions parallelOptions
,
367 Action
<TSource
> body
)
369 return ForEach
<TSource
, object> (source
, parallelOptions
, null, (e
, s
, l
) => {body (e); return null; }
, null);
372 public static ParallelLoopResult ForEach
<TSource
> (Partitioner
<TSource
> source
, ParallelOptions parallelOptions
,
373 Action
<TSource
, ParallelLoopState
> body
)
375 return ForEach
<TSource
, object> (source
, parallelOptions
, null, (e
, s
, l
) => { body (e, s); return null; }
, null);
378 public static ParallelLoopResult ForEach
<TSource
, TLocal
> (IEnumerable
<TSource
> source
, Func
<TLocal
> localInit
,
379 Func
<TSource
, ParallelLoopState
, TLocal
, TLocal
> body
,
380 Action
<TLocal
> localFinally
)
382 return ForEach
<TSource
, TLocal
> ((Partitioner
<TSource
>)Partitioner
.Create (source
), null, localInit
, body
, localFinally
);
385 public static ParallelLoopResult ForEach
<TSource
, TLocal
> (IEnumerable
<TSource
> source
, Func
<TLocal
> localInit
,
386 Func
<TSource
, ParallelLoopState
, long, TLocal
, TLocal
> body
,
387 Action
<TLocal
> localFinally
)
389 return ForEach
<TSource
, TLocal
> (Partitioner
.Create (source
), null, localInit
, body
, localFinally
);
392 public static ParallelLoopResult ForEach
<TSource
, TLocal
> (OrderablePartitioner
<TSource
> source
, Func
<TLocal
> localInit
,
393 Func
<TSource
, ParallelLoopState
, long, TLocal
, TLocal
> body
,
394 Action
<TLocal
> localFinally
)
396 return ForEach
<TSource
, TLocal
> (source
, ParallelOptions
.Default
, localInit
, body
, localFinally
);
399 public static ParallelLoopResult ForEach
<TSource
, TLocal
> (Partitioner
<TSource
> source
, Func
<TLocal
> localInit
,
400 Func
<TSource
, ParallelLoopState
, TLocal
, TLocal
> body
,
401 Action
<TLocal
> localFinally
)
403 return ForEach
<TSource
, TLocal
> (source
, ParallelOptions
.Default
, localInit
, body
, localFinally
);
406 public static ParallelLoopResult ForEach
<TSource
, TLocal
> (IEnumerable
<TSource
> source
, ParallelOptions parallelOptions
,
407 Func
<TLocal
> localInit
,
408 Func
<TSource
, ParallelLoopState
, TLocal
, TLocal
> body
,
409 Action
<TLocal
> localFinally
)
411 return ForEach
<TSource
, TLocal
> (Partitioner
.Create (source
), parallelOptions
, localInit
, body
, localFinally
);
414 public static ParallelLoopResult ForEach
<TSource
, TLocal
> (IEnumerable
<TSource
> source
, ParallelOptions parallelOptions
,
415 Func
<TLocal
> localInit
,
416 Func
<TSource
, ParallelLoopState
, long, TLocal
, TLocal
> body
,
417 Action
<TLocal
> localFinally
)
419 return ForEach
<TSource
, TLocal
> (Partitioner
.Create (source
), parallelOptions
, localInit
, body
, localFinally
);
422 public static ParallelLoopResult ForEach
<TSource
, TLocal
> (Partitioner
<TSource
> enumerable
, ParallelOptions options
,
424 Func
<TSource
, ParallelLoopState
, TLocal
, TLocal
> action
,
425 Action
<TLocal
> destruct
)
427 return ForEach
<TSource
, TLocal
> (enumerable
.GetPartitions
, options
, init
, action
, destruct
);
430 public static ParallelLoopResult ForEach
<TSource
, TLocal
> (OrderablePartitioner
<TSource
> enumerable
, ParallelOptions options
,
432 Func
<TSource
, ParallelLoopState
, long, TLocal
, TLocal
> action
,
433 Action
<TLocal
> destruct
)
435 return ForEach
<KeyValuePair
<long, TSource
>, TLocal
> (enumerable
.GetOrderablePartitions
, options
,
436 init
, (e
, s
, l
) => action (e
.Value
, s
, e
.Key
, l
), destruct
);
441 public static void Invoke (params Action
[] actions
)
444 throw new ArgumentNullException ("actions");
446 Invoke (actions
, (Action a
) => Task
.Factory
.StartNew (a
));
449 public static void Invoke (ParallelOptions parallelOptions
, params Action
[] actions
)
451 if (parallelOptions
== null)
452 throw new ArgumentNullException ("parallelOptions");
454 throw new ArgumentNullException ("actions");
456 Invoke (actions
, (Action a
) => Task
.Factory
.StartNew (a
, CancellationToken
.None
, TaskCreationOptions
.None
, parallelOptions
.TaskScheduler
));
459 static void Invoke (Action
[] actions
, Func
<Action
, Task
> taskCreator
)
461 if (actions
.Length
== 0)
462 throw new ArgumentException ("actions is empty");
464 // Execute it directly
465 if (actions
.Length
== 1 && actions
[0] != null)
468 bool shouldThrow
= false;
469 Task
[] ts
= Array
.ConvertAll (actions
, delegate (Action a
) {
475 return taskCreator (a
);
479 throw new ArgumentException ("One action in actions is null", "actions");
484 HandleExceptions (ts
);
489 #region SpawnBestNumber, used by PLinq
490 internal static Task
[] SpawnBestNumber (Action action
, Action callback
)
492 return SpawnBestNumber (action
, -1, callback
);
495 internal static Task
[] SpawnBestNumber (Action action
, int dop
, Action callback
)
497 return SpawnBestNumber (action
, dop
, false, callback
);
500 internal static Task
[] SpawnBestNumber (Action action
, int dop
, bool wait
, Action callback
)
502 // Get the optimum amount of worker to create
503 int num
= dop
== -1 ? (wait
? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop
;
506 CountdownEvent evt
= new CountdownEvent (num
);
507 Task
[] tasks
= new Task
[num
];
508 for (int i
= 0; i
< num
; i
++) {
509 tasks
[i
] = Task
.Factory
.StartNew (() => {
512 if (callback
!= null && evt
.IsSet
)
517 // If explicitely told, wait for all workers to complete
518 // and thus let main thread participate in the processing
520 Task
.WaitAll (tasks
);