2010-04-06 Jb Evain <jbevain@novell.com>
[mcs.git] / class / System.ServiceModel / System.ServiceModel.Dispatcher / ChannelDispatcher.cs
blob157c08f6c8e5b26d15cd7fc9ab3022a36ea26eaa
1 //
2 // ChannelDispatcher.cs
3 //
4 // Author:
5 // Atsushi Enomoto <atsushi@ximian.com>
6 //
7 // Copyright (C) 2005,2009 Novell, Inc. http://www.novell.com
8 //
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
16 //
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
19 //
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28 using System;
29 using System.Collections.Generic;
30 using System.Collections.ObjectModel;
31 using System.Reflection;
32 using System.ServiceModel.Channels;
33 using System.Threading;
34 using System.Transactions;
35 using System.ServiceModel;
36 using System.ServiceModel.Description;
38 namespace System.ServiceModel.Dispatcher
40 public class ChannelDispatcher : ChannelDispatcherBase
42 class EndpointDispatcherCollection : SynchronizedCollection<EndpointDispatcher>
44 public EndpointDispatcherCollection (ChannelDispatcher owner)
46 this.owner = owner;
49 ChannelDispatcher owner;
51 protected override void ClearItems ()
53 foreach (var ed in this)
54 ed.ChannelDispatcher = null;
55 base.ClearItems ();
58 protected override void InsertItem (int index, EndpointDispatcher item)
60 item.ChannelDispatcher = owner;
61 base.InsertItem (index, item);
64 protected override void RemoveItem (int index)
66 if (index < Count)
67 this [index].ChannelDispatcher = null;
68 base.RemoveItem (index);
71 protected override void SetItem (int index, EndpointDispatcher item)
73 item.ChannelDispatcher = owner;
74 base.SetItem (index, item);
78 ServiceHostBase host;
80 string binding_name;
81 Collection<IErrorHandler> error_handlers
82 = new Collection<IErrorHandler> ();
83 IChannelListener listener;
84 internal IDefaultCommunicationTimeouts timeouts; // FIXME: remove internal
85 MessageVersion message_version;
86 bool receive_sync, include_exception_detail_in_faults,
87 manual_addressing, is_tx_receive;
88 int max_tx_batch_size;
89 SynchronizedCollection<IChannelInitializer> initializers
90 = new SynchronizedCollection<IChannelInitializer> ();
91 IsolationLevel tx_isolation_level;
92 TimeSpan tx_timeout;
93 ServiceThrottle throttle;
95 Guid identifier = Guid.NewGuid ();
96 ManualResetEvent async_event = new ManualResetEvent (false);
98 ListenerLoopManager loop_manager;
99 SynchronizedCollection<EndpointDispatcher> endpoints;
101 [MonoTODO ("get binding info from config")]
102 public ChannelDispatcher (IChannelListener listener)
103 : this (listener, null)
107 public ChannelDispatcher (
108 IChannelListener listener, string bindingName)
109 : this (listener, bindingName, null)
113 public ChannelDispatcher (
114 IChannelListener listener, string bindingName,
115 IDefaultCommunicationTimeouts timeouts)
117 if (listener == null)
118 throw new ArgumentNullException ("listener");
119 Init (listener, bindingName, timeouts);
122 private void Init (IChannelListener listener, string bindingName,
123 IDefaultCommunicationTimeouts timeouts)
125 this.listener = listener;
126 this.binding_name = bindingName;
127 // IChannelListener is often a ChannelListenerBase
128 // which implements IDefaultCommunicationTimeouts.
129 this.timeouts = timeouts ?? listener as IDefaultCommunicationTimeouts ?? DefaultCommunicationTimeouts.Instance;
130 endpoints = new EndpointDispatcherCollection (this);
133 internal EndpointDispatcher InitializeServiceEndpoint (Type serviceType, ServiceEndpoint se)
135 this.MessageVersion = se.Binding.MessageVersion;
136 if (this.MessageVersion == null)
137 this.MessageVersion = MessageVersion.Default;
139 //Attach one EndpointDispacher to the ChannelDispatcher
140 EndpointDispatcher ed = new EndpointDispatcher (se.Address, se.Contract.Name, se.Contract.Namespace);
141 this.Endpoints.Add (ed);
142 ed.InitializeServiceEndpoint (false, serviceType, se);
143 return ed;
146 public string BindingName {
147 get { return binding_name; }
150 public SynchronizedCollection<IChannelInitializer> ChannelInitializers {
151 get { return initializers; }
154 protected internal override TimeSpan DefaultCloseTimeout {
155 get { return timeouts.CloseTimeout; }
158 protected internal override TimeSpan DefaultOpenTimeout {
159 get { return timeouts.OpenTimeout; }
162 public Collection<IErrorHandler> ErrorHandlers {
163 get { return error_handlers; }
166 public SynchronizedCollection<EndpointDispatcher> Endpoints {
167 get { return endpoints; }
170 [MonoTODO]
171 public bool IsTransactedAccept {
172 get { throw new NotImplementedException (); }
175 public bool IsTransactedReceive {
176 get { return is_tx_receive; }
177 set { is_tx_receive = value; }
180 public bool ManualAddressing {
181 get { return manual_addressing; }
182 set { manual_addressing = value; }
185 public int MaxTransactedBatchSize {
186 get { return max_tx_batch_size; }
187 set { max_tx_batch_size = value; }
190 public override ServiceHostBase Host {
191 get { return host; }
194 public override IChannelListener Listener {
195 get { return listener; }
198 public MessageVersion MessageVersion {
199 get { return message_version; }
200 set { message_version = value; }
203 public bool ReceiveSynchronously {
204 get { return receive_sync; }
205 set {
206 ThrowIfDisposedOrImmutable ();
207 receive_sync = value;
211 public bool IncludeExceptionDetailInFaults {
212 get { return include_exception_detail_in_faults; }
213 set { include_exception_detail_in_faults = value; }
216 public ServiceThrottle ServiceThrottle {
217 get { return throttle; }
218 set { throttle = value; }
221 public IsolationLevel TransactionIsolationLevel {
222 get { return tx_isolation_level; }
223 set { tx_isolation_level = value; }
226 public TimeSpan TransactionTimeout {
227 get { return tx_timeout; }
228 set { tx_timeout = value; }
231 protected internal override void Attach (ServiceHostBase host)
233 this.host = host;
236 public override void CloseInput ()
238 if (loop_manager != null)
239 loop_manager.CloseInput ();
242 protected internal override void Detach (ServiceHostBase host)
244 this.host = null;
247 protected override void OnAbort ()
249 if (loop_manager != null)
250 loop_manager.Stop (TimeSpan.FromTicks (1));
253 Action<TimeSpan> open_delegate;
254 Action<TimeSpan> close_delegate;
256 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
257 AsyncCallback callback, object state)
259 if (close_delegate == null)
260 close_delegate = new Action<TimeSpan> (OnClose);
261 return close_delegate.BeginInvoke (timeout, callback, state);
264 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
265 AsyncCallback callback, object state)
267 if (open_delegate == null)
268 open_delegate = new Action<TimeSpan> (OnOpen);
269 return open_delegate.BeginInvoke (timeout, callback, state);
272 protected override void OnClose (TimeSpan timeout)
274 if (loop_manager != null)
275 loop_manager.Stop (timeout);
278 protected override void OnClosed ()
280 if (host != null)
281 host.ChannelDispatchers.Remove (this);
282 base.OnClosed ();
285 protected override void OnEndClose (IAsyncResult result)
287 close_delegate.EndInvoke (result);
290 protected override void OnEndOpen (IAsyncResult result)
292 open_delegate.EndInvoke (result);
295 protected override void OnOpen (TimeSpan timeout)
297 if (Host == null || MessageVersion == null)
298 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
300 loop_manager.Setup (timeout);
303 protected override void OnOpening ()
305 base.OnOpening ();
306 loop_manager = new ListenerLoopManager (this);
309 protected override void OnOpened ()
311 base.OnOpened ();
312 StartLoop ();
315 void StartLoop ()
317 // FIXME: not sure if it should be filled here.
318 if (ServiceThrottle == null)
319 ServiceThrottle = new ServiceThrottle ();
321 loop_manager.Start ();
325 // isolated from ChannelDispatcher
326 class ListenerLoopManager
328 ChannelDispatcher owner;
329 AutoResetEvent throttle_wait_handle = new AutoResetEvent (false);
330 AutoResetEvent creator_handle = new AutoResetEvent (false);
331 ManualResetEvent stop_handle = new ManualResetEvent (false);
332 bool loop;
333 Thread loop_thread;
334 DateTime close_started;
335 TimeSpan close_timeout;
336 Func<IAsyncResult> channel_acceptor;
337 List<IChannel> channels = new List<IChannel> ();
338 AddressFilterMode address_filter_mode;
340 public ListenerLoopManager (ChannelDispatcher owner)
342 this.owner = owner;
343 var sba = owner.Host != null ? owner.Host.Description.Behaviors.Find<ServiceBehaviorAttribute> () : null;
344 if (sba != null)
345 address_filter_mode = sba.AddressFilterMode;
348 public void Setup (TimeSpan openTimeout)
350 if (owner.Listener.State != CommunicationState.Opened)
351 owner.Listener.Open (openTimeout);
353 // It is tested at Open(), but strangely it is not instantiated at this point.
354 foreach (var ed in owner.Endpoints)
355 if (ed.DispatchRuntime.InstanceContextProvider == null && (ed.DispatchRuntime.Type == null || ed.DispatchRuntime.Type.GetConstructor (Type.EmptyTypes) == null))
356 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
357 SetupChannelAcceptor ();
360 public void Start ()
362 if (loop_thread == null)
363 loop_thread = new Thread (new ThreadStart (Loop));
364 loop_thread.Start ();
367 Func<IAsyncResult> CreateAcceptor<TChannel> (IChannelListener l) where TChannel : class, IChannel
369 IChannelListener<TChannel> r = l as IChannelListener<TChannel>;
370 if (r == null)
371 return null;
372 AsyncCallback callback = delegate (IAsyncResult result) {
373 try {
374 ChannelAccepted (r.EndAcceptChannel (result));
375 } catch (Exception ex) {
376 Console.WriteLine ("Exception during finishing channel acceptance.");
377 Console.WriteLine (ex);
378 creator_handle.Set ();
381 return delegate {
382 try {
383 return r.BeginAcceptChannel (callback, null);
384 } catch (Exception ex) {
385 Console.WriteLine ("Exception during accepting channel.");
386 Console.WriteLine (ex);
387 throw;
392 void SetupChannelAcceptor ()
394 var l = owner.Listener;
395 channel_acceptor =
396 CreateAcceptor<IReplyChannel> (l) ??
397 CreateAcceptor<IReplySessionChannel> (l) ??
398 CreateAcceptor<IInputChannel> (l) ??
399 CreateAcceptor<IInputSessionChannel> (l) ??
400 CreateAcceptor<IDuplexChannel> (l) ??
401 CreateAcceptor<IDuplexSessionChannel> (l);
402 if (channel_acceptor == null)
403 throw new InvalidOperationException (String.Format ("Unrecognized channel listener type: {0}", l.GetType ()));
406 public void Stop (TimeSpan timeout)
408 if (loop_thread == null)
409 return;
411 close_started = DateTime.Now;
412 close_timeout = timeout;
413 loop = false;
414 creator_handle.Set ();
415 throttle_wait_handle.Set (); // break primary loop
416 if (stop_handle != null) {
417 stop_handle.WaitOne (timeout > TimeSpan.Zero ? timeout : TimeSpan.FromTicks (1));
418 stop_handle.Close ();
419 stop_handle = null;
421 if (owner.Listener.State != CommunicationState.Closed) {
422 // FIXME: log it
423 Console.WriteLine ("Channel listener '{0}' is not closed. Aborting.", owner.Listener.GetType ());
424 owner.Listener.Abort ();
426 if (loop_thread != null && loop_thread.IsAlive)
427 loop_thread.Abort ();
428 loop_thread = null;
431 public void CloseInput ()
433 foreach (var ch in channels.ToArray ()) {
434 if (ch.State == CommunicationState.Closed)
435 channels.Remove (ch); // zonbie, if exists
436 else {
437 try {
438 ch.Close (close_timeout - (DateTime.Now - close_started));
439 } catch (Exception ex) {
440 // FIXME: log it.
441 Console.WriteLine (ex);
442 ch.Abort ();
448 void Loop ()
450 try {
451 LoopCore ();
452 } catch (Exception ex) {
453 // FIXME: log it
454 Console.WriteLine ("ListenerLoopManager caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner.Listener);
455 Console.WriteLine (ex);
456 } finally {
457 if (stop_handle != null)
458 stop_handle.Set ();
462 void LoopCore ()
464 loop = true;
466 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
467 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
469 while (loop) {
470 // FIXME: enable throttling and allow more than one connection to process at a time.
471 while (loop && channels.Count < 1) {
472 // while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
473 channel_acceptor ();
474 creator_handle.WaitOne (); // released by ChannelAccepted()
476 if (!loop)
477 break;
478 throttle_wait_handle.WaitOne (); // released by IChannel.Close()
480 try {
481 owner.Listener.Close ();
482 } finally {
483 // make sure to close both listener and channels.
484 owner.CloseInput ();
488 void ChannelAccepted (IChannel ch)
490 try {
491 if (ch == null) // could happen when it was aborted
492 return;
493 if (!loop) {
494 var dis = ch as IDisposable;
495 if (dis != null)
496 dis.Dispose ();
497 return;
500 lock (channels)
501 channels.Add (ch);
502 ch.Opened += delegate {
503 ch.Faulted += delegate {
504 lock (channels)
505 if (channels.Contains (ch))
506 channels.Remove (ch);
507 throttle_wait_handle.Set (); // release loop wait lock.
509 ch.Closed += delegate {
510 lock (channels)
511 if (channels.Contains (ch))
512 channels.Remove (ch);
513 throttle_wait_handle.Set (); // release loop wait lock.
516 ch.Open ();
517 } finally {
518 creator_handle.Set ();
521 ProcessRequestOrInput (ch);
524 void ProcessRequestOrInput (IChannel ch)
526 var reply = ch as IReplyChannel;
527 var input = ch as IInputChannel;
529 if (reply != null) {
530 if (owner.ReceiveSynchronously) {
531 RequestContext rc;
532 if (reply.TryReceiveRequest (owner.timeouts.ReceiveTimeout, out rc))
533 ProcessRequest (reply, rc);
534 } else {
535 reply.BeginTryReceiveRequest (owner.timeouts.ReceiveTimeout, TryReceiveRequestDone, reply);
537 } else if (input != null) {
538 if (owner.ReceiveSynchronously) {
539 Message msg;
540 if (input.TryReceive (owner.timeouts.ReceiveTimeout, out msg))
541 ProcessInput (input, msg);
542 } else {
543 input.BeginTryReceive (owner.timeouts.ReceiveTimeout, TryReceiveDone, input);
548 void TryReceiveRequestDone (IAsyncResult result)
550 RequestContext rc;
551 var reply = (IReplyChannel) result.AsyncState;
552 if (reply.EndTryReceiveRequest (result, out rc))
553 ProcessRequest (reply, rc);
554 else
555 reply.Close ();
558 void TryReceiveDone (IAsyncResult result)
560 Message msg;
561 var input = (IInputChannel) result.AsyncState;
562 if (input.EndTryReceive (result, out msg))
563 ProcessInput (input, msg);
564 else
565 input.Close ();
568 void ProcessRequest (IReplyChannel reply, RequestContext rc)
570 var req = rc.RequestMessage;
571 try {
572 var ed = FindEndpointDispatcher (req);
573 new InputOrReplyRequestProcessor (ed.DispatchRuntime, reply).ProcessReply (rc);
574 } catch (Exception ex) {
575 // FIXME: log it.
576 Console.WriteLine (ex);
578 var conv = reply.GetProperty<FaultConverter> () ?? FaultConverter.GetDefaultFaultConverter (rc.RequestMessage.Version);
579 Message res;
580 if (!conv.TryCreateFaultMessage (ex, out res))
581 res = Message.CreateMessage (req.Version, new FaultCode ("Receiver"), ex.Message, req.Version.Addressing.FaultNamespace);
582 rc.Reply (res);
583 } finally {
584 if (rc != null)
585 rc.Close ();
586 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
587 if (loop && reply.State != CommunicationState.Closed)
588 ProcessRequestOrInput (reply);
592 void ProcessInput (IInputChannel input, Message message)
594 try {
595 EndpointDispatcher candidate = null;
596 candidate = FindEndpointDispatcher (message);
597 new InputOrReplyRequestProcessor (candidate.DispatchRuntime, input).
598 ProcessInput (message);
600 catch (Exception ex) {
601 // FIXME: log it.
602 Console.WriteLine (ex);
603 } finally {
604 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
605 if (loop && input.State != CommunicationState.Closed)
606 ProcessRequestOrInput (input);
610 EndpointDispatcher FindEndpointDispatcher (Message message) {
611 EndpointDispatcher candidate = null;
612 bool hasEndpointMatch = false;
613 foreach (var endpoint in owner.Endpoints) {
614 if (endpoint.AddressFilter.Match (message)) {
615 hasEndpointMatch = true;
616 if (!endpoint.ContractFilter.Match (message))
617 continue;
618 var newdis = endpoint;
619 if (candidate == null || candidate.FilterPriority < newdis.FilterPriority)
620 candidate = newdis;
621 else if (candidate.FilterPriority == newdis.FilterPriority)
622 throw new MultipleFilterMatchesException ();
625 if (candidate == null && !hasEndpointMatch) {
626 if (owner.Host != null)
627 owner.Host.OnUnknownMessageReceived (message);
628 // we have to return a fault to the client anyways...
629 throw new EndpointNotFoundException ();
631 else if (candidate == null)
632 // FIXME: It is not a good place to check, but anyways detach this error from EndpointNotFoundException.
633 throw new ActionNotSupportedException (String.Format ("Action '{0}' did not match any operations in the target contract", message.Headers.Action));
635 return candidate;