6 // Jérémie "Garuma" Laval <jeremie.laval@gmail.com>
8 // Copyright (c) 2010 Jérémie "Garuma" Laval
10 // Permission is hereby granted, free of charge, to any person obtaining a copy
11 // of this software and associated documentation files (the "Software"), to deal
12 // in the Software without restriction, including without limitation the rights
13 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 // copies of the Software, and to permit persons to whom the Software is
15 // furnished to do so, subject to the following conditions:
17 // The above copyright notice and this permission notice shall be included in
18 // all copies or substantial portions of the Software.
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System
.Threading
;
30 using System
.Threading
.Tasks
;
31 using System
.Collections
;
32 using System
.Collections
.Generic
;
33 using System
.Collections
.Concurrent
;
37 // TODO: Refactory each of the Process method into one big entity
38 // Check CancellationToken.Canceled parameter in the Task's action body too
39 internal static class ParallelExecuter
41 internal static QueryOptions CheckQuery
<T
> (QueryBaseNode
<T
> startingNode
)
43 return CheckQuery
<T
> (startingNode
, false);
46 internal static QueryOptions CheckQuery
<T
> (QueryBaseNode
<T
> startingNode
, bool blocking
)
48 return CheckQuery (startingNode
, GetBestWorkerNumber (blocking
));
51 internal static QueryOptions CheckQuery
<T
> (QueryBaseNode
<T
> startingNode
, int partitionCount
)
53 QueryCheckerVisitor visitor
= new QueryCheckerVisitor (partitionCount
);
54 startingNode
.Visit (visitor
);
56 return visitor
.Options
;
59 // QueryOptions.ImplementerToken = QueryOptions.ImplementerToken.Chain (myOperatorSource);
60 internal static CancellationToken
Chain (this CancellationToken self
, CancellationTokenSource other
)
62 CancellationTokenSource linked
= CancellationTokenSource
.CreateLinkedTokenSource (self
, other
.Token
);
66 internal static int GetBestWorkerNumber ()
68 return GetBestWorkerNumber (false);
71 internal static int GetBestWorkerNumber (bool blocking
)
73 return blocking
? Environment
.ProcessorCount
+ 1 : Environment
.ProcessorCount
;
76 internal static Task
[] Process
<TSource
, TElement
> (QueryBaseNode
<TSource
> node
, Action
<TElement
> call
,
77 Func
<QueryBaseNode
<TSource
>, QueryOptions
, IList
<IEnumerable
<TElement
>>> acquisitionFunc
,
80 return Process
<TSource
, TElement
> (node
, call
, acquisitionFunc
, null, options
);
83 internal static Task
[] Process
<TSource
, TElement
> (QueryBaseNode
<TSource
> node
, Action
<TElement
> call
,
84 Func
<QueryBaseNode
<TSource
>, QueryOptions
, IList
<IEnumerable
<TElement
>>> acquisitionFunc
,
88 return Process
<TSource
, TElement
> (node
, (e
, i
) => call (e
), acquisitionFunc
, (i
) => endAction (), options
);
91 internal static Task
[] Process
<TSource
, TElement
> (QueryBaseNode
<TSource
> node
, Action
<TElement
, int> call
,
92 Func
<QueryBaseNode
<TSource
>, QueryOptions
, IList
<IEnumerable
<TElement
>>> acquisitionFunc
,
93 Action
<int> endAction
,
96 IList
<IEnumerable
<TElement
>> enumerables
= acquisitionFunc (node
, options
);
98 Task
[] tasks
= new Task
[enumerables
.Count
];
100 for (int i
= 0; i
< tasks
.Length
; i
++) {
102 tasks
[i
] = Task
.Factory
.StartNew (() => {
103 foreach (TElement item
in enumerables
[index
]) {
104 // This is from specific operators
105 if (options
.ImplementerToken
.IsCancellationRequested
)
107 if (options
.Token
.IsCancellationRequested
)
108 throw new OperationCanceledException (options
.Token
);
112 if (endAction
!= null)
120 internal static void ProcessAndBlock
<T
> (QueryBaseNode
<T
> node
, Action
<T
> call
)
122 QueryOptions options
= CheckQuery (node
, true);
124 Task
[] tasks
= Process (node
, call
, (n
, o
) => n
.GetEnumerables (o
), options
);
125 Task
.WaitAll (tasks
, options
.Token
);
128 internal static Action ProcessAndCallback
<T
> (QueryBaseNode
<T
> node
, Action
<T
> call
,
129 Action callback
, QueryOptions options
)
131 Task
[] tasks
= Process (node
, call
, (n
, o
) => n
.GetEnumerables (o
), options
);
132 if (callback
!= null)
133 Task
.Factory
.ContinueWhenAll (tasks
, (_
) => callback ());
135 return () => Task
.WaitAll (tasks
, options
.Token
);
138 internal static Action ProcessAndCallback
<T
> (QueryBaseNode
<T
> node
, Action
<KeyValuePair
<long, T
>, int> call
,
139 Action callback
, QueryOptions options
)
141 return ProcessAndCallback
<T
> (node
, call
, null, callback
, options
);
144 internal static Action ProcessAndCallback
<T
> (QueryBaseNode
<T
> node
, Action
<KeyValuePair
<long, T
>, int> call
,
145 Action
<int> endAction
,
146 Action callback
, QueryOptions options
)
148 Task
[] tasks
= Process (node
, call
, (n
, o
) => n
.GetOrderedEnumerables (o
), endAction
, options
);
149 if (callback
!= null)
150 Task
.Factory
.ContinueWhenAll (tasks
, (_
) => callback ());
152 return () => Task
.WaitAll (tasks
, options
.Token
);
155 internal static void ProcessAndAggregate
<T
, U
> (QueryBaseNode
<T
> node
,
157 Func
<U
, T
, U
> localCall
,
158 Action
<IList
<U
>> call
)
160 QueryOptions options
= CheckQuery (node
, true);
162 IList
<IEnumerable
<T
>> enumerables
= node
.GetEnumerables (options
);
163 U
[] locals
= new U
[enumerables
.Count
];
164 Task
[] tasks
= new Task
[enumerables
.Count
];
167 if (seedFunc
!= null) {
168 for (int i
= 0; i
< locals
.Length
; i
++)
169 locals
[i
] = seedFunc ();
173 for (int i
= 0; i
< tasks
.Length
; i
++) {
175 tasks
[i
] = Task
.Factory
.StartNew (() => {
176 foreach (T item
in enumerables
[index
]) {
177 // This is from specific operators
178 if (options
.ImplementerToken
.IsCancellationRequested
)
180 if (options
.Token
.IsCancellationRequested
)
181 throw new OperationCanceledException (options
.Token
);
185 // HACK: TODO: omfwtfitsomuchsucks
186 locals
[index
] = (U
)(object)item
;
190 U acc
= locals
[index
];
191 locals
[index
] = localCall (acc
, item
);
196 Task
.WaitAll (tasks
, options
.Token
);