Added LGPL license information to code files.
[lwes-dotnet/github-mirror.git] / Org.Lwes / Listener / EventListenerBase.cs
blob055bf94272df0ce698effb1a79178d9578dee1fd
1 //
2 // This file is part of the LWES .NET Binding (LWES.net)
3 //
4 // COPYRIGHT (C) 2009, Phillip Clark (cerebralkungfu[at*g mail[dot*com)
5 // original .NET implementation
6 //
7 // LWES.net is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
12 // LWES.net is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
17 // You should have received a copy of the GNU General Public License
18 // along with LWES.net. If not, see <http://www.gnu.org/licenses/>.
20 namespace Org.Lwes.Listener
22 using System;
23 using System.Collections.Generic;
24 using System.Linq;
25 using System.Net;
26 using System.Net.Sockets;
27 using System.Threading;
29 using Org.Lwes.DB;
30 using Org.Lwes.Properties;
32 /// <summary>
33 /// Base class for event listeners.
34 /// </summary>
35 public class EventListenerBase : IEventListener
37 #region Fields
39 const int CDisposeBackgroundThreadWaitTimeMS = 200;
40 const int Leader = 1;
42 List<RegistrationKey> _additions = new List<RegistrationKey>();
43 Action<RegistrationKey, Exception> _cacheHandleErrorsDelegate;
44 int _consolidationVotes = 0;
45 IEventTemplateDB _db;
46 IPEndPoint _endpoint;
47 ListenerGarbageHandling _garbageHandling;
48 Dictionary<TrafficTrackingKey, TrafficTrackingRec> _garbageTracking;
49 Object _garbageTrackingLock;
50 IListener _listener;
51 ReaderWriterLockSlim _notifications = new ReaderWriterLockSlim();
52 int _notifiers = 0;
53 RegistrationKey[] _registrations = new RegistrationKey[0];
55 #endregion Fields
57 #region Constructors
59 /// <summary>
60 /// Creates a new instance.
61 /// </summary>
62 protected EventListenerBase()
64 _cacheHandleErrorsDelegate = new Action<RegistrationKey, Exception>(HandleErrorsOnEventSink);
67 /// <summary>
68 /// Destructor ensuring dispose is called.
69 /// </summary>
70 ~EventListenerBase()
72 Dispose(false);
75 #endregion Constructors
77 #region Enumerations
79 enum ListenerState
81 Unknown = 0,
82 Active = 1,
83 Suspending = 2,
84 Suspended = 3,
85 StopSignaled = 4,
86 Stopping = 5,
87 Stopped = 6,
90 #endregion Enumerations
92 #region Nested Interfaces
94 interface IListener : IDisposable
96 void Start(IEventTemplateDB db
97 , IPEndPoint listenEP
98 , Action<Socket, IPEndPoint> finishSocket
99 , EventListenerBase listener);
102 #endregion Nested Interfaces
104 #region Properties
106 /// <summary>
107 /// Indicates whether the listener has been initialized.
108 /// </summary>
109 public virtual bool IsInitialized
111 get { return _listener != null; }
114 #endregion Properties
116 #region Methods
118 /// <summary>
119 /// Disposes of the emitter and frees any resources held.
120 /// </summary>
121 public void Dispose()
123 Dispose(true);
124 GC.SuppressFinalize(this);
127 /// <summary>
128 /// Registers an event sink and activates it.
129 /// </summary>
130 /// <param name="sink">the event sink to register</param>
131 /// <param name="handback">a handback object - this object is opaque to the listener
132 /// and will be attached to the registration key prior to activation</param>
133 /// <returns>A registration key for the event sink.</returns>
134 public IEventSinkRegistrationKey RegisterAndActivateEventSink(IEventSink sink, object handback)
136 IEventSinkRegistrationKey key = RegisterEventSink(sink);
137 key.Handback = handback;
138 key.Activate();
139 return key;
142 /// <summary>
143 /// Registers an event sink with the listener without activating the
144 /// event sink.
145 /// </summary>
146 /// <param name="sink">the event sink to register</param>
147 /// <returns>A registration key for the event sink</returns>
148 public IEventSinkRegistrationKey RegisterEventSink(IEventSink sink)
150 if (sink == null) throw new ArgumentNullException("sink");
151 RegistrationKey key = new RegistrationKey(this, sink);
152 AddRegistration(key);
153 return key;
156 internal void PerformEventArrival(Event ev)
158 int n = Interlocked.Increment(ref _notifiers);
161 if (n == Leader) _notifications.EnterUpgradeableReadLock();
162 else _notifications.EnterReadLock();
165 foreach (var r in _registrations)
167 if (r.PerformEventArrival(ev, _cacheHandleErrorsDelegate))
169 Interlocked.Increment(ref _consolidationVotes);
172 if (n == Leader && Thread.VolatileRead(ref _consolidationVotes) > 0)
174 SafeConsolidateRegistrations();
177 finally
179 if (n == Leader) _notifications.ExitUpgradeableReadLock();
180 else _notifications.ExitReadLock();
183 finally
185 Interlocked.Decrement(ref _notifiers);
189 internal GarbageHandlingVote PerformGarbageArrival(EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage)
191 GarbageHandlingVote strategy = GarbageHandlingVote.None;
192 int n = Interlocked.Increment(ref _notifiers);
195 if (n == Leader) _notifications.EnterUpgradeableReadLock();
196 else _notifications.EnterReadLock();
199 foreach (var r in _registrations)
201 GarbageHandlingVote strategyVote = r.PerformGarbageArrival(
202 remoteEndPoint,
203 priorGarbageCountForEndpoint,
204 garbage,
205 _cacheHandleErrorsDelegate
207 if (strategyVote > strategy)
209 strategy = strategyVote;
212 if (n == Leader && Thread.VolatileRead(ref _consolidationVotes) > 0)
214 SafeConsolidateRegistrations();
217 finally
219 if (n == Leader) _notifications.ExitUpgradeableReadLock();
220 else _notifications.ExitReadLock();
223 finally
225 Interlocked.Decrement(ref _notifiers);
227 return strategy;
230 /// <summary>
231 /// Ensures the emitter has been initialized.
232 /// </summary>
233 /// <exception cref="InvalidOperationException">thrown if the emitter has not yet been initialized.</exception>
234 protected void CheckInitialized()
236 if (IsInitialized) throw new InvalidOperationException(Resources.Error_NotYetInitialized);
239 /// <summary>
240 /// Disposes of the emitter.
241 /// </summary>
242 /// <param name="disposing">Indicates whether the object is being disposed</param>
243 protected virtual void Dispose(bool disposing)
245 Util.Dispose(ref _listener);
248 /// <summary>
249 /// Initializes the base class.
250 /// </summary>
251 /// <param name="db">template database used when creating events</param>
252 /// <param name="endpoint">an IP endpoint where listening will occur</param>
253 /// <param name="parallel">whether the listener will listen and dispatch events in parallel</param>
254 /// <param name="garbageHandling">indicates the garbage handling strategy the listener will use</param>
255 /// <param name="finishSocket">callback method used to complete the setup of the socket
256 /// connected to the given <paramref name="endpoint"/></param>
257 protected void Initialize(IEventTemplateDB db
258 , IPEndPoint endpoint
259 , bool parallel
260 , ListenerGarbageHandling garbageHandling
261 , Action<Socket, IPEndPoint> finishSocket)
263 if (db == null) throw new ArgumentNullException("db");
264 if (endpoint == null) throw new ArgumentNullException("endpoint");
265 if (finishSocket == null) throw new ArgumentNullException("finishSocket");
267 _db = db;
268 _endpoint = endpoint;
269 IListener listener = (parallel)
270 ? (IListener)new ParallelListener()
271 : (IListener)new BackgroundThreadListener();
273 _garbageHandling = garbageHandling;
274 if (_garbageHandling > ListenerGarbageHandling.FailSilently)
276 _garbageTracking = new Dictionary<TrafficTrackingKey, TrafficTrackingRec>();
277 _garbageTrackingLock = new Object();
280 listener.Start(db, endpoint, finishSocket, this);
281 _listener = listener;
284 private void AddRegistration(RegistrationKey key)
286 int n = Interlocked.Increment(ref _notifiers);
290 if (n == Leader)
292 if (_notifications.TryEnterWriteLock(20))
296 lock (_additions)
298 _additions.Add(key);
299 UnsafeConsolidateRegistrations();
301 return;
303 finally
305 _notifications.ExitWriteLock();
310 // We couldn't get the writelock so we're gonna have to schedule
311 // the key to be added later...
312 lock (_additions)
314 _additions.Add(key);
315 Interlocked.Increment(ref _consolidationVotes);
318 finally
320 Interlocked.Decrement(ref _notifiers);
324 private GarbageHandlingVote GetTrafficStrategyForEndpoint(EndPoint ep)
326 if (_garbageHandling == ListenerGarbageHandling.FailSilently)
328 return GarbageHandlingVote.None;
330 else
332 IPEndPoint ipep = (IPEndPoint)ep;
333 TrafficTrackingKey key = new TrafficTrackingKey(ep);
334 TrafficTrackingRec tracking;
335 lock (_garbageTrackingLock)
337 if (_garbageTracking.TryGetValue(key, out tracking))
339 return tracking.Strategy;
342 return GarbageHandlingVote.Default;
346 private void HandleErrorsOnEventSink(RegistrationKey key, Exception e)
348 // TODO: Strategies for event sinks that cause exceptions.
351 private void HandleGarbageData(EndPoint ep, byte[] buffer, int offset, int bytesTransferred)
353 if (_garbageHandling > ListenerGarbageHandling.FailSilently)
355 IPEndPoint ipep = (IPEndPoint)ep;
356 TrafficTrackingKey key = new TrafficTrackingKey(ep);
357 TrafficTrackingRec tracking;
358 lock (_garbageTrackingLock)
360 if (!_garbageTracking.TryGetValue(key, out tracking))
362 tracking = new TrafficTrackingRec(ep);
363 _garbageTracking.Add(key, tracking);
366 if (_garbageHandling == ListenerGarbageHandling.AskEventSinksToVoteOnStrategy
367 && tracking.Strategy != GarbageHandlingVote.IgnoreAllTrafficFromEndpoint)
369 PerformGarbageDataNotification(tracking, ep, buffer, offset, bytesTransferred);
374 private void PerformGarbageDataNotification(TrafficTrackingRec tracking, EndPoint rcep, byte[] buffer, int offset, int bytesTransferred)
376 byte[] copy = new byte[bytesTransferred];
377 Array.Copy(buffer, copy, bytesTransferred);
378 tracking.Strategy = PerformGarbageArrival(rcep, tracking.IncrementGarbageCount(), copy);
381 private void SafeConsolidateRegistrations()
383 _notifications.EnterWriteLock();
386 lock (_additions)
388 UnsafeConsolidateRegistrations();
391 finally
393 _notifications.ExitWriteLock();
397 private void UnsafeConsolidateRegistrations()
399 _registrations = (from r in _registrations
400 where r.Status != EventSinkStatus.Canceled
401 select r).Concat(from r in _additions
402 where r.Status != EventSinkStatus.Canceled
403 select r).ToArray();
404 _additions.Clear();
406 Thread.VolatileWrite(ref _consolidationVotes, 0);
409 #endregion Methods
411 #region Nested Types
413 struct TrafficTrackingKey
415 #region Fields
417 public uint Address;
418 public int AddressFamily;
419 public int Port;
421 #endregion Fields
423 #region Constructors
425 public TrafficTrackingKey(EndPoint ep)
427 IPEndPoint ipep = (IPEndPoint)ep;
428 Address = BitConverter.ToUInt32(ipep.Address.GetAddressBytes(), 0);
429 Port = ipep.Port;
430 AddressFamily = (int)ipep.AddressFamily;
433 #endregion Constructors
436 /// <remarks>
437 /// Uses background threads to receive events from LWES. This class uses two
438 /// threads, one to listen and deserialize the events and another to perform
439 /// the notifications.
440 /// </remarks>
441 class BackgroundThreadListener : IListener
443 #region Fields
445 EndPoint _anyEP;
446 byte[] _buffer;
447 IEventTemplateDB _db;
448 SimpleLockFreeQueue<Event> _eventQueue = new SimpleLockFreeQueue<Event>();
449 UdpEndpoint _listenEP;
450 EventListenerBase _listener;
451 Thread _notifier;
452 Status<ListenerState> _notifierState;
453 Object _notifierWaitObject;
454 Thread _reciever;
455 Status<ListenerState> _recieverState;
457 #endregion Fields
459 #region Constructors
461 ~BackgroundThreadListener()
463 Dispose(false);
466 #endregion Constructors
468 #region Methods
470 public void Dispose()
472 Dispose(true);
473 GC.SuppressFinalize(this);
476 /// <summary>
477 /// Starts the listener in multi-threaded mode. In this mode the listener
478 /// consumes from 1 to 2 threads from the threadpool. A thread is used for
479 /// receiving bytes and deserializing LWES events and another thread is
480 /// scheduled to perform event notification only when LWES events have
481 /// been received.
482 /// </summary>
483 /// <param name="db">event template DB used during deserialization</param>
484 /// <param name="listenEP">a IP endpoint where listening should occur</param>
485 /// <param name="finishSocket"></param>
486 /// <param name="listener"></param>
487 public void Start(IEventTemplateDB db
488 , IPEndPoint listenEP
489 , Action<Socket, IPEndPoint> finishSocket
490 , EventListenerBase listener)
492 _db = db;
493 _listener = listener;
494 _anyEP = (listenEP.AddressFamily == AddressFamily.InterNetworkV6)
495 ? new IPEndPoint(IPAddress.IPv6Any, listenEP.Port)
496 : new IPEndPoint(IPAddress.Any, listenEP.Port);
497 _buffer = BufferManager.AcquireBuffer(null);
498 _listenEP = new UdpEndpoint(listenEP).Initialize(finishSocket);
499 // Start a dedicated background thread to handle the receiving...
500 _reciever = new Thread(Background_Receiver);
501 _reciever.IsBackground = true;
502 _reciever.Start();
504 // Start a dedicated background thread to perform event notification...
505 _notifierWaitObject = new Object();
506 _notifier = new Thread(Background_Notifier);
507 _notifier.IsBackground = true;
508 _notifier.Start();
511 internal void Stop()
513 if (_recieverState.TryTransition(ListenerState.StopSignaled, ListenerState.Active))
515 // Close the listener, this will cause the receiver thread to wakeup
516 // if it is blocked waiting for IO on the socket.
517 Util.Dispose(ref _listenEP);
518 _reciever.Join();
522 private void Background_Notifier(object unused_state)
524 _notifierState.SetState(ListenerState.Active);
525 while (_notifierState.CurrentState < ListenerState.StopSignaled)
527 Event ev;
528 if (!_eventQueue.Dequeue(out ev))
530 lock (_notifierWaitObject)
531 { // double-check that the queue is empty
532 // this strategy catches the race condition when the
533 // reciever queue's an event while we're acquiring the lock.
534 _notifierState.SetState(ListenerState.Suspending);
535 if (!_eventQueue.Dequeue(out ev))
537 _notifierState.SetState(ListenerState.Suspended);
538 Monitor.Wait(_notifierWaitObject);
539 continue;
541 // If the stop signal arrived during a wait then bail out...
542 if (_notifierState.CurrentState == ListenerState.StopSignaled)
544 _notifierState.SetState(ListenerState.Stopped);
545 break;
547 // otherwise we're active again
548 _notifierState.SetState(ListenerState.Active);
551 _listener.PerformEventArrival(ev);
555 private void Background_Receiver(object unused_state)
557 if (_recieverState.TryTransition(ListenerState.Active, ListenerState.Unknown))
561 // Continue until signaled to stop...
562 while (_recieverState.CurrentState == ListenerState.Active)
564 EndPoint rcep = _anyEP;
565 // Perform a blocking receive...
566 int bytesTransferred = _listenEP.ReceiveFrom(ref rcep, _buffer, 0, _buffer.Length);
567 if (bytesTransferred > 0)
569 GarbageHandlingVote handling = _listener.GetTrafficStrategyForEndpoint(rcep);
570 if (handling == GarbageHandlingVote.None)
572 PerformEventDeserializationAndQueueForNotification(rcep, _buffer, 0, bytesTransferred);
574 else if (handling == GarbageHandlingVote.TreatTrafficFromEndpointAsGarbage)
576 _listener.HandleGarbageData(rcep, _buffer, 0, bytesTransferred);
578 // Otherwise the handling was GarbageHandlingStrategy.FailfastForTrafficOnEndpoint
579 // and we're going to ignore it altogether.
583 catch (SocketException se)
585 if (se.ErrorCode != 10004)
586 throw se;
588 if (_recieverState.TryTransition(ListenerState.Stopping, ListenerState.StopSignaled))
590 // Cascade the stop signal to the notifier and wait for it to exit...
591 _notifierState.SetState(ListenerState.StopSignaled);
592 _notifier.Join();
597 private void Dispose(bool disposing)
599 // Signal background threads...
600 _recieverState.TryTransition(ListenerState.StopSignaled, ListenerState.Active, () =>
602 Util.Dispose(ref _listenEP);
603 _reciever.Join(CDisposeBackgroundThreadWaitTimeMS);
604 BufferManager.ReleaseBuffer(_buffer);
605 _buffer = null;
609 private void PerformEventDeserializationAndQueueForNotification(EndPoint rcep
610 , byte[] buffer
611 , int offset, int bytesTransferred)
613 IPEndPoint ep = (IPEndPoint)rcep;
616 // For received events, set MetaEventInfo.ReciptTime, MetaEventInfo.SenderIP, and MetaEventInfo.SenderPort...
617 Event ev = Event.BinaryDecode(_db, buffer, offset, bytesTransferred)
618 .SetValue(Constants.MetaEventInfoAttributes.ReceiptTime.Name, Constants.DateTimeToLwesTimeTicks(DateTime.UtcNow))
619 .SetValue(Constants.MetaEventInfoAttributes.SenderIP.Name, ep.Address)
620 .SetValue(Constants.MetaEventInfoAttributes.SenderPort.Name, ep.Port);
621 _eventQueue.Enqueue(ev);
623 catch (BadLwesDataException)
625 _listener.HandleGarbageData(rcep, buffer, offset, bytesTransferred);
628 if (_notifierState.CurrentState > ListenerState.Active)
630 // notifier thread is suspended;
631 // wake it up...
632 lock (_notifierWaitObject)
634 Monitor.Pulse(_notifierWaitObject);
639 #endregion Methods
642 /// <remarks>
643 /// Uses the threadpool and overlapped IO on the recieving socket. This listener
644 /// will consume between 0 and 3 threads from the threadpool, depending on which
645 /// jobs are active. The jobs may consist of the following:
646 /// <ul>
647 /// <li>Receiver - invoked by the socket on a threadpool thread when input is received</li>
648 /// <li>Deserializer - scheduled for a threadpool thread and runs as long as buffers are in the receive queue</li>
649 /// <li>Notifier - scheduled for a threadpool thread and runs as long as Events are in the notification queue</li>
650 /// </ul>
651 /// </remarks>
652 class ParallelListener : IListener
654 #region Fields
656 EndPoint _anyEP;
657 IEventTemplateDB _db;
658 int _deserializers;
659 SimpleLockFreeQueue<Event> _eventQueue = new SimpleLockFreeQueue<Event>();
660 UdpEndpoint _listenEP;
661 EventListenerBase _listener;
662 Status<ListenerState> _listenerState;
663 int _notifiers;
664 SimpleLockFreeQueue<ReceiveCapture> _receiveQueue;
666 #endregion Fields
668 #region Constructors
670 ~ParallelListener()
672 Dispose(false);
675 #endregion Constructors
677 #region Methods
679 public void Dispose()
681 Dispose(true);
682 GC.SuppressFinalize(this);
685 /// <summary>
686 /// Starts the listener.
687 /// </summary>
688 /// <param name="db">an event template DB</param>
689 /// <param name="listenEP">the listening endpoint</param>
690 /// <param name="finishSocket">a callback method that is called upon to finish the listening socket</param>
691 /// <param name="owner">the owner</param>
692 public void Start(IEventTemplateDB db
693 , IPEndPoint listenEP
694 , Action<Socket, IPEndPoint> finishSocket
695 , EventListenerBase owner)
697 _db = db;
698 _listener = owner;
699 _anyEP = (listenEP.AddressFamily == AddressFamily.InterNetworkV6)
700 ? new IPEndPoint(IPAddress.IPv6Any, 0)
701 : new IPEndPoint(IPAddress.Any, 0);
703 _receiveQueue = new SimpleLockFreeQueue<ReceiveCapture>();
705 _listenEP = new UdpEndpoint(listenEP).Initialize(finishSocket);
706 ParallelReceiver();
709 internal void Stop()
711 _listenerState.TryTransition(ListenerState.StopSignaled, ListenerState.Active, () =>
713 while (Thread.VolatileRead(ref _deserializers) > 0)
715 Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS);
717 while (Thread.VolatileRead(ref _notifiers) > 0)
719 Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS);
722 Util.Dispose(ref _listenEP);
724 _listenerState.SpinWaitForState(ListenerState.Stopped, () => Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS));
728 private void Background_Deserializer(object unused_state)
730 // Called within the thread pool:
732 // Drains the recieve queue of capture records and
733 // transforms those records into Event objects by deserialization.
738 ReceiveCapture input;
739 while (_listenerState.IsLessThan(ListenerState.StopSignaled) && _receiveQueue.Dequeue(out input))
741 GarbageHandlingVote handling = _listener.GetTrafficStrategyForEndpoint(input.RemoteEndPoint);
742 if (handling == GarbageHandlingVote.None)
744 PerformEventDeserializationAndQueueForNotification(input.RemoteEndPoint, input.Buffer, 0, input.BytesTransferred);
746 else if (handling == GarbageHandlingVote.TreatTrafficFromEndpointAsGarbage)
748 _listener.HandleGarbageData(input.RemoteEndPoint, input.Buffer, 0, input.BytesTransferred);
750 // Otherwise the handling was GarbageHandlingStrategy.FailfastForTrafficOnEndpoint
751 // and we're going to ignore it altogether.
754 finally
756 int z = Interlocked.Decrement(ref _deserializers);
757 if (z == 0 && !_receiveQueue.IsEmpty)
758 EnsureDeserializerIsActive();
762 private void Background_Notifier(object unused_state)
765 // Drains the event queue and performs notification
769 Event ev;
770 while (_listenerState.IsLessThan(ListenerState.StopSignaled) && _eventQueue.Dequeue(out ev))
772 _listener.PerformEventArrival(ev);
775 finally
777 int z = Interlocked.Decrement(ref _notifiers);
778 if (z == 0 && !_receiveQueue.IsEmpty)
779 EnsureNotifierIsActive();
783 private void Background_ParallelReceiver(object unused_state)
785 // Continue until signalled to stop...
786 if (_listenerState.IsLessThan(ListenerState.StopSignaled))
788 // Acquiring a buffer may block until a buffer
789 // becomes available.
790 byte[] buffer = BufferManager.AcquireBuffer(() => _listenerState.IsGreaterThan(ListenerState.Active));
792 // If the buffer is null then the stop-signal was received while acquiring a buffer
793 if (buffer != null)
795 _listenEP.ReceiveFromAsync(_anyEP, buffer, 0, buffer.Length, (op) =>
797 if (op.SocketError == SocketError.Success)
799 // Reschedule the receiver before pulling the buffer out, we want to catch receives
800 // in the tightest loop possible, although we don't want to keep a threadpool thread
801 // *forever* and possibly cause thread-starvation in for other jobs so we continually
802 // put the job back in the queue - this way our parallelism plays nicely with other
803 // jobs - now, if only the other jobs were programmed to give up their threads periodically
804 // too... hmmm!
805 ThreadPool.QueueUserWorkItem(new WaitCallback(Background_ParallelReceiver));
806 if (op.BytesTransferred > 0)
808 _receiveQueue.Enqueue(new ReceiveCapture(op.RemoteEndPoint, op.Buffer, op.BytesTransferred));
810 EnsureDeserializerIsActive();
813 else if (op.SocketError == SocketError.OperationAborted)
815 // This is the dispose or stop call. fall through
816 CascadeStopSignal();
819 return false;
820 }, null);
822 return;
826 // We get here if the receiver is signaled to stop.
827 CascadeStopSignal();
830 private void CascadeStopSignal()
832 _listenerState.TryTransition(ListenerState.Stopping, ListenerState.StopSignaled, () =>
834 while (Thread.VolatileRead(ref _deserializers) > 0)
836 Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS);
838 while (Thread.VolatileRead(ref _notifiers) > 0)
840 Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS);
842 _listenerState.SetState(ListenerState.Stopped);
846 private void Dispose(bool disposing)
848 if (_listenerState.CurrentState == ListenerState.Active)
849 Stop();
852 private void EnsureDeserializerIsActive()
854 int current = -1, value = Thread.VolatileRead(ref _deserializers);
855 if (value < 1)
857 WaitCallback cb = new WaitCallback(Background_Deserializer);
858 while (true)
860 current = value;
861 value = Interlocked.CompareExchange(ref _deserializers, value + 1, current);
862 if (value == current)
864 ThreadPool.QueueUserWorkItem(cb);
865 break;
871 private void EnsureNotifierIsActive()
873 int current = -1, value = Thread.VolatileRead(ref _notifiers);
874 if (value < 1)
876 WaitCallback cb = new WaitCallback(Background_Notifier);
877 while (true)
879 current = value;
880 value = Interlocked.CompareExchange(ref _notifiers, value + 1, current);
881 if (value == current)
883 ThreadPool.QueueUserWorkItem(cb);
884 break;
890 private void ParallelReceiver()
892 // Only startup once.
893 if (_listenerState.TryTransition(ListenerState.Active, ListenerState.Unknown))
895 Background_ParallelReceiver(null);
899 private void PerformEventDeserializationAndQueueForNotification(EndPoint rcep
900 , byte[] buffer
901 , int offset, int bytesTransferred)
903 IPEndPoint ep = (IPEndPoint)rcep;
906 // For received events, set MetaEventInfo.ReciptTime, MetaEventInfo.SenderIP, and MetaEventInfo.SenderPort...
907 Event ev = Event.BinaryDecode(_db, buffer, offset, bytesTransferred)
908 .SetValue(Constants.MetaEventInfoAttributes.ReceiptTime.Name, Constants.DateTimeToLwesTimeTicks(DateTime.UtcNow))
909 .SetValue(Constants.MetaEventInfoAttributes.SenderIP.Name, ep.Address)
910 .SetValue(Constants.MetaEventInfoAttributes.SenderPort.Name, ep.Port);
911 _eventQueue.Enqueue(ev);
913 catch (BadLwesDataException)
915 _listener.HandleGarbageData(rcep, buffer, offset, bytesTransferred);
918 BufferManager.ReleaseBuffer(buffer);
919 EnsureNotifierIsActive();
922 #endregion Methods
924 #region Nested Types
926 struct ReceiveCapture
928 #region Fields
930 public byte[] Buffer;
931 public int BytesTransferred;
932 public EndPoint RemoteEndPoint;
934 #endregion Fields
936 #region Constructors
938 public ReceiveCapture(EndPoint ep, byte[] data, int transferred)
940 this.RemoteEndPoint = ep;
941 this.Buffer = data;
942 this.BytesTransferred = transferred;
945 #endregion Constructors
948 #endregion Nested Types
951 class RegistrationKey : IEventSinkRegistrationKey
953 #region Fields
955 bool _disableGarbageNotification;
956 Status<EventSinkStatus> _status = new Status<EventSinkStatus>(EventSinkStatus.Suspended);
957 bool _threadSafe;
959 #endregion Fields
961 #region Constructors
963 public RegistrationKey(EventListenerBase listener, IEventSink sink)
965 Listener = listener;
966 Sink = sink;
967 _threadSafe = sink.IsThreadSafe;
970 #endregion Constructors
972 #region Properties
974 public object Handback
976 get;
977 set;
980 public IEventListener Listener
982 get;
983 private set;
986 public IEventSink Sink
988 get;
989 private set;
992 public EventSinkStatus Status
994 get { return _status.CurrentState; }
997 #endregion Properties
999 #region Methods
1001 public bool Activate()
1003 return _status.SetStateIfLessThan(EventSinkStatus.Active, EventSinkStatus.Canceled);
1006 public void Cancel()
1008 _status.SetState(EventSinkStatus.Canceled);
1011 public void DisableGarbageNotification()
1013 Thread.MemoryBarrier();
1014 _disableGarbageNotification = true;
1015 Thread.MemoryBarrier();
1018 public bool Suspend()
1020 return _status.SetStateIfLessThan(EventSinkStatus.Suspended, EventSinkStatus.Canceled);
1023 internal bool PerformEventArrival(Event ev, Action<RegistrationKey, Exception> errorHandler)
1025 if (!_threadSafe)
1027 if (_status.SpinToggleState(EventSinkStatus.Notifying, EventSinkStatus.Active))
1031 Sink.HandleEventArrival(this, ev);
1032 _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying);
1034 catch (Exception e)
1036 errorHandler(this, e);
1040 else
1044 EventSinkStatus s = _status.CompareExchange(EventSinkStatus.Notifying, EventSinkStatus.Active);
1045 if (s == EventSinkStatus.Active || s == EventSinkStatus.Notifying)
1047 Sink.HandleEventArrival(this, ev);
1048 _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying);
1051 catch (Exception e)
1053 errorHandler(this, e);
1056 return _status.CurrentState == EventSinkStatus.Canceled;
1059 internal GarbageHandlingVote PerformGarbageArrival(EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage,
1060 Action<RegistrationKey, Exception> errorHandler)
1062 Thread.MemoryBarrier();
1063 bool ignoring = _disableGarbageNotification;
1065 GarbageHandlingVote strategy = GarbageHandlingVote.None;
1066 if (!ignoring)
1068 if (!_threadSafe)
1070 if (_status.SpinToggleState(EventSinkStatus.Active, EventSinkStatus.Notifying))
1074 strategy = Sink.HandleGarbageData(this, remoteEndPoint, priorGarbageCountForEndpoint, garbage);
1076 catch (Exception e)
1078 errorHandler(this, e);
1080 _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying);
1083 else
1087 if (_status.TryTransition(EventSinkStatus.Notifying, EventSinkStatus.Active))
1089 strategy = Sink.HandleGarbageData(this, remoteEndPoint, priorGarbageCountForEndpoint, garbage);
1090 _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying);
1093 catch (Exception e)
1095 errorHandler(this, e);
1099 return strategy;
1102 #endregion Methods
1105 class TrafficTrackingRec
1107 #region Fields
1109 int _garbageCount = 0;
1111 #endregion Fields
1113 #region Constructors
1115 public TrafficTrackingRec(EndPoint ep)
1117 RemoteEndPoint = ep;
1120 #endregion Constructors
1122 #region Properties
1124 public bool IsEmpty
1126 get { return RemoteEndPoint == null; }
1129 public int PreviousGargageDataCount
1131 get { return _garbageCount; }
1134 public EndPoint RemoteEndPoint
1136 get;
1137 private set;
1140 public GarbageHandlingVote Strategy
1142 get;
1143 set;
1146 #endregion Properties
1148 #region Methods
1150 public int IncrementGarbageCount()
1152 return Interlocked.Increment(ref _garbageCount);
1155 #endregion Methods
1158 #endregion Nested Types