1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation. All rights reserved.
3 //------------------------------------------------------------
5 namespace System
.ServiceModel
.Channels
8 using System
.Diagnostics
;
12 using System
.Runtime
.CompilerServices
;
13 using System
.Security
.Authentication
.ExtendedProtection
;
14 using System
.ServiceModel
;
15 using System
.ServiceModel
.Activation
;
16 using System
.ServiceModel
.Description
;
17 using System
.ServiceModel
.Diagnostics
;
18 using System
.ServiceModel
.Dispatcher
;
19 using System
.ServiceModel
.Security
;
20 using System
.Threading
;
22 using System
.ServiceModel
.Diagnostics
.Application
;
24 delegate void ServerSingletonPreambleCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader
);
25 delegate ISingletonChannelListener
SingletonPreambleDemuxCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader
);
26 interface ISingletonChannelListener
28 TimeSpan ReceiveTimeout { get; }
29 void ReceiveRequest(RequestContext requestContext
, Action callback
, bool canDispatchOnThisThread
);
32 class ServerSingletonPreambleConnectionReader
: InitialServerConnectionReader
34 ServerSingletonDecoder decoder
;
35 ServerSingletonPreambleCallback callback
;
36 WaitCallback onAsyncReadComplete
;
37 IConnectionOrientedTransportFactorySettings transportSettings
;
38 TransportSettingsCallback transportSettingsCallback
;
39 SecurityMessageProperty security
;
41 IConnection rawConnection
;
42 byte[] connectionBuffer
;
46 TimeoutHelper receiveTimeoutHelper
;
47 Action
<Uri
> viaDelegate
;
48 ChannelBinding channelBindingToken
;
49 static AsyncCallback onValidate
;
51 public ServerSingletonPreambleConnectionReader(IConnection connection
, Action connectionDequeuedCallback
,
52 long streamPosition
, int offset
, int size
, TransportSettingsCallback transportSettingsCallback
,
53 ConnectionClosedCallback closedCallback
, ServerSingletonPreambleCallback callback
)
54 : base(connection
, closedCallback
)
56 this.decoder
= new ServerSingletonDecoder(streamPosition
, MaxViaSize
, MaxContentTypeSize
);
59 this.callback
= callback
;
60 this.transportSettingsCallback
= transportSettingsCallback
;
61 this.rawConnection
= connection
;
62 this.ConnectionDequeuedCallback
= connectionDequeuedCallback
;
66 public ChannelBinding ChannelBinding
70 return this.channelBindingToken
;
74 public int BufferOffset
76 get { return this.offset; }
81 get { return this.size; }
84 public ServerSingletonDecoder Decoder
86 get { return this.decoder; }
89 public IConnection RawConnection
91 get { return this.rawConnection; }
96 get { return this.via; }
99 public IConnectionOrientedTransportFactorySettings TransportSettings
101 get { return this.transportSettings; }
104 public SecurityMessageProperty Security
106 get { return this.security; }
109 TimeSpan
GetRemainingTimeout()
111 return this.receiveTimeoutHelper
.RemainingTime();
114 void ReadAndDispatch()
116 bool success
= false;
119 while ((size
> 0 || !isReadPending
) && !IsClosed
)
123 isReadPending
= true;
124 if (onAsyncReadComplete
== null)
126 onAsyncReadComplete
= new WaitCallback(OnAsyncReadComplete
);
129 if (Connection
.BeginRead(0, connectionBuffer
.Length
, GetRemainingTimeout(),
130 onAsyncReadComplete
, null) == AsyncCompletionResult
.Queued
)
134 HandleReadComplete();
137 int bytesRead
= decoder
.Decode(connectionBuffer
, offset
, size
);
144 if (decoder
.CurrentState
== ServerSingletonDecoder
.State
.PreUpgradeStart
)
146 if (onValidate
== null)
148 onValidate
= Fx
.ThunkCallback(new AsyncCallback(OnValidate
));
151 this.via
= decoder
.Via
;
152 IAsyncResult result
= this.Connection
.BeginValidate(this.via
, onValidate
, this);
154 if (result
.CompletedSynchronously
)
156 if (!VerifyValidationResult(result
))
158 // This goes through the failure path (Abort) even though it doesn't throw.
162 break; //exit loop, set success=true;
167 catch (CommunicationException exception
)
169 DiagnosticUtility
.TraceHandledException(exception
, TraceEventType
.Information
);
171 catch (TimeoutException exception
)
173 if (TD
.ReceiveTimeoutIsEnabled())
175 TD
.ReceiveTimeout(exception
.Message
);
177 DiagnosticUtility
.TraceHandledException(exception
, TraceEventType
.Information
);
185 if (!ExceptionHandler
.HandleTransportExceptionHelper(e
))
190 // containment -- we abort ourselves for any error, no extra containment needed
201 //returns true if validation was successful
202 bool VerifyValidationResult(IAsyncResult result
)
204 return this.Connection
.EndValidate(result
) && this.ContinuePostValidationProcessing();
207 static void OnValidate(IAsyncResult result
)
209 bool success
= false;
210 ServerSingletonPreambleConnectionReader thisPtr
= (ServerSingletonPreambleConnectionReader
)result
.AsyncState
;
213 if (!result
.CompletedSynchronously
)
215 if (!thisPtr
.VerifyValidationResult(result
))
217 // This goes through the failure path (Abort) even though it doesn't throw.
223 catch (CommunicationException exception
)
225 DiagnosticUtility
.TraceHandledException(exception
, TraceEventType
.Information
);
227 catch (TimeoutException exception
)
229 if (TD
.ReceiveTimeoutIsEnabled())
231 TD
.ReceiveTimeout(exception
.Message
);
233 DiagnosticUtility
.TraceHandledException(exception
, TraceEventType
.Information
);
242 DiagnosticUtility
.TraceHandledException(e
, TraceEventType
.Information
);
253 //returns false if the connection should be aborted
254 bool ContinuePostValidationProcessing()
256 if (viaDelegate
!= null)
262 catch (ServiceActivationException e
)
264 DiagnosticUtility
.TraceHandledException(e
, TraceEventType
.Information
);
265 // return fault and close connection
266 SendFault(FramingEncodingString
.ServiceActivationFailedFault
);
272 this.transportSettings
= transportSettingsCallback(via
);
274 if (transportSettings
== null)
276 EndpointNotFoundException e
= new EndpointNotFoundException(SR
.GetString(SR
.EndpointNotFound
, decoder
.Via
));
277 DiagnosticUtility
.TraceHandledException(e
, TraceEventType
.Information
);
278 // return fault and close connection
279 SendFault(FramingEncodingString
.EndpointNotFoundFault
);
283 // we have enough information to hand off to a channel. Our job is done
288 public void SendFault(string faultString
)
290 SendFault(faultString
, ref this.receiveTimeoutHelper
);
293 void SendFault(string faultString
, ref TimeoutHelper timeoutHelper
)
295 InitialServerConnectionReader
.SendFault(Connection
, faultString
,
296 connectionBuffer
, timeoutHelper
.RemainingTime(), TransportDefaults
.MaxDrainSize
);
299 public IAsyncResult
BeginCompletePreamble(TimeSpan timeout
, AsyncCallback callback
, object state
)
301 return new CompletePreambleAsyncResult(timeout
, this, callback
, state
);
304 public IConnection
EndCompletePreamble(IAsyncResult result
)
306 return CompletePreambleAsyncResult
.End(result
);
309 class CompletePreambleAsyncResult
: TypedAsyncResult
<IConnection
>
311 static WaitCallback onReadCompleted
= new WaitCallback(OnReadCompleted
);
312 static WaitCallback onWriteCompleted
= new WaitCallback(OnWriteCompleted
);
313 static AsyncCallback onUpgradeComplete
= Fx
.ThunkCallback(OnUpgradeComplete
);
314 TimeoutHelper timeoutHelper
;
315 ServerSingletonPreambleConnectionReader parent
;
316 StreamUpgradeAcceptor upgradeAcceptor
;
317 StreamUpgradeProvider upgrade
;
318 IStreamUpgradeChannelBindingProvider channelBindingProvider
;
319 IConnection currentConnection
;
320 UpgradeState upgradeState
= UpgradeState
.None
;
322 public CompletePreambleAsyncResult(TimeSpan timeout
, ServerSingletonPreambleConnectionReader parent
, AsyncCallback callback
, object state
)
323 : base(callback
, state
)
325 this.timeoutHelper
= new TimeoutHelper(timeout
);
326 this.parent
= parent
;
330 if (ContinueWork(null))
332 Complete(this.currentConnection
, true);
336 byte[] ConnectionBuffer
340 return this.parent
.connectionBuffer
;
344 this.parent
.connectionBuffer
= value;
352 return this.parent
.offset
;
356 this.parent
.offset
= value;
364 return this.parent
.size
;
368 this.parent
.size
= value;
372 bool CanReadAndDecode
376 //ok to read/decode before we start the upgrade
377 //and between UpgradeComplete/WritingPreambleAck
378 return this.upgradeState
== UpgradeState
.None
379 || this.upgradeState
== UpgradeState
.UpgradeComplete
;
383 ServerSingletonDecoder Decoder
387 return this.parent
.decoder
;
393 if (!this.parent
.transportSettings
.MessageEncoderFactory
.Encoder
.IsContentTypeSupported(Decoder
.ContentType
))
395 SendFault(FramingEncodingString
.ContentTypeInvalidFault
, ref timeoutHelper
);
396 throw DiagnosticUtility
.ExceptionUtility
.ThrowHelperError(new ProtocolException(SR
.GetString(
397 SR
.ContentTypeMismatch
, Decoder
.ContentType
, parent
.transportSettings
.MessageEncoderFactory
.Encoder
.ContentType
)));
400 upgrade
= this.parent
.transportSettings
.Upgrade
;
403 channelBindingProvider
= upgrade
.GetProperty
<IStreamUpgradeChannelBindingProvider
>();
404 upgradeAcceptor
= upgrade
.CreateUpgradeAcceptor();
407 this.currentConnection
= this.parent
.Connection
;
410 void SendFault(string faultString
, ref TimeoutHelper timeoutHelper
)
412 this.parent
.SendFault(faultString
, ref timeoutHelper
);
418 return this.currentConnection
.BeginRead(0, this.ConnectionBuffer
.Length
, timeoutHelper
.RemainingTime(), onReadCompleted
, this) == AsyncCompletionResult
.Completed
;
423 this.Size
= currentConnection
.EndRead();
426 throw DiagnosticUtility
.ExceptionUtility
.ThrowHelperError(this.Decoder
.CreatePrematureEOFException());
430 bool ContinueWork(IAsyncResult upgradeAsyncResult
)
432 if (upgradeAsyncResult
!= null)
434 Fx
.AssertAndThrow(this.upgradeState
== UpgradeState
.EndUpgrade
, "upgradeAsyncResult should only be passed in from OnUpgradeComplete callback");
439 if (Size
== 0 && this.CanReadAndDecode
)
447 //when read completes, we will re-enter this loop.
454 if (this.CanReadAndDecode
)
456 int bytesRead
= Decoder
.Decode(ConnectionBuffer
, Offset
, Size
);
464 switch (Decoder
.CurrentState
)
466 case ServerSingletonDecoder
.State
.UpgradeRequest
:
467 switch (this.upgradeState
)
469 case UpgradeState
.None
:
470 //change the state so that we don't read/decode until it is safe
471 ChangeUpgradeState(UpgradeState
.VerifyingUpgradeRequest
);
473 case UpgradeState
.VerifyingUpgradeRequest
:
474 if (this.upgradeAcceptor
== null)
476 SendFault(FramingEncodingString
.UpgradeInvalidFault
, ref timeoutHelper
);
477 throw DiagnosticUtility
.ExceptionUtility
.ThrowHelperError(
478 new ProtocolException(SR
.GetString(SR
.UpgradeRequestToNonupgradableService
, Decoder
.Upgrade
)));
481 if (!this.upgradeAcceptor
.CanUpgrade(Decoder
.Upgrade
))
483 SendFault(FramingEncodingString
.UpgradeInvalidFault
, ref timeoutHelper
);
484 throw DiagnosticUtility
.ExceptionUtility
.ThrowHelperError(new ProtocolException(SR
.GetString(SR
.UpgradeProtocolNotSupported
, Decoder
.Upgrade
)));
487 ChangeUpgradeState(UpgradeState
.WritingUpgradeAck
);
489 if (this.currentConnection
.BeginWrite(ServerSingletonEncoder
.UpgradeResponseBytes
, 0, ServerSingletonEncoder
.UpgradeResponseBytes
.Length
,
490 true, timeoutHelper
.RemainingTime(), onWriteCompleted
, this) == AsyncCompletionResult
.Queued
)
492 //OnWriteCompleted will:
493 // 1) set upgradeState to UpgradeAckSent
499 this.currentConnection
.EndWrite();
502 ChangeUpgradeState(UpgradeState
.UpgradeAckSent
);
504 case UpgradeState
.UpgradeAckSent
:
505 IConnection connectionToUpgrade
= this.currentConnection
;
508 connectionToUpgrade
= new PreReadConnection(connectionToUpgrade
, ConnectionBuffer
, Offset
, Size
);
510 ChangeUpgradeState(UpgradeState
.BeginUpgrade
);
512 case UpgradeState
.BeginUpgrade
:
515 if (!BeginUpgrade(out upgradeAsyncResult
))
517 //OnUpgradeComplete will set upgradeState to EndUpgrade
521 ChangeUpgradeState(UpgradeState
.EndUpgrade
);
523 catch (Exception exception
)
525 if (Fx
.IsFatal(exception
))
528 this.parent
.WriteAuditFailure(upgradeAcceptor
as StreamSecurityUpgradeAcceptor
, exception
);
532 case UpgradeState
.EndUpgrade
://Must be a different state here than UpgradeComplete so that we don't try to read from the connection
535 EndUpgrade(upgradeAsyncResult
);
536 ChangeUpgradeState(UpgradeState
.UpgradeComplete
);
538 catch (Exception exception
)
540 if (Fx
.IsFatal(exception
))
543 this.parent
.WriteAuditFailure(upgradeAcceptor
as StreamSecurityUpgradeAcceptor
, exception
);
547 case UpgradeState
.UpgradeComplete
:
548 //Client is doing more than one upgrade, reset the state
549 ChangeUpgradeState(UpgradeState
.VerifyingUpgradeRequest
);
553 case ServerSingletonDecoder
.State
.Start
:
554 this.parent
.SetupSecurityIfNecessary(upgradeAcceptor
);
556 if (this.upgradeState
== UpgradeState
.UpgradeComplete
//We have done at least one upgrade, but we are now done.
557 || this.upgradeState
== UpgradeState
.None
)//no upgrade, just send the preample end bytes
559 ChangeUpgradeState(UpgradeState
.WritingPreambleEnd
);
560 // we've finished the preamble. Ack and return.
561 if (this.currentConnection
.BeginWrite(ServerSessionEncoder
.AckResponseBytes
, 0, ServerSessionEncoder
.AckResponseBytes
.Length
,
562 true, timeoutHelper
.RemainingTime(), onWriteCompleted
, this) == AsyncCompletionResult
.Queued
)
564 //OnWriteCompleted will:
565 // 1) set upgradeState to PreambleEndSent
571 this.currentConnection
.EndWrite();
575 ChangeUpgradeState(UpgradeState
.PreambleEndSent
);
578 //we are done, this.currentConnection is the upgraded connection
592 bool BeginUpgrade(out IAsyncResult upgradeAsyncResult
)
594 upgradeAsyncResult
= InitialServerConnectionReader
.BeginUpgradeConnection(this.currentConnection
, upgradeAcceptor
, this.parent
.transportSettings
, onUpgradeComplete
, this);
596 if (!upgradeAsyncResult
.CompletedSynchronously
)
598 upgradeAsyncResult
= null; //caller shouldn't use this out param unless completed sync.
605 void EndUpgrade(IAsyncResult upgradeAsyncResult
)
607 this.currentConnection
= InitialServerConnectionReader
.EndUpgradeConnection(upgradeAsyncResult
);
609 this.ConnectionBuffer
= this.currentConnection
.AsyncReadBuffer
;
611 if (this.channelBindingProvider
!= null
612 && this.channelBindingProvider
.IsChannelBindingSupportEnabled
613 && this.parent
.channelBindingToken
== null)//first one wins in the case of multiple upgrades.
615 this.parent
.channelBindingToken
= channelBindingProvider
.GetChannelBinding(this.upgradeAcceptor
, ChannelBindingKind
.Endpoint
);
619 void ChangeUpgradeState(UpgradeState newState
)
623 case UpgradeState
.None
:
624 throw Fx
.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState
+ ", newState=" + newState
);
625 case UpgradeState
.VerifyingUpgradeRequest
:
626 if (this.upgradeState
!= UpgradeState
.None
//starting first upgrade
627 && this.upgradeState
!= UpgradeState
.UpgradeComplete
)//completing one upgrade and starting another
629 throw Fx
.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState
+ ", newState=" + newState
);
632 case UpgradeState
.WritingUpgradeAck
:
633 if (this.upgradeState
!= UpgradeState
.VerifyingUpgradeRequest
)
635 throw Fx
.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState
+ ", newState=" + newState
);
638 case UpgradeState
.UpgradeAckSent
:
639 if (this.upgradeState
!= UpgradeState
.WritingUpgradeAck
)
641 throw Fx
.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState
+ ", newState=" + newState
);
644 case UpgradeState
.BeginUpgrade
:
645 if (this.upgradeState
!= UpgradeState
.UpgradeAckSent
)
647 throw Fx
.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState
+ ", newState=" + newState
);
650 case UpgradeState
.EndUpgrade
:
651 if (this.upgradeState
!= UpgradeState
.BeginUpgrade
)
653 throw Fx
.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState
+ ", newState=" + newState
);
656 case UpgradeState
.UpgradeComplete
:
657 if (this.upgradeState
!= UpgradeState
.EndUpgrade
)
659 throw Fx
.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState
+ ", newState=" + newState
);
662 case UpgradeState
.WritingPreambleEnd
:
663 if (this.upgradeState
!= UpgradeState
.None
//no upgrade being used
664 && this.upgradeState
!= UpgradeState
.UpgradeComplete
)//upgrades are now complete, end the preamble handshake.
666 throw Fx
.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState
+ ", newState=" + newState
);
669 case UpgradeState
.PreambleEndSent
:
670 if (this.upgradeState
!= UpgradeState
.WritingPreambleEnd
)
672 throw Fx
.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState
+ ", newState=" + newState
);
676 throw Fx
.AssertAndThrow("Unexpected Upgrade State: " + newState
);
678 this.upgradeState
= newState
;
681 static void OnReadCompleted(object state
)
683 CompletePreambleAsyncResult thisPtr
= (CompletePreambleAsyncResult
)state
;
684 Exception completionException
= null;
685 bool completeSelf
= false;
690 completeSelf
= thisPtr
.ContinueWork(null);
698 completionException
= ex
;
704 if (completionException
!= null)
706 thisPtr
.Complete(false, completionException
);
710 thisPtr
.Complete(thisPtr
.currentConnection
, false);
715 static void OnWriteCompleted(object state
)
717 CompletePreambleAsyncResult thisPtr
= (CompletePreambleAsyncResult
)state
;
718 Exception completionException
= null;
719 bool completeSelf
= false;
723 thisPtr
.currentConnection
.EndWrite();
725 switch (thisPtr
.upgradeState
)
727 case UpgradeState
.WritingUpgradeAck
:
728 thisPtr
.ChangeUpgradeState(UpgradeState
.UpgradeAckSent
);
730 case UpgradeState
.WritingPreambleEnd
:
731 thisPtr
.ChangeUpgradeState(UpgradeState
.PreambleEndSent
);
734 completeSelf
= thisPtr
.ContinueWork(null);
742 completionException
= ex
;
748 if (completionException
!= null)
750 thisPtr
.Complete(false, completionException
);
754 thisPtr
.Complete(thisPtr
.currentConnection
, false);
759 static void OnUpgradeComplete(IAsyncResult result
)
761 if (result
.CompletedSynchronously
)
766 CompletePreambleAsyncResult thisPtr
= (CompletePreambleAsyncResult
)result
.AsyncState
;
767 Exception completionException
= null;
768 bool completeSelf
= false;
772 thisPtr
.ChangeUpgradeState(UpgradeState
.EndUpgrade
);
773 completeSelf
= thisPtr
.ContinueWork(result
);
781 completionException
= ex
;
787 if (completionException
!= null)
789 thisPtr
.Complete(false, completionException
);
793 thisPtr
.Complete(thisPtr
.currentConnection
, false);
801 VerifyingUpgradeRequest
,
812 void SetupSecurityIfNecessary(StreamUpgradeAcceptor upgradeAcceptor
)
814 StreamSecurityUpgradeAcceptor securityUpgradeAcceptor
= upgradeAcceptor
as StreamSecurityUpgradeAcceptor
;
815 if (securityUpgradeAcceptor
!= null)
817 this.security
= securityUpgradeAcceptor
.GetRemoteSecurity();
818 if (this.security
== null)
820 Exception securityFailedException
= new ProtocolException(
821 SR
.GetString(SR
.RemoteSecurityNotNegotiatedOnStreamUpgrade
, this.Via
));
822 throw DiagnosticUtility
.ExceptionUtility
.ThrowHelperError(securityFailedException
);
824 // Audit Authentication Success
825 WriteAuditEvent(securityUpgradeAcceptor
, AuditLevel
.Success
, null);
829 #region Transport Security Auditing
830 void WriteAuditFailure(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor
, Exception exception
)
834 WriteAuditEvent(securityUpgradeAcceptor
, AuditLevel
.Failure
, exception
);
836 #pragma warning suppress 56500 // covered by FxCop
837 catch (Exception auditException
)
839 if (Fx
.IsFatal(auditException
))
844 DiagnosticUtility
.TraceHandledException(auditException
, TraceEventType
.Error
);
848 void WriteAuditEvent(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor
, AuditLevel auditLevel
, Exception exception
)
850 if ((this.transportSettings
.AuditBehavior
.MessageAuthenticationAuditLevel
& auditLevel
) != auditLevel
)
855 if (securityUpgradeAcceptor
== null)
859 String primaryIdentity
= String
.Empty
;
860 SecurityMessageProperty clientSecurity
= securityUpgradeAcceptor
.GetRemoteSecurity();
861 if (clientSecurity
!= null)
863 primaryIdentity
= GetIdentityNameFromContext(clientSecurity
);
866 ServiceSecurityAuditBehavior auditBehavior
= this.transportSettings
.AuditBehavior
;
868 if (auditLevel
== AuditLevel
.Success
)
870 SecurityAuditHelper
.WriteTransportAuthenticationSuccessEvent(auditBehavior
.AuditLogLocation
,
871 auditBehavior
.SuppressAuditFailure
, null, this.Via
, primaryIdentity
);
875 SecurityAuditHelper
.WriteTransportAuthenticationFailureEvent(auditBehavior
.AuditLogLocation
,
876 auditBehavior
.SuppressAuditFailure
, null, this.Via
, primaryIdentity
, exception
);
880 [MethodImpl(MethodImplOptions
.NoInlining
)]
881 static string GetIdentityNameFromContext(SecurityMessageProperty clientSecurity
)
883 return SecurityUtils
.GetIdentityNamesFromContext(
884 clientSecurity
.ServiceSecurityContext
.AuthorizationContext
);
888 void HandleReadComplete()
891 size
= Connection
.EndRead();
892 isReadPending
= false;
895 throw DiagnosticUtility
.ExceptionUtility
.ThrowHelperError(decoder
.CreatePrematureEOFException());
899 void OnAsyncReadComplete(object state
)
901 bool success
= false;
904 HandleReadComplete();
908 catch (CommunicationException exception
)
910 DiagnosticUtility
.TraceHandledException(exception
, TraceEventType
.Information
);
912 catch (TimeoutException exception
)
914 if (TD
.ReceiveTimeoutIsEnabled())
916 TD
.ReceiveTimeout(exception
.Message
);
918 DiagnosticUtility
.TraceHandledException(exception
, TraceEventType
.Information
);
926 if (!ExceptionHandler
.HandleTransportExceptionHelper(e
))
931 // containment -- we abort ourselves for any error, no extra containment needed
942 public void StartReading(Action
<Uri
> viaDelegate
, TimeSpan timeout
)
944 this.viaDelegate
= viaDelegate
;
945 this.receiveTimeoutHelper
= new TimeoutHelper(timeout
);
946 this.connectionBuffer
= Connection
.AsyncReadBuffer
;
951 class ServerSingletonConnectionReader
: SingletonConnectionReader
953 ConnectionDemuxer connectionDemuxer
;
954 ServerSingletonDecoder decoder
;
955 IConnection rawConnection
;
957 ChannelBinding channelBindingToken
;
959 public ServerSingletonConnectionReader(ServerSingletonPreambleConnectionReader preambleReader
,
960 IConnection upgradedConnection
, ConnectionDemuxer connectionDemuxer
)
961 : base(upgradedConnection
, preambleReader
.BufferOffset
, preambleReader
.BufferSize
,
962 preambleReader
.Security
, preambleReader
.TransportSettings
, preambleReader
.Via
)
964 this.decoder
= preambleReader
.Decoder
;
965 this.contentType
= this.decoder
.ContentType
;
966 this.connectionDemuxer
= connectionDemuxer
;
967 this.rawConnection
= preambleReader
.RawConnection
;
968 this.channelBindingToken
= preambleReader
.ChannelBinding
;
971 protected override string ContentType
973 get { return this.contentType; }
976 protected override long StreamPosition
978 get { return this.decoder.StreamPosition; }
981 protected override bool DecodeBytes(byte[] buffer
, ref int offset
, ref int size
, ref bool isAtEof
)
985 int bytesRead
= decoder
.Decode(buffer
, offset
, size
);
992 switch (decoder
.CurrentState
)
994 case ServerSingletonDecoder
.State
.EnvelopeStart
:
995 // we're at the envelope
998 case ServerSingletonDecoder
.State
.End
:
1007 protected override void OnClose(TimeSpan timeout
)
1009 TimeoutHelper timeoutHelper
= new TimeoutHelper(timeout
);
1010 // send back EOF and then recycle the connection
1011 this.Connection
.Write(SingletonEncoder
.EndBytes
, 0, SingletonEncoder
.EndBytes
.Length
, true, timeoutHelper
.RemainingTime());
1012 this.connectionDemuxer
.ReuseConnection(this.rawConnection
, timeoutHelper
.RemainingTime());
1014 ChannelBindingUtility
.Dispose(ref this.channelBindingToken
);
1017 protected override void PrepareMessage(Message message
)
1019 base.PrepareMessage(message
);
1020 IPEndPoint remoteEndPoint
= this.rawConnection
.RemoteIPEndPoint
;
1022 // pipes will return null
1023 if (remoteEndPoint
!= null)
1025 RemoteEndpointMessageProperty remoteEndpointProperty
= new RemoteEndpointMessageProperty(remoteEndPoint
);
1026 message
.Properties
.Add(RemoteEndpointMessageProperty
.Name
, remoteEndpointProperty
);
1029 if (this.channelBindingToken
!= null)
1031 ChannelBindingMessageProperty property
= new ChannelBindingMessageProperty(this.channelBindingToken
, false);
1032 property
.AddTo(message
);
1033 property
.Dispose(); //message.Properties.Add() creates a copy...
1038 abstract class SingletonConnectionReader
1040 IConnection connection
;
1045 SecurityMessageProperty security
;
1046 object thisLock
= new object();
1049 IConnectionOrientedTransportFactorySettings transportSettings
;
1053 protected SingletonConnectionReader(IConnection connection
, int offset
, int size
, SecurityMessageProperty security
,
1054 IConnectionOrientedTransportFactorySettings transportSettings
, Uri via
)
1056 this.connection
= connection
;
1057 this.offset
= offset
;
1059 this.security
= security
;
1060 this.transportSettings
= transportSettings
;
1064 protected IConnection Connection
1068 return this.connection
;
1072 protected object ThisLock
1076 return this.thisLock
;
1080 protected virtual string ContentType
1082 get { return null; }
1085 protected abstract long StreamPosition { get; }
1089 this.connection
.Abort();
1092 public void DoneReceiving(bool atEof
)
1094 DoneReceiving(atEof
, this.transportSettings
.CloseTimeout
);
1097 void DoneReceiving(bool atEof
, TimeSpan timeout
)
1099 if (!this.doneReceiving
)
1101 this.isAtEof
= atEof
;
1102 this.doneReceiving
= true;
1104 if (this.doneSending
)
1106 this.Close(timeout
);
1111 public void Close(TimeSpan timeout
)
1120 this.isClosed
= true;
1123 TimeoutHelper timeoutHelper
= new TimeoutHelper(timeout
);
1124 bool success
= false;
1127 // first drain our stream if necessary
1128 if (this.inputStream
!= null)
1130 byte[] dummy
= DiagnosticUtility
.Utility
.AllocateByteArray(transportSettings
.ConnectionBufferSize
);
1131 while (!this.isAtEof
)
1133 this.inputStream
.ReadTimeout
= TimeoutHelper
.ToMilliseconds(timeoutHelper
.RemainingTime());
1134 int bytesRead
= this.inputStream
.Read(dummy
, 0, dummy
.Length
);
1137 this.isAtEof
= true;
1141 OnClose(timeoutHelper
.RemainingTime());
1153 protected abstract void OnClose(TimeSpan timeout
);
1155 public void DoneSending(TimeSpan timeout
)
1157 this.doneSending
= true;
1158 if (this.doneReceiving
)
1160 this.Close(timeout
);
1164 protected abstract bool DecodeBytes(byte[] buffer
, ref int offset
, ref int size
, ref bool isAtEof
);
1166 protected virtual void PrepareMessage(Message message
)
1168 message
.Properties
.Via
= this.via
;
1169 message
.Properties
.Security
= (this.security
!= null) ? (SecurityMessageProperty
)this.security
.CreateCopy() : null;
1172 public RequestContext
ReceiveRequest(TimeSpan timeout
)
1174 Message requestMessage
= Receive(timeout
);
1175 return new StreamedFramingRequestContext(this, requestMessage
);
1178 public IAsyncResult
BeginReceive(TimeSpan timeout
, AsyncCallback callback
, object state
)
1180 return new ReceiveAsyncResult(this, timeout
, callback
, state
);
1183 public virtual Message
EndReceive(IAsyncResult result
)
1185 return ReceiveAsyncResult
.End(result
);
1188 public Message
Receive(TimeSpan timeout
)
1190 byte[] buffer
= DiagnosticUtility
.Utility
.AllocateByteArray(connection
.AsyncReadBufferSize
);
1194 Buffer
.BlockCopy(connection
.AsyncReadBuffer
, offset
, buffer
, offset
, size
);
1197 TimeoutHelper timeoutHelper
= new TimeoutHelper(timeout
);
1200 if (DecodeBytes(buffer
, ref offset
, ref size
, ref isAtEof
))
1207 DoneReceiving(true, timeoutHelper
.RemainingTime());
1214 size
= connection
.Read(buffer
, 0, buffer
.Length
, timeoutHelper
.RemainingTime());
1217 DoneReceiving(true, timeoutHelper
.RemainingTime());
1223 // we're ready to read a message
1224 IConnection singletonConnection
= this.connection
;
1227 byte[] initialData
= DiagnosticUtility
.Utility
.AllocateByteArray(size
);
1228 Buffer
.BlockCopy(buffer
, offset
, initialData
, 0, size
);
1229 singletonConnection
= new PreReadConnection(singletonConnection
, initialData
);
1232 Stream connectionStream
= new SingletonInputConnectionStream(this, singletonConnection
, this.transportSettings
);
1233 this.inputStream
= new MaxMessageSizeStream(connectionStream
, transportSettings
.MaxReceivedMessageSize
);
1234 using (ServiceModelActivity activity
= DiagnosticUtility
.ShouldUseActivity
? ServiceModelActivity
.CreateBoundedActivity(true) : null)
1236 if (DiagnosticUtility
.ShouldUseActivity
)
1238 ServiceModelActivity
.Start(activity
, SR
.GetString(SR
.ActivityProcessingMessage
, TraceUtility
.RetrieveMessageNumber()), ActivityType
.ProcessMessage
);
1241 Message message
= null;
1244 message
= transportSettings
.MessageEncoderFactory
.Encoder
.ReadMessage(
1245 this.inputStream
, transportSettings
.MaxBufferSize
, this.ContentType
);
1247 catch (XmlException xmlException
)
1249 throw DiagnosticUtility
.ExceptionUtility
.ThrowHelperError(
1250 new ProtocolException(SR
.GetString(SR
.MessageXmlProtocolError
), xmlException
));
1253 if (DiagnosticUtility
.ShouldUseActivity
)
1255 TraceUtility
.TransferFromTransport(message
);
1258 PrepareMessage(message
);
1264 class ReceiveAsyncResult
: AsyncResult
1266 static Action
<object> onReceiveScheduled
= new Action
<object>(OnReceiveScheduled
);
1269 SingletonConnectionReader parent
;
1272 public ReceiveAsyncResult(SingletonConnectionReader parent
, TimeSpan timeout
, AsyncCallback callback
,
1274 : base(callback
, state
)
1276 this.parent
= parent
;
1277 this.timeout
= timeout
;
1280 ActionItem
.Schedule(onReceiveScheduled
, this);
1283 public static Message
End(IAsyncResult result
)
1285 ReceiveAsyncResult receiveAsyncResult
= AsyncResult
.End
<ReceiveAsyncResult
>(result
);
1286 return receiveAsyncResult
.message
;
1289 static void OnReceiveScheduled(object state
)
1291 ReceiveAsyncResult thisPtr
= (ReceiveAsyncResult
)state
;
1293 Exception completionException
= null;
1296 thisPtr
.message
= thisPtr
.parent
.Receive(thisPtr
.timeout
);
1298 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
1299 catch (Exception exception
)
1301 if (Fx
.IsFatal(exception
))
1305 completionException
= exception
;
1307 thisPtr
.Complete(false, completionException
);
1311 class StreamedFramingRequestContext
: RequestContextBase
1313 IConnection connection
;
1314 SingletonConnectionReader parent
;
1315 IConnectionOrientedTransportFactorySettings settings
;
1316 TimeoutHelper timeoutHelper
;
1318 public StreamedFramingRequestContext(SingletonConnectionReader parent
, Message requestMessage
)
1319 : base(requestMessage
, parent
.transportSettings
.CloseTimeout
, parent
.transportSettings
.SendTimeout
)
1321 this.parent
= parent
;
1322 this.connection
= parent
.connection
;
1323 this.settings
= parent
.transportSettings
;
1326 protected override void OnAbort()
1328 this.parent
.Abort();
1331 protected override void OnClose(TimeSpan timeout
)
1333 this.parent
.Close(timeout
);
1336 protected override void OnReply(Message message
, TimeSpan timeout
)
1338 ICompressedMessageEncoder compressedMessageEncoder
= this.settings
.MessageEncoderFactory
.Encoder
as ICompressedMessageEncoder
;
1339 if (compressedMessageEncoder
!= null && compressedMessageEncoder
.CompressionEnabled
)
1341 compressedMessageEncoder
.AddCompressedMessageProperties(message
, this.parent
.ContentType
);
1344 timeoutHelper
= new TimeoutHelper(timeout
);
1345 StreamingConnectionHelper
.WriteMessage(message
, this.connection
, false, this.settings
, ref timeoutHelper
);
1346 parent
.DoneSending(timeoutHelper
.RemainingTime());
1349 protected override IAsyncResult
OnBeginReply(Message message
, TimeSpan timeout
, AsyncCallback callback
, object state
)
1351 ICompressedMessageEncoder compressedMessageEncoder
= this.settings
.MessageEncoderFactory
.Encoder
as ICompressedMessageEncoder
;
1352 if (compressedMessageEncoder
!= null && compressedMessageEncoder
.CompressionEnabled
)
1354 compressedMessageEncoder
.AddCompressedMessageProperties(message
, this.parent
.ContentType
);
1357 timeoutHelper
= new TimeoutHelper(timeout
);
1358 return StreamingConnectionHelper
.BeginWriteMessage(message
, this.connection
, false, this.settings
,
1359 ref timeoutHelper
, callback
, state
);
1362 protected override void OnEndReply(IAsyncResult result
)
1364 StreamingConnectionHelper
.EndWriteMessage(result
);
1365 parent
.DoneSending(timeoutHelper
.RemainingTime());
1369 // ensures that the reader is notified at end-of-stream, and takes care of the framing chunk headers
1370 class SingletonInputConnectionStream
: ConnectionStream
1372 SingletonMessageDecoder decoder
;
1373 SingletonConnectionReader reader
;
1375 byte[] chunkBuffer
; // used for when we have overflow
1376 int chunkBufferOffset
;
1377 int chunkBufferSize
;
1378 int chunkBytesRemaining
;
1380 public SingletonInputConnectionStream(SingletonConnectionReader reader
, IConnection connection
,
1381 IDefaultCommunicationTimeouts defaultTimeouts
)
1382 : base(connection
, defaultTimeouts
)
1384 this.reader
= reader
;
1385 this.decoder
= new SingletonMessageDecoder(reader
.StreamPosition
);
1386 this.chunkBytesRemaining
= 0;
1387 this.chunkBuffer
= new byte[IntEncoder
.MaxEncodedSize
];
1392 this.reader
.Abort();
1395 public override void Close()
1397 this.reader
.DoneReceiving(this.atEof
);
1400 // run chunk data through the decoder
1401 void DecodeData(byte[] buffer
, int offset
, int size
)
1405 int bytesRead
= decoder
.Decode(buffer
, offset
, size
);
1406 offset
+= bytesRead
;
1408 Fx
.Assert(decoder
.CurrentState
== SingletonMessageDecoder
.State
.ReadingEnvelopeBytes
|| decoder
.CurrentState
== SingletonMessageDecoder
.State
.ChunkEnd
, "");
1412 // run the current data through the decoder to get valid message bytes
1413 void DecodeSize(byte[] buffer
, ref int offset
, ref int size
)
1417 int bytesRead
= decoder
.Decode(buffer
, offset
, size
);
1421 offset
+= bytesRead
;
1425 switch (decoder
.CurrentState
)
1427 case SingletonMessageDecoder
.State
.ChunkStart
:
1428 this.chunkBytesRemaining
= decoder
.ChunkSize
;
1430 // if we have overflow and we're not decoding out of our buffer, copy over
1431 if (size
> 0 && !object.ReferenceEquals(buffer
, this.chunkBuffer
))
1433 Fx
.Assert(size
<= this.chunkBuffer
.Length
, "");
1434 Buffer
.BlockCopy(buffer
, offset
, this.chunkBuffer
, 0, size
);
1435 this.chunkBufferOffset
= 0;
1436 this.chunkBufferSize
= size
;
1440 case SingletonMessageDecoder
.State
.End
:
1447 int ReadCore(byte[] buffer
, int offset
, int count
)
1452 bytesRead
= base.Read(buffer
, offset
, count
);
1460 if (bytesRead
== -1) // there was an exception
1469 public override int Read(byte[] buffer
, int offset
, int count
)
1484 // first deal with any residual carryover
1485 if (this.chunkBufferSize
> 0)
1487 int bytesToCopy
= Math
.Min(chunkBytesRemaining
,
1488 Math
.Min(this.chunkBufferSize
, count
));
1490 Buffer
.BlockCopy(this.chunkBuffer
, this.chunkBufferOffset
, buffer
, offset
, bytesToCopy
);
1491 // keep decoder up to date
1492 DecodeData(this.chunkBuffer
, this.chunkBufferOffset
, bytesToCopy
);
1494 this.chunkBufferOffset
+= bytesToCopy
;
1495 this.chunkBufferSize
-= bytesToCopy
;
1496 this.chunkBytesRemaining
-= bytesToCopy
;
1497 if (this.chunkBytesRemaining
== 0 && this.chunkBufferSize
> 0)
1499 DecodeSize(this.chunkBuffer
, ref this.chunkBufferOffset
, ref this.chunkBufferSize
);
1502 result
+= bytesToCopy
;
1503 offset
+= bytesToCopy
;
1504 count
-= bytesToCopy
;
1506 else if (chunkBytesRemaining
> 0)
1508 // We're in the middle of a chunk. Try and include the next chunk size as well
1510 int bytesToRead
= count
;
1511 if (int.MaxValue
- chunkBytesRemaining
>= IntEncoder
.MaxEncodedSize
)
1513 bytesToRead
= Math
.Min(count
, chunkBytesRemaining
+ IntEncoder
.MaxEncodedSize
);
1516 int bytesRead
= ReadCore(buffer
, offset
, bytesToRead
);
1518 // keep decoder up to date
1519 DecodeData(buffer
, offset
, Math
.Min(bytesRead
, this.chunkBytesRemaining
));
1521 if (bytesRead
> chunkBytesRemaining
)
1523 result
+= this.chunkBytesRemaining
;
1524 int overflowCount
= bytesRead
- chunkBytesRemaining
;
1525 int overflowOffset
= offset
+ chunkBytesRemaining
;
1526 this.chunkBytesRemaining
= 0;
1527 // read at least part of the next chunk, and put any overflow in this.chunkBuffer
1528 DecodeSize(buffer
, ref overflowOffset
, ref overflowCount
);
1532 result
+= bytesRead
;
1533 this.chunkBytesRemaining
-= bytesRead
;
1540 // Final case: we have a new chunk. Read the size, and loop around again
1541 if (count
< IntEncoder
.MaxEncodedSize
)
1543 // we don't have space for MaxEncodedSize, so it's worth the copy cost to read into a temp buffer
1544 this.chunkBufferOffset
= 0;
1545 this.chunkBufferSize
= ReadCore(this.chunkBuffer
, 0, this.chunkBuffer
.Length
);
1546 DecodeSize(this.chunkBuffer
, ref this.chunkBufferOffset
, ref this.chunkBufferSize
);
1550 int bytesRead
= ReadCore(buffer
, offset
, IntEncoder
.MaxEncodedSize
);
1551 int sizeOffset
= offset
;
1552 DecodeSize(buffer
, ref sizeOffset
, ref bytesRead
);
1563 if (this.chunkBufferSize
> 0 || this.chunkBytesRemaining
> 0
1564 || decoder
.CurrentState
!= SingletonMessageDecoder
.State
.End
)
1566 throw DiagnosticUtility
.ExceptionUtility
.ThrowHelperError(decoder
.CreatePrematureEOFException());
1569 this.reader
.DoneReceiving(true);
1573 public override IAsyncResult
BeginRead(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object state
)
1575 return new ReadAsyncResult(this, buffer
, offset
, count
, callback
, state
);
1578 public override int EndRead(IAsyncResult result
)
1580 return ReadAsyncResult
.End(result
);
1583 public class ReadAsyncResult
: AsyncResult
1585 SingletonInputConnectionStream parent
;
1588 public ReadAsyncResult(SingletonInputConnectionStream parent
,
1589 byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object state
)
1590 : base(callback
, state
)
1592 this.parent
= parent
;
1595 this.result
= this.parent
.Read(buffer
, offset
, count
);
1596 base.Complete(true);
1599 public static int End(IAsyncResult result
)
1601 ReadAsyncResult thisPtr
= AsyncResult
.End
<ReadAsyncResult
>(result
);
1602 return thisPtr
.result
;
1608 static class StreamingConnectionHelper
1610 public static void WriteMessage(Message message
, IConnection connection
, bool isRequest
,
1611 IConnectionOrientedTransportFactorySettings settings
, ref TimeoutHelper timeoutHelper
)
1613 byte[] endBytes
= null;
1614 if (message
!= null)
1616 MessageEncoder messageEncoder
= settings
.MessageEncoderFactory
.Encoder
;
1617 byte[] envelopeStartBytes
= SingletonEncoder
.EnvelopeStartBytes
;
1622 endBytes
= SingletonEncoder
.EnvelopeEndFramingEndBytes
;
1623 writeStreamed
= TransferModeHelper
.IsRequestStreamed(settings
.TransferMode
);
1627 endBytes
= SingletonEncoder
.EnvelopeEndBytes
;
1628 writeStreamed
= TransferModeHelper
.IsResponseStreamed(settings
.TransferMode
);
1633 connection
.Write(envelopeStartBytes
, 0, envelopeStartBytes
.Length
, false, timeoutHelper
.RemainingTime());
1634 Stream connectionStream
= new StreamingOutputConnectionStream(connection
, settings
);
1635 Stream writeTimeoutStream
= new TimeoutStream(connectionStream
, ref timeoutHelper
);
1636 messageEncoder
.WriteMessage(message
, writeTimeoutStream
);
1640 ArraySegment
<byte> messageData
= messageEncoder
.WriteMessage(message
,
1641 int.MaxValue
, settings
.BufferManager
, envelopeStartBytes
.Length
+ IntEncoder
.MaxEncodedSize
);
1642 messageData
= SingletonEncoder
.EncodeMessageFrame(messageData
);
1643 Buffer
.BlockCopy(envelopeStartBytes
, 0, messageData
.Array
, messageData
.Offset
- envelopeStartBytes
.Length
,
1644 envelopeStartBytes
.Length
);
1645 connection
.Write(messageData
.Array
, messageData
.Offset
- envelopeStartBytes
.Length
,
1646 messageData
.Count
+ envelopeStartBytes
.Length
, true, timeoutHelper
.RemainingTime(), settings
.BufferManager
);
1649 else if (isRequest
) // context handles response end bytes
1651 endBytes
= SingletonEncoder
.EndBytes
;
1654 if (endBytes
!= null)
1656 connection
.Write(endBytes
, 0, endBytes
.Length
,
1657 true, timeoutHelper
.RemainingTime());
1661 public static IAsyncResult
BeginWriteMessage(Message message
, IConnection connection
, bool isRequest
,
1662 IConnectionOrientedTransportFactorySettings settings
, ref TimeoutHelper timeoutHelper
,
1663 AsyncCallback callback
, object state
)
1665 return new WriteMessageAsyncResult(message
, connection
, isRequest
, settings
, ref timeoutHelper
, callback
, state
);
1668 public static void EndWriteMessage(IAsyncResult result
)
1670 WriteMessageAsyncResult
.End(result
);
1673 // overrides ConnectionStream to add a Framing int at the beginning of each record
1674 class StreamingOutputConnectionStream
: ConnectionStream
1678 public StreamingOutputConnectionStream(IConnection connection
, IDefaultCommunicationTimeouts timeouts
)
1679 : base(connection
, timeouts
)
1681 this.encodedSize
= new byte[IntEncoder
.MaxEncodedSize
];
1683 void WriteChunkSize(int size
)
1687 int bytesEncoded
= IntEncoder
.Encode(size
, encodedSize
, 0);
1688 base.Connection
.Write(encodedSize
, 0, bytesEncoded
, false, TimeSpan
.FromMilliseconds(this.WriteTimeout
));
1692 public override IAsyncResult
BeginWrite(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object state
)
1694 WriteChunkSize(count
);
1695 return base.BeginWrite(buffer
, offset
, count
, callback
, state
);
1698 public override void WriteByte(byte value)
1701 base.WriteByte(value);
1704 public override void Write(byte[] buffer
, int offset
, int count
)
1706 WriteChunkSize(count
);
1707 base.Write(buffer
, offset
, count
);
1711 class WriteMessageAsyncResult
: AsyncResult
1713 IConnection connection
;
1714 MessageEncoder encoder
;
1715 BufferManager bufferManager
;
1717 static WaitCallback onWriteBufferedMessage
;
1718 static WaitCallback onWriteStartBytes
;
1719 static Action
<object> onWriteStartBytesScheduled
;
1720 static WaitCallback onWriteEndBytes
=
1721 Fx
.ThunkCallback(new WaitCallback(OnWriteEndBytes
));
1722 byte[] bufferToFree
;
1723 IConnectionOrientedTransportFactorySettings settings
;
1724 TimeoutHelper timeoutHelper
;
1727 public WriteMessageAsyncResult(Message message
, IConnection connection
, bool isRequest
,
1728 IConnectionOrientedTransportFactorySettings settings
, ref TimeoutHelper timeoutHelper
,
1729 AsyncCallback callback
, object state
)
1730 : base(callback
, state
)
1732 this.connection
= connection
;
1733 this.encoder
= settings
.MessageEncoderFactory
.Encoder
;
1734 this.bufferManager
= settings
.BufferManager
;
1735 this.timeoutHelper
= timeoutHelper
;
1736 this.message
= message
;
1737 this.settings
= settings
;
1739 bool throwing
= true;
1740 bool completeSelf
= false;
1741 if (message
== null)
1743 if (isRequest
) // context takes care of the end bytes on Close/reader.EOF
1745 this.endBytes
= SingletonEncoder
.EndBytes
;
1747 completeSelf
= WriteEndBytes();
1753 byte[] envelopeStartBytes
= SingletonEncoder
.EnvelopeStartBytes
;
1757 this.endBytes
= SingletonEncoder
.EnvelopeEndFramingEndBytes
;
1758 writeStreamed
= TransferModeHelper
.IsRequestStreamed(settings
.TransferMode
);
1762 this.endBytes
= SingletonEncoder
.EnvelopeEndBytes
;
1763 writeStreamed
= TransferModeHelper
.IsResponseStreamed(settings
.TransferMode
);
1768 if (onWriteStartBytes
== null)
1770 onWriteStartBytes
= Fx
.ThunkCallback(new WaitCallback(OnWriteStartBytes
));
1773 AsyncCompletionResult writeStartBytesResult
= connection
.BeginWrite(envelopeStartBytes
, 0, envelopeStartBytes
.Length
, true,
1774 timeoutHelper
.RemainingTime(), onWriteStartBytes
, this);
1776 if (writeStartBytesResult
== AsyncCompletionResult
.Completed
)
1778 if (onWriteStartBytesScheduled
== null)
1780 onWriteStartBytesScheduled
= new Action
<object>(OnWriteStartBytes
);
1782 ActionItem
.Schedule(onWriteStartBytesScheduled
, this);
1787 ArraySegment
<byte> messageData
= settings
.MessageEncoderFactory
.Encoder
.WriteMessage(message
,
1788 int.MaxValue
, this.bufferManager
, envelopeStartBytes
.Length
+ IntEncoder
.MaxEncodedSize
);
1789 messageData
= SingletonEncoder
.EncodeMessageFrame(messageData
);
1790 this.bufferToFree
= messageData
.Array
;
1791 Buffer
.BlockCopy(envelopeStartBytes
, 0, messageData
.Array
, messageData
.Offset
- envelopeStartBytes
.Length
,
1792 envelopeStartBytes
.Length
);
1794 if (onWriteBufferedMessage
== null)
1796 onWriteBufferedMessage
= Fx
.ThunkCallback(new WaitCallback(OnWriteBufferedMessage
));
1798 AsyncCompletionResult writeBufferedResult
=
1799 connection
.BeginWrite(messageData
.Array
, messageData
.Offset
- envelopeStartBytes
.Length
,
1800 messageData
.Count
+ envelopeStartBytes
.Length
, true, timeoutHelper
.RemainingTime(),
1801 onWriteBufferedMessage
, this);
1803 if (writeBufferedResult
== AsyncCompletionResult
.Completed
)
1805 completeSelf
= HandleWriteBufferedMessage();
1821 base.Complete(true);
1825 public static void End(IAsyncResult result
)
1827 AsyncResult
.End
<WriteMessageAsyncResult
>(result
);
1832 if (bufferToFree
!= null)
1834 this.bufferManager
.ReturnBuffer(bufferToFree
);
1838 bool HandleWriteStartBytes()
1840 connection
.EndWrite();
1841 Stream connectionStream
= new StreamingOutputConnectionStream(connection
, settings
);
1842 Stream writeTimeoutStream
= new TimeoutStream(connectionStream
, ref timeoutHelper
);
1843 this.encoder
.WriteMessage(message
, writeTimeoutStream
);
1844 return WriteEndBytes();
1847 bool HandleWriteBufferedMessage()
1849 this.connection
.EndWrite();
1850 return WriteEndBytes();
1853 bool WriteEndBytes()
1855 if (this.endBytes
== null)
1861 AsyncCompletionResult result
= connection
.BeginWrite(endBytes
, 0,
1862 endBytes
.Length
, true, timeoutHelper
.RemainingTime(), onWriteEndBytes
, this);
1864 if (result
== AsyncCompletionResult
.Queued
)
1869 return HandleWriteEndBytes();
1872 bool HandleWriteEndBytes()
1874 this.connection
.EndWrite();
1879 static void OnWriteStartBytes(object asyncState
)
1881 OnWriteStartBytesCallbackHelper(asyncState
);
1884 static void OnWriteStartBytesCallbackHelper(object asyncState
)
1886 WriteMessageAsyncResult thisPtr
= (WriteMessageAsyncResult
)asyncState
;
1887 Exception completionException
= null;
1888 bool completeSelf
= false;
1889 bool throwing
= true;
1892 completeSelf
= thisPtr
.HandleWriteStartBytes();
1895 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
1902 completeSelf
= true;
1903 completionException
= e
;
1915 thisPtr
.Complete(false, completionException
);
1919 static void OnWriteBufferedMessage(object asyncState
)
1921 WriteMessageAsyncResult thisPtr
= (WriteMessageAsyncResult
)asyncState
;
1923 Exception completionException
= null;
1924 bool completeSelf
= false;
1925 bool throwing
= true;
1928 completeSelf
= thisPtr
.HandleWriteBufferedMessage();
1931 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
1938 completeSelf
= true;
1939 completionException
= e
;
1950 thisPtr
.Complete(false, completionException
);
1954 static void OnWriteEndBytes(object asyncState
)
1956 WriteMessageAsyncResult thisPtr
= (WriteMessageAsyncResult
)asyncState
;
1958 Exception completionException
= null;
1959 bool completeSelf
= false;
1960 bool success
= false;
1963 completeSelf
= thisPtr
.HandleWriteEndBytes();
1966 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
1973 completeSelf
= true;
1974 completionException
= e
;
1986 thisPtr
.Complete(false, completionException
);