2 // ChannelDispatcher.cs
5 // Atsushi Enomoto <atsushi@ximian.com>
7 // Copyright (C) 2005,2009 Novell, Inc. http://www.novell.com
9 // Permission is hereby granted, free of charge, to any person obtaining
10 // a copy of this software and associated documentation files (the
11 // "Software"), to deal in the Software without restriction, including
12 // without limitation the rights to use, copy, modify, merge, publish,
13 // distribute, sublicense, and/or sell copies of the Software, and to
14 // permit persons to whom the Software is furnished to do so, subject to
15 // the following conditions:
17 // The above copyright notice and this permission notice shall be
18 // included in all copies or substantial portions of the Software.
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 using System
.Collections
.Generic
;
30 using System
.Collections
.ObjectModel
;
31 using System
.Reflection
;
32 using System
.ServiceModel
.Channels
;
33 using System
.Threading
;
34 using System
.Transactions
;
35 using System
.ServiceModel
;
36 using System
.ServiceModel
.Description
;
38 namespace System
.ServiceModel
.Dispatcher
40 public class ChannelDispatcher
: ChannelDispatcherBase
42 class EndpointDispatcherCollection
: SynchronizedCollection
<EndpointDispatcher
>
44 public EndpointDispatcherCollection (ChannelDispatcher owner
)
49 ChannelDispatcher owner
;
51 protected override void ClearItems ()
53 foreach (var ed
in this)
54 ed
.ChannelDispatcher
= null;
58 protected override void InsertItem (int index
, EndpointDispatcher item
)
60 item
.ChannelDispatcher
= owner
;
61 base.InsertItem (index
, item
);
64 protected override void RemoveItem (int index
)
67 this [index
].ChannelDispatcher
= null;
68 base.RemoveItem (index
);
71 protected override void SetItem (int index
, EndpointDispatcher item
)
73 item
.ChannelDispatcher
= owner
;
74 base.SetItem (index
, item
);
81 Collection
<IErrorHandler
> error_handlers
82 = new Collection
<IErrorHandler
> ();
83 IChannelListener listener
;
84 internal IDefaultCommunicationTimeouts timeouts
; // FIXME: remove internal
85 MessageVersion message_version
;
86 bool receive_sync
, include_exception_detail_in_faults
,
87 manual_addressing
, is_tx_receive
;
88 int max_tx_batch_size
;
89 SynchronizedCollection
<IChannelInitializer
> initializers
90 = new SynchronizedCollection
<IChannelInitializer
> ();
91 IsolationLevel tx_isolation_level
;
93 ServiceThrottle throttle
;
95 Guid identifier
= Guid
.NewGuid ();
96 ManualResetEvent async_event
= new ManualResetEvent (false);
98 ListenerLoopManager loop_manager
;
99 SynchronizedCollection
<EndpointDispatcher
> endpoints
;
101 [MonoTODO ("get binding info from config")]
102 public ChannelDispatcher (IChannelListener listener
)
103 : this (listener
, null)
107 public ChannelDispatcher (
108 IChannelListener listener
, string bindingName
)
109 : this (listener
, bindingName
, null)
113 public ChannelDispatcher (
114 IChannelListener listener
, string bindingName
,
115 IDefaultCommunicationTimeouts timeouts
)
117 if (listener
== null)
118 throw new ArgumentNullException ("listener");
119 Init (listener
, bindingName
, timeouts
);
122 private void Init (IChannelListener listener
, string bindingName
,
123 IDefaultCommunicationTimeouts timeouts
)
125 this.listener
= listener
;
126 this.binding_name
= bindingName
;
127 // IChannelListener is often a ChannelListenerBase
128 // which implements IDefaultCommunicationTimeouts.
129 this.timeouts
= timeouts
?? listener
as IDefaultCommunicationTimeouts
?? DefaultCommunicationTimeouts
.Instance
;
130 endpoints
= new EndpointDispatcherCollection (this);
133 internal EndpointDispatcher
InitializeServiceEndpoint (Type serviceType
, ServiceEndpoint se
)
135 this.MessageVersion
= se
.Binding
.MessageVersion
;
136 if (this.MessageVersion
== null)
137 this.MessageVersion
= MessageVersion
.Default
;
139 //Attach one EndpointDispacher to the ChannelDispatcher
140 EndpointDispatcher ed
= new EndpointDispatcher (se
.Address
, se
.Contract
.Name
, se
.Contract
.Namespace
);
141 this.Endpoints
.Add (ed
);
142 ed
.InitializeServiceEndpoint (false, serviceType
, se
);
146 public string BindingName
{
147 get { return binding_name; }
150 public SynchronizedCollection
<IChannelInitializer
> ChannelInitializers
{
151 get { return initializers; }
154 protected internal override TimeSpan DefaultCloseTimeout
{
155 get { return timeouts.CloseTimeout; }
158 protected internal override TimeSpan DefaultOpenTimeout
{
159 get { return timeouts.OpenTimeout; }
162 public Collection
<IErrorHandler
> ErrorHandlers
{
163 get { return error_handlers; }
166 public SynchronizedCollection
<EndpointDispatcher
> Endpoints
{
167 get { return endpoints; }
171 public bool IsTransactedAccept
{
172 get { throw new NotImplementedException (); }
175 public bool IsTransactedReceive
{
176 get { return is_tx_receive; }
177 set { is_tx_receive = value; }
180 public bool ManualAddressing
{
181 get { return manual_addressing; }
182 set { manual_addressing = value; }
185 public int MaxTransactedBatchSize
{
186 get { return max_tx_batch_size; }
187 set { max_tx_batch_size = value; }
190 public override ServiceHostBase Host
{
194 public override IChannelListener Listener
{
195 get { return listener; }
198 public MessageVersion MessageVersion
{
199 get { return message_version; }
200 set { message_version = value; }
203 public bool ReceiveSynchronously
{
204 get { return receive_sync; }
206 ThrowIfDisposedOrImmutable ();
207 receive_sync
= value;
211 public bool IncludeExceptionDetailInFaults
{
212 get { return include_exception_detail_in_faults; }
213 set { include_exception_detail_in_faults = value; }
216 public ServiceThrottle ServiceThrottle
{
217 get { return throttle; }
218 set { throttle = value; }
221 public IsolationLevel TransactionIsolationLevel
{
222 get { return tx_isolation_level; }
223 set { tx_isolation_level = value; }
226 public TimeSpan TransactionTimeout
{
227 get { return tx_timeout; }
228 set { tx_timeout = value; }
231 protected internal override void Attach (ServiceHostBase host
)
236 public override void CloseInput ()
238 if (loop_manager
!= null)
239 loop_manager
.CloseInput ();
242 protected internal override void Detach (ServiceHostBase host
)
247 protected override void OnAbort ()
249 if (loop_manager
!= null)
250 loop_manager
.Stop (TimeSpan
.FromTicks (1));
253 Action
<TimeSpan
> open_delegate
;
254 Action
<TimeSpan
> close_delegate
;
256 protected override IAsyncResult
OnBeginClose (TimeSpan timeout
,
257 AsyncCallback callback
, object state
)
259 if (close_delegate
== null)
260 close_delegate
= new Action
<TimeSpan
> (OnClose
);
261 return close_delegate
.BeginInvoke (timeout
, callback
, state
);
264 protected override IAsyncResult
OnBeginOpen (TimeSpan timeout
,
265 AsyncCallback callback
, object state
)
267 if (open_delegate
== null)
268 open_delegate
= new Action
<TimeSpan
> (OnOpen
);
269 return open_delegate
.BeginInvoke (timeout
, callback
, state
);
272 protected override void OnClose (TimeSpan timeout
)
274 if (loop_manager
!= null)
275 loop_manager
.Stop (timeout
);
278 protected override void OnClosed ()
281 host
.ChannelDispatchers
.Remove (this);
285 protected override void OnEndClose (IAsyncResult result
)
287 close_delegate
.EndInvoke (result
);
290 protected override void OnEndOpen (IAsyncResult result
)
292 open_delegate
.EndInvoke (result
);
295 protected override void OnOpen (TimeSpan timeout
)
297 if (Host
== null || MessageVersion
== null)
298 throw new InvalidOperationException ("Service host is not attached to this ChannelDispatcher.");
300 loop_manager
.Setup (timeout
);
303 protected override void OnOpening ()
306 loop_manager
= new ListenerLoopManager (this);
309 protected override void OnOpened ()
317 // FIXME: not sure if it should be filled here.
318 if (ServiceThrottle
== null)
319 ServiceThrottle
= new ServiceThrottle ();
321 loop_manager
.Start ();
325 // isolated from ChannelDispatcher
326 class ListenerLoopManager
328 ChannelDispatcher owner
;
329 AutoResetEvent throttle_wait_handle
= new AutoResetEvent (false);
330 AutoResetEvent creator_handle
= new AutoResetEvent (false);
331 ManualResetEvent stop_handle
= new ManualResetEvent (false);
334 DateTime close_started
;
335 TimeSpan close_timeout
;
336 Func
<IAsyncResult
> channel_acceptor
;
337 List
<IChannel
> channels
= new List
<IChannel
> ();
338 AddressFilterMode address_filter_mode
;
340 public ListenerLoopManager (ChannelDispatcher owner
)
343 var sba
= owner
.Host
!= null ? owner
.Host
.Description
.Behaviors
.Find
<ServiceBehaviorAttribute
> () : null;
345 address_filter_mode
= sba
.AddressFilterMode
;
348 public void Setup (TimeSpan openTimeout
)
350 if (owner
.Listener
.State
!= CommunicationState
.Opened
)
351 owner
.Listener
.Open (openTimeout
);
353 // It is tested at Open(), but strangely it is not instantiated at this point.
354 foreach (var ed
in owner
.Endpoints
)
355 if (ed
.DispatchRuntime
.InstanceContextProvider
== null && (ed
.DispatchRuntime
.Type
== null || ed
.DispatchRuntime
.Type
.GetConstructor (Type
.EmptyTypes
) == null))
356 throw new InvalidOperationException ("There is no default constructor for the service Type in the DispatchRuntime");
357 SetupChannelAcceptor ();
362 if (loop_thread
== null)
363 loop_thread
= new Thread (new ThreadStart (Loop
));
364 loop_thread
.Start ();
367 Func
<IAsyncResult
> CreateAcceptor
<TChannel
> (IChannelListener l
) where TChannel
: class, IChannel
369 IChannelListener
<TChannel
> r
= l
as IChannelListener
<TChannel
>;
372 AsyncCallback callback
= delegate (IAsyncResult result
) {
374 ChannelAccepted (r
.EndAcceptChannel (result
));
375 } catch (Exception ex
) {
376 Console
.WriteLine ("Exception during finishing channel acceptance.");
377 Console
.WriteLine (ex
);
378 creator_handle
.Set ();
383 return r
.BeginAcceptChannel (callback
, null);
384 } catch (Exception ex
) {
385 Console
.WriteLine ("Exception during accepting channel.");
386 Console
.WriteLine (ex
);
392 void SetupChannelAcceptor ()
394 var l
= owner
.Listener
;
396 CreateAcceptor
<IReplyChannel
> (l
) ??
397 CreateAcceptor
<IReplySessionChannel
> (l
) ??
398 CreateAcceptor
<IInputChannel
> (l
) ??
399 CreateAcceptor
<IInputSessionChannel
> (l
) ??
400 CreateAcceptor
<IDuplexChannel
> (l
) ??
401 CreateAcceptor
<IDuplexSessionChannel
> (l
);
402 if (channel_acceptor
== null)
403 throw new InvalidOperationException (String
.Format ("Unrecognized channel listener type: {0}", l
.GetType ()));
406 public void Stop (TimeSpan timeout
)
408 if (loop_thread
== null)
411 close_started
= DateTime
.Now
;
412 close_timeout
= timeout
;
414 creator_handle
.Set ();
415 throttle_wait_handle
.Set (); // break primary loop
416 if (stop_handle
!= null) {
417 stop_handle
.WaitOne (timeout
> TimeSpan
.Zero
? timeout
: TimeSpan
.FromTicks (1));
418 stop_handle
.Close ();
421 if (owner
.Listener
.State
!= CommunicationState
.Closed
) {
423 Console
.WriteLine ("Channel listener '{0}' is not closed. Aborting.", owner
.Listener
.GetType ());
424 owner
.Listener
.Abort ();
426 if (loop_thread
!= null && loop_thread
.IsAlive
)
427 loop_thread
.Abort ();
431 public void CloseInput ()
433 foreach (var ch
in channels
.ToArray ()) {
434 if (ch
.State
== CommunicationState
.Closed
)
435 channels
.Remove (ch
); // zonbie, if exists
438 ch
.Close (close_timeout
- (DateTime
.Now
- close_started
));
439 } catch (Exception ex
) {
441 Console
.WriteLine (ex
);
452 } catch (Exception ex
) {
454 Console
.WriteLine ("ListenerLoopManager caught an exception inside dispatcher loop, which is likely thrown by the channel listener {0}", owner
.Listener
);
455 Console
.WriteLine (ex
);
457 if (stop_handle
!= null)
466 // FIXME: use WaitForChannel() for (*only* for) transacted channel listeners.
467 // http://social.msdn.microsoft.com/Forums/en-US/wcf/thread/3faa4a5e-8602-4dbe-a181-73b3f581835e
470 // FIXME: enable throttling and allow more than one connection to process at a time.
471 while (loop
&& channels
.Count
< 1) {
472 // while (loop && channels.Count < owner.ServiceThrottle.MaxConcurrentSessions) {
474 creator_handle
.WaitOne (); // released by ChannelAccepted()
478 throttle_wait_handle
.WaitOne (); // released by IChannel.Close()
481 owner
.Listener
.Close ();
483 // make sure to close both listener and channels.
488 void ChannelAccepted (IChannel ch
)
491 if (ch
== null) // could happen when it was aborted
494 var dis
= ch
as IDisposable
;
502 ch
.Opened
+= delegate {
503 ch
.Faulted
+= delegate {
505 if (channels
.Contains (ch
))
506 channels
.Remove (ch
);
507 throttle_wait_handle
.Set (); // release loop wait lock.
509 ch
.Closed
+= delegate {
511 if (channels
.Contains (ch
))
512 channels
.Remove (ch
);
513 throttle_wait_handle
.Set (); // release loop wait lock.
518 creator_handle
.Set ();
521 ProcessRequestOrInput (ch
);
524 void ProcessRequestOrInput (IChannel ch
)
526 var reply
= ch
as IReplyChannel
;
527 var input
= ch
as IInputChannel
;
530 if (owner
.ReceiveSynchronously
) {
532 if (reply
.TryReceiveRequest (owner
.timeouts
.ReceiveTimeout
, out rc
))
533 ProcessRequest (reply
, rc
);
535 reply
.BeginTryReceiveRequest (owner
.timeouts
.ReceiveTimeout
, TryReceiveRequestDone
, reply
);
537 } else if (input
!= null) {
538 if (owner
.ReceiveSynchronously
) {
540 if (input
.TryReceive (owner
.timeouts
.ReceiveTimeout
, out msg
))
541 ProcessInput (input
, msg
);
543 input
.BeginTryReceive (owner
.timeouts
.ReceiveTimeout
, TryReceiveDone
, input
);
548 void TryReceiveRequestDone (IAsyncResult result
)
551 var reply
= (IReplyChannel
) result
.AsyncState
;
552 if (reply
.EndTryReceiveRequest (result
, out rc
))
553 ProcessRequest (reply
, rc
);
558 void TryReceiveDone (IAsyncResult result
)
561 var input
= (IInputChannel
) result
.AsyncState
;
562 if (input
.EndTryReceive (result
, out msg
))
563 ProcessInput (input
, msg
);
568 void ProcessRequest (IReplyChannel reply
, RequestContext rc
)
570 var req
= rc
.RequestMessage
;
572 var ed
= FindEndpointDispatcher (req
);
573 new InputOrReplyRequestProcessor (ed
.DispatchRuntime
, reply
).ProcessReply (rc
);
574 } catch (Exception ex
) {
576 Console
.WriteLine (ex
);
578 var conv
= reply
.GetProperty
<FaultConverter
> () ?? FaultConverter
.GetDefaultFaultConverter (rc
.RequestMessage
.Version
);
580 if (!conv
.TryCreateFaultMessage (ex
, out res
))
581 res
= Message
.CreateMessage (req
.Version
, new FaultCode ("Receiver"), ex
.Message
, req
.Version
.Addressing
.FaultNamespace
);
586 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
587 if (loop
&& reply
.State
!= CommunicationState
.Closed
)
588 ProcessRequestOrInput (reply
);
592 void ProcessInput (IInputChannel input
, Message message
)
595 EndpointDispatcher candidate
= null;
596 candidate
= FindEndpointDispatcher (message
);
597 new InputOrReplyRequestProcessor (candidate
.DispatchRuntime
, input
).
598 ProcessInput (message
);
600 catch (Exception ex
) {
602 Console
.WriteLine (ex
);
604 // unless it is closed by session/call manager, move it back to the loop to receive the next message.
605 if (loop
&& input
.State
!= CommunicationState
.Closed
)
606 ProcessRequestOrInput (input
);
610 EndpointDispatcher
FindEndpointDispatcher (Message message
) {
611 EndpointDispatcher candidate
= null;
612 bool hasEndpointMatch
= false;
613 foreach (var endpoint
in owner
.Endpoints
) {
614 if (endpoint
.AddressFilter
.Match (message
)) {
615 hasEndpointMatch
= true;
616 if (!endpoint
.ContractFilter
.Match (message
))
618 var newdis
= endpoint
;
619 if (candidate
== null || candidate
.FilterPriority
< newdis
.FilterPriority
)
621 else if (candidate
.FilterPriority
== newdis
.FilterPriority
)
622 throw new MultipleFilterMatchesException ();
625 if (candidate
== null && !hasEndpointMatch
) {
626 if (owner
.Host
!= null)
627 owner
.Host
.OnUnknownMessageReceived (message
);
628 // we have to return a fault to the client anyways...
629 throw new EndpointNotFoundException ();
631 else if (candidate
== null)
632 // FIXME: It is not a good place to check, but anyways detach this error from EndpointNotFoundException.
633 throw new ActionNotSupportedException (String
.Format ("Action '{0}' did not match any operations in the target contract", message
.Headers
.Action
));