Bump corefx
[mono-project.git] / mcs / class / referencesource / System.Workflow.Runtime / WorkBatch.cs
blob6b241d3f126658742974534dacab4ec3ea8280a5
1 #pragma warning disable 1634, 1691
2 using System;
3 using System.Diagnostics;
4 using System.Transactions;
5 using System.Collections;
6 using System.Collections.Generic;
7 using System.Workflow.Runtime.Hosting;
9 namespace System.Workflow.Runtime
11 #region Runtime Batch Implementation
13 #region WorkBatch
15 internal enum WorkBatchState
17 Usable,
18 Merged,
19 Completed
22 /// <summary>
23 /// Summary description for Work Batching.
24 /// </summary>
25 internal sealed class WorkBatch : IWorkBatch, IDisposable
27 private PendingWorkCollection _pendingWorkCollection;
28 private object mutex = new object();
29 private WorkBatchState _state;
30 private WorkBatchCollection _collection = null;
32 private WorkBatch()
36 internal WorkBatch(WorkBatchCollection workBackCollection)
38 _pendingWorkCollection = new PendingWorkCollection();
39 _state = WorkBatchState.Usable;
40 _collection = workBackCollection;
43 internal int Count
45 get { return _pendingWorkCollection.WorkItems.Count; }
48 internal void SetWorkBatchCollection(WorkBatchCollection workBatchCollection)
50 _collection = workBatchCollection;
53 #region IWorkBatch Implementation
54 /// <summary>
55 /// Add Work to Batch
56 /// </summary>
57 /// <param name="work"></param>
58 /// <param name="workItem"></param>
59 void IWorkBatch.Add(IPendingWork work, object workItem)
61 if (_pendingWorkCollection == null)
62 throw new ObjectDisposedException("WorkBatch");
64 lock (this.mutex)
66 System.Diagnostics.Debug.Assert(this._state == WorkBatchState.Usable, "Trying to add to unusable batch.");
68 _pendingWorkCollection.Add(work, _collection.GetNextWorkItemOrderId(work), workItem);
71 #endregion
73 #region Internal Implementation
75 internal bool IsDirty
77 get
79 return this._pendingWorkCollection.IsDirty;
82 /// <summary>
83 /// This one commits all the pending work and its items
84 /// added so far in this batch.
85 /// </summary>
86 /// <param name="transaction"></param>
87 internal void Commit(Transaction transaction)
89 lock (this.mutex)
91 _pendingWorkCollection.Commit(transaction);
96 /// <summary>
97 /// This one completes the pending work
98 /// </summary>
99 /// <param name="succeeded"></param>
100 internal void Complete(bool succeeded)
102 lock (this.mutex)
104 if (this._pendingWorkCollection.IsUsable)
106 _pendingWorkCollection.Complete(succeeded);
107 _pendingWorkCollection.Dispose();
108 this._state = WorkBatchState.Completed;
113 /// <summary>
114 /// API for Runtime to call to do Merge Operation: Right now
115 /// we dont use this because we dont support incoming work collection.
116 /// </summary>
117 /// <param name="batch"></param>
118 internal void Merge(WorkBatch batch)
120 if (batch == null)
121 return; //nothing to merge
123 if (_pendingWorkCollection == null)
124 throw new ObjectDisposedException("WorkBatch");
126 lock (this.mutex)
128 lock (batch.mutex)
130 foreach (KeyValuePair<IPendingWork, SortedList<long, object>> item in batch._pendingWorkCollection.WorkItems)
132 //_pendingWorkCollection.AddRange(item.Key, item.Value);
133 SortedList<long, object> newItems = item.Value;
134 foreach (KeyValuePair<long, object> kvp in newItems)
135 _pendingWorkCollection.Add(item.Key, kvp.Key, kvp.Value);
139 this._state = WorkBatchState.Merged;
142 #endregion
144 #region IDisposable Members
145 public void Dispose()
147 Dispose(true);
148 GC.SuppressFinalize(this);
151 private void Dispose(bool disposing)
153 if (disposing)
155 _pendingWorkCollection.Dispose();
156 _pendingWorkCollection = null;
159 #endregion
161 #region PendingWorkCollection implementation
163 /// <summary>
164 /// Pending Work Implementation
165 /// </summary>
166 internal sealed class PendingWorkCollection : IDisposable
168 Dictionary<IPendingWork, SortedList<long, object>> Items;
170 #region Internal Implementation
171 internal PendingWorkCollection()
173 Items = new Dictionary<IPendingWork, SortedList<long, object>>();
176 internal Dictionary<IPendingWork, SortedList<long, object>> WorkItems
180 return Items;
184 internal bool IsUsable
188 return this.Items != null;
192 internal bool IsDirty
196 if (!this.IsUsable)
197 return false;
200 // Loop through all pending work items in the collection
201 // If any of them assert that they need to commit than the batch is dirty
202 foreach (KeyValuePair<IPendingWork, SortedList<long, object>> workItem in this.WorkItems)
206 IPendingWork work = workItem.Key;
207 if (work.MustCommit(workItem.Value))
208 return true;
210 catch (Exception e)
212 if (WorkflowExecutor.IsIrrecoverableException(e))
214 #pragma warning disable 56503
215 throw;
216 #pragma warning restore 56503
218 else
220 // Ignore exceptions and treat condition as false return value;
225 // If no one asserted that they need to commit we're not dirty
226 return false;
230 internal void Add(IPendingWork work, long orderId, object workItem)
232 SortedList<long, object> workItems = null;
234 if (!Items.TryGetValue(work, out workItems))
236 workItems = new SortedList<long, object>();
237 Items.Add(work, workItems);
239 Debug.Assert(!workItems.ContainsKey(orderId), string.Format(System.Globalization.CultureInfo.InvariantCulture, "List already contains key {0}", orderId));
240 workItems.Add(orderId, workItem);
241 WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 0, "pending work hc {0} added workItem hc {1}", work.GetHashCode(), workItem.GetHashCode());
244 //Commit All Pending Work
245 internal void Commit(Transaction transaction)
247 //ignore items param
248 foreach (KeyValuePair<IPendingWork, SortedList<long, object>> workItem in Items)
250 IPendingWork work = workItem.Key;
251 List<object> values = new List<object>(workItem.Value.Values);
252 work.Commit(transaction, values);
256 //Complete All Pending Work
257 internal void Complete(bool succeeded)
259 foreach (KeyValuePair<IPendingWork, SortedList<long, object>> workItem in Items)
261 IPendingWork work = workItem.Key;
262 List<object> values = new List<object>(workItem.Value.Values);
265 work.Complete(succeeded, values);
267 catch (Exception e)
269 if (WorkflowExecutor.IsIrrecoverableException(e))
271 throw;
273 else
275 WorkflowTrace.Runtime.TraceEvent(TraceEventType.Warning, 0, "Work Item {0} threw exception on complete notification", workItem.GetType());
281 #endregion
283 #region IDisposable Members
284 public void Dispose()
286 Dispose(true);
287 GC.SuppressFinalize(this);
290 private void Dispose(bool disposing)
292 if (disposing && Items != null)
294 Items.Clear();
295 Items = null;
299 #endregion
301 #endregion
303 #endregion
305 #region WorkBatchCollection
306 /// <summary>
307 /// collection of name to Batch
308 /// </summary>
309 internal sealed class WorkBatchCollection : Dictionary<object, WorkBatch>
311 object transientBatchID = new object();
312 private object mutex = new object();
314 // All access must be through Interlocked.* methods
315 private long _workItemOrderId = 0;
317 internal long WorkItemOrderId
321 return Threading.Interlocked.Read(ref _workItemOrderId);
325 Debug.Assert(value >= _workItemOrderId, "New value for WorkItemOrderId must be greater than the current value");
326 lock (mutex)
328 _workItemOrderId = value;
333 internal long GetNextWorkItemOrderId(IPendingWork pendingWork)
335 return Threading.Interlocked.Increment(ref _workItemOrderId);
337 /// <summary>
338 /// A new batch is created per atomic scope or any
339 /// required sub batches. An example of an optional sub batch
340 /// could be a batch created for Send activities
341 /// </summary>
342 /// <param name="id"></param>
343 /// <returns></returns>
344 internal IWorkBatch GetBatch(object id)
346 WorkBatch batch = null;
348 lock (mutex)
350 if (this.TryGetValue(id, out batch))
351 return batch;
353 batch = new WorkBatch(this);
354 Add(id, batch);
357 return batch;
360 /// <summary>
361 /// Find a batch for a given id without creating it.
362 /// </summary>
363 /// <param name="id">batch key</param>
364 /// <returns>batch or null if not found</returns>
365 private WorkBatch FindBatch(object id)
367 WorkBatch batch = null;
368 lock (mutex)
370 TryGetValue(id, out batch);
373 return batch;
376 internal IWorkBatch GetTransientBatch()
378 return GetBatch(transientBatchID);
381 internal WorkBatch GetMergedBatch()
383 lock (mutex)
385 WorkBatch batch = new WorkBatch(this);
387 foreach (WorkBatch existingBatch in this.Values)
389 batch.Merge(existingBatch);
391 //Copy of all the items merged in one batch.
392 //Order is preserved in the same way batches are created.
393 return batch;
397 internal void RollbackBatch(object id)
399 lock (mutex)
401 WorkBatch batch = FindBatch(id);
402 if (batch != null)
404 batch.Complete(false);
405 batch.Dispose();
406 Remove(id);
411 // Rollback all sub batches, calling "complete(false)" on all entries.
412 internal void RollbackAllBatchedWork()
414 lock (mutex)
416 foreach (WorkBatch batch in this.Values)
418 batch.Complete(false);
419 batch.Dispose();
421 Clear(); // clear the collection
425 // Clear sub batches after successful commit/complete.
426 internal void ClearSubBatches()
428 lock (mutex)
430 foreach (WorkBatch existingBatch in this.Values)
432 existingBatch.Dispose();
434 Clear(); // clear the collection
438 internal void ClearTransientBatch()
440 RollbackBatch(transientBatchID);
443 #endregion
445 #endregion