From 0dbc6130699aae7a0d16149fee717b595eb68807 Mon Sep 17 00:00:00 2001 From: Phillip Clark Date: Thu, 20 Aug 2009 23:39:15 +0000 Subject: [PATCH] Refactored IEventListener and IEventSink to handle garbage data according to configurable strategies - see the "Sample Configuration.config" for details. Modified the NArrange settings to remove regions from interface definitions, it was becoming a hassle. git-svn-id: https://lwes.svn.sourceforge.net/svnroot/lwes/lwes-dotnet/trunk@206 a2f82657-cdd2-4550-bd36-68a8e7111808 --- Lwes-test-listener-console/App.config | 2 +- Lwes-test-listener-console/Program.cs | 33 +- Org.Lwes.Tests/EventListenerTests.cs | 7 +- Org.Lwes.Tests/SerializerTests.cs | 185 +++---- Org.Lwes.Tests/StatusTests.cs | 12 +- Org.Lwes/Config/ListenerConfigurationSection.cs | 24 + Org.Lwes/DB/IEventTemplateDB.cs | 8 - Org.Lwes/Emitter/EventEmitterBase.cs | 8 +- Org.Lwes/Emitter/IEventEmitter.cs | 8 - Org.Lwes/EventAttribute.cs | 12 - Org.Lwes/EventUtils.cs | 2 +- Org.Lwes/Exceptions.cs | 53 +- Org.Lwes/Factory/IEventFactory.cs | 8 - Org.Lwes/Journaler/IJournaler.cs | 8 - Org.Lwes/Listener/Enums.cs | 89 ++++ Org.Lwes/Listener/EventListener.cs | 6 +- Org.Lwes/Listener/EventListenerBase.cs | 665 ++++++++++++++++-------- Org.Lwes/Listener/IEventListener.cs | 19 + Org.Lwes/Listener/IEventSink.cs | 120 ++--- Org.Lwes/Listener/IEventSinkRegistrationKey.cs | 63 +++ Org.Lwes/Listener/MulticastEventListener.cs | 6 +- Org.Lwes/Listener/UnicastEventListener.cs | 6 +- Org.Lwes/LwesSerializer.cs | 2 +- Org.Lwes/Org.Lwes.csproj | 2 + Org.Lwes/Sample Configuration.config | 22 +- Org.Lwes/Status.cs | 43 +- narrange.xml | 10 +- 27 files changed, 914 insertions(+), 509 deletions(-) create mode 100644 Org.Lwes/Listener/Enums.cs rewrite Org.Lwes/Listener/IEventSink.cs (89%) create mode 100644 Org.Lwes/Listener/IEventSinkRegistrationKey.cs diff --git a/Lwes-test-listener-console/App.config b/Lwes-test-listener-console/App.config index 66ca687..38b87f6 100644 --- a/Lwes-test-listener-console/App.config +++ b/Lwes-test-listener-console/App.config @@ -8,7 +8,7 @@ + parallel ="false" /> diff --git a/Lwes-test-listener-console/Program.cs b/Lwes-test-listener-console/Program.cs index 49e6955..9d3c369 100644 --- a/Lwes-test-listener-console/Program.cs +++ b/Lwes-test-listener-console/Program.cs @@ -14,7 +14,7 @@ #region Methods static void Main(string[] args) - { + { using (IEventListener listener = EventListener.CreateDefault()) { EventSink sink = new EventSink(); @@ -76,32 +76,47 @@ } } + #endregion Methods + + #region Nested Types + class EventSink : IEventSink { + #region Fields + SimpleLockFreeQueue _incomingEvents = new SimpleLockFreeQueue(); - #region IEventSink Members + + #endregion Fields + + #region Properties + + public SimpleLockFreeQueue Events + { + get { return _incomingEvents; } + } public bool IsThreadSafe { get { return false; } } + #endregion Properties + + #region Methods + public void HandleEventArrival(IEventSinkRegistrationKey key, Event ev) { _incomingEvents.Enqueue(ev); } - public GarbageHandlingStrategy HandleGarbageArrival(IEventSinkRegistrationKey key, System.Net.EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage) + public GarbageHandlingVote HandleGarbageData(IEventSinkRegistrationKey key, System.Net.EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage) { - return GarbageHandlingStrategy.FailfastForTrafficOnEndpoint; + return GarbageHandlingVote.IgnoreAllTrafficFromEndpoint; } - #endregion - - public SimpleLockFreeQueue Events { get { return _incomingEvents; } } + #endregion Methods } - - #endregion Methods + #endregion Nested Types } } \ No newline at end of file diff --git a/Org.Lwes.Tests/EventListenerTests.cs b/Org.Lwes.Tests/EventListenerTests.cs index 40c7d15..2ff59c3 100644 --- a/Org.Lwes.Tests/EventListenerTests.cs +++ b/Org.Lwes.Tests/EventListenerTests.cs @@ -6,14 +6,14 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + using Org.Lwes.Emitter; using Org.Lwes.Listener; - using Moq; [TestClass] public class EventListenerTests { - #region Methods [TestMethod] @@ -32,7 +32,7 @@ }; bool done = false; Event receivedEvent = default(Event); - + // Mock an IEventSink that records the incomming event... Mock mock = new Mock(); mock.SetupGet(sink => sink.IsThreadSafe).Returns(true); @@ -75,7 +75,6 @@ Console.Write(receivedEvent.ToString()); } - #endregion Methods } } \ No newline at end of file diff --git a/Org.Lwes.Tests/SerializerTests.cs b/Org.Lwes.Tests/SerializerTests.cs index a7fd69a..c3995e6 100644 --- a/Org.Lwes.Tests/SerializerTests.cs +++ b/Org.Lwes.Tests/SerializerTests.cs @@ -2,12 +2,14 @@ { using System; using System.Collections.Generic; + using System.Net; + using System.Text; using Microsoft.VisualStudio.TestTools.UnitTesting; - using System.Text; + using Moq; + using Org.Lwes.DB; - using System.Net; [TestClass] public class SerializerTests @@ -95,6 +97,79 @@ } [TestMethod] + [ExpectedException(typeof(BadLwesDataException))] + public void RoundTripSerialize_DeserializeGarbageData() + { + byte[] bytes = new byte[400]; + Random _rand = new Random(Environment.TickCount); + _rand.NextBytes(bytes); + + var mock = new Mock(); + Event dummy; + mock.Setup(db => db.TryCreateEvent(It.IsAny(), out dummy, It.IsAny(), It.IsAny())) + .Returns(false); + + LwesSerializer.Deserialize(bytes, 0, bytes.Length, mock.Object); + } + + [TestMethod] + public void RoundTripSerialize_Event() + { + var e = new + { + EventName = "UserLogin", + Attributes = new + { + UserName = new { Name = "username", Token = TypeToken.STRING, Value = "bob" }, + Password = new { Name = "password", Token = TypeToken.UINT64, Value = 0xfeedabbadeadbeefUL }, + ClientIP = new { Name = "clientIP", Token = TypeToken.IP_ADDR, Value = IPAddress.Parse("127.0.0.1") }, + Successful = new { Name = "successful", Token = TypeToken.BOOLEAN, Value = false } + } + }; + + // Create the event... + Event ev = new Event(e.EventName) + .SetValue(e.Attributes.UserName.Name, e.Attributes.UserName.Value) + .SetValue(e.Attributes.Password.Name, e.Attributes.Password.Value) + .SetValue(e.Attributes.ClientIP.Name, e.Attributes.ClientIP.Value) + .SetValue(e.Attributes.Successful.Name, e.Attributes.Successful.Value); + + // Ensure the token types match... + Assert.AreEqual(e.Attributes.UserName.Token, ev[e.Attributes.UserName.Name].TypeToken); + Assert.AreEqual(e.Attributes.Password.Token, ev[e.Attributes.Password.Name].TypeToken); + Assert.AreEqual(e.Attributes.ClientIP.Token, ev[e.Attributes.ClientIP.Name].TypeToken); + Assert.AreEqual(e.Attributes.Successful.Token, ev[e.Attributes.Successful.Name].TypeToken); + + // Ensure the values match... + Assert.AreEqual(e.Attributes.UserName.Value, ev[e.Attributes.UserName.Name].GetValue()); + Assert.AreEqual(e.Attributes.Password.Value, ev[e.Attributes.Password.Name].GetValue()); + Assert.AreEqual(e.Attributes.ClientIP.Value, ev[e.Attributes.ClientIP.Name].GetValue()); + Assert.AreEqual(e.Attributes.Successful.Value, ev[e.Attributes.Successful.Name].GetValue()); + + // Serialization requires an IEventTemplateDB, we're gonna mock it here... + var mock = new Mock(); + Event dummy; + mock.Setup(db => db.TryCreateEvent(It.IsAny(), out dummy, It.IsAny(), It.IsAny())) + .Returns(false); + + // Perform a roundtrip serialization... + byte[] data = LwesSerializer.Serialize(ev); + Event ev2 = LwesSerializer.Deserialize(data, 0, data.Length, mock.Object); + + // Ensure the token types match on the deserialized object... + Assert.AreEqual(e.Attributes.UserName.Token, ev2[e.Attributes.UserName.Name].TypeToken); + Assert.AreEqual(e.Attributes.Password.Token, ev2[e.Attributes.Password.Name].TypeToken); + Assert.AreEqual(e.Attributes.ClientIP.Token, ev2[e.Attributes.ClientIP.Name].TypeToken); + Assert.AreEqual(e.Attributes.Successful.Token, ev2[e.Attributes.Successful.Name].TypeToken); + + // Ensure the values match on the deserialized object... + Assert.AreEqual(e.Attributes.UserName.Value, ev2[e.Attributes.UserName.Name].GetValue()); + Assert.AreEqual(e.Attributes.Password.Value, ev2[e.Attributes.Password.Name].GetValue()); + Assert.AreEqual(e.Attributes.ClientIP.Value, ev2[e.Attributes.ClientIP.Name].GetValue()); + Assert.AreEqual(e.Attributes.Successful.Value, ev2[e.Attributes.Successful.Name].GetValue()); + } + + [TestMethod] public void RoundTripSerialize_Int16() { var values = new Int16[] @@ -218,13 +293,13 @@ } [TestMethod] - public void RoundTripSerialize_String_UTF_8() + public void RoundTripSerialize_String_ISO_8859_1() { var values = new string[] - { EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.UTF_8) - , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.UTF_8) - , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.UTF_8) - , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.UTF_8) + { EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.ISO_8859_1) + , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.ISO_8859_1) + , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.ISO_8859_1) + , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.ISO_8859_1) }; byte[] data = new byte[values.Length * 400 * sizeof(UInt64)]; @@ -234,10 +309,10 @@ // Serialize all of the values... for (int i = 0; i < values.Length; i++) { - encodedSize = Constants.DefaultEncoding.GetByteCount(values[i]) + sizeof(UInt16); + encodedSize = Constants.ISO8859_1Encoding.GetByteCount(values[i]) + sizeof(UInt16); accumulatedSize += encodedSize; // Write the value and assert the length written... - Assert.AreEqual(encodedSize, LwesSerializer.Write(data, ref cursor, values[i], Constants.DefaultEncoding.GetEncoder())); + Assert.AreEqual(encodedSize, LwesSerializer.Write(data, ref cursor, values[i], Constants.ISO8859_1Encoding.GetEncoder())); // Verify that the cursor moved appropriately... Assert.AreEqual(accumulatedSize, cursor); encodedSizes.Enqueue(accumulatedSize); @@ -249,20 +324,20 @@ for (int i = 0; i < values.Length; i++) { // Read the and assert the value... - Assert.AreEqual(values[i], LwesSerializer.ReadString(data, ref cursor, Constants.DefaultEncoding.GetDecoder())); + Assert.AreEqual(values[i], LwesSerializer.ReadString(data, ref cursor, Constants.ISO8859_1Encoding.GetDecoder())); // Verify that the cursor moved appropriately... Assert.AreEqual(encodedSizes.Dequeue(), cursor); } } [TestMethod] - public void RoundTripSerialize_String_ISO_8859_1() + public void RoundTripSerialize_String_UTF_8() { var values = new string[] - { EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.ISO_8859_1) - , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.ISO_8859_1) - , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.ISO_8859_1) - , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.ISO_8859_1) + { EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.UTF_8) + , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.UTF_8) + , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.UTF_8) + , EventUtils.GenerateRandomString(_rand.Next(1, 400), SupportedEncoding.UTF_8) }; byte[] data = new byte[values.Length * 400 * sizeof(UInt64)]; @@ -272,10 +347,10 @@ // Serialize all of the values... for (int i = 0; i < values.Length; i++) { - encodedSize = Constants.ISO8859_1Encoding.GetByteCount(values[i]) + sizeof(UInt16); + encodedSize = Constants.DefaultEncoding.GetByteCount(values[i]) + sizeof(UInt16); accumulatedSize += encodedSize; // Write the value and assert the length written... - Assert.AreEqual(encodedSize, LwesSerializer.Write(data, ref cursor, values[i], Constants.ISO8859_1Encoding.GetEncoder())); + Assert.AreEqual(encodedSize, LwesSerializer.Write(data, ref cursor, values[i], Constants.DefaultEncoding.GetEncoder())); // Verify that the cursor moved appropriately... Assert.AreEqual(accumulatedSize, cursor); encodedSizes.Enqueue(accumulatedSize); @@ -287,7 +362,7 @@ for (int i = 0; i < values.Length; i++) { // Read the and assert the value... - Assert.AreEqual(values[i], LwesSerializer.ReadString(data, ref cursor, Constants.ISO8859_1Encoding.GetDecoder())); + Assert.AreEqual(values[i], LwesSerializer.ReadString(data, ref cursor, Constants.DefaultEncoding.GetDecoder())); // Verify that the cursor moved appropriately... Assert.AreEqual(encodedSizes.Dequeue(), cursor); } @@ -420,80 +495,6 @@ } } - [TestMethod] - public void RoundTripSerialize_Event() - { - var e = new - { - EventName = "UserLogin", - Attributes = new - { - UserName = new { Name = "username", Token = TypeToken.STRING, Value = "bob" }, - Password = new { Name = "password", Token = TypeToken.UINT64, Value = 0xfeedabbadeadbeefUL }, - ClientIP = new { Name = "clientIP", Token = TypeToken.IP_ADDR, Value = IPAddress.Parse("127.0.0.1") }, - Successful = new { Name = "successful", Token = TypeToken.BOOLEAN, Value = false } - } - }; - - // Create the event... - Event ev = new Event(e.EventName) - .SetValue(e.Attributes.UserName.Name, e.Attributes.UserName.Value) - .SetValue(e.Attributes.Password.Name, e.Attributes.Password.Value) - .SetValue(e.Attributes.ClientIP.Name, e.Attributes.ClientIP.Value) - .SetValue(e.Attributes.Successful.Name, e.Attributes.Successful.Value); - - // Ensure the token types match... - Assert.AreEqual(e.Attributes.UserName.Token, ev[e.Attributes.UserName.Name].TypeToken); - Assert.AreEqual(e.Attributes.Password.Token, ev[e.Attributes.Password.Name].TypeToken); - Assert.AreEqual(e.Attributes.ClientIP.Token, ev[e.Attributes.ClientIP.Name].TypeToken); - Assert.AreEqual(e.Attributes.Successful.Token, ev[e.Attributes.Successful.Name].TypeToken); - - // Ensure the values match... - Assert.AreEqual(e.Attributes.UserName.Value, ev[e.Attributes.UserName.Name].GetValue()); - Assert.AreEqual(e.Attributes.Password.Value, ev[e.Attributes.Password.Name].GetValue()); - Assert.AreEqual(e.Attributes.ClientIP.Value, ev[e.Attributes.ClientIP.Name].GetValue()); - Assert.AreEqual(e.Attributes.Successful.Value, ev[e.Attributes.Successful.Name].GetValue()); - - // Serialization requires an IEventTemplateDB, we're gonna mock it here... - var mock = new Mock(); - Event dummy; - mock.Setup(db => db.TryCreateEvent(It.IsAny(), out dummy, It.IsAny(), It.IsAny())) - .Returns(false); - - // Perform a roundtrip serialization... - byte[] data = LwesSerializer.Serialize(ev); - Event ev2 = LwesSerializer.Deserialize(data, 0, data.Length, mock.Object); - - // Ensure the token types match on the deserialized object... - Assert.AreEqual(e.Attributes.UserName.Token, ev2[e.Attributes.UserName.Name].TypeToken); - Assert.AreEqual(e.Attributes.Password.Token, ev2[e.Attributes.Password.Name].TypeToken); - Assert.AreEqual(e.Attributes.ClientIP.Token, ev2[e.Attributes.ClientIP.Name].TypeToken); - Assert.AreEqual(e.Attributes.Successful.Token, ev2[e.Attributes.Successful.Name].TypeToken); - - // Ensure the values match on the deserialized object... - Assert.AreEqual(e.Attributes.UserName.Value, ev2[e.Attributes.UserName.Name].GetValue()); - Assert.AreEqual(e.Attributes.Password.Value, ev2[e.Attributes.Password.Name].GetValue()); - Assert.AreEqual(e.Attributes.ClientIP.Value, ev2[e.Attributes.ClientIP.Name].GetValue()); - Assert.AreEqual(e.Attributes.Successful.Value, ev2[e.Attributes.Successful.Name].GetValue()); - - } - - [TestMethod] - [ExpectedException(typeof(BadLwesDataException))] - public void RoundTripSerialize_DeserializeGarbageData() - { - byte[] bytes = new byte[400]; - Random _rand = new Random(Environment.TickCount); - _rand.NextBytes(bytes); - - var mock = new Mock(); - Event dummy; - mock.Setup(db => db.TryCreateEvent(It.IsAny(), out dummy, It.IsAny(), It.IsAny())) - .Returns(false); - - LwesSerializer.Deserialize(bytes, 0, bytes.Length, mock.Object); - } - #endregion Methods } } \ No newline at end of file diff --git a/Org.Lwes.Tests/StatusTests.cs b/Org.Lwes.Tests/StatusTests.cs index e25afeb..afb12a9 100644 --- a/Org.Lwes.Tests/StatusTests.cs +++ b/Org.Lwes.Tests/StatusTests.cs @@ -76,12 +76,12 @@ Interlocked.Increment(ref onThreadsStarted); while (state.IsLessThan(TestStates.ShutdownSignaled)) { - state.TrySetState(TestStates.On, TestStates.Undecided, () => + state.TryTransition(TestStates.On, TestStates.Undecided, () => { Interlocked.Increment(ref transitionsOn); }); } - state.TrySetState(TestStates.OnStateDone, TestStates.ShutdownSignaled, () => + state.TryTransition(TestStates.OnStateDone, TestStates.ShutdownSignaled, () => { Interlocked.Increment(ref transitionsOnStateDone); }); @@ -95,12 +95,12 @@ while(state.IsLessThan(TestStates.OnStateDone)) { - state.TrySetState(TestStates.Off, TestStates.On, () => + state.TryTransition(TestStates.Off, TestStates.On, () => { Interlocked.Increment(ref transitionsOff); }); } - state.TrySetState(TestStates.OffStateDone, TestStates.OnStateDone, () => + state.TryTransition(TestStates.OffStateDone, TestStates.OnStateDone, () => { Interlocked.Increment(ref transitionsOffStateDone); }); @@ -114,13 +114,13 @@ while (state.IsLessThan(TestStates.OffStateDone)) { - state.TrySetState(TestStates.Undecided, TestStates.Off, () => + state.TryTransition(TestStates.Undecided, TestStates.Off, () => { Interlocked.Increment(ref transitionsUndecided); }); } - state.TrySetState(TestStates.Done, TestStates.OffStateDone, () => + state.TryTransition(TestStates.Done, TestStates.OffStateDone, () => { Interlocked.Increment(ref transitionsDone); }); diff --git a/Org.Lwes/Config/ListenerConfigurationSection.cs b/Org.Lwes/Config/ListenerConfigurationSection.cs index 5a0400e..84fd1c8 100644 --- a/Org.Lwes/Config/ListenerConfigurationSection.cs +++ b/Org.Lwes/Config/ListenerConfigurationSection.cs @@ -6,6 +6,7 @@ using System.Linq; using System.Net; using System.Text; + using Org.Lwes.Listener; /// /// Configuration section for a listener. @@ -39,6 +40,11 @@ /// public const string PropertyName_useMulticast = "multicast"; + /// + /// Property name for garbage handling. + /// + public const string PropertyName_garbageHandling = "garbageHandling"; + #endregion Fields #region Properties @@ -105,6 +111,24 @@ } /// + /// Indicates whether the listener listens on a multicast port. + /// + [ConfigurationProperty(PropertyName_garbageHandling + , IsRequired = false + , DefaultValue = ListenerGarbageHandling.FailSilently)] + public ListenerGarbageHandling GarbageHandling + { + get + { + return (ListenerGarbageHandling)this[PropertyName_garbageHandling]; + } + set + { + this[PropertyName_garbageHandling] = value; + } + } + + /// /// Indicates whether the listener uses a parallel strategy. /// [ConfigurationProperty(PropertyName_parallel diff --git a/Org.Lwes/DB/IEventTemplateDB.cs b/Org.Lwes/DB/IEventTemplateDB.cs index e3d3982..1b99a87 100644 --- a/Org.Lwes/DB/IEventTemplateDB.cs +++ b/Org.Lwes/DB/IEventTemplateDB.cs @@ -12,8 +12,6 @@ /// public interface IEventTemplateDB { - #region Properties - /// /// An enumerable containing the names of defined events. /// @@ -30,10 +28,6 @@ get; } - #endregion Properties - - #region Methods - /// /// Gets the named event template. /// @@ -84,7 +78,5 @@ /// reference to a variable that will contain the template upon success /// true if the named event is retreived; otherwise false bool TryGetEventTemplate(string evName, out EventTemplate template); - - #endregion Methods } } \ No newline at end of file diff --git a/Org.Lwes/Emitter/EventEmitterBase.cs b/Org.Lwes/Emitter/EventEmitterBase.cs index dab229c..4fc9dc4 100644 --- a/Org.Lwes/Emitter/EventEmitterBase.cs +++ b/Org.Lwes/Emitter/EventEmitterBase.cs @@ -67,15 +67,11 @@ interface IEmitter : IDisposable { - #region Methods - void Emit(Event ev); void Start(IEventTemplateDB db , IPEndPoint sendToEP , Action finishSocket); - - #endregion Methods } #endregion Nested Interfaces @@ -317,7 +313,7 @@ private void Dispose(bool p) { // Signal background threads... - _senderState.TrySetState(EmitterState.StopSignaled, EmitterState.Active, () => + _senderState.TryTransition(EmitterState.StopSignaled, EmitterState.Active, () => { Util.Dispose(ref _emitEP); BufferManager.ReleaseBuffer(_buffer); @@ -424,7 +420,7 @@ private void Dispose(bool p) { // Signal background threads... - _emitterState.TrySetState(EmitterState.StopSignaled, EmitterState.Active, () => + _emitterState.TryTransition(EmitterState.StopSignaled, EmitterState.Active, () => { while (Thread.VolatileRead(ref _serializers) > 0) { diff --git a/Org.Lwes/Emitter/IEventEmitter.cs b/Org.Lwes/Emitter/IEventEmitter.cs index 5e91e94..5948564 100644 --- a/Org.Lwes/Emitter/IEventEmitter.cs +++ b/Org.Lwes/Emitter/IEventEmitter.cs @@ -16,8 +16,6 @@ /// public interface IEventEmitter : IDisposable { - #region Properties - /// /// The character encoding used when performing event IO. /// @@ -51,10 +49,6 @@ get; } - #endregion Properties - - #region Methods - /// /// Creates an event type identified by the event name. /// @@ -92,7 +86,5 @@ /// /// the event being emitted void Emit(Event evt); - - #endregion Methods } } \ No newline at end of file diff --git a/Org.Lwes/EventAttribute.cs b/Org.Lwes/EventAttribute.cs index 45e80e4..d85f1a4 100644 --- a/Org.Lwes/EventAttribute.cs +++ b/Org.Lwes/EventAttribute.cs @@ -11,8 +11,6 @@ /// public interface IEventAttribute { - #region Properties - /// /// Indicates the attribute's value equals the default value for the attribute. /// @@ -37,10 +35,6 @@ get; } - #endregion Properties - - #region Methods - /// /// Used by the LWES event system during serialization. Encodes /// the event attribute into the byte buffer. @@ -204,20 +198,14 @@ /// reference to a variable that will hold the value upon success /// true if the value is retrieved; otherwise false bool TryGetValue(out bool value); - - #endregion Methods } internal interface IEventAttribute : IEventAttribute { - #region Properties - T Value { get; } - - #endregion Properties } /// diff --git a/Org.Lwes/EventUtils.cs b/Org.Lwes/EventUtils.cs index b93fa32..af1d64e 100644 --- a/Org.Lwes/EventUtils.cs +++ b/Org.Lwes/EventUtils.cs @@ -2,9 +2,9 @@ { using System; using System.Net; + using System.Text; using Org.Lwes.ESF; - using System.Text; /// /// Utility class. diff --git a/Org.Lwes/Exceptions.cs b/Org.Lwes/Exceptions.cs index b3f0960..9a23374 100644 --- a/Org.Lwes/Exceptions.cs +++ b/Org.Lwes/Exceptions.cs @@ -29,10 +29,10 @@ } /// - /// Base class for exceptions thrown by the event system. + /// Exception thrown when bad data is encountered during serialization. /// [Serializable] - public class EventSystemException : Exception + public class BadLwesDataException : EventSystemException { #region Constructors @@ -40,27 +40,17 @@ /// Creates a new instance. /// /// error message - public EventSystemException(string errorMessage) + public BadLwesDataException(string errorMessage) : base(errorMessage) { } /// - /// Creates a new instance. - /// - /// error message - /// exception that caused this exception - public EventSystemException(string errorMessage, Exception innerException) - : base(errorMessage, innerException) - { - } - - /// /// Creates a new instance from serialization. /// /// /// - public EventSystemException(SerializationInfo info, StreamingContext context) + public BadLwesDataException(SerializationInfo info, StreamingContext context) : base(info, context) { } @@ -69,10 +59,10 @@ } /// - /// Exception thrown when an attriubte does not exist in an Event. + /// Base class for exceptions thrown by the event system. /// [Serializable] - public class NoSuchAttributeException : EventSystemException + public class EventSystemException : Exception { #region Constructors @@ -80,17 +70,27 @@ /// Creates a new instance. /// /// error message - public NoSuchAttributeException(string errorMessage) + public EventSystemException(string errorMessage) : base(errorMessage) { } /// + /// Creates a new instance. + /// + /// error message + /// exception that caused this exception + public EventSystemException(string errorMessage, Exception innerException) + : base(errorMessage, innerException) + { + } + + /// /// Creates a new instance from serialization. /// /// /// - public NoSuchAttributeException(SerializationInfo info, StreamingContext context) + public EventSystemException(SerializationInfo info, StreamingContext context) : base(info, context) { } @@ -99,10 +99,10 @@ } /// - /// Exception thrown when an attribute type does not exist. + /// Exception thrown when an attriubte does not exist in an Event. /// [Serializable] - public class NoSuchAttributeTypeException : EventSystemException + public class NoSuchAttributeException : EventSystemException { #region Constructors @@ -110,7 +110,7 @@ /// Creates a new instance. /// /// error message - public NoSuchAttributeTypeException(string errorMessage) + public NoSuchAttributeException(string errorMessage) : base(errorMessage) { } @@ -120,7 +120,7 @@ /// /// /// - public NoSuchAttributeTypeException(SerializationInfo info, StreamingContext context) + public NoSuchAttributeException(SerializationInfo info, StreamingContext context) : base(info, context) { } @@ -129,10 +129,10 @@ } /// - /// Exception thrown when bad data is encountered during serialization. + /// Exception thrown when an attribute type does not exist. /// [Serializable] - public class BadLwesDataException : EventSystemException + public class NoSuchAttributeTypeException : EventSystemException { #region Constructors @@ -140,7 +140,7 @@ /// Creates a new instance. /// /// error message - public BadLwesDataException(string errorMessage) + public NoSuchAttributeTypeException(string errorMessage) : base(errorMessage) { } @@ -150,12 +150,11 @@ /// /// /// - public BadLwesDataException(SerializationInfo info, StreamingContext context) + public NoSuchAttributeTypeException(SerializationInfo info, StreamingContext context) : base(info, context) { } #endregion Constructors } - } \ No newline at end of file diff --git a/Org.Lwes/Factory/IEventFactory.cs b/Org.Lwes/Factory/IEventFactory.cs index fb4f858..71ff846 100644 --- a/Org.Lwes/Factory/IEventFactory.cs +++ b/Org.Lwes/Factory/IEventFactory.cs @@ -9,8 +9,6 @@ /// public interface IEventFactory { - #region Properties - /// /// The character encoding used when performing event IO. /// @@ -44,10 +42,6 @@ get; } - #endregion Properties - - #region Methods - /// /// Creates an event type identified by the event name. /// @@ -87,7 +81,5 @@ /// Indicates whether validation should occur by default /// event template database void Initialize(SupportedEncoding enc, bool performValidation, IEventTemplateDB db); - - #endregion Methods } } \ No newline at end of file diff --git a/Org.Lwes/Journaler/IJournaler.cs b/Org.Lwes/Journaler/IJournaler.cs index 13e5e19..3f884ea 100644 --- a/Org.Lwes/Journaler/IJournaler.cs +++ b/Org.Lwes/Journaler/IJournaler.cs @@ -42,8 +42,6 @@ /// public interface IJournaler : IDisposable { - #region Properties - /// /// Gets the journaler's state. /// @@ -52,10 +50,6 @@ get; } - #endregion Properties - - #region Methods - /// /// Initializes the journaler. /// @@ -70,7 +64,5 @@ /// Stops the journaler. /// void Stop(); - - #endregion Methods } } \ No newline at end of file diff --git a/Org.Lwes/Listener/Enums.cs b/Org.Lwes/Listener/Enums.cs new file mode 100644 index 0000000..dca715c --- /dev/null +++ b/Org.Lwes/Listener/Enums.cs @@ -0,0 +1,89 @@ +namespace Org.Lwes.Listener +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Text; + + #region Enumerations + + /// + /// Represents the state of an event sink. + /// + public enum EventSinkStatus + { + /// + /// Indicates the event sink has been suspended and should not receive + /// events until activated. This is the initial state for new IEventSinkRegistrationKeys. + /// + Suspended = 0, + /// + /// Indicates the event sink is active and may be notified of events + /// at any time. + /// + Active = 1, + /// + /// For event sinks that are not thread-safe, indicates that the sink + /// is currently being invoked. + /// + Notifying = 2, + /// + /// Indicates the event sink has been canceled. + /// + Canceled = 3 + } + + /// + /// Strategies to be taken when garbage data arrives from an endpoint. + /// + public enum GarbageHandlingVote + { + /// + /// No opinion on garbage handling. Continue as normal. + /// + None, + /// + /// Causes all traffic from an endpoint to be handled by the event + /// sink's HandleGarbageData. + /// + TreatTrafficFromEndpointAsGarbage, + /// + /// Causes any data received from the endpoint to be discarded as soon + /// as possible. + /// + IgnoreAllTrafficFromEndpoint, + /// + /// The default strategy. Same as None + /// + Default = None + } + + /// + /// Enumeration for garbage handling strategy employed by Listeners. + /// + public enum ListenerGarbageHandling + { + /// + /// Indicates the listener should silently handle garbage data + /// but continue to accept data from all endpoints. + /// + FailSilently = 0, + /// + /// Indicates the listener should ignore data from endpoints + /// from which garbage has been received. + /// + IgnoreEndpointsThatSendGarbage = 1, + /// + /// Indicates the listener should ask the event sinks to vote + /// on the strategy on a per-endpoint basis. + /// + /// + AskEventSinksToVoteOnStrategy = 2, + /// + /// The default value: FailSilently + /// + Default = FailSilently + } + + #endregion Enumerations +} \ No newline at end of file diff --git a/Org.Lwes/Listener/EventListener.cs b/Org.Lwes/Listener/EventListener.cs index d967150..e813fe8 100644 --- a/Org.Lwes/Listener/EventListener.cs +++ b/Org.Lwes/Listener/EventListener.cs @@ -67,7 +67,8 @@ mee.Initialize(EventTemplateDB.CreateDefault(), IPAddress.Parse(namedListenerConfig.AddressString), namedListenerConfig.Port, - namedListenerConfig.UseParallelEmit); + namedListenerConfig.UseParallelEmit, + namedListenerConfig.GarbageHandling); return mee; } else @@ -105,7 +106,8 @@ emitter.Initialize(EventTemplateDB.CreateDefault() , Constants.DefaultMulticastAddress , Constants.CDefaultMulticastPort - , true); + , true + , ListenerGarbageHandling.Default); return emitter; } diff --git a/Org.Lwes/Listener/EventListenerBase.cs b/Org.Lwes/Listener/EventListenerBase.cs index 91a02e2..6e31902 100644 --- a/Org.Lwes/Listener/EventListenerBase.cs +++ b/Org.Lwes/Listener/EventListenerBase.cs @@ -1,6 +1,7 @@ namespace Org.Lwes.Listener { using System; + using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; @@ -8,7 +9,6 @@ using Org.Lwes.DB; using Org.Lwes.Properties; - using System.Collections.Generic; /// /// Base class for event listeners. @@ -18,10 +18,19 @@ #region Fields const int CDisposeBackgroundThreadWaitTimeMS = 200; + const int Leader = 1; + List _additions = new List(); + int _consolidationVotes = 0; IEventTemplateDB _db; IPEndPoint _endpoint; + ListenerGarbageHandling _garbageHandling; + Dictionary _garbageTracking; + Object _garbageTrackingLock; IListener _listener; + ReaderWriterLockSlim _notifications = new ReaderWriterLockSlim(); + int _notifiers = 0; + RegistrationKey[] _registrations = new RegistrationKey[0]; #endregion Fields @@ -35,6 +44,14 @@ Dispose(false); } + /// + /// Creates a new instance. + /// + protected EventListenerBase() + { + _cacheHandleErrorsDelegate = new Action(HandleErrorsOnEventSink); + } + #endregion Constructors #region Enumerations @@ -56,14 +73,10 @@ interface IListener : IDisposable { - #region Methods - void Start(IEventTemplateDB db , IPEndPoint listenEP , Action finishSocket , EventListenerBase listener); - - #endregion Methods } #endregion Nested Interfaces @@ -92,6 +105,100 @@ } /// + /// Registers an event sink with the listener without activating the + /// event sink. + /// + /// the event sink to register + /// A registration key for the event sink + public IEventSinkRegistrationKey RegisterEventSink(IEventSink sink) + { + if (sink == null) throw new ArgumentNullException("sink"); + RegistrationKey key = new RegistrationKey(this, sink); + AddRegistration(key); + return key; + } + + private void HandleErrorsOnEventSink(RegistrationKey key, Exception e) + { + // TODO: Strategies for event sinks that cause exceptions. + } + + Action _cacheHandleErrorsDelegate; + internal void PerformEventArrival(Event ev) + { + int n = Interlocked.Increment(ref _notifiers); + try + { + if (n == Leader) _notifications.EnterUpgradeableReadLock(); + else _notifications.EnterReadLock(); + try + { + foreach (var r in _registrations) + { + if (r.PerformEventArrival(ev, _cacheHandleErrorsDelegate)) + { + Interlocked.Increment(ref _consolidationVotes); + } + } + if (n == Leader && Thread.VolatileRead(ref _consolidationVotes) > 0) + { + SafeConsolidateRegistrations(); + } + } + finally + { + if (n == Leader) _notifications.ExitUpgradeableReadLock(); + else _notifications.ExitReadLock(); + } + } + finally + { + Interlocked.Decrement(ref _notifiers); + } + } + + internal GarbageHandlingVote PerformGarbageArrival(EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage) + { + GarbageHandlingVote strategy = GarbageHandlingVote.None; + int n = Interlocked.Increment(ref _notifiers); + try + { + if (n == Leader) _notifications.EnterUpgradeableReadLock(); + else _notifications.EnterReadLock(); + try + { + foreach (var r in _registrations) + { + GarbageHandlingVote strategyVote = r.PerformGarbageArrival( + remoteEndPoint, + priorGarbageCountForEndpoint, + garbage, + _cacheHandleErrorsDelegate + ); + if (strategyVote > strategy) + { + strategy = strategyVote; + } + } + if (n == Leader && Thread.VolatileRead(ref _consolidationVotes) > 0) + { + SafeConsolidateRegistrations(); + } + } + finally + { + if (n == Leader) _notifications.ExitUpgradeableReadLock(); + else _notifications.ExitReadLock(); + } + } + finally + { + Interlocked.Decrement(ref _notifiers); + } + return strategy; + } + + /// /// Ensures the emitter has been initialized. /// /// thrown if the emitter has not yet been initialized. @@ -115,9 +222,14 @@ /// template database used when creating events /// an IP endpoint where listening will occur /// whether the listener will listen and dispatch events in parallel + /// indicates the garbage handling strategy the listener will use /// callback method used to complete the setup of the socket /// connected to the given - protected void Initialize(IEventTemplateDB db, IPEndPoint endpoint, bool parallel, Action finishSocket) + protected void Initialize(IEventTemplateDB db + , IPEndPoint endpoint + , bool parallel + , ListenerGarbageHandling garbageHandling + , Action finishSocket) { if (db == null) throw new ArgumentNullException("db"); if (endpoint == null) throw new ArgumentNullException("endpoint"); @@ -129,14 +241,164 @@ ? (IListener)new ParallelListener() : (IListener)new BackgroundThreadListener(); + _garbageHandling = garbageHandling; + if (_garbageHandling > ListenerGarbageHandling.FailSilently) + { + _garbageTracking = new Dictionary(); + _garbageTrackingLock = new Object(); + } + listener.Start(db, endpoint, finishSocket, this); _listener = listener; } + private void AddRegistration(RegistrationKey key) + { + int n = Interlocked.Increment(ref _notifiers); + + try + { + if (n == Leader) + { + if (_notifications.TryEnterWriteLock(20)) + { + try + { + lock (_additions) + { + _additions.Add(key); + UnsafeConsolidateRegistrations(); + } + return; + } + finally + { + _notifications.ExitWriteLock(); + } + } + } + + // We couldn't get the writelock so we're gonna have to schedule + // the key to be added later... + lock (_additions) + { + _additions.Add(key); + Interlocked.Increment(ref _consolidationVotes); + } + } + finally + { + Interlocked.Decrement(ref _notifiers); + } + } + + private GarbageHandlingVote GetTrafficStrategyForEndpoint(EndPoint ep) + { + if (_garbageHandling == ListenerGarbageHandling.FailSilently) + { + return GarbageHandlingVote.None; + } + else + { + IPEndPoint ipep = (IPEndPoint)ep; + TrafficTrackingKey key = new TrafficTrackingKey(ep); + TrafficTrackingRec tracking; + lock (_garbageTrackingLock) + { + if (_garbageTracking.TryGetValue(key, out tracking)) + { + return tracking.Strategy; + } + } + return GarbageHandlingVote.Default; + } + } + + private void HandleGarbageData(EndPoint ep, byte[] buffer, int offset, int bytesTransferred) + { + if (_garbageHandling > ListenerGarbageHandling.FailSilently) + { + IPEndPoint ipep = (IPEndPoint)ep; + TrafficTrackingKey key = new TrafficTrackingKey(ep); + TrafficTrackingRec tracking; + lock (_garbageTrackingLock) + { + if (!_garbageTracking.TryGetValue(key, out tracking)) + { + tracking = new TrafficTrackingRec(ep); + _garbageTracking.Add(key, tracking); + } + } + if (_garbageHandling == ListenerGarbageHandling.AskEventSinksToVoteOnStrategy + && tracking.Strategy != GarbageHandlingVote.IgnoreAllTrafficFromEndpoint) + { + PerformGarbageDataNotification(tracking, ep, buffer, offset, bytesTransferred); + } + } + } + + private void PerformGarbageDataNotification(TrafficTrackingRec tracking, EndPoint rcep, byte[] buffer, int offset, int bytesTransferred) + { + byte[] copy = new byte[bytesTransferred]; + Array.Copy(buffer, copy, bytesTransferred); + tracking.Strategy = PerformGarbageArrival(rcep, tracking.IncrementGarbageCount(), copy); + } + + private void SafeConsolidateRegistrations() + { + _notifications.EnterWriteLock(); + try + { + lock (_additions) + { + UnsafeConsolidateRegistrations(); + } + } + finally + { + _notifications.ExitWriteLock(); + } + } + + private void UnsafeConsolidateRegistrations() + { + _registrations = (from r in _registrations + where r.Status != EventSinkStatus.Canceled + select r).Concat(from r in _additions + where r.Status != EventSinkStatus.Canceled + select r).ToArray(); + _additions.Clear(); + + Thread.VolatileWrite(ref _consolidationVotes, 0); + } + #endregion Methods #region Nested Types + struct TrafficTrackingKey + { + #region Fields + + public uint Address; + public int AddressFamily; + public int Port; + + #endregion Fields + + #region Constructors + + public TrafficTrackingKey(EndPoint ep) + { + IPEndPoint ipep = (IPEndPoint)ep; + Address = BitConverter.ToUInt32(ipep.Address.GetAddressBytes(), 0); + Port = ipep.Port; + AddressFamily = (int)ipep.AddressFamily; + } + + #endregion Constructors + } + /// /// Uses background threads to receive events from LWES. This class uses two /// threads, one to listen and deserialize the events and another to perform @@ -148,10 +410,10 @@ EndPoint _anyEP; byte[] _buffer; - EventListenerBase _listener; IEventTemplateDB _db; SimpleLockFreeQueue _eventQueue = new SimpleLockFreeQueue(); UdpEndpoint _listenEP; + EventListenerBase _listener; Thread _notifier; Status _notifierState; Object _notifierWaitObject; @@ -214,7 +476,7 @@ internal void Stop() { - if (_recieverState.TrySetState(ListenerState.StopSignaled, ListenerState.Active)) + if (_recieverState.TryTransition(ListenerState.StopSignaled, ListenerState.Active)) { // Close the listener, this will cause the receiver thread to wakeup // if it is blocked waiting for IO on the socket. @@ -240,6 +502,7 @@ { _notifierState.SetState(ListenerState.Suspended); Monitor.Wait(_notifierWaitObject); + continue; } // If the stop signal arrived during a wait then bail out... if (_notifierState.CurrentState == ListenerState.StopSignaled) @@ -257,7 +520,7 @@ private void Background_Receiver(object unused_state) { - if (_recieverState.TrySetState(ListenerState.Active, ListenerState.Unknown)) + if (_recieverState.TryTransition(ListenerState.Active, ListenerState.Unknown)) { try { @@ -266,11 +529,20 @@ { EndPoint rcep = _anyEP; // Perform a blocking receive... - int bytes = _listenEP.ReceiveFrom(ref rcep, _buffer, 0, _buffer.Length); - if (bytes > 0) + int bytesTransferred = _listenEP.ReceiveFrom(ref rcep, _buffer, 0, _buffer.Length); + if (bytesTransferred > 0) { - // Data was received, deserialize the event... - PerformEventDeserializationAndQueueForNotification(rcep, _buffer, 0, bytes); + GarbageHandlingVote handling = _listener.GetTrafficStrategyForEndpoint(rcep); + if (handling == GarbageHandlingVote.None) + { + PerformEventDeserializationAndQueueForNotification(rcep, _buffer, 0, bytesTransferred); + } + else if (handling == GarbageHandlingVote.TreatTrafficFromEndpointAsGarbage) + { + _listener.HandleGarbageData(rcep, _buffer, 0, bytesTransferred); + } + // Otherwise the handling was GarbageHandlingStrategy.FailfastForTrafficOnEndpoint + // and we're going to ignore it altogether. } } } @@ -279,7 +551,7 @@ if (se.ErrorCode != 10004) throw se; } - if (_recieverState.TrySetState(ListenerState.Stopping, ListenerState.StopSignaled)) + if (_recieverState.TryTransition(ListenerState.Stopping, ListenerState.StopSignaled)) { // Cascade the stop signal to the notifier and wait for it to exit... _notifierState.SetState(ListenerState.StopSignaled); @@ -291,7 +563,7 @@ private void Dispose(bool disposing) { // Signal background threads... - _recieverState.TrySetState(ListenerState.StopSignaled, ListenerState.Active, () => + _recieverState.TryTransition(ListenerState.StopSignaled, ListenerState.Active, () => { Util.Dispose(ref _listenEP); _reciever.Join(CDisposeBackgroundThreadWaitTimeMS); @@ -302,13 +574,13 @@ private void PerformEventDeserializationAndQueueForNotification(EndPoint rcep , byte[] buffer - , int p, int bytes) + , int offset, int bytesTransferred) { IPEndPoint ep = (IPEndPoint)rcep; try { // For received events, set MetaEventInfo.ReciptTime, MetaEventInfo.SenderIP, and MetaEventInfo.SenderPort... - Event ev = Event.BinaryDecode(_db, buffer, 0, bytes) + Event ev = Event.BinaryDecode(_db, buffer, offset, bytesTransferred) .SetValue(Constants.MetaEventInfoAttributes.ReceiptTime.Name, Constants.DateTimeToLwesTimeTicks(DateTime.UtcNow)) .SetValue(Constants.MetaEventInfoAttributes.SenderIP.Name, ep.Address) .SetValue(Constants.MetaEventInfoAttributes.SenderPort.Name, ep.Port); @@ -316,15 +588,12 @@ } catch (BadLwesDataException) { - byte[] copy = new byte[bytes]; - Array.Copy(buffer, copy, bytes); - GarbageHandlingStrategy strategy = _listener.PerformGarbageArrival(rcep, 0, copy); - // TODO: Implement the garbage handling strategies + _listener.HandleGarbageData(rcep, buffer, offset, bytesTransferred); } if (_notifierState.CurrentState > ListenerState.Active) { - // notifier thread is either suspending or suspended; + // notifier thread is suspended; // wake it up... lock (_notifierWaitObject) { @@ -351,11 +620,11 @@ #region Fields EndPoint _anyEP; - EventListenerBase _listener; IEventTemplateDB _db; int _deserializers; SimpleLockFreeQueue _eventQueue = new SimpleLockFreeQueue(); UdpEndpoint _listenEP; + EventListenerBase _listener; Status _listenerState; int _notifiers; SimpleLockFreeQueue _receiveQueue; @@ -385,14 +654,14 @@ /// an event template DB /// the listening endpoint /// a callback method that is called upon to finish the listening socket - /// a callback method to receive events + /// the owner public void Start(IEventTemplateDB db , IPEndPoint listenEP , Action finishSocket - , EventListenerBase listener) + , EventListenerBase owner) { _db = db; - _listener = listener; + _listener = owner; _anyEP = (listenEP.AddressFamily == AddressFamily.InterNetworkV6) ? new IPEndPoint(IPAddress.IPv6Any, 0) : new IPEndPoint(IPAddress.Any, 0); @@ -405,7 +674,7 @@ internal void Stop() { - _listenerState.TrySetState(ListenerState.StopSignaled, ListenerState.Active, () => + _listenerState.TryTransition(ListenerState.StopSignaled, ListenerState.Active, () => { while (Thread.VolatileRead(ref _deserializers) > 0) { @@ -435,7 +704,17 @@ ReceiveCapture input; while (_listenerState.IsLessThan(ListenerState.StopSignaled) && _receiveQueue.Dequeue(out input)) { - PerformEventDeserializationAndQueueForNotification(input.RemoteEndPoint, input.Buffer, 0, input.BytesTransferred); + GarbageHandlingVote handling = _listener.GetTrafficStrategyForEndpoint(input.RemoteEndPoint); + if (handling == GarbageHandlingVote.None) + { + PerformEventDeserializationAndQueueForNotification(input.RemoteEndPoint, input.Buffer, 0, input.BytesTransferred); + } + else if (handling == GarbageHandlingVote.TreatTrafficFromEndpointAsGarbage) + { + _listener.HandleGarbageData(input.RemoteEndPoint, input.Buffer, 0, input.BytesTransferred); + } + // Otherwise the handling was GarbageHandlingStrategy.FailfastForTrafficOnEndpoint + // and we're going to ignore it altogether. } } finally @@ -484,10 +763,10 @@ if (op.SocketError == SocketError.Success) { // Reschedule the receiver before pulling the buffer out, we want to catch receives - // in the tightest loop possible, although we don't want to keep a threadpool thread - // *forever* and possibly cause thread-starvation in for other jobs so we continually - // put the job back in the queue - this way our parallelism plays nicely with other - // jobs - now, if only the other jobs were programmed to give up their threads periodically + // in the tightest loop possible, although we don't want to keep a threadpool thread + // *forever* and possibly cause thread-starvation in for other jobs so we continually + // put the job back in the queue - this way our parallelism plays nicely with other + // jobs - now, if only the other jobs were programmed to give up their threads periodically // too... hmmm! ThreadPool.QueueUserWorkItem(new WaitCallback(Background_ParallelReceiver)); if (op.BytesTransferred > 0) @@ -516,7 +795,7 @@ private void CascadeStopSignal() { - _listenerState.TrySetState(ListenerState.Stopping, ListenerState.StopSignaled, () => + _listenerState.TryTransition(ListenerState.Stopping, ListenerState.StopSignaled, () => { while (Thread.VolatileRead(ref _deserializers) > 0) { @@ -577,7 +856,7 @@ private void ParallelReceiver() { // Only startup once. - if (_listenerState.TrySetState(ListenerState.Active, ListenerState.Unknown)) + if (_listenerState.TryTransition(ListenerState.Active, ListenerState.Unknown)) { Background_ParallelReceiver(null); } @@ -585,13 +864,13 @@ private void PerformEventDeserializationAndQueueForNotification(EndPoint rcep , byte[] buffer - , int p, int bytes) + , int offset, int bytesTransferred) { IPEndPoint ep = (IPEndPoint)rcep; try { // For received events, set MetaEventInfo.ReciptTime, MetaEventInfo.SenderIP, and MetaEventInfo.SenderPort... - Event ev = Event.BinaryDecode(_db, buffer, 0, bytes) + Event ev = Event.BinaryDecode(_db, buffer, offset, bytesTransferred) .SetValue(Constants.MetaEventInfoAttributes.ReceiptTime.Name, Constants.DateTimeToLwesTimeTicks(DateTime.UtcNow)) .SetValue(Constants.MetaEventInfoAttributes.SenderIP.Name, ep.Address) .SetValue(Constants.MetaEventInfoAttributes.SenderPort.Name, ep.Port); @@ -599,10 +878,7 @@ } catch (BadLwesDataException) { - byte[] copy = new byte[bytes]; - Array.Copy(buffer, copy, bytes); - GarbageHandlingStrategy strategy = _listener.PerformGarbageArrival(rcep, 0, copy); - // TODO: Implement the garbage handling strategies + _listener.HandleGarbageData(rcep, buffer, offset, bytesTransferred); } BufferManager.ReleaseBuffer(buffer); @@ -638,21 +914,18 @@ #endregion Nested Types } - #endregion Nested Types - - #region Other - - //bool _parallel; - - #endregion Other - - #region IEventListener Members - class RegistrationKey : IEventSinkRegistrationKey { + #region Fields + Status _status = new Status(EventSinkStatus.Suspended); + bool _disableGarbageNotification; bool _threadSafe; + #endregion Fields + + #region Constructors + public RegistrationKey(EventListenerBase listener, IEventSink sink) { Listener = listener; @@ -660,255 +933,217 @@ _threadSafe = sink.IsThreadSafe; } - public IEventListener Listener { get; private set; } - public IEventSink Sink { get; private set; } + #endregion Constructors + + #region Properties + + public object Handback + { + get; + set; + } + + public IEventListener Listener + { + get; + private set; + } + + public IEventSink Sink + { + get; + private set; + } public EventSinkStatus Status { get { return _status.CurrentState; } } + #endregion Properties + + #region Methods + public bool Activate() { return _status.SetStateIfLessThan(EventSinkStatus.Active, EventSinkStatus.Canceled); } - public bool Suspend() - { - return _status.SetStateIfLessThan(EventSinkStatus.Suspended, EventSinkStatus.Canceled); - } - public void Cancel() { _status.SetState(EventSinkStatus.Canceled); } - public object Handback { get; set; } - + public bool Suspend() + { + return _status.SetStateIfLessThan(EventSinkStatus.Suspended, EventSinkStatus.Canceled); + } - internal bool PerformEventArrival(Event ev) + internal bool PerformEventArrival(Event ev, Action errorHandler) { if (!_threadSafe) { - if (_status.SpinToggleState(EventSinkStatus.Active, EventSinkStatus.Notifying)) + if (_status.SpinToggleState(EventSinkStatus.Notifying, EventSinkStatus.Active)) { try { Sink.HandleEventArrival(this, ev); + _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying); } catch (Exception e) { - // TODO: Trace the exception + errorHandler(this, e); } - _status.TrySetState(EventSinkStatus.Active, EventSinkStatus.Notifying); } } else { try { - if (_status.TrySetState(EventSinkStatus.Notifying, EventSinkStatus.Active)) + EventSinkStatus s = _status.CompareExchange(EventSinkStatus.Notifying, EventSinkStatus.Active); + if (s == EventSinkStatus.Active || s == EventSinkStatus.Notifying) { Sink.HandleEventArrival(this, ev); - _status.TrySetState(EventSinkStatus.Active, EventSinkStatus.Notifying); + _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying); } } catch (Exception e) { - // TODO: Trace the exception + errorHandler(this, e); } } - _status.TrySetState(EventSinkStatus.Active, EventSinkStatus.Notifying); return _status.CurrentState == EventSinkStatus.Canceled; } - internal GarbageHandlingStrategy PerformGarbageArrival(EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage) + internal GarbageHandlingVote PerformGarbageArrival(EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage, + Action errorHandler) { - GarbageHandlingStrategy strategy = GarbageHandlingStrategy.None; - if (!_threadSafe) - { - if (_status.SpinToggleState(EventSinkStatus.Active, EventSinkStatus.Notifying)) - { - try - { - strategy = Sink.HandleGarbageArrival(this, remoteEndPoint, priorGarbageCountForEndpoint, garbage); - } - catch (Exception e) - { - // TODO: Trace the exception - } - _status.TrySetState(EventSinkStatus.Active, EventSinkStatus.Notifying); - } - } - else + Thread.MemoryBarrier(); + bool ignoring = _disableGarbageNotification; + + GarbageHandlingVote strategy = GarbageHandlingVote.None; + if (!ignoring) { - try + if (!_threadSafe) { - if (_status.TrySetState(EventSinkStatus.Notifying, EventSinkStatus.Active)) + if (_status.SpinToggleState(EventSinkStatus.Active, EventSinkStatus.Notifying)) { - strategy = Sink.HandleGarbageArrival(this, remoteEndPoint, priorGarbageCountForEndpoint, garbage); - _status.TrySetState(EventSinkStatus.Active, EventSinkStatus.Notifying); + try + { + strategy = Sink.HandleGarbageData(this, remoteEndPoint, priorGarbageCountForEndpoint, garbage); + } + catch (Exception e) + { + errorHandler(this, e); + } + _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying); } } - catch (Exception e) - { - // TODO: Trace the exception - } - } - return strategy; - } - } - - public IEventSinkRegistrationKey RegisterEventSink(IEventSink sink) - { - if (sink == null) throw new ArgumentNullException("sink"); - RegistrationKey key = new RegistrationKey(this, sink); - AddRegistration(key); - return key; - } - - private void AddRegistration(RegistrationKey key) - { - int n = Interlocked.Increment(ref _notifiers); - - try - { - if (n == Leader) - { - if (_notifications.TryEnterWriteLock(20)) + else { try { - lock (_additions) + if (_status.TryTransition(EventSinkStatus.Notifying, EventSinkStatus.Active)) { - _additions.Add(key); - UnsafeConsolidateRegistrations(); + strategy = Sink.HandleGarbageData(this, remoteEndPoint, priorGarbageCountForEndpoint, garbage); + _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying); } - return; } - finally + catch (Exception e) { - _notifications.ExitWriteLock(); + errorHandler(this, e); } } } - - // We couldn't get the writelock so we're gonna have to schedule - // the key to be added later... - lock (_additions) - { - _additions.Add(key); - Interlocked.Increment(ref _consolidationVotes); - } + return strategy; } - finally + + public void DisableGarbageNotification() { - Interlocked.Decrement(ref _notifiers); + Thread.MemoryBarrier(); + _disableGarbageNotification = true; + Thread.MemoryBarrier(); } - } - const int Leader = 1; - int _notifiers = 0; - int _consolidationVotes = 0; - ReaderWriterLockSlim _notifications = new ReaderWriterLockSlim(); - List _additions = new List(); - RegistrationKey[] _registrations = new RegistrationKey[0]; + #endregion + } - internal void PerformEventArrival(Event ev) + class TrafficTrackingRec { - int n = Interlocked.Increment(ref _notifiers); - try + #region Fields + + int _garbageCount = 0; + + #endregion Fields + + #region Constructors + + public TrafficTrackingRec(EndPoint ep) { - if (n == Leader) _notifications.EnterUpgradeableReadLock(); - else _notifications.EnterReadLock(); - try - { - foreach (var r in _registrations) - { - if (r.PerformEventArrival(ev)) - { - Interlocked.Increment(ref _consolidationVotes); - } - } - if (n == Leader && Thread.VolatileRead(ref _consolidationVotes) > 0) - { - SafeConsolidateRegistrations(); - } - } - finally - { - if (n == Leader) _notifications.ExitUpgradeableReadLock(); - else _notifications.ExitReadLock(); - } + RemoteEndPoint = ep; } - finally + + #endregion Constructors + + #region Properties + + public bool IsEmpty { - Interlocked.Decrement(ref _notifiers); + get { return RemoteEndPoint == null; } } - } - internal GarbageHandlingStrategy PerformGarbageArrival(EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage) - { - GarbageHandlingStrategy strategy = GarbageHandlingStrategy.None; - int n = Interlocked.Increment(ref _notifiers); - try + public int PreviousGargageDataCount { - if (n == Leader) _notifications.EnterUpgradeableReadLock(); - else _notifications.EnterReadLock(); - try - { - foreach (var r in _registrations) - { - GarbageHandlingStrategy strategyVote = r.PerformGarbageArrival(remoteEndPoint, priorGarbageCountForEndpoint, garbage); - if (strategyVote > strategy) - { - strategy = strategyVote; - } - } - if (n == Leader && Thread.VolatileRead(ref _consolidationVotes) > 0) - { - SafeConsolidateRegistrations(); - } - } - finally - { - if (n == Leader) _notifications.ExitUpgradeableReadLock(); - else _notifications.ExitReadLock(); - } + get { return _garbageCount; } } - finally + + public EndPoint RemoteEndPoint { - Interlocked.Decrement(ref _notifiers); + get; + private set; } - return strategy; - } - private void SafeConsolidateRegistrations() - { - _notifications.EnterWriteLock(); - try + public GarbageHandlingVote Strategy { - lock (_additions) - { - UnsafeConsolidateRegistrations(); - } + get; + set; } - finally + + #endregion Properties + + #region Methods + + public int IncrementGarbageCount() { - _notifications.ExitWriteLock(); + return Interlocked.Increment(ref _garbageCount); } + + #endregion Methods } - private void UnsafeConsolidateRegistrations() - { - _registrations = (from r in _registrations - where r.Status != EventSinkStatus.Canceled - select r).Concat(from r in _additions - where r.Status != EventSinkStatus.Canceled - select r).ToArray(); - _additions.Clear(); + #endregion Nested Types - Thread.VolatileWrite(ref _consolidationVotes, 0); + #region Other + + //bool _parallel; + + #endregion Other + + #region IEventListener Members + + /// + /// Registers an event sink and activates it. + /// + /// the event sink to register + /// a handback object - this object is opaque to the listener + /// and will be attached to the registration key prior to activation + /// A registration key for the event sink. + public IEventSinkRegistrationKey RegisterAndActivateEventSink(IEventSink sink, object handback) + { + IEventSinkRegistrationKey key = RegisterEventSink(sink); + key.Handback = handback; + key.Activate(); + return key; } #endregion diff --git a/Org.Lwes/Listener/IEventListener.cs b/Org.Lwes/Listener/IEventListener.cs index fbabffe..a2d9f67 100644 --- a/Org.Lwes/Listener/IEventListener.cs +++ b/Org.Lwes/Listener/IEventListener.cs @@ -8,6 +8,25 @@ /// public interface IEventListener : IDisposable { + /// + /// Registers an event sink with the listener without activating the + /// event sink. + /// + /// the event sink to register + /// A registration key for the event sink + /// Event sinks will not begin to recieve event or garbage + /// notification until *after* the registration key's Active method + /// is called. IEventSinkRegistrationKey RegisterEventSink(IEventSink sink); + + /// + /// Registers an event sink and activates it. + /// + /// the event sink to register + /// a handback object - this object is opaque to the listener + /// and will be attached to the registration key prior to activation + /// A registration key for the event sink. + IEventSinkRegistrationKey RegisterAndActivateEventSink(IEventSink sink, object handback); + } } \ No newline at end of file diff --git a/Org.Lwes/Listener/IEventSink.cs b/Org.Lwes/Listener/IEventSink.cs dissimilarity index 89% index be21299..2d3c71e 100644 --- a/Org.Lwes/Listener/IEventSink.cs +++ b/Org.Lwes/Listener/IEventSink.cs @@ -1,78 +1,42 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Net; - -namespace Org.Lwes.Listener -{ - /// - /// Strategies to be taken when garbage data arrives from an endpoint. - /// - public enum GarbageHandlingStrategy - { - /// - /// No opinion on garbage handling. - /// - None, - /// - /// Continue handling traffic from the endpoint as if no garbage - /// has arrived. - /// - ContinueNormalHandlingForEndpoint, - /// - /// Treats all traffic from the endpoint as garbage without trying to - /// deserialize the data. - /// - TreatAllTrafficFromEndpointAsGarbage, - /// - /// Causes any data received from the endpoint to be discarded as soon - /// as possible. - /// - FailfastForTrafficOnEndpoint, - /// - /// The default strategy. Same as FailfastForTrafficOnEndpoint - /// - Default = None - } - /// - /// Represents the state of an event sink. - /// - public enum EventSinkStatus - { - /// - /// Indicates the event sink has been suspended and should not receive - /// events until activated. This is the initial state for new IEventSinkRegistrationKeys. - /// - Suspended = 0, - /// - /// Indicates the event sink is active and may be notified of events - /// at any time. - /// - Active = 1, - /// - /// For event sinks that are not thread-safe, indicates that the sink - /// is currently being invoked. - /// - Notifying = 2, - /// - /// Indicates the event sink has been canceled. - /// - Canceled = 3 - } - public interface IEventSinkRegistrationKey - { - IEventListener Listener { get; } - EventSinkStatus Status { get; } - bool Activate(); - bool Suspend(); - void Cancel(); - Object Handback { get; set; } - } - public interface IEventSink - { - bool IsThreadSafe { get; } - void HandleEventArrival(IEventSinkRegistrationKey key, Event ev); - GarbageHandlingStrategy HandleGarbageArrival(IEventSinkRegistrationKey key, EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage); - } -} +namespace Org.Lwes.Listener +{ + using System; + using System.Net; + + /// + /// Interface for event sinks that handle events and data received by the event listeners. + /// + public interface IEventSink + { + /// + /// Indicates whether the event sink is thread-safe. + /// + /// A sink is thread-safe if BOTH HandleEventArrival and HandleGarbageData + /// can be called multiple times concurrently without failure. + bool IsThreadSafe + { + get; + } + + /// + /// Callback method invoked by event listeners when LWES events arrive. + /// + /// Registration key for controlling the registration and status of + /// an event sink + /// a newly arrived LWES event + void HandleEventArrival(IEventSinkRegistrationKey key, Event ev); + + /// + /// Callback method invoked by event listeners when garbage data arrives on an endpoint. + /// + /// Registration key for controlling the registration and status of + /// an event sink + /// endpoint that sent the garbage data + /// number of times garbage data has arrived from the + /// endpoint + /// byte array containing the garbage + /// a vote as to how future traffic from the endpoint should be handled + GarbageHandlingVote HandleGarbageData(IEventSinkRegistrationKey key, EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage); + } + +} \ No newline at end of file diff --git a/Org.Lwes/Listener/IEventSinkRegistrationKey.cs b/Org.Lwes/Listener/IEventSinkRegistrationKey.cs new file mode 100644 index 0000000..dd52be0 --- /dev/null +++ b/Org.Lwes/Listener/IEventSinkRegistrationKey.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Org.Lwes.Listener +{ + /// + /// Registration key for event sinks. + /// + public interface IEventSinkRegistrationKey + { + /// + /// An opaque handback object. + /// + Object Handback + { + get; + set; + } + + /// + /// The listener with which the event sink is registered. + /// + IEventListener Listener + { + get; + } + + /// + /// The status of the event sink. + /// + EventSinkStatus Status + { + get; + } + + /// + /// Activates the event sink. + /// + /// true if the event sink has not already been canceled; otherwise false + bool Activate(); + + /// + /// Cancels a registration. An event sink whose registration is canceled will no longer + /// receive event or garbage notification. + /// + void Cancel(); + + /// + /// Suspends the event sink. An event sink whose registration is suspended will not + /// receive event or garbage notification until it is re-activated. + /// + /// true if the event sink has not already been canceled; otherwise false + bool Suspend(); + + /// + /// Disables garbage notification for a sink. + /// + void DisableGarbageNotification(); + } + +} diff --git a/Org.Lwes/Listener/MulticastEventListener.cs b/Org.Lwes/Listener/MulticastEventListener.cs index 766bc9c..21e8a24 100644 --- a/Org.Lwes/Listener/MulticastEventListener.cs +++ b/Org.Lwes/Listener/MulticastEventListener.cs @@ -24,14 +24,16 @@ /// /// /// + /// public void Initialize(IEventTemplateDB db , IPAddress multicastAddress , int multicastPort - , bool parallel) + , bool parallel + , ListenerGarbageHandling garbageHandling) { if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized); - base.Initialize(db, new IPEndPoint(multicastAddress, multicastPort), parallel, + base.Initialize(db, new IPEndPoint(multicastAddress, multicastPort), parallel, garbageHandling, (s, e) => { s.SetSocketOption(SocketOptionLevel.Udp, SocketOptionName.NoDelay, 1); diff --git a/Org.Lwes/Listener/UnicastEventListener.cs b/Org.Lwes/Listener/UnicastEventListener.cs index 30c2766..160d998 100644 --- a/Org.Lwes/Listener/UnicastEventListener.cs +++ b/Org.Lwes/Listener/UnicastEventListener.cs @@ -24,14 +24,16 @@ /// /// /// + /// public void Initialize(IEventTemplateDB db , IPAddress address , int port - , bool parallel) + , bool parallel + , ListenerGarbageHandling garbageHandling) { if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized); - base.Initialize(db, new IPEndPoint(address, port), parallel, + base.Initialize(db, new IPEndPoint(address, port), parallel, garbageHandling, (s, e) => { s.SetSocketOption(SocketOptionLevel.Udp, SocketOptionName.NoDelay, 1); diff --git a/Org.Lwes/LwesSerializer.cs b/Org.Lwes/LwesSerializer.cs index 97e1651..ff8d042 100644 --- a/Org.Lwes/LwesSerializer.cs +++ b/Org.Lwes/LwesSerializer.cs @@ -540,7 +540,7 @@ #endif int ofs = offset; int count = ReadUInt16(buffer, ref ofs); - if (ofs + count > buffer.Length) + if (ofs + count > buffer.Length) throw new BadLwesDataException(String.Concat("Cannot deserialize incoming string at offset ", ofs)); char[] result = new char[count]; int bytesUsed; diff --git a/Org.Lwes/Org.Lwes.csproj b/Org.Lwes/Org.Lwes.csproj index c2aba21..3702557 100644 --- a/Org.Lwes/Org.Lwes.csproj +++ b/Org.Lwes/Org.Lwes.csproj @@ -80,10 +80,12 @@ + + diff --git a/Org.Lwes/Sample Configuration.config b/Org.Lwes/Sample Configuration.config index 5970eb6..842a061 100644 --- a/Org.Lwes/Sample Configuration.config +++ b/Org.Lwes/Sample Configuration.config @@ -96,7 +96,8 @@ multicast="true" parallel="true" address="224.0.0.69" - port="9191"> + port="9191" + garbageHandling="FailSilently"/> You may override the default listener by adding your own listener named "default". @@ -123,6 +124,25 @@ "port" = { integer-string } - indicates the port for listening for LWES events. The default value is "9191". + + "garbageHandling" = { Default | FailSilently | IgnoreEndpointsThatSendGarbage + | AskEventSinksToVoteOnStrategy | 0 | 1 | 2 } - indicates the garbage handling + strategy used by the listener. Possible values follow... + + Default | FailSilenty | 0 - Indicates that the listener should silently handle + garbage data by discarding it and continue processing all incomming data. + + IgnoreEndpointsThatSendGarbage | 1 - indicates that the listener should track + endpoints that send garbage data and stop processing data from endpoints if + garbage data is received from an endpoint. + + AskEventSinksToVoteOnStrategy | 2 - indicates that the event sinks should be + asked to vote on the strategy to use on a per-endpoint basis. Each active + event sink's HandleGarbageData method will be invoked and the most + restrictive vote will dictate how future traffic from the endpoint is + handled. Possiblities are { None | TreatTrafficFromEndpointAsGarbage + | IgnoreAllTrafficFromEndpoint }. See the documentation for the IEventSink + interface for more info. --> + /// Toggles between the toggle state and the desired state - with + /// a spin-wait if necessary. + /// + /// desired state + /// state from which the desired state can toggle + /// true if the state transitions to the desired state from the toggle state; otherwise false + public bool SpinToggleState(E desired, E toggle) + { + int d = Convert.ToInt32(desired); + int t = Convert.ToInt32(toggle); + + spin: + int r = Interlocked.CompareExchange(ref _status, d, t); + // If the state was the toggle state then we're successful and done... + if (r == t) return true; + // otherwise if the result is anything but the desired state we're + // unsuccessful and done... + if (r != d) return false; + // otherwise we spin + goto spin; + } + + /// /// Perfroms a spinwait until the current state equals the target state. /// /// the target state @@ -153,7 +176,7 @@ /// the target state /// comparand state must match current state /// true if the current state matches and the state is transitioned to ; otherwise false - public bool TrySetState(E value, E comparand) + public bool TryTransition(E value, E comparand) { int c = Convert.ToInt32(comparand); return Interlocked.CompareExchange(ref _status, Convert.ToInt32(value), c) == c; @@ -166,10 +189,10 @@ /// comparand state must match current state /// action to perform if the state transition is successful /// true if the current state matches and the state is transitioned to ; otherwise false - public bool TrySetState(E value, E comparand, Action actionOnSuccess) + public bool TryTransition(E value, E comparand, Action actionOnSuccess) { if (actionOnSuccess == null) throw new ArgumentNullException("actionOnSuccess"); - if (TrySetState(value, comparand)) + if (TryTransition(value, comparand)) { actionOnSuccess(); return true; @@ -179,17 +202,11 @@ #endregion Methods - public bool SpinToggleState(E value, E toggle) + internal E CompareExchange(E value, E comparand) { - int c = Convert.ToInt32(toggle); - int v = Convert.ToInt32(value); - while (true) - { - int r = Interlocked.CompareExchange(ref _status, c, v); - if (r == v) return true; - if (r != c) return false; - Thread.SpinWait(1000); - } + return + (E)Enum.ToObject(typeof(E), Interlocked.CompareExchange(ref _status, Convert.ToInt32(value), Convert.ToInt32(comparand))); + ; } } } \ No newline at end of file diff --git a/narrange.xml b/narrange.xml index 16cb240..8c6ad99 100644 --- a/narrange.xml +++ b/narrange.xml @@ -84,7 +84,7 @@ - + @@ -106,7 +106,7 @@ - + @@ -128,7 +128,7 @@ - + @@ -150,7 +150,7 @@ - + @@ -172,7 +172,7 @@ - + -- 2.11.4.GIT