2 // This file is part of the LWES .NET Binding (LWES.net)
4 // COPYRIGHT© 2009, Phillip Clark (phillip[at*flitbit[dot*org)
5 // original .NET implementation
7 // LWES.net is free software: you can redistribute it and/or modify
8 // it under the terms of the Lesser GNU General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
12 // LWES.net is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // Lesser GNU General Public License for more details.
17 // You should have received a copy of the Lesser GNU General Public License
18 // along with LWES.net. If not, see <http://www.gnu.org/licenses/>.
20 namespace Org
.Lwes
.Emitter
24 using System
.Net
.Sockets
;
26 using System
.Threading
;
30 using Org
.Lwes
.Properties
;
33 /// Base class for event emitters.
35 public abstract class EventEmitterBase
: IEventEmitter
39 const int CDisposeBackgroundThreadWaitTimeMS
= 200;
44 SupportedEncoding _enc
;
47 Status
<EmitterState
> _status
;
55 /// Creates a new instance.
57 protected EventEmitterBase()
62 /// Destroys the instance; completes the IDisposable pattern.
69 #endregion Constructors
85 #endregion Enumerations
87 #region Nested Interfaces
89 interface IEmitter
: IDisposable
93 void Start(IEventTemplateDB db
95 , Action
<Socket
, IPEndPoint
> finishSocket
);
98 #endregion Nested Interfaces
103 /// The ip address to which events are emitted.
105 public IPAddress Address
113 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
119 /// The character encoding used when performing event IO.
121 public SupportedEncoding Encoding
126 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
128 _encoding
= Constants
.GetEncoding((short)value);
133 /// Indicates whether the factory has been initialized.
135 public virtual bool IsInitialized
137 get { return _status.CurrentState == EmitterState.Active; }
141 /// The ip port to which events are emitted.
151 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
157 /// The event template database used when creating events.
159 public IEventTemplateDB TemplateDB
164 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
170 /// Indicates whether events issued from the factory will validate
171 /// when they are written to.
175 get { return _validate; }
178 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
184 /// Indicates whether the emitter is using a parallel emit strategy.
186 protected bool IsParallel
191 #endregion Properties
196 /// Creates an event type identified by the event name.
198 /// <param name="eventName">the event type's name</param>
199 /// <returns>a new LWES event instance</returns>
200 public Event
CreateEvent(string eventName
)
202 if (!IsInitialized
) throw new InvalidOperationException(Resources
.Error_NotYetInitialized
);
203 if (eventName
== null) throw new ArgumentNullException("eventName");
204 if (eventName
.Length
== 0) throw new ArgumentException(Resources
.Error_EmptyStringNotAllowed
, "eventName");
207 if (!_db
.TryCreateEvent(eventName
, out result
, _validate
, _enc
))
209 result
= new Event(new EventTemplate(false, eventName
), false, _enc
);
215 /// Creates an event type identified by the event name.
217 /// <param name="eventName">the event type's name</param>
218 /// <param name="enc">encoding used when performing IO on the event</param>
219 /// <returns>a new LWES event instance</returns>
220 public Event
CreateEvent(string eventName
, SupportedEncoding enc
)
222 if (!IsInitialized
) throw new InvalidOperationException(Resources
.Error_NotYetInitialized
);
223 if (eventName
== null) throw new ArgumentNullException("eventName");
224 if (eventName
.Length
== 0) throw new ArgumentException(Resources
.Error_EmptyStringNotAllowed
, "eventName");
227 if (!_db
.TryCreateEvent(eventName
, out result
, _validate
, enc
))
229 result
= new Event(new EventTemplate(false, eventName
), false, enc
);
235 /// Creates an event type identified by the event name.
237 /// <param name="eventName">the event type's name</param>
238 /// <param name="validate">whether the event is validated</param>
239 /// <returns>a new LWES event instance</returns>
240 public Event
CreateEvent(string eventName
, bool validate
)
242 if (!IsInitialized
) throw new InvalidOperationException(Resources
.Error_NotYetInitialized
);
243 if (eventName
== null) throw new ArgumentNullException("eventName");
244 if (eventName
.Length
== 0) throw new ArgumentException(Resources
.Error_EmptyStringNotAllowed
, "eventName");
247 if (!_db
.TryCreateEvent(eventName
, out result
, validate
, _enc
))
249 result
= new Event(new EventTemplate(false, eventName
), validate
, _enc
);
255 /// Creates an event type identified by the event name.
257 /// <param name="eventName">the event type's name</param>
258 /// <param name="validate">whether the event is validated</param>
259 /// <param name="enc">encoding used when performing IO on the event</param>
260 /// <returns>a new LWES event instance</returns>
261 public Event
CreateEvent(string eventName
, bool validate
, SupportedEncoding enc
)
263 if (!IsInitialized
) throw new InvalidOperationException(Resources
.Error_NotYetInitialized
);
264 if (eventName
== null) throw new ArgumentNullException("eventName");
265 if (eventName
.Length
== 0) throw new ArgumentException(Resources
.Error_EmptyStringNotAllowed
, "eventName");
268 if (!_db
.TryCreateEvent(eventName
, out result
, validate
, enc
))
270 result
= new Event(new EventTemplate(false, eventName
), validate
, enc
);
276 /// Disposes of the emitter and frees any resources held.
278 public void Dispose()
281 GC
.SuppressFinalize(this);
285 /// Emits an event to the event system.
287 /// <param name="evt">the event being emitted</param>
288 public void Emit(Event evt
)
290 if (!IsInitialized
) throw new InvalidOperationException(Resources
.Error_NotYetInitialized
);
296 /// Initializes the emitter.
298 public void Initialize()
300 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
302 if (_status
.SetStateIfLessThan(EmitterState
.Initializing
, EmitterState
.Initializing
))
306 PerformInitialization();
310 _status
.TryTransition(EmitterState
.Active
, EmitterState
.Initializing
);
316 /// Disposes of the emitter.
318 /// <param name="disposing">Indicates whether the object is being disposed</param>
319 protected virtual void Dispose(bool disposing
)
321 Util
.Dispose(ref _emitter
);
325 /// Finishes initialization of the emitter.
327 /// <param name="endpoint">An IP endpoint where events will be emitted</param>
328 /// <param name="finishSocket">callback method used to finish setup of the socket</param>
329 protected void FinishInitialize(IPEndPoint endpoint
, Action
<Socket
, IPEndPoint
> finishSocket
)
331 if (endpoint
== null) throw new ArgumentNullException("endpoint");
332 if (finishSocket
== null) throw new ArgumentNullException("finishSocket");
334 if (_status
.CurrentState
!= EmitterState
.Initializing
)
335 throw new InvalidOperationException("only valid while initialzing");
337 if (_db
== null) throw new InvalidOperationException("TemplateDB must be set before initialization");
338 if (_encoding
== null) throw new InvalidOperationException("Encoding must be set before initialization");
340 IEmitter emitter
= (IsParallel
)
341 ? (IEmitter
)new ParallelEmitter()
342 : (IEmitter
)new DirectEmitter();
344 emitter
.Start(_db
, endpoint
, finishSocket
);
350 /// Performs initialization of the emitter. Derived classes must implement this method
351 /// and subsequently call the <em>FinishInitialize</em> method of the base class.
353 protected abstract void PerformInitialization();
359 class DirectEmitter
: IEmitter
364 IEventTemplateDB _db
;
367 Status
<EmitterState
> _senderState
;
378 #endregion Constructors
382 public void Dispose()
385 GC
.SuppressFinalize(this);
388 public void Emit(Event ev
)
390 if (_senderState
.IsGreaterThan(EmitterState
.Active
))
391 throw new InvalidOperationException(Resources
.Error_EmitterHasEnteredShutdownState
);
393 _emitEP
.SendTo(_sendToEP
, LwesSerializer
.Serialize(ev
));
396 public void Start(IEventTemplateDB db
, IPEndPoint sendToEP
, Action
<Socket
, IPEndPoint
> finishSocket
)
399 _sendToEP
= sendToEP
;
400 _buffer
= BufferManager
.AcquireBuffer(null);
401 _emitEP
= new UdpEndpoint(sendToEP
).Initialize(finishSocket
);
402 _senderState
.SetState(EmitterState
.Active
);
405 private void Dispose(bool p
)
407 // Signal background threads...
408 _senderState
.TryTransition(EmitterState
.StopSignaled
, EmitterState
.Active
, () =>
410 Util
.Dispose(ref _emitEP
);
411 BufferManager
.ReleaseBuffer(_buffer
);
419 class ParallelEmitter
: IEmitter
423 SimpleLockFreeQueue
<byte[]> _dataQueue
= new SimpleLockFreeQueue
<byte[]>();
424 IEventTemplateDB _db
;
426 Status
<EmitterState
> _emitterState
;
439 #endregion Constructors
443 public void Dispose()
446 GC
.SuppressFinalize(this);
449 public void Emit(Event ev
)
451 _dataQueue
.Enqueue(LwesSerializer
.SerializeToMemoryBuffer(ev
));
452 EnsureSenderIsActive();
455 public void Start(IEventTemplateDB db
, IPEndPoint sendToEP
, Action
<Socket
, IPEndPoint
> finishSocket
)
458 _sendToEP
= sendToEP
;
459 _emitEP
= new UdpEndpoint(sendToEP
).Initialize(finishSocket
);
460 _emitterState
.SetState(EmitterState
.Active
);
463 void Background_Sender(object unused_state
)
466 // Drains the event queue and performs notification
471 while (_emitterState
.IsLessThan(EmitterState
.StopSignaled
) && _dataQueue
.TryDequeue(out data
))
473 _emitEP
.SendTo(_sendToEP
, data
);
474 BufferManager
.ReleaseBuffer(data
);
479 int z
= Interlocked
.Decrement(ref _senders
);
480 if (z
== 0 && !_dataQueue
.IsEmpty
)
481 EnsureSenderIsActive();
485 private void Dispose(bool p
)
487 // Signal background threads...
488 _emitterState
.TryTransition(EmitterState
.StopSignaled
, EmitterState
.Active
, () =>
490 while (Thread
.VolatileRead(ref _senders
) > 0)
492 Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
);
495 while (_dataQueue
.TryDequeue(out b
))
497 BufferManager
.ReleaseBuffer(b
);
499 Util
.Dispose(ref _emitEP
);
503 private void EnsureSenderIsActive()
505 int current
= -1, value = Thread
.VolatileRead(ref _senders
);
508 WaitCallback cb
= new WaitCallback(Background_Sender
);
512 value = Interlocked
.CompareExchange(ref _senders
, value + 1, current
);
513 if (value == current
)
515 ThreadPool
.QueueUserWorkItem(cb
);
525 #endregion Nested Types