2 // This file is part of the LWES .NET Binding (LWES.net)
4 // COPYRIGHT (C) 2009, Phillip Clark (cerebralkungfu[at*g mail[dot*com)
5 // original .NET implementation
7 // LWES.net is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
12 // LWES.net is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
17 // You should have received a copy of the GNU General Public License
18 // along with LWES.net. If not, see <http://www.gnu.org/licenses/>.
20 namespace Org
.Lwes
.Listener
23 using System
.Collections
.Generic
;
26 using System
.Net
.Sockets
;
27 using System
.Threading
;
30 using Org
.Lwes
.Properties
;
33 /// Base class for event listeners.
35 public class EventListenerBase
: IEventListener
39 const int CDisposeBackgroundThreadWaitTimeMS
= 200;
42 List
<RegistrationKey
> _additions
= new List
<RegistrationKey
>();
43 Action
<RegistrationKey
, Exception
> _cacheHandleErrorsDelegate
;
44 int _consolidationVotes
= 0;
47 ListenerGarbageHandling _garbageHandling
;
48 Dictionary
<TrafficTrackingKey
, TrafficTrackingRec
> _garbageTracking
;
49 Object _garbageTrackingLock
;
51 ReaderWriterLockSlim _notifications
= new ReaderWriterLockSlim();
53 RegistrationKey
[] _registrations
= new RegistrationKey
[0];
60 /// Creates a new instance.
62 protected EventListenerBase()
64 _cacheHandleErrorsDelegate
= new Action
<RegistrationKey
, Exception
>(HandleErrorsOnEventSink
);
68 /// Destructor ensuring dispose is called.
75 #endregion Constructors
90 #endregion Enumerations
92 #region Nested Interfaces
94 interface IListener
: IDisposable
96 void Start(IEventTemplateDB db
98 , Action
<Socket
, IPEndPoint
> finishSocket
99 , EventListenerBase listener
);
102 #endregion Nested Interfaces
107 /// Indicates whether the listener has been initialized.
109 public virtual bool IsInitialized
111 get { return _listener != null; }
114 #endregion Properties
119 /// Disposes of the emitter and frees any resources held.
121 public void Dispose()
124 GC
.SuppressFinalize(this);
128 /// Registers an event sink and activates it.
130 /// <param name="sink">the event sink to register</param>
131 /// <param name="handback">a handback object - this object is opaque to the listener
132 /// and will be attached to the registration key prior to activation</param>
133 /// <returns>A registration key for the event sink.</returns>
134 public IEventSinkRegistrationKey
RegisterAndActivateEventSink(IEventSink sink
, object handback
)
136 IEventSinkRegistrationKey key
= RegisterEventSink(sink
);
137 key
.Handback
= handback
;
143 /// Registers an event sink with the listener without activating the
146 /// <param name="sink">the event sink to register</param>
147 /// <returns>A registration key for the event sink</returns>
148 public IEventSinkRegistrationKey
RegisterEventSink(IEventSink sink
)
150 if (sink
== null) throw new ArgumentNullException("sink");
151 RegistrationKey key
= new RegistrationKey(this, sink
);
152 AddRegistration(key
);
156 internal void PerformEventArrival(Event ev
)
158 int n
= Interlocked
.Increment(ref _notifiers
);
161 if (n
== Leader
) _notifications
.EnterUpgradeableReadLock();
162 else _notifications
.EnterReadLock();
165 foreach (var r
in _registrations
)
167 if (r
.PerformEventArrival(ev
, _cacheHandleErrorsDelegate
))
169 Interlocked
.Increment(ref _consolidationVotes
);
172 if (n
== Leader
&& Thread
.VolatileRead(ref _consolidationVotes
) > 0)
174 SafeConsolidateRegistrations();
179 if (n
== Leader
) _notifications
.ExitUpgradeableReadLock();
180 else _notifications
.ExitReadLock();
185 Interlocked
.Decrement(ref _notifiers
);
189 internal GarbageHandlingVote
PerformGarbageArrival(EndPoint remoteEndPoint
, int priorGarbageCountForEndpoint
, byte[] garbage
)
191 GarbageHandlingVote strategy
= GarbageHandlingVote
.None
;
192 int n
= Interlocked
.Increment(ref _notifiers
);
195 if (n
== Leader
) _notifications
.EnterUpgradeableReadLock();
196 else _notifications
.EnterReadLock();
199 foreach (var r
in _registrations
)
201 GarbageHandlingVote strategyVote
= r
.PerformGarbageArrival(
203 priorGarbageCountForEndpoint
,
205 _cacheHandleErrorsDelegate
207 if (strategyVote
> strategy
)
209 strategy
= strategyVote
;
212 if (n
== Leader
&& Thread
.VolatileRead(ref _consolidationVotes
) > 0)
214 SafeConsolidateRegistrations();
219 if (n
== Leader
) _notifications
.ExitUpgradeableReadLock();
220 else _notifications
.ExitReadLock();
225 Interlocked
.Decrement(ref _notifiers
);
231 /// Ensures the emitter has been initialized.
233 /// <exception cref="InvalidOperationException">thrown if the emitter has not yet been initialized.</exception>
234 protected void CheckInitialized()
236 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_NotYetInitialized
);
240 /// Disposes of the emitter.
242 /// <param name="disposing">Indicates whether the object is being disposed</param>
243 protected virtual void Dispose(bool disposing
)
245 Util
.Dispose(ref _listener
);
249 /// Initializes the base class.
251 /// <param name="db">template database used when creating events</param>
252 /// <param name="endpoint">an IP endpoint where listening will occur</param>
253 /// <param name="parallel">whether the listener will listen and dispatch events in parallel</param>
254 /// <param name="garbageHandling">indicates the garbage handling strategy the listener will use</param>
255 /// <param name="finishSocket">callback method used to complete the setup of the socket
256 /// connected to the given <paramref name="endpoint"/></param>
257 protected void Initialize(IEventTemplateDB db
258 , IPEndPoint endpoint
260 , ListenerGarbageHandling garbageHandling
261 , Action
<Socket
, IPEndPoint
> finishSocket
)
263 if (db
== null) throw new ArgumentNullException("db");
264 if (endpoint
== null) throw new ArgumentNullException("endpoint");
265 if (finishSocket
== null) throw new ArgumentNullException("finishSocket");
268 _endpoint
= endpoint
;
269 IListener listener
= (parallel
)
270 ? (IListener
)new ParallelListener()
271 : (IListener
)new BackgroundThreadListener();
273 _garbageHandling
= garbageHandling
;
274 if (_garbageHandling
> ListenerGarbageHandling
.FailSilently
)
276 _garbageTracking
= new Dictionary
<TrafficTrackingKey
, TrafficTrackingRec
>();
277 _garbageTrackingLock
= new Object();
280 listener
.Start(db
, endpoint
, finishSocket
, this);
281 _listener
= listener
;
284 private void AddRegistration(RegistrationKey key
)
286 int n
= Interlocked
.Increment(ref _notifiers
);
292 if (_notifications
.TryEnterWriteLock(20))
299 UnsafeConsolidateRegistrations();
305 _notifications
.ExitWriteLock();
310 // We couldn't get the writelock so we're gonna have to schedule
311 // the key to be added later...
315 Interlocked
.Increment(ref _consolidationVotes
);
320 Interlocked
.Decrement(ref _notifiers
);
324 private GarbageHandlingVote
GetTrafficStrategyForEndpoint(EndPoint ep
)
326 if (_garbageHandling
== ListenerGarbageHandling
.FailSilently
)
328 return GarbageHandlingVote
.None
;
332 IPEndPoint ipep
= (IPEndPoint
)ep
;
333 TrafficTrackingKey key
= new TrafficTrackingKey(ep
);
334 TrafficTrackingRec tracking
;
335 lock (_garbageTrackingLock
)
337 if (_garbageTracking
.TryGetValue(key
, out tracking
))
339 return tracking
.Strategy
;
342 return GarbageHandlingVote
.Default
;
346 private void HandleErrorsOnEventSink(RegistrationKey key
, Exception e
)
348 // TODO: Strategies for event sinks that cause exceptions.
351 private void HandleGarbageData(EndPoint ep
, byte[] buffer
, int offset
, int bytesTransferred
)
353 if (_garbageHandling
> ListenerGarbageHandling
.FailSilently
)
355 IPEndPoint ipep
= (IPEndPoint
)ep
;
356 TrafficTrackingKey key
= new TrafficTrackingKey(ep
);
357 TrafficTrackingRec tracking
;
358 lock (_garbageTrackingLock
)
360 if (!_garbageTracking
.TryGetValue(key
, out tracking
))
362 tracking
= new TrafficTrackingRec(ep
);
363 _garbageTracking
.Add(key
, tracking
);
366 if (_garbageHandling
== ListenerGarbageHandling
.AskEventSinksToVoteOnStrategy
367 && tracking
.Strategy
!= GarbageHandlingVote
.IgnoreAllTrafficFromEndpoint
)
369 PerformGarbageDataNotification(tracking
, ep
, buffer
, offset
, bytesTransferred
);
374 private void PerformGarbageDataNotification(TrafficTrackingRec tracking
, EndPoint rcep
, byte[] buffer
, int offset
, int bytesTransferred
)
376 byte[] copy
= new byte[bytesTransferred
];
377 Array
.Copy(buffer
, copy
, bytesTransferred
);
378 tracking
.Strategy
= PerformGarbageArrival(rcep
, tracking
.IncrementGarbageCount(), copy
);
381 private void SafeConsolidateRegistrations()
383 _notifications
.EnterWriteLock();
388 UnsafeConsolidateRegistrations();
393 _notifications
.ExitWriteLock();
397 private void UnsafeConsolidateRegistrations()
399 _registrations
= (from r
in _registrations
400 where r
.Status
!= EventSinkStatus
.Canceled
401 select r
).Concat(from r
in _additions
402 where r
.Status
!= EventSinkStatus
.Canceled
406 Thread
.VolatileWrite(ref _consolidationVotes
, 0);
413 struct TrafficTrackingKey
418 public int AddressFamily
;
425 public TrafficTrackingKey(EndPoint ep
)
427 IPEndPoint ipep
= (IPEndPoint
)ep
;
428 Address
= BitConverter
.ToUInt32(ipep
.Address
.GetAddressBytes(), 0);
430 AddressFamily
= (int)ipep
.AddressFamily
;
433 #endregion Constructors
437 /// Uses background threads to receive events from LWES. This class uses two
438 /// threads, one to listen and deserialize the events and another to perform
439 /// the notifications.
441 class BackgroundThreadListener
: IListener
447 IEventTemplateDB _db
;
448 SimpleLockFreeQueue
<Event
> _eventQueue
= new SimpleLockFreeQueue
<Event
>();
449 UdpEndpoint _listenEP
;
450 EventListenerBase _listener
;
452 Status
<ListenerState
> _notifierState
;
453 Object _notifierWaitObject
;
455 Status
<ListenerState
> _recieverState
;
461 ~
BackgroundThreadListener()
466 #endregion Constructors
470 public void Dispose()
473 GC
.SuppressFinalize(this);
477 /// Starts the listener in multi-threaded mode. In this mode the listener
478 /// consumes from 1 to 2 threads from the threadpool. A thread is used for
479 /// receiving bytes and deserializing LWES events and another thread is
480 /// scheduled to perform event notification only when LWES events have
483 /// <param name="db">event template DB used during deserialization</param>
484 /// <param name="listenEP">a IP endpoint where listening should occur</param>
485 /// <param name="finishSocket"></param>
486 /// <param name="listener"></param>
487 public void Start(IEventTemplateDB db
488 , IPEndPoint listenEP
489 , Action
<Socket
, IPEndPoint
> finishSocket
490 , EventListenerBase listener
)
493 _listener
= listener
;
494 _anyEP
= (listenEP
.AddressFamily
== AddressFamily
.InterNetworkV6
)
495 ? new IPEndPoint(IPAddress
.IPv6Any
, listenEP
.Port
)
496 : new IPEndPoint(IPAddress
.Any
, listenEP
.Port
);
497 _buffer
= BufferManager
.AcquireBuffer(null);
498 _listenEP
= new UdpEndpoint(listenEP
).Initialize(finishSocket
);
499 // Start a dedicated background thread to handle the receiving...
500 _reciever
= new Thread(Background_Receiver
);
501 _reciever
.IsBackground
= true;
504 // Start a dedicated background thread to perform event notification...
505 _notifierWaitObject
= new Object();
506 _notifier
= new Thread(Background_Notifier
);
507 _notifier
.IsBackground
= true;
513 if (_recieverState
.TryTransition(ListenerState
.StopSignaled
, ListenerState
.Active
))
515 // Close the listener, this will cause the receiver thread to wakeup
516 // if it is blocked waiting for IO on the socket.
517 Util
.Dispose(ref _listenEP
);
522 private void Background_Notifier(object unused_state
)
524 _notifierState
.SetState(ListenerState
.Active
);
525 while (_notifierState
.CurrentState
< ListenerState
.StopSignaled
)
528 if (!_eventQueue
.Dequeue(out ev
))
530 lock (_notifierWaitObject
)
531 { // double-check that the queue is empty
532 // this strategy catches the race condition when the
533 // reciever queue's an event while we're acquiring the lock.
534 _notifierState
.SetState(ListenerState
.Suspending
);
535 if (!_eventQueue
.Dequeue(out ev
))
537 _notifierState
.SetState(ListenerState
.Suspended
);
538 Monitor
.Wait(_notifierWaitObject
);
541 // If the stop signal arrived during a wait then bail out...
542 if (_notifierState
.CurrentState
== ListenerState
.StopSignaled
)
544 _notifierState
.SetState(ListenerState
.Stopped
);
547 // otherwise we're active again
548 _notifierState
.SetState(ListenerState
.Active
);
551 _listener
.PerformEventArrival(ev
);
555 private void Background_Receiver(object unused_state
)
557 if (_recieverState
.TryTransition(ListenerState
.Active
, ListenerState
.Unknown
))
561 // Continue until signaled to stop...
562 while (_recieverState
.CurrentState
== ListenerState
.Active
)
564 EndPoint rcep
= _anyEP
;
565 // Perform a blocking receive...
566 int bytesTransferred
= _listenEP
.ReceiveFrom(ref rcep
, _buffer
, 0, _buffer
.Length
);
567 if (bytesTransferred
> 0)
569 GarbageHandlingVote handling
= _listener
.GetTrafficStrategyForEndpoint(rcep
);
570 if (handling
== GarbageHandlingVote
.None
)
572 PerformEventDeserializationAndQueueForNotification(rcep
, _buffer
, 0, bytesTransferred
);
574 else if (handling
== GarbageHandlingVote
.TreatTrafficFromEndpointAsGarbage
)
576 _listener
.HandleGarbageData(rcep
, _buffer
, 0, bytesTransferred
);
578 // Otherwise the handling was GarbageHandlingStrategy.FailfastForTrafficOnEndpoint
579 // and we're going to ignore it altogether.
583 catch (SocketException se
)
585 if (se
.ErrorCode
!= 10004)
588 if (_recieverState
.TryTransition(ListenerState
.Stopping
, ListenerState
.StopSignaled
))
590 // Cascade the stop signal to the notifier and wait for it to exit...
591 _notifierState
.SetState(ListenerState
.StopSignaled
);
597 private void Dispose(bool disposing
)
599 // Signal background threads...
600 _recieverState
.TryTransition(ListenerState
.StopSignaled
, ListenerState
.Active
, () =>
602 Util
.Dispose(ref _listenEP
);
603 _reciever
.Join(CDisposeBackgroundThreadWaitTimeMS
);
604 BufferManager
.ReleaseBuffer(_buffer
);
609 private void PerformEventDeserializationAndQueueForNotification(EndPoint rcep
611 , int offset
, int bytesTransferred
)
613 IPEndPoint ep
= (IPEndPoint
)rcep
;
616 // For received events, set MetaEventInfo.ReciptTime, MetaEventInfo.SenderIP, and MetaEventInfo.SenderPort...
617 Event ev
= Event
.BinaryDecode(_db
, buffer
, offset
, bytesTransferred
)
618 .SetValue(Constants
.MetaEventInfoAttributes
.ReceiptTime
.Name
, Constants
.DateTimeToLwesTimeTicks(DateTime
.UtcNow
))
619 .SetValue(Constants
.MetaEventInfoAttributes
.SenderIP
.Name
, ep
.Address
)
620 .SetValue(Constants
.MetaEventInfoAttributes
.SenderPort
.Name
, ep
.Port
);
621 _eventQueue
.Enqueue(ev
);
623 catch (BadLwesDataException
)
625 _listener
.HandleGarbageData(rcep
, buffer
, offset
, bytesTransferred
);
628 if (_notifierState
.CurrentState
> ListenerState
.Active
)
630 // notifier thread is suspended;
632 lock (_notifierWaitObject
)
634 Monitor
.Pulse(_notifierWaitObject
);
643 /// Uses the threadpool and overlapped IO on the recieving socket. This listener
644 /// will consume between 0 and 3 threads from the threadpool, depending on which
645 /// jobs are active. The jobs may consist of the following:
647 /// <li>Receiver - invoked by the socket on a threadpool thread when input is received</li>
648 /// <li>Deserializer - scheduled for a threadpool thread and runs as long as buffers are in the receive queue</li>
649 /// <li>Notifier - scheduled for a threadpool thread and runs as long as Events are in the notification queue</li>
652 class ParallelListener
: IListener
657 IEventTemplateDB _db
;
659 SimpleLockFreeQueue
<Event
> _eventQueue
= new SimpleLockFreeQueue
<Event
>();
660 UdpEndpoint _listenEP
;
661 EventListenerBase _listener
;
662 Status
<ListenerState
> _listenerState
;
664 SimpleLockFreeQueue
<ReceiveCapture
> _receiveQueue
;
675 #endregion Constructors
679 public void Dispose()
682 GC
.SuppressFinalize(this);
686 /// Starts the listener.
688 /// <param name="db">an event template DB</param>
689 /// <param name="listenEP">the listening endpoint</param>
690 /// <param name="finishSocket">a callback method that is called upon to finish the listening socket</param>
691 /// <param name="owner">the owner</param>
692 public void Start(IEventTemplateDB db
693 , IPEndPoint listenEP
694 , Action
<Socket
, IPEndPoint
> finishSocket
695 , EventListenerBase owner
)
699 _anyEP
= (listenEP
.AddressFamily
== AddressFamily
.InterNetworkV6
)
700 ? new IPEndPoint(IPAddress
.IPv6Any
, 0)
701 : new IPEndPoint(IPAddress
.Any
, 0);
703 _receiveQueue
= new SimpleLockFreeQueue
<ReceiveCapture
>();
705 _listenEP
= new UdpEndpoint(listenEP
).Initialize(finishSocket
);
711 _listenerState
.TryTransition(ListenerState
.StopSignaled
, ListenerState
.Active
, () =>
713 while (Thread
.VolatileRead(ref _deserializers
) > 0)
715 Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
);
717 while (Thread
.VolatileRead(ref _notifiers
) > 0)
719 Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
);
722 Util
.Dispose(ref _listenEP
);
724 _listenerState
.SpinWaitForState(ListenerState
.Stopped
, () => Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
));
728 private void Background_Deserializer(object unused_state
)
730 // Called within the thread pool:
732 // Drains the recieve queue of capture records and
733 // transforms those records into Event objects by deserialization.
738 ReceiveCapture input
;
739 while (_listenerState
.IsLessThan(ListenerState
.StopSignaled
) && _receiveQueue
.Dequeue(out input
))
741 GarbageHandlingVote handling
= _listener
.GetTrafficStrategyForEndpoint(input
.RemoteEndPoint
);
742 if (handling
== GarbageHandlingVote
.None
)
744 PerformEventDeserializationAndQueueForNotification(input
.RemoteEndPoint
, input
.Buffer
, 0, input
.BytesTransferred
);
746 else if (handling
== GarbageHandlingVote
.TreatTrafficFromEndpointAsGarbage
)
748 _listener
.HandleGarbageData(input
.RemoteEndPoint
, input
.Buffer
, 0, input
.BytesTransferred
);
750 // Otherwise the handling was GarbageHandlingStrategy.FailfastForTrafficOnEndpoint
751 // and we're going to ignore it altogether.
756 int z
= Interlocked
.Decrement(ref _deserializers
);
757 if (z
== 0 && !_receiveQueue
.IsEmpty
)
758 EnsureDeserializerIsActive();
762 private void Background_Notifier(object unused_state
)
765 // Drains the event queue and performs notification
770 while (_listenerState
.IsLessThan(ListenerState
.StopSignaled
) && _eventQueue
.Dequeue(out ev
))
772 _listener
.PerformEventArrival(ev
);
777 int z
= Interlocked
.Decrement(ref _notifiers
);
778 if (z
== 0 && !_receiveQueue
.IsEmpty
)
779 EnsureNotifierIsActive();
783 private void Background_ParallelReceiver(object unused_state
)
785 // Continue until signalled to stop...
786 if (_listenerState
.IsLessThan(ListenerState
.StopSignaled
))
788 // Acquiring a buffer may block until a buffer
789 // becomes available.
790 byte[] buffer
= BufferManager
.AcquireBuffer(() => _listenerState
.IsGreaterThan(ListenerState
.Active
));
792 // If the buffer is null then the stop-signal was received while acquiring a buffer
795 _listenEP
.ReceiveFromAsync(_anyEP
, buffer
, 0, buffer
.Length
, (op
) =>
797 if (op
.SocketError
== SocketError
.Success
)
799 // Reschedule the receiver before pulling the buffer out, we want to catch receives
800 // in the tightest loop possible, although we don't want to keep a threadpool thread
801 // *forever* and possibly cause thread-starvation in for other jobs so we continually
802 // put the job back in the queue - this way our parallelism plays nicely with other
803 // jobs - now, if only the other jobs were programmed to give up their threads periodically
805 ThreadPool
.QueueUserWorkItem(new WaitCallback(Background_ParallelReceiver
));
806 if (op
.BytesTransferred
> 0)
808 _receiveQueue
.Enqueue(new ReceiveCapture(op
.RemoteEndPoint
, op
.Buffer
, op
.BytesTransferred
));
810 EnsureDeserializerIsActive();
813 else if (op
.SocketError
== SocketError
.OperationAborted
)
815 // This is the dispose or stop call. fall through
826 // We get here if the receiver is signaled to stop.
830 private void CascadeStopSignal()
832 _listenerState
.TryTransition(ListenerState
.Stopping
, ListenerState
.StopSignaled
, () =>
834 while (Thread
.VolatileRead(ref _deserializers
) > 0)
836 Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
);
838 while (Thread
.VolatileRead(ref _notifiers
) > 0)
840 Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
);
842 _listenerState
.SetState(ListenerState
.Stopped
);
846 private void Dispose(bool disposing
)
848 if (_listenerState
.CurrentState
== ListenerState
.Active
)
852 private void EnsureDeserializerIsActive()
854 int current
= -1, value = Thread
.VolatileRead(ref _deserializers
);
857 WaitCallback cb
= new WaitCallback(Background_Deserializer
);
861 value = Interlocked
.CompareExchange(ref _deserializers
, value + 1, current
);
862 if (value == current
)
864 ThreadPool
.QueueUserWorkItem(cb
);
871 private void EnsureNotifierIsActive()
873 int current
= -1, value = Thread
.VolatileRead(ref _notifiers
);
876 WaitCallback cb
= new WaitCallback(Background_Notifier
);
880 value = Interlocked
.CompareExchange(ref _notifiers
, value + 1, current
);
881 if (value == current
)
883 ThreadPool
.QueueUserWorkItem(cb
);
890 private void ParallelReceiver()
892 // Only startup once.
893 if (_listenerState
.TryTransition(ListenerState
.Active
, ListenerState
.Unknown
))
895 Background_ParallelReceiver(null);
899 private void PerformEventDeserializationAndQueueForNotification(EndPoint rcep
901 , int offset
, int bytesTransferred
)
903 IPEndPoint ep
= (IPEndPoint
)rcep
;
906 // For received events, set MetaEventInfo.ReciptTime, MetaEventInfo.SenderIP, and MetaEventInfo.SenderPort...
907 Event ev
= Event
.BinaryDecode(_db
, buffer
, offset
, bytesTransferred
)
908 .SetValue(Constants
.MetaEventInfoAttributes
.ReceiptTime
.Name
, Constants
.DateTimeToLwesTimeTicks(DateTime
.UtcNow
))
909 .SetValue(Constants
.MetaEventInfoAttributes
.SenderIP
.Name
, ep
.Address
)
910 .SetValue(Constants
.MetaEventInfoAttributes
.SenderPort
.Name
, ep
.Port
);
911 _eventQueue
.Enqueue(ev
);
913 catch (BadLwesDataException
)
915 _listener
.HandleGarbageData(rcep
, buffer
, offset
, bytesTransferred
);
918 BufferManager
.ReleaseBuffer(buffer
);
919 EnsureNotifierIsActive();
926 struct ReceiveCapture
930 public byte[] Buffer
;
931 public int BytesTransferred
;
932 public EndPoint RemoteEndPoint
;
938 public ReceiveCapture(EndPoint ep
, byte[] data
, int transferred
)
940 this.RemoteEndPoint
= ep
;
942 this.BytesTransferred
= transferred
;
945 #endregion Constructors
948 #endregion Nested Types
951 class RegistrationKey
: IEventSinkRegistrationKey
955 bool _disableGarbageNotification
;
956 Status
<EventSinkStatus
> _status
= new Status
<EventSinkStatus
>(EventSinkStatus
.Suspended
);
963 public RegistrationKey(EventListenerBase listener
, IEventSink sink
)
967 _threadSafe
= sink
.IsThreadSafe
;
970 #endregion Constructors
974 public object Handback
980 public IEventListener Listener
986 public IEventSink Sink
992 public EventSinkStatus Status
994 get { return _status.CurrentState; }
997 #endregion Properties
1001 public bool Activate()
1003 return _status
.SetStateIfLessThan(EventSinkStatus
.Active
, EventSinkStatus
.Canceled
);
1006 public void Cancel()
1008 _status
.SetState(EventSinkStatus
.Canceled
);
1011 public void DisableGarbageNotification()
1013 Thread
.MemoryBarrier();
1014 _disableGarbageNotification
= true;
1015 Thread
.MemoryBarrier();
1018 public bool Suspend()
1020 return _status
.SetStateIfLessThan(EventSinkStatus
.Suspended
, EventSinkStatus
.Canceled
);
1023 internal bool PerformEventArrival(Event ev
, Action
<RegistrationKey
, Exception
> errorHandler
)
1027 if (_status
.SpinToggleState(EventSinkStatus
.Notifying
, EventSinkStatus
.Active
))
1031 Sink
.HandleEventArrival(this, ev
);
1032 _status
.TryTransition(EventSinkStatus
.Active
, EventSinkStatus
.Notifying
);
1036 errorHandler(this, e
);
1044 EventSinkStatus s
= _status
.CompareExchange(EventSinkStatus
.Notifying
, EventSinkStatus
.Active
);
1045 if (s
== EventSinkStatus
.Active
|| s
== EventSinkStatus
.Notifying
)
1047 Sink
.HandleEventArrival(this, ev
);
1048 _status
.TryTransition(EventSinkStatus
.Active
, EventSinkStatus
.Notifying
);
1053 errorHandler(this, e
);
1056 return _status
.CurrentState
== EventSinkStatus
.Canceled
;
1059 internal GarbageHandlingVote
PerformGarbageArrival(EndPoint remoteEndPoint
, int priorGarbageCountForEndpoint
, byte[] garbage
,
1060 Action
<RegistrationKey
, Exception
> errorHandler
)
1062 Thread
.MemoryBarrier();
1063 bool ignoring
= _disableGarbageNotification
;
1065 GarbageHandlingVote strategy
= GarbageHandlingVote
.None
;
1070 if (_status
.SpinToggleState(EventSinkStatus
.Active
, EventSinkStatus
.Notifying
))
1074 strategy
= Sink
.HandleGarbageData(this, remoteEndPoint
, priorGarbageCountForEndpoint
, garbage
);
1078 errorHandler(this, e
);
1080 _status
.TryTransition(EventSinkStatus
.Active
, EventSinkStatus
.Notifying
);
1087 if (_status
.TryTransition(EventSinkStatus
.Notifying
, EventSinkStatus
.Active
))
1089 strategy
= Sink
.HandleGarbageData(this, remoteEndPoint
, priorGarbageCountForEndpoint
, garbage
);
1090 _status
.TryTransition(EventSinkStatus
.Active
, EventSinkStatus
.Notifying
);
1095 errorHandler(this, e
);
1105 class TrafficTrackingRec
1109 int _garbageCount
= 0;
1113 #region Constructors
1115 public TrafficTrackingRec(EndPoint ep
)
1117 RemoteEndPoint
= ep
;
1120 #endregion Constructors
1126 get { return RemoteEndPoint == null; }
1129 public int PreviousGargageDataCount
1131 get { return _garbageCount; }
1134 public EndPoint RemoteEndPoint
1140 public GarbageHandlingVote Strategy
1146 #endregion Properties
1150 public int IncrementGarbageCount()
1152 return Interlocked
.Increment(ref _garbageCount
);
1158 #endregion Nested Types