From 748587020d36fd9f3f0aac161e985c2ce2b39423 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Laval?= Date: Wed, 10 Aug 2011 17:37:43 +0200 Subject: [PATCH] Add standard module directory layout and ConcurrentExclusiveSchedulerPair --- .../Assembly/AssemblyInfo.cs | 18 ++ mcs/class/System.Threading.Tasks.Dataflow/Makefile | 9 + .../System.Threading.Tasks.Dataflow.dll.sources | 4 + ...ystem.Threading.Tasks.Dataflow_test.dll.sources | 1 + .../ConcurrentExclusiveSchedulerPair.cs | 259 +++++++++++++++++++++ .../ConcurrentExclusiveSchedulerPairTest.cs | 179 ++++++++++++++ 6 files changed, 470 insertions(+) create mode 100644 mcs/class/System.Threading.Tasks.Dataflow/Assembly/AssemblyInfo.cs create mode 100644 mcs/class/System.Threading.Tasks.Dataflow/Makefile create mode 100644 mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources create mode 100644 mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources create mode 100644 mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks/ConcurrentExclusiveSchedulerPair.cs create mode 100644 mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks/ConcurrentExclusiveSchedulerPairTest.cs diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Assembly/AssemblyInfo.cs b/mcs/class/System.Threading.Tasks.Dataflow/Assembly/AssemblyInfo.cs new file mode 100644 index 00000000000..83267087b61 --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/Assembly/AssemblyInfo.cs @@ -0,0 +1,18 @@ +// +// AssemblyInfo.cs +// +// Author: +// Andreas Nahr (ClassDevelopment@A-SoftTech.com) +// +// (C) 2003 Ximian, Inc. http://www.ximian.com +// (C) 2004 Novell (http://www.novell.com) +// + +using System; +using System.Reflection; +using System.Runtime.InteropServices; + +[assembly: AssemblyVersion (Consts.FxVersion)] + +[assembly: AssemblyDelaySign (true)] +[assembly: AssemblyKeyFile ("../mono.pub")] diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Makefile b/mcs/class/System.Threading.Tasks.Dataflow/Makefile new file mode 100644 index 00000000000..3f82d310c84 --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/Makefile @@ -0,0 +1,9 @@ +thisdir = class/System.Threading.Tasks.Dataflow +SUBDIRS = +include ../../build/rules.make + +LIBRARY = System.Threading.Tasks.Dataflow.dll + +include ../../build/library.make + +LIB_MCS_FLAGS += -r:$(corlib) -r:System.Core.dll -r:System.dll diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources new file mode 100644 index 00000000000..3978945e162 --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources @@ -0,0 +1,4 @@ +../../build/common/Consts.cs +../../build/common/Locale.cs +Assembly/AssemblyInfo.cs +System.Threading.Tasks/ConcurrentExclusiveSchedulerPair.cs diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources new file mode 100644 index 00000000000..4db25b8a48d --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources @@ -0,0 +1 @@ +System.Threading.Tasks/ConcurrentExclusiveSchedulerPairTest.cs diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks/ConcurrentExclusiveSchedulerPair.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks/ConcurrentExclusiveSchedulerPair.cs new file mode 100644 index 00000000000..b09ce5d3864 --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks/ConcurrentExclusiveSchedulerPair.cs @@ -0,0 +1,259 @@ +// ConcurrentExclusiveSchedulerPair.cs +// +// Copyright (c) 2011 Jérémie "garuma" Laval +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +// +// + +#if NET_4_0 || MOBILE + +using System; +using System.Threading; +using System.Collections.Generic; +using System.Collections.Concurrent; + +namespace System.Threading.Tasks +{ + public class ConcurrentExclusiveSchedulerPair : IDisposable + { + readonly int maxConcurrencyLevel; + readonly int maxItemsPerTask; + + readonly TaskScheduler target; + readonly TaskFactory factory; + readonly Action taskHandler; + + readonly ConcurrentQueue concurrentTasks = new ConcurrentQueue (); + readonly ConcurrentQueue exclusiveTasks = new ConcurrentQueue (); + + readonly ReaderWriterLockSlim rwl = new ReaderWriterLockSlim (); + readonly TaskCompletionSource completion = new TaskCompletionSource (); + readonly ConcurrentTaskScheduler concurrent; + readonly ExclusiveTaskScheduler exclusive; + + int numTask; + + class ExclusiveTaskScheduler : TaskScheduler + { + ConcurrentExclusiveSchedulerPair scheduler; + + public ExclusiveTaskScheduler (ConcurrentExclusiveSchedulerPair scheduler) + { + this.scheduler = scheduler; + } + + public override int MaximumConcurrencyLevel { + get { + return scheduler.maxConcurrencyLevel; + } + } + + protected override void QueueTask (Task t) + { + scheduler.QueueExclusive (t); + } + + protected override bool TryExecuteTaskInline (Task task, bool taskWasPreviouslyQueued) + { + if (task.Status != TaskStatus.Created) + return false; + + task.RunSynchronously (scheduler.target); + return true; + } + + protected override IEnumerable GetScheduledTasks () + { + throw new NotImplementedException (); + } + } + + class ConcurrentTaskScheduler : TaskScheduler + { + ConcurrentExclusiveSchedulerPair scheduler; + + public ConcurrentTaskScheduler (ConcurrentExclusiveSchedulerPair scheduler) + { + this.scheduler = scheduler; + } + + public override int MaximumConcurrencyLevel { + get { + return scheduler.maxConcurrencyLevel; + } + } + + protected override void QueueTask (Task t) + { + scheduler.QueueConcurrent (t); + } + + protected override bool TryExecuteTaskInline (Task task, bool taskWasPreviouslyQueued) + { + if (task.Status != TaskStatus.Created) + return false; + + task.RunSynchronously (scheduler.target); + return true; + } + + public void Execute (Task t) + { + TryExecuteTask (t); + } + + protected override IEnumerable GetScheduledTasks () + { + throw new NotImplementedException (); + } + } + + public ConcurrentExclusiveSchedulerPair () : this (TaskScheduler.Current) + { + } + + public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler) : this (taskScheduler, taskScheduler.MaximumConcurrencyLevel) + { + } + + public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler, int maxConcurrencyLevel) + : this (taskScheduler, maxConcurrencyLevel, -1) + { + } + + public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask) + { + this.target = taskScheduler; + this.maxConcurrencyLevel = maxConcurrencyLevel; + this.maxItemsPerTask = maxItemsPerTask; + this.factory = new TaskFactory (taskScheduler); + this.taskHandler = InternalTaskProcesser; + this.concurrent = new ConcurrentTaskScheduler (this); + this.exclusive = new ExclusiveTaskScheduler (this); + } + + public void Complete () + { + completion.SetResult (null); + } + + public TaskScheduler ConcurrentScheduler { + get { + return concurrent; + } + } + + public TaskScheduler ExclusiveScheduler { + get { + return exclusive; + } + } + + public Task Completion { + get { + return completion.Task; + } + } + + public void Dispose () + { + Dispose (true); + } + + protected virtual void Dispose (bool disposing) + { + throw new NotImplementedException (); + } + + void QueueExclusive (Task task) + { + exclusiveTasks.Enqueue (task); + SpinUpTasks (); + } + + void QueueConcurrent (Task task) + { + concurrentTasks.Enqueue (task); + SpinUpTasks (); + } + + void InternalTaskProcesser () + { + Task task; + int times = 0; + const int lockWaitTime = 2; + + while (!concurrentTasks.IsEmpty || !exclusiveTasks.IsEmpty) { + if (maxItemsPerTask != -1 && ++times == maxItemsPerTask) + break; + + bool locked = false; + + try { + if (!concurrentTasks.IsEmpty && rwl.TryEnterReadLock (lockWaitTime)) { + locked = true; + while (concurrentTasks.TryDequeue (out task)) { + RunTask (task); + } + } + } finally { + if (locked) { + rwl.ExitReadLock (); + locked = false; + } + } + + try { + if (!exclusiveTasks.IsEmpty && rwl.TryEnterWriteLock (lockWaitTime)) { + locked = true; + while (exclusiveTasks.TryDequeue (out task)) { + RunTask (task); + } + } + } finally { + if (locked) { + rwl.ExitWriteLock (); + } + } + } + // TODO: there's a race here, task adding + spinup check may be done while here + Interlocked.Decrement (ref numTask); + } + + void SpinUpTasks () + { + int currentTaskNumber; + do { + currentTaskNumber = numTask; + if (currentTaskNumber >= maxConcurrencyLevel) + return; + } while (Interlocked.CompareExchange (ref numTask, currentTaskNumber + 1, currentTaskNumber) != currentTaskNumber); + + factory.StartNew (taskHandler); + } + + void RunTask (Task task) + { + concurrent.Execute (task); + } + } +} + +#endif \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks/ConcurrentExclusiveSchedulerPairTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks/ConcurrentExclusiveSchedulerPairTest.cs new file mode 100644 index 00000000000..c0685e43ce8 --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks/ConcurrentExclusiveSchedulerPairTest.cs @@ -0,0 +1,179 @@ +// +// ConcurrentExclusiveSchedulerPairTest.cs +// +// Author: +// Jérémie "garuma" Laval +// +// Copyright (c) 2011 Jérémie "garuma" Laval +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +#if NET_4_0 + +using System; +using System.Threading; +using System.Threading.Tasks; + +using NUnit.Framework; + +namespace MonoTests.System.Threading.Tasks +{ + [TestFixture] + public class ConcurrentExclusiveSchedulerPairTest + { + ConcurrentExclusiveSchedulerPair schedPair; + TaskFactory factory; + + [Test] + public void BasicExclusiveUsageTest () + { + schedPair = new ConcurrentExclusiveSchedulerPair (TaskScheduler.Default, 4); + factory = new TaskFactory (schedPair.ExclusiveScheduler); + + bool launched = false; + factory.StartNew (() => launched = true); + Thread.Sleep (600); + + Assert.IsTrue (launched); + } + + [Test] + public void BasicConcurrentUsageTest () + { + schedPair = new ConcurrentExclusiveSchedulerPair (TaskScheduler.Default, 4); + factory = new TaskFactory (schedPair.ConcurrentScheduler); + + bool launched = false; + factory.StartNew (() => launched = true); + Thread.Sleep (600); + + Assert.IsTrue (launched); + } + + [Test] + public void ExclusiveUsageTest () + { + schedPair = new ConcurrentExclusiveSchedulerPair (TaskScheduler.Default, 4); + factory = new TaskFactory (schedPair.ExclusiveScheduler); + + int count = 0; + ManualResetEventSlim mreFinish = new ManualResetEventSlim (false); + ManualResetEventSlim mreStart = new ManualResetEventSlim (false); + + factory.StartNew (() => { + mreStart.Set (); + Interlocked.Increment (ref count); + mreFinish.Wait (); + }); + mreStart.Wait (); + factory.StartNew (() => Interlocked.Increment (ref count)); + Thread.Sleep (100); + + Assert.AreEqual (1, count); + mreFinish.Set (); + } + + [Test] + public void ConcurrentUsageTest () + { + schedPair = new ConcurrentExclusiveSchedulerPair (TaskScheduler.Default, 4); + factory = new TaskFactory (schedPair.ConcurrentScheduler); + + int count = 0; + ManualResetEventSlim mreFinish = new ManualResetEventSlim (false); + CountdownEvent cntd = new CountdownEvent (2); + + factory.StartNew (() => { + Interlocked.Increment (ref count); + cntd.Signal (); + mreFinish.Wait (); + }); + factory.StartNew (() => { + Interlocked.Increment (ref count); + cntd.Signal (); + mreFinish.Wait (); + }); + + cntd.Wait (); + Assert.AreEqual (2, count); + mreFinish.Set (); + } + + [Test] + public void ConcurrentUsageWithExclusiveExecutingTest () + { + schedPair = new ConcurrentExclusiveSchedulerPair (TaskScheduler.Default, 4); + TaskFactory exclFact = new TaskFactory (schedPair.ExclusiveScheduler); + TaskFactory concFact = new TaskFactory (schedPair.ConcurrentScheduler); + + int count = 0; + bool exclStarted = false; + ManualResetEventSlim mreStart = new ManualResetEventSlim (false); + ManualResetEventSlim mreFinish = new ManualResetEventSlim (false); + + exclFact.StartNew (() => { + exclStarted = true; + mreStart.Set (); + mreFinish.Wait (); + exclStarted = false; + }); + + mreStart.Wait (); + + concFact.StartNew (() => Interlocked.Increment (ref count)); + concFact.StartNew (() => Interlocked.Increment (ref count)); + Thread.Sleep (100); + + Assert.IsTrue (exclStarted); + Assert.AreEqual (0, count); + mreFinish.Set (); + } + + [Test] + public void ExclusiveUsageWithConcurrentExecutingTest () + { + schedPair = new ConcurrentExclusiveSchedulerPair (TaskScheduler.Default, 4); + TaskFactory exclFact = new TaskFactory (schedPair.ExclusiveScheduler); + TaskFactory concFact = new TaskFactory (schedPair.ConcurrentScheduler); + + int count = 0; + bool started = false; + ManualResetEventSlim mreStart = new ManualResetEventSlim (false); + ManualResetEventSlim mreFinish = new ManualResetEventSlim (false); + + concFact.StartNew (() => { + started = true; + mreStart.Set (); + mreFinish.Wait (); + started = false; + }); + + mreStart.Wait (); + + exclFact.StartNew (() => Interlocked.Increment (ref count)); + Thread.Sleep (100); + + Assert.IsTrue (started); + Assert.AreEqual (0, count); + mreFinish.Set (); + } + } +} + +#endif -- 2.11.4.GIT