Added IDataReceiverSink and refactored to separate the EventSink from the GarbageData...
[lwes-dotnet/github-mirror.git] / Org.Lwes / Emitter / EventEmitterBase.cs
blob6d4c1e9395752744508786bb0a802c0d6f844a02
1 //
2 // This file is part of the LWES .NET Binding (LWES.net)
3 //
4 // COPYRIGHT© 2009, Phillip Clark (phillip[at*flitbit[dot*org)
5 // original .NET implementation
6 //
7 // LWES.net is free software: you can redistribute it and/or modify
8 // it under the terms of the Lesser GNU General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
12 // LWES.net is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // Lesser GNU General Public License for more details.
17 // You should have received a copy of the Lesser GNU General Public License
18 // along with LWES.net. If not, see <http://www.gnu.org/licenses/>.
20 namespace Org.Lwes.Emitter
22 using System;
23 using System.Net;
24 using System.Net.Sockets;
25 using System.Text;
26 using System.Threading;
28 using Org.Lwes.DB;
29 using Org.Lwes.ESF;
30 using Org.Lwes.Properties;
32 /// <summary>
33 /// Base class for event emitters.
34 /// </summary>
35 public abstract class EventEmitterBase : IEventEmitter
37 #region Fields
39 const int CDisposeBackgroundThreadWaitTimeMS = 200;
41 IPAddress _address;
42 IEventTemplateDB _db;
43 IEmitter _emitter;
44 SupportedEncoding _enc;
45 Encoding _encoding;
46 int _port;
47 Status<EmitterState> _status;
48 bool _validate;
50 #endregion Fields
52 #region Constructors
54 /// <summary>
55 /// Creates a new instance.
56 /// </summary>
57 protected EventEmitterBase()
61 /// <summary>
62 /// Destroys the instance; completes the IDisposable pattern.
63 /// </summary>
64 ~EventEmitterBase()
66 Dispose(false);
69 #endregion Constructors
71 #region Enumerations
73 enum EmitterState
75 Unknown = 0,
76 Initializing = 1,
77 Active = 2,
78 Suspending = 3,
79 Suspended = 4,
80 StopSignaled = 5,
81 Stopping = 6,
82 Stopped = 7,
85 #endregion Enumerations
87 #region Nested Interfaces
89 interface IEmitter : IDisposable
91 void Emit(Event ev);
93 void Start(IEventTemplateDB db
94 , IPEndPoint sendToEP
95 , Action<Socket, IPEndPoint> finishSocket);
98 #endregion Nested Interfaces
100 #region Properties
102 /// <summary>
103 /// The ip address to which events are emitted.
104 /// </summary>
105 public IPAddress Address
109 return _address;
113 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
114 _address = value;
118 /// <summary>
119 /// The character encoding used when performing event IO.
120 /// </summary>
121 public SupportedEncoding Encoding
123 get { return _enc; }
126 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
127 _enc = value;
128 _encoding = Constants.GetEncoding((short)value);
132 /// <summary>
133 /// Indicates whether the factory has been initialized.
134 /// </summary>
135 public virtual bool IsInitialized
137 get { return _status.CurrentState == EmitterState.Active; }
140 /// <summary>
141 /// The ip port to which events are emitted.
142 /// </summary>
143 public int Port
147 return _port;
151 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
152 _port = value;
156 /// <summary>
157 /// The event template database used when creating events.
158 /// </summary>
159 public IEventTemplateDB TemplateDB
161 get { return _db; }
164 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
165 _db = value;
169 /// <summary>
170 /// Indicates whether events issued from the factory will validate
171 /// when they are written to.
172 /// </summary>
173 public bool Validate
175 get { return _validate; }
178 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
179 _validate = value;
183 /// <summary>
184 /// Indicates whether the emitter is using a parallel emit strategy.
185 /// </summary>
186 protected bool IsParallel
188 get; set;
191 #endregion Properties
193 #region Methods
195 /// <summary>
196 /// Creates an event type identified by the event name.
197 /// </summary>
198 /// <param name="eventName">the event type's name</param>
199 /// <returns>a new LWES event instance</returns>
200 public Event CreateEvent(string eventName)
202 if (!IsInitialized) throw new InvalidOperationException(Resources.Error_NotYetInitialized);
203 if (eventName == null) throw new ArgumentNullException("eventName");
204 if (eventName.Length == 0) throw new ArgumentException(Resources.Error_EmptyStringNotAllowed, "eventName");
206 Event result;
207 if (!_db.TryCreateEvent(eventName, out result, _validate, _enc))
209 result = new Event(new EventTemplate(false, eventName), false, _enc);
211 return result;
214 /// <summary>
215 /// Creates an event type identified by the event name.
216 /// </summary>
217 /// <param name="eventName">the event type's name</param>
218 /// <param name="enc">encoding used when performing IO on the event</param>
219 /// <returns>a new LWES event instance</returns>
220 public Event CreateEvent(string eventName, SupportedEncoding enc)
222 if (!IsInitialized) throw new InvalidOperationException(Resources.Error_NotYetInitialized);
223 if (eventName == null) throw new ArgumentNullException("eventName");
224 if (eventName.Length == 0) throw new ArgumentException(Resources.Error_EmptyStringNotAllowed, "eventName");
226 Event result;
227 if (!_db.TryCreateEvent(eventName, out result, _validate, enc))
229 result = new Event(new EventTemplate(false, eventName), false, enc);
231 return result;
234 /// <summary>
235 /// Creates an event type identified by the event name.
236 /// </summary>
237 /// <param name="eventName">the event type's name</param>
238 /// <param name="validate">whether the event is validated</param>
239 /// <returns>a new LWES event instance</returns>
240 public Event CreateEvent(string eventName, bool validate)
242 if (!IsInitialized) throw new InvalidOperationException(Resources.Error_NotYetInitialized);
243 if (eventName == null) throw new ArgumentNullException("eventName");
244 if (eventName.Length == 0) throw new ArgumentException(Resources.Error_EmptyStringNotAllowed, "eventName");
246 Event result;
247 if (!_db.TryCreateEvent(eventName, out result, validate, _enc))
249 result = new Event(new EventTemplate(false, eventName), validate, _enc);
251 return result;
254 /// <summary>
255 /// Creates an event type identified by the event name.
256 /// </summary>
257 /// <param name="eventName">the event type's name</param>
258 /// <param name="validate">whether the event is validated</param>
259 /// <param name="enc">encoding used when performing IO on the event</param>
260 /// <returns>a new LWES event instance</returns>
261 public Event CreateEvent(string eventName, bool validate, SupportedEncoding enc)
263 if (!IsInitialized) throw new InvalidOperationException(Resources.Error_NotYetInitialized);
264 if (eventName == null) throw new ArgumentNullException("eventName");
265 if (eventName.Length == 0) throw new ArgumentException(Resources.Error_EmptyStringNotAllowed, "eventName");
267 Event result;
268 if (!_db.TryCreateEvent(eventName, out result, validate, enc))
270 result = new Event(new EventTemplate(false, eventName), validate, enc);
272 return result;
275 /// <summary>
276 /// Disposes of the emitter and frees any resources held.
277 /// </summary>
278 public void Dispose()
280 Dispose(true);
281 GC.SuppressFinalize(this);
284 /// <summary>
285 /// Emits an event to the event system.
286 /// </summary>
287 /// <param name="evt">the event being emitted</param>
288 public void Emit(Event evt)
290 if (!IsInitialized) throw new InvalidOperationException(Resources.Error_NotYetInitialized);
292 _emitter.Emit(evt);
295 /// <summary>
296 /// Initializes the emitter.
297 /// </summary>
298 public void Initialize()
300 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
302 if (_status.SetStateIfLessThan(EmitterState.Initializing, EmitterState.Initializing))
306 PerformInitialization();
308 finally
310 _status.TryTransition(EmitterState.Active, EmitterState.Initializing);
315 /// <summary>
316 /// Disposes of the emitter.
317 /// </summary>
318 /// <param name="disposing">Indicates whether the object is being disposed</param>
319 protected virtual void Dispose(bool disposing)
321 Util.Dispose(ref _emitter);
324 /// <summary>
325 /// Finishes initialization of the emitter.
326 /// </summary>
327 /// <param name="endpoint">An IP endpoint where events will be emitted</param>
328 /// <param name="finishSocket">callback method used to finish setup of the socket</param>
329 protected void FinishInitialize(IPEndPoint endpoint, Action<Socket, IPEndPoint> finishSocket)
331 if (endpoint == null) throw new ArgumentNullException("endpoint");
332 if (finishSocket == null) throw new ArgumentNullException("finishSocket");
334 if (_status.CurrentState != EmitterState.Initializing)
335 throw new InvalidOperationException("only valid while initialzing");
337 if (_db == null) throw new InvalidOperationException("TemplateDB must be set before initialization");
338 if (_encoding == null) throw new InvalidOperationException("Encoding must be set before initialization");
340 IEmitter emitter = (IsParallel)
341 ? (IEmitter)new ParallelEmitter()
342 : (IEmitter)new DirectEmitter();
344 emitter.Start(_db, endpoint, finishSocket);
346 _emitter = emitter;
349 /// <summary>
350 /// Performs initialization of the emitter. Derived classes must implement this method
351 /// and subsequently call the <em>FinishInitialize</em> method of the base class.
352 /// </summary>
353 protected abstract void PerformInitialization();
355 #endregion Methods
357 #region Nested Types
359 class DirectEmitter : IEmitter
361 #region Fields
363 byte[] _buffer;
364 IEventTemplateDB _db;
365 UdpEndpoint _emitEP;
366 EndPoint _sendToEP;
367 Status<EmitterState> _senderState;
369 #endregion Fields
371 #region Constructors
373 ~DirectEmitter()
375 Dispose(false);
378 #endregion Constructors
380 #region Methods
382 public void Dispose()
384 this.Dispose(true);
385 GC.SuppressFinalize(this);
388 public void Emit(Event ev)
390 if (_senderState.IsGreaterThan(EmitterState.Active))
391 throw new InvalidOperationException(Resources.Error_EmitterHasEnteredShutdownState);
393 _emitEP.SendTo(_sendToEP, LwesSerializer.Serialize(ev));
396 public void Start(IEventTemplateDB db, IPEndPoint sendToEP, Action<Socket, IPEndPoint> finishSocket)
398 _db = db;
399 _sendToEP = sendToEP;
400 _buffer = BufferManager.AcquireBuffer(null);
401 _emitEP = new UdpEndpoint(sendToEP).Initialize(finishSocket);
402 _senderState.SetState(EmitterState.Active);
405 private void Dispose(bool p)
407 // Signal background threads...
408 _senderState.TryTransition(EmitterState.StopSignaled, EmitterState.Active, () =>
410 Util.Dispose(ref _emitEP);
411 BufferManager.ReleaseBuffer(_buffer);
412 _buffer = null;
416 #endregion Methods
419 class ParallelEmitter : IEmitter
421 #region Fields
423 SimpleLockFreeQueue<byte[]> _dataQueue = new SimpleLockFreeQueue<byte[]>();
424 IEventTemplateDB _db;
425 UdpEndpoint _emitEP;
426 Status<EmitterState> _emitterState;
427 EndPoint _sendToEP;
428 int _senders;
430 #endregion Fields
432 #region Constructors
434 ~ParallelEmitter()
436 Dispose(false);
439 #endregion Constructors
441 #region Methods
443 public void Dispose()
445 this.Dispose(true);
446 GC.SuppressFinalize(this);
449 public void Emit(Event ev)
451 _dataQueue.Enqueue(LwesSerializer.SerializeToMemoryBuffer(ev));
452 EnsureSenderIsActive();
455 public void Start(IEventTemplateDB db, IPEndPoint sendToEP, Action<Socket, IPEndPoint> finishSocket)
457 _db = db;
458 _sendToEP = sendToEP;
459 _emitEP = new UdpEndpoint(sendToEP).Initialize(finishSocket);
460 _emitterState.SetState(EmitterState.Active);
463 void Background_Sender(object unused_state)
466 // Drains the event queue and performs notification
470 byte[] data;
471 while (_emitterState.IsLessThan(EmitterState.StopSignaled) && _dataQueue.TryDequeue(out data))
473 _emitEP.SendTo(_sendToEP, data);
474 BufferManager.ReleaseBuffer(data);
477 finally
479 int z = Interlocked.Decrement(ref _senders);
480 if (z == 0 && !_dataQueue.IsEmpty)
481 EnsureSenderIsActive();
485 private void Dispose(bool p)
487 // Signal background threads...
488 _emitterState.TryTransition(EmitterState.StopSignaled, EmitterState.Active, () =>
490 while (Thread.VolatileRead(ref _senders) > 0)
492 Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS);
494 byte[] b;
495 while (_dataQueue.TryDequeue(out b))
497 BufferManager.ReleaseBuffer(b);
499 Util.Dispose(ref _emitEP);
503 private void EnsureSenderIsActive()
505 int current = -1, value = Thread.VolatileRead(ref _senders);
506 if (value < 1)
508 WaitCallback cb = new WaitCallback(Background_Sender);
509 while (true)
511 current = value;
512 value = Interlocked.CompareExchange(ref _senders, value + 1, current);
513 if (value == current)
515 ThreadPool.QueueUserWorkItem(cb);
516 break;
522 #endregion Methods
525 #endregion Nested Types