Updates referencesource to .NET 4.7
[mono-project.git] / mcs / class / referencesource / System.ServiceModel / System / ServiceModel / Channels / SingletonConnectionReader.cs
blob4cb52f5798015d2dd3d4f86e922a609febcbb0e7
1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation. All rights reserved.
3 //------------------------------------------------------------
5 namespace System.ServiceModel.Channels
7 using System;
8 using System.Diagnostics;
9 using System.IO;
10 using System.Net;
11 using System.Runtime;
12 using System.Runtime.CompilerServices;
13 using System.Security.Authentication.ExtendedProtection;
14 using System.ServiceModel;
15 using System.ServiceModel.Activation;
16 using System.ServiceModel.Description;
17 using System.ServiceModel.Diagnostics;
18 using System.ServiceModel.Dispatcher;
19 using System.ServiceModel.Security;
20 using System.Threading;
21 using System.Xml;
22 using System.ServiceModel.Diagnostics.Application;
24 delegate void ServerSingletonPreambleCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader);
25 delegate ISingletonChannelListener SingletonPreambleDemuxCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader);
26 interface ISingletonChannelListener
28 TimeSpan ReceiveTimeout { get; }
29 void ReceiveRequest(RequestContext requestContext, Action callback, bool canDispatchOnThisThread);
32 class ServerSingletonPreambleConnectionReader : InitialServerConnectionReader
34 ServerSingletonDecoder decoder;
35 ServerSingletonPreambleCallback callback;
36 WaitCallback onAsyncReadComplete;
37 IConnectionOrientedTransportFactorySettings transportSettings;
38 TransportSettingsCallback transportSettingsCallback;
39 SecurityMessageProperty security;
40 Uri via;
41 IConnection rawConnection;
42 byte[] connectionBuffer;
43 bool isReadPending;
44 int offset;
45 int size;
46 TimeoutHelper receiveTimeoutHelper;
47 Action<Uri> viaDelegate;
48 ChannelBinding channelBindingToken;
49 static AsyncCallback onValidate;
51 public ServerSingletonPreambleConnectionReader(IConnection connection, Action connectionDequeuedCallback,
52 long streamPosition, int offset, int size, TransportSettingsCallback transportSettingsCallback,
53 ConnectionClosedCallback closedCallback, ServerSingletonPreambleCallback callback)
54 : base(connection, closedCallback)
56 this.decoder = new ServerSingletonDecoder(streamPosition, MaxViaSize, MaxContentTypeSize);
57 this.offset = offset;
58 this.size = size;
59 this.callback = callback;
60 this.transportSettingsCallback = transportSettingsCallback;
61 this.rawConnection = connection;
62 this.ConnectionDequeuedCallback = connectionDequeuedCallback;
66 public ChannelBinding ChannelBinding
68 get
70 return this.channelBindingToken;
74 public int BufferOffset
76 get { return this.offset; }
79 public int BufferSize
81 get { return this.size; }
84 public ServerSingletonDecoder Decoder
86 get { return this.decoder; }
89 public IConnection RawConnection
91 get { return this.rawConnection; }
94 public Uri Via
96 get { return this.via; }
99 public IConnectionOrientedTransportFactorySettings TransportSettings
101 get { return this.transportSettings; }
104 public SecurityMessageProperty Security
106 get { return this.security; }
109 TimeSpan GetRemainingTimeout()
111 return this.receiveTimeoutHelper.RemainingTime();
114 void ReadAndDispatch()
116 bool success = false;
119 while ((size > 0 || !isReadPending) && !IsClosed)
121 if (size == 0)
123 isReadPending = true;
124 if (onAsyncReadComplete == null)
126 onAsyncReadComplete = new WaitCallback(OnAsyncReadComplete);
129 if (Connection.BeginRead(0, connectionBuffer.Length, GetRemainingTimeout(),
130 onAsyncReadComplete, null) == AsyncCompletionResult.Queued)
132 break;
134 HandleReadComplete();
137 int bytesRead = decoder.Decode(connectionBuffer, offset, size);
138 if (bytesRead > 0)
140 offset += bytesRead;
141 size -= bytesRead;
144 if (decoder.CurrentState == ServerSingletonDecoder.State.PreUpgradeStart)
146 if (onValidate == null)
148 onValidate = Fx.ThunkCallback(new AsyncCallback(OnValidate));
151 this.via = decoder.Via;
152 IAsyncResult result = this.Connection.BeginValidate(this.via, onValidate, this);
154 if (result.CompletedSynchronously)
156 if (!VerifyValidationResult(result))
158 // This goes through the failure path (Abort) even though it doesn't throw.
159 return;
162 break; //exit loop, set success=true;
165 success = true;
167 catch (CommunicationException exception)
169 DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
171 catch (TimeoutException exception)
173 if (TD.ReceiveTimeoutIsEnabled())
175 TD.ReceiveTimeout(exception.Message);
177 DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
179 catch (Exception e)
181 if (Fx.IsFatal(e))
183 throw;
185 if (!ExceptionHandler.HandleTransportExceptionHelper(e))
187 throw;
190 // containment -- we abort ourselves for any error, no extra containment needed
192 finally
194 if (!success)
196 Abort();
201 //returns true if validation was successful
202 bool VerifyValidationResult(IAsyncResult result)
204 return this.Connection.EndValidate(result) && this.ContinuePostValidationProcessing();
207 static void OnValidate(IAsyncResult result)
209 bool success = false;
210 ServerSingletonPreambleConnectionReader thisPtr = (ServerSingletonPreambleConnectionReader)result.AsyncState;
213 if (!result.CompletedSynchronously)
215 if (!thisPtr.VerifyValidationResult(result))
217 // This goes through the failure path (Abort) even though it doesn't throw.
218 return;
221 success = true;
223 catch (CommunicationException exception)
225 DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
227 catch (TimeoutException exception)
229 if (TD.ReceiveTimeoutIsEnabled())
231 TD.ReceiveTimeout(exception.Message);
233 DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
235 catch (Exception e)
237 if (Fx.IsFatal(e))
239 throw;
242 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
244 finally
246 if (!success)
248 thisPtr.Abort();
253 //returns false if the connection should be aborted
254 bool ContinuePostValidationProcessing()
256 if (viaDelegate != null)
260 viaDelegate(via);
262 catch (ServiceActivationException e)
264 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
265 // return fault and close connection
266 SendFault(FramingEncodingString.ServiceActivationFailedFault);
267 return true;
272 this.transportSettings = transportSettingsCallback(via);
274 if (transportSettings == null)
276 EndpointNotFoundException e = new EndpointNotFoundException(SR.GetString(SR.EndpointNotFound, decoder.Via));
277 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
278 // return fault and close connection
279 SendFault(FramingEncodingString.EndpointNotFoundFault);
280 return false;
283 // we have enough information to hand off to a channel. Our job is done
284 callback(this);
285 return true;
288 public void SendFault(string faultString)
290 SendFault(faultString, ref this.receiveTimeoutHelper);
293 void SendFault(string faultString, ref TimeoutHelper timeoutHelper)
295 InitialServerConnectionReader.SendFault(Connection, faultString,
296 connectionBuffer, timeoutHelper.RemainingTime(), TransportDefaults.MaxDrainSize);
299 public IAsyncResult BeginCompletePreamble(TimeSpan timeout, AsyncCallback callback, object state)
301 return new CompletePreambleAsyncResult(timeout, this, callback, state);
304 public IConnection EndCompletePreamble(IAsyncResult result)
306 return CompletePreambleAsyncResult.End(result);
309 class CompletePreambleAsyncResult : TypedAsyncResult<IConnection>
311 static WaitCallback onReadCompleted = new WaitCallback(OnReadCompleted);
312 static WaitCallback onWriteCompleted = new WaitCallback(OnWriteCompleted);
313 static AsyncCallback onUpgradeComplete = Fx.ThunkCallback(OnUpgradeComplete);
314 TimeoutHelper timeoutHelper;
315 ServerSingletonPreambleConnectionReader parent;
316 StreamUpgradeAcceptor upgradeAcceptor;
317 StreamUpgradeProvider upgrade;
318 IStreamUpgradeChannelBindingProvider channelBindingProvider;
319 IConnection currentConnection;
320 UpgradeState upgradeState = UpgradeState.None;
322 public CompletePreambleAsyncResult(TimeSpan timeout, ServerSingletonPreambleConnectionReader parent, AsyncCallback callback, object state)
323 : base(callback, state)
325 this.timeoutHelper = new TimeoutHelper(timeout);
326 this.parent = parent;
328 Initialize();
330 if (ContinueWork(null))
332 Complete(this.currentConnection, true);
336 byte[] ConnectionBuffer
340 return this.parent.connectionBuffer;
344 this.parent.connectionBuffer = value;
348 int Offset
352 return this.parent.offset;
356 this.parent.offset = value;
360 int Size
364 return this.parent.size;
368 this.parent.size = value;
372 bool CanReadAndDecode
376 //ok to read/decode before we start the upgrade
377 //and between UpgradeComplete/WritingPreambleAck
378 return this.upgradeState == UpgradeState.None
379 || this.upgradeState == UpgradeState.UpgradeComplete;
383 ServerSingletonDecoder Decoder
387 return this.parent.decoder;
391 void Initialize()
393 if (!this.parent.transportSettings.MessageEncoderFactory.Encoder.IsContentTypeSupported(Decoder.ContentType))
395 SendFault(FramingEncodingString.ContentTypeInvalidFault, ref timeoutHelper);
396 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(
397 SR.ContentTypeMismatch, Decoder.ContentType, parent.transportSettings.MessageEncoderFactory.Encoder.ContentType)));
400 upgrade = this.parent.transportSettings.Upgrade;
401 if (upgrade != null)
403 channelBindingProvider = upgrade.GetProperty<IStreamUpgradeChannelBindingProvider>();
404 upgradeAcceptor = upgrade.CreateUpgradeAcceptor();
407 this.currentConnection = this.parent.Connection;
410 void SendFault(string faultString, ref TimeoutHelper timeoutHelper)
412 this.parent.SendFault(faultString, ref timeoutHelper);
415 bool BeginRead()
417 this.Offset = 0;
418 return this.currentConnection.BeginRead(0, this.ConnectionBuffer.Length, timeoutHelper.RemainingTime(), onReadCompleted, this) == AsyncCompletionResult.Completed;
421 void EndRead()
423 this.Size = currentConnection.EndRead();
424 if (this.Size == 0)
426 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.Decoder.CreatePrematureEOFException());
430 bool ContinueWork(IAsyncResult upgradeAsyncResult)
432 if (upgradeAsyncResult != null)
434 Fx.AssertAndThrow(this.upgradeState == UpgradeState.EndUpgrade, "upgradeAsyncResult should only be passed in from OnUpgradeComplete callback");
437 for (;;)
439 if (Size == 0 && this.CanReadAndDecode)
441 if (BeginRead())
443 EndRead();
445 else
447 //when read completes, we will re-enter this loop.
448 break;
452 for (;;)
454 if (this.CanReadAndDecode)
456 int bytesRead = Decoder.Decode(ConnectionBuffer, Offset, Size);
457 if (bytesRead > 0)
459 Offset += bytesRead;
460 Size -= bytesRead;
464 switch (Decoder.CurrentState)
466 case ServerSingletonDecoder.State.UpgradeRequest:
467 switch (this.upgradeState)
469 case UpgradeState.None:
470 //change the state so that we don't read/decode until it is safe
471 ChangeUpgradeState(UpgradeState.VerifyingUpgradeRequest);
472 break;
473 case UpgradeState.VerifyingUpgradeRequest:
474 if (this.upgradeAcceptor == null)
476 SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
477 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
478 new ProtocolException(SR.GetString(SR.UpgradeRequestToNonupgradableService, Decoder.Upgrade)));
481 if (!this.upgradeAcceptor.CanUpgrade(Decoder.Upgrade))
483 SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
484 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(SR.UpgradeProtocolNotSupported, Decoder.Upgrade)));
487 ChangeUpgradeState(UpgradeState.WritingUpgradeAck);
488 // accept upgrade
489 if (this.currentConnection.BeginWrite(ServerSingletonEncoder.UpgradeResponseBytes, 0, ServerSingletonEncoder.UpgradeResponseBytes.Length,
490 true, timeoutHelper.RemainingTime(), onWriteCompleted, this) == AsyncCompletionResult.Queued)
492 //OnWriteCompleted will:
493 // 1) set upgradeState to UpgradeAckSent
494 // 2) call EndWrite
495 return false;
497 else
499 this.currentConnection.EndWrite();
502 ChangeUpgradeState(UpgradeState.UpgradeAckSent);
503 break;
504 case UpgradeState.UpgradeAckSent:
505 IConnection connectionToUpgrade = this.currentConnection;
506 if (Size > 0)
508 connectionToUpgrade = new PreReadConnection(connectionToUpgrade, ConnectionBuffer, Offset, Size);
510 ChangeUpgradeState(UpgradeState.BeginUpgrade);
511 break;
512 case UpgradeState.BeginUpgrade:
515 if (!BeginUpgrade(out upgradeAsyncResult))
517 //OnUpgradeComplete will set upgradeState to EndUpgrade
518 return false;
521 ChangeUpgradeState(UpgradeState.EndUpgrade);
523 catch (Exception exception)
525 if (Fx.IsFatal(exception))
526 throw;
528 this.parent.WriteAuditFailure(upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception);
529 throw;
531 break;
532 case UpgradeState.EndUpgrade://Must be a different state here than UpgradeComplete so that we don't try to read from the connection
535 EndUpgrade(upgradeAsyncResult);
536 ChangeUpgradeState(UpgradeState.UpgradeComplete);
538 catch (Exception exception)
540 if (Fx.IsFatal(exception))
541 throw;
543 this.parent.WriteAuditFailure(upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception);
544 throw;
546 break;
547 case UpgradeState.UpgradeComplete:
548 //Client is doing more than one upgrade, reset the state
549 ChangeUpgradeState(UpgradeState.VerifyingUpgradeRequest);
550 break;
552 break;
553 case ServerSingletonDecoder.State.Start:
554 this.parent.SetupSecurityIfNecessary(upgradeAcceptor);
556 if (this.upgradeState == UpgradeState.UpgradeComplete //We have done at least one upgrade, but we are now done.
557 || this.upgradeState == UpgradeState.None)//no upgrade, just send the preample end bytes
559 ChangeUpgradeState(UpgradeState.WritingPreambleEnd);
560 // we've finished the preamble. Ack and return.
561 if (this.currentConnection.BeginWrite(ServerSessionEncoder.AckResponseBytes, 0, ServerSessionEncoder.AckResponseBytes.Length,
562 true, timeoutHelper.RemainingTime(), onWriteCompleted, this) == AsyncCompletionResult.Queued)
564 //OnWriteCompleted will:
565 // 1) set upgradeState to PreambleEndSent
566 // 2) call EndWrite
567 return false;
569 else
571 this.currentConnection.EndWrite();
574 //terminal state
575 ChangeUpgradeState(UpgradeState.PreambleEndSent);
578 //we are done, this.currentConnection is the upgraded connection
579 return true;
582 if (Size == 0)
584 break;
589 return false;
592 bool BeginUpgrade(out IAsyncResult upgradeAsyncResult)
594 upgradeAsyncResult = InitialServerConnectionReader.BeginUpgradeConnection(this.currentConnection, upgradeAcceptor, this.parent.transportSettings, onUpgradeComplete, this);
596 if (!upgradeAsyncResult.CompletedSynchronously)
598 upgradeAsyncResult = null; //caller shouldn't use this out param unless completed sync.
599 return false;
602 return true;
605 void EndUpgrade(IAsyncResult upgradeAsyncResult)
607 this.currentConnection = InitialServerConnectionReader.EndUpgradeConnection(upgradeAsyncResult);
609 this.ConnectionBuffer = this.currentConnection.AsyncReadBuffer;
611 if (this.channelBindingProvider != null
612 && this.channelBindingProvider.IsChannelBindingSupportEnabled
613 && this.parent.channelBindingToken == null)//first one wins in the case of multiple upgrades.
615 this.parent.channelBindingToken = channelBindingProvider.GetChannelBinding(this.upgradeAcceptor, ChannelBindingKind.Endpoint);
619 void ChangeUpgradeState(UpgradeState newState)
621 switch (newState)
623 case UpgradeState.None:
624 throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
625 case UpgradeState.VerifyingUpgradeRequest:
626 if (this.upgradeState != UpgradeState.None //starting first upgrade
627 && this.upgradeState != UpgradeState.UpgradeComplete)//completing one upgrade and starting another
629 throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
631 break;
632 case UpgradeState.WritingUpgradeAck:
633 if (this.upgradeState != UpgradeState.VerifyingUpgradeRequest)
635 throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
637 break;
638 case UpgradeState.UpgradeAckSent:
639 if (this.upgradeState != UpgradeState.WritingUpgradeAck)
641 throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
643 break;
644 case UpgradeState.BeginUpgrade:
645 if (this.upgradeState != UpgradeState.UpgradeAckSent)
647 throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
649 break;
650 case UpgradeState.EndUpgrade:
651 if (this.upgradeState != UpgradeState.BeginUpgrade)
653 throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
655 break;
656 case UpgradeState.UpgradeComplete:
657 if (this.upgradeState != UpgradeState.EndUpgrade)
659 throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
661 break;
662 case UpgradeState.WritingPreambleEnd:
663 if (this.upgradeState != UpgradeState.None //no upgrade being used
664 && this.upgradeState != UpgradeState.UpgradeComplete)//upgrades are now complete, end the preamble handshake.
666 throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
668 break;
669 case UpgradeState.PreambleEndSent:
670 if (this.upgradeState != UpgradeState.WritingPreambleEnd)
672 throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
674 break;
675 default:
676 throw Fx.AssertAndThrow("Unexpected Upgrade State: " + newState);
678 this.upgradeState = newState;
681 static void OnReadCompleted(object state)
683 CompletePreambleAsyncResult thisPtr = (CompletePreambleAsyncResult)state;
684 Exception completionException = null;
685 bool completeSelf = false;
689 thisPtr.EndRead();
690 completeSelf = thisPtr.ContinueWork(null);
692 catch (Exception ex)
694 if (Fx.IsFatal(ex))
696 throw;
698 completionException = ex;
699 completeSelf = true;
702 if (completeSelf)
704 if (completionException != null)
706 thisPtr.Complete(false, completionException);
708 else
710 thisPtr.Complete(thisPtr.currentConnection, false);
715 static void OnWriteCompleted(object state)
717 CompletePreambleAsyncResult thisPtr = (CompletePreambleAsyncResult)state;
718 Exception completionException = null;
719 bool completeSelf = false;
723 thisPtr.currentConnection.EndWrite();
725 switch (thisPtr.upgradeState)
727 case UpgradeState.WritingUpgradeAck:
728 thisPtr.ChangeUpgradeState(UpgradeState.UpgradeAckSent);
729 break;
730 case UpgradeState.WritingPreambleEnd:
731 thisPtr.ChangeUpgradeState(UpgradeState.PreambleEndSent);
732 break;
734 completeSelf = thisPtr.ContinueWork(null);
736 catch (Exception ex)
738 if (Fx.IsFatal(ex))
740 throw;
742 completionException = ex;
743 completeSelf = true;
746 if (completeSelf)
748 if (completionException != null)
750 thisPtr.Complete(false, completionException);
752 else
754 thisPtr.Complete(thisPtr.currentConnection, false);
759 static void OnUpgradeComplete(IAsyncResult result)
761 if (result.CompletedSynchronously)
763 return;
766 CompletePreambleAsyncResult thisPtr = (CompletePreambleAsyncResult)result.AsyncState;
767 Exception completionException = null;
768 bool completeSelf = false;
772 thisPtr.ChangeUpgradeState(UpgradeState.EndUpgrade);
773 completeSelf = thisPtr.ContinueWork(result);
775 catch (Exception ex)
777 if (Fx.IsFatal(ex))
779 throw;
781 completionException = ex;
782 completeSelf = true;
785 if (completeSelf)
787 if (completionException != null)
789 thisPtr.Complete(false, completionException);
791 else
793 thisPtr.Complete(thisPtr.currentConnection, false);
798 enum UpgradeState
800 None,
801 VerifyingUpgradeRequest,
802 WritingUpgradeAck,
803 UpgradeAckSent,
804 BeginUpgrade,
805 EndUpgrade,
806 UpgradeComplete,
807 WritingPreambleEnd,
808 PreambleEndSent,
812 void SetupSecurityIfNecessary(StreamUpgradeAcceptor upgradeAcceptor)
814 StreamSecurityUpgradeAcceptor securityUpgradeAcceptor = upgradeAcceptor as StreamSecurityUpgradeAcceptor;
815 if (securityUpgradeAcceptor != null)
817 this.security = securityUpgradeAcceptor.GetRemoteSecurity();
818 if (this.security == null)
820 Exception securityFailedException = new ProtocolException(
821 SR.GetString(SR.RemoteSecurityNotNegotiatedOnStreamUpgrade, this.Via));
822 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(securityFailedException);
824 // Audit Authentication Success
825 WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Success, null);
829 #region Transport Security Auditing
830 void WriteAuditFailure(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, Exception exception)
834 WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Failure, exception);
836 #pragma warning suppress 56500 // covered by FxCop
837 catch (Exception auditException)
839 if (Fx.IsFatal(auditException))
841 throw;
844 DiagnosticUtility.TraceHandledException(auditException, TraceEventType.Error);
848 void WriteAuditEvent(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, AuditLevel auditLevel, Exception exception)
850 if ((this.transportSettings.AuditBehavior.MessageAuthenticationAuditLevel & auditLevel) != auditLevel)
852 return;
855 if (securityUpgradeAcceptor == null)
857 return;
859 String primaryIdentity = String.Empty;
860 SecurityMessageProperty clientSecurity = securityUpgradeAcceptor.GetRemoteSecurity();
861 if (clientSecurity != null)
863 primaryIdentity = GetIdentityNameFromContext(clientSecurity);
866 ServiceSecurityAuditBehavior auditBehavior = this.transportSettings.AuditBehavior;
868 if (auditLevel == AuditLevel.Success)
870 SecurityAuditHelper.WriteTransportAuthenticationSuccessEvent(auditBehavior.AuditLogLocation,
871 auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity);
873 else
875 SecurityAuditHelper.WriteTransportAuthenticationFailureEvent(auditBehavior.AuditLogLocation,
876 auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity, exception);
880 [MethodImpl(MethodImplOptions.NoInlining)]
881 static string GetIdentityNameFromContext(SecurityMessageProperty clientSecurity)
883 return SecurityUtils.GetIdentityNamesFromContext(
884 clientSecurity.ServiceSecurityContext.AuthorizationContext);
886 #endregion
888 void HandleReadComplete()
890 offset = 0;
891 size = Connection.EndRead();
892 isReadPending = false;
893 if (size == 0)
895 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
899 void OnAsyncReadComplete(object state)
901 bool success = false;
904 HandleReadComplete();
905 ReadAndDispatch();
906 success = true;
908 catch (CommunicationException exception)
910 DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
912 catch (TimeoutException exception)
914 if (TD.ReceiveTimeoutIsEnabled())
916 TD.ReceiveTimeout(exception.Message);
918 DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
920 catch (Exception e)
922 if (Fx.IsFatal(e))
924 throw;
926 if (!ExceptionHandler.HandleTransportExceptionHelper(e))
928 throw;
931 // containment -- we abort ourselves for any error, no extra containment needed
933 finally
935 if (!success)
937 Abort();
942 public void StartReading(Action<Uri> viaDelegate, TimeSpan timeout)
944 this.viaDelegate = viaDelegate;
945 this.receiveTimeoutHelper = new TimeoutHelper(timeout);
946 this.connectionBuffer = Connection.AsyncReadBuffer;
947 ReadAndDispatch();
951 class ServerSingletonConnectionReader : SingletonConnectionReader
953 ConnectionDemuxer connectionDemuxer;
954 ServerSingletonDecoder decoder;
955 IConnection rawConnection;
956 string contentType;
957 ChannelBinding channelBindingToken;
959 public ServerSingletonConnectionReader(ServerSingletonPreambleConnectionReader preambleReader,
960 IConnection upgradedConnection, ConnectionDemuxer connectionDemuxer)
961 : base(upgradedConnection, preambleReader.BufferOffset, preambleReader.BufferSize,
962 preambleReader.Security, preambleReader.TransportSettings, preambleReader.Via)
964 this.decoder = preambleReader.Decoder;
965 this.contentType = this.decoder.ContentType;
966 this.connectionDemuxer = connectionDemuxer;
967 this.rawConnection = preambleReader.RawConnection;
968 this.channelBindingToken = preambleReader.ChannelBinding;
971 protected override string ContentType
973 get { return this.contentType; }
976 protected override long StreamPosition
978 get { return this.decoder.StreamPosition; }
981 protected override bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof)
983 while (size > 0)
985 int bytesRead = decoder.Decode(buffer, offset, size);
986 if (bytesRead > 0)
988 offset += bytesRead;
989 size -= bytesRead;
992 switch (decoder.CurrentState)
994 case ServerSingletonDecoder.State.EnvelopeStart:
995 // we're at the envelope
996 return true;
998 case ServerSingletonDecoder.State.End:
999 isAtEof = true;
1000 return false;
1004 return false;
1007 protected override void OnClose(TimeSpan timeout)
1009 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1010 // send back EOF and then recycle the connection
1011 this.Connection.Write(SingletonEncoder.EndBytes, 0, SingletonEncoder.EndBytes.Length, true, timeoutHelper.RemainingTime());
1012 this.connectionDemuxer.ReuseConnection(this.rawConnection, timeoutHelper.RemainingTime());
1014 ChannelBindingUtility.Dispose(ref this.channelBindingToken);
1017 protected override void PrepareMessage(Message message)
1019 base.PrepareMessage(message);
1020 IPEndPoint remoteEndPoint = this.rawConnection.RemoteIPEndPoint;
1022 // pipes will return null
1023 if (remoteEndPoint != null)
1025 RemoteEndpointMessageProperty remoteEndpointProperty = new RemoteEndpointMessageProperty(remoteEndPoint);
1026 message.Properties.Add(RemoteEndpointMessageProperty.Name, remoteEndpointProperty);
1029 if (this.channelBindingToken != null)
1031 ChannelBindingMessageProperty property = new ChannelBindingMessageProperty(this.channelBindingToken, false);
1032 property.AddTo(message);
1033 property.Dispose(); //message.Properties.Add() creates a copy...
1038 abstract class SingletonConnectionReader
1040 IConnection connection;
1041 bool doneReceiving;
1042 bool doneSending;
1043 bool isAtEof;
1044 bool isClosed;
1045 SecurityMessageProperty security;
1046 object thisLock = new object();
1047 int offset;
1048 int size;
1049 IConnectionOrientedTransportFactorySettings transportSettings;
1050 Uri via;
1051 Stream inputStream;
1053 protected SingletonConnectionReader(IConnection connection, int offset, int size, SecurityMessageProperty security,
1054 IConnectionOrientedTransportFactorySettings transportSettings, Uri via)
1056 this.connection = connection;
1057 this.offset = offset;
1058 this.size = size;
1059 this.security = security;
1060 this.transportSettings = transportSettings;
1061 this.via = via;
1064 protected IConnection Connection
1068 return this.connection;
1072 protected object ThisLock
1076 return this.thisLock;
1080 protected virtual string ContentType
1082 get { return null; }
1085 protected abstract long StreamPosition { get; }
1087 public void Abort()
1089 this.connection.Abort();
1092 public void DoneReceiving(bool atEof)
1094 DoneReceiving(atEof, this.transportSettings.CloseTimeout);
1097 void DoneReceiving(bool atEof, TimeSpan timeout)
1099 if (!this.doneReceiving)
1101 this.isAtEof = atEof;
1102 this.doneReceiving = true;
1104 if (this.doneSending)
1106 this.Close(timeout);
1111 public void Close(TimeSpan timeout)
1113 lock (ThisLock)
1115 if (this.isClosed)
1117 return;
1120 this.isClosed = true;
1123 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1124 bool success = false;
1127 // first drain our stream if necessary
1128 if (this.inputStream != null)
1130 byte[] dummy = DiagnosticUtility.Utility.AllocateByteArray(transportSettings.ConnectionBufferSize);
1131 while (!this.isAtEof)
1133 this.inputStream.ReadTimeout = TimeoutHelper.ToMilliseconds(timeoutHelper.RemainingTime());
1134 int bytesRead = this.inputStream.Read(dummy, 0, dummy.Length);
1135 if (bytesRead == 0)
1137 this.isAtEof = true;
1141 OnClose(timeoutHelper.RemainingTime());
1142 success = true;
1144 finally
1146 if (!success)
1148 this.Abort();
1153 protected abstract void OnClose(TimeSpan timeout);
1155 public void DoneSending(TimeSpan timeout)
1157 this.doneSending = true;
1158 if (this.doneReceiving)
1160 this.Close(timeout);
1164 protected abstract bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof);
1166 protected virtual void PrepareMessage(Message message)
1168 message.Properties.Via = this.via;
1169 message.Properties.Security = (this.security != null) ? (SecurityMessageProperty)this.security.CreateCopy() : null;
1172 public RequestContext ReceiveRequest(TimeSpan timeout)
1174 Message requestMessage = Receive(timeout);
1175 return new StreamedFramingRequestContext(this, requestMessage);
1178 public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
1180 return new ReceiveAsyncResult(this, timeout, callback, state);
1183 public virtual Message EndReceive(IAsyncResult result)
1185 return ReceiveAsyncResult.End(result);
1188 public Message Receive(TimeSpan timeout)
1190 byte[] buffer = DiagnosticUtility.Utility.AllocateByteArray(connection.AsyncReadBufferSize);
1192 if (size > 0)
1194 Buffer.BlockCopy(connection.AsyncReadBuffer, offset, buffer, offset, size);
1197 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1198 for (;;)
1200 if (DecodeBytes(buffer, ref offset, ref size, ref isAtEof))
1202 break;
1205 if (this.isAtEof)
1207 DoneReceiving(true, timeoutHelper.RemainingTime());
1208 return null;
1211 if (size == 0)
1213 offset = 0;
1214 size = connection.Read(buffer, 0, buffer.Length, timeoutHelper.RemainingTime());
1215 if (size == 0)
1217 DoneReceiving(true, timeoutHelper.RemainingTime());
1218 return null;
1223 // we're ready to read a message
1224 IConnection singletonConnection = this.connection;
1225 if (size > 0)
1227 byte[] initialData = DiagnosticUtility.Utility.AllocateByteArray(size);
1228 Buffer.BlockCopy(buffer, offset, initialData, 0, size);
1229 singletonConnection = new PreReadConnection(singletonConnection, initialData);
1232 Stream connectionStream = new SingletonInputConnectionStream(this, singletonConnection, this.transportSettings);
1233 this.inputStream = new MaxMessageSizeStream(connectionStream, transportSettings.MaxReceivedMessageSize);
1234 using (ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? ServiceModelActivity.CreateBoundedActivity(true) : null)
1236 if (DiagnosticUtility.ShouldUseActivity)
1238 ServiceModelActivity.Start(activity, SR.GetString(SR.ActivityProcessingMessage, TraceUtility.RetrieveMessageNumber()), ActivityType.ProcessMessage);
1241 Message message = null;
1244 message = transportSettings.MessageEncoderFactory.Encoder.ReadMessage(
1245 this.inputStream, transportSettings.MaxBufferSize, this.ContentType);
1247 catch (XmlException xmlException)
1249 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
1250 new ProtocolException(SR.GetString(SR.MessageXmlProtocolError), xmlException));
1253 if (DiagnosticUtility.ShouldUseActivity)
1255 TraceUtility.TransferFromTransport(message);
1258 PrepareMessage(message);
1260 return message;
1264 class ReceiveAsyncResult : AsyncResult
1266 static Action<object> onReceiveScheduled = new Action<object>(OnReceiveScheduled);
1268 Message message;
1269 SingletonConnectionReader parent;
1270 TimeSpan timeout;
1272 public ReceiveAsyncResult(SingletonConnectionReader parent, TimeSpan timeout, AsyncCallback callback,
1273 object state)
1274 : base(callback, state)
1276 this.parent = parent;
1277 this.timeout = timeout;
1280 ActionItem.Schedule(onReceiveScheduled, this);
1283 public static Message End(IAsyncResult result)
1285 ReceiveAsyncResult receiveAsyncResult = AsyncResult.End<ReceiveAsyncResult>(result);
1286 return receiveAsyncResult.message;
1289 static void OnReceiveScheduled(object state)
1291 ReceiveAsyncResult thisPtr = (ReceiveAsyncResult)state;
1293 Exception completionException = null;
1296 thisPtr.message = thisPtr.parent.Receive(thisPtr.timeout);
1298 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
1299 catch (Exception exception)
1301 if (Fx.IsFatal(exception))
1303 throw;
1305 completionException = exception;
1307 thisPtr.Complete(false, completionException);
1311 class StreamedFramingRequestContext : RequestContextBase
1313 IConnection connection;
1314 SingletonConnectionReader parent;
1315 IConnectionOrientedTransportFactorySettings settings;
1316 TimeoutHelper timeoutHelper;
1318 public StreamedFramingRequestContext(SingletonConnectionReader parent, Message requestMessage)
1319 : base(requestMessage, parent.transportSettings.CloseTimeout, parent.transportSettings.SendTimeout)
1321 this.parent = parent;
1322 this.connection = parent.connection;
1323 this.settings = parent.transportSettings;
1326 protected override void OnAbort()
1328 this.parent.Abort();
1331 protected override void OnClose(TimeSpan timeout)
1333 this.parent.Close(timeout);
1336 protected override void OnReply(Message message, TimeSpan timeout)
1338 ICompressedMessageEncoder compressedMessageEncoder = this.settings.MessageEncoderFactory.Encoder as ICompressedMessageEncoder;
1339 if (compressedMessageEncoder != null && compressedMessageEncoder.CompressionEnabled)
1341 compressedMessageEncoder.AddCompressedMessageProperties(message, this.parent.ContentType);
1344 timeoutHelper = new TimeoutHelper(timeout);
1345 StreamingConnectionHelper.WriteMessage(message, this.connection, false, this.settings, ref timeoutHelper);
1346 parent.DoneSending(timeoutHelper.RemainingTime());
1349 protected override IAsyncResult OnBeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state)
1351 ICompressedMessageEncoder compressedMessageEncoder = this.settings.MessageEncoderFactory.Encoder as ICompressedMessageEncoder;
1352 if (compressedMessageEncoder != null && compressedMessageEncoder.CompressionEnabled)
1354 compressedMessageEncoder.AddCompressedMessageProperties(message, this.parent.ContentType);
1357 timeoutHelper = new TimeoutHelper(timeout);
1358 return StreamingConnectionHelper.BeginWriteMessage(message, this.connection, false, this.settings,
1359 ref timeoutHelper, callback, state);
1362 protected override void OnEndReply(IAsyncResult result)
1364 StreamingConnectionHelper.EndWriteMessage(result);
1365 parent.DoneSending(timeoutHelper.RemainingTime());
1369 // ensures that the reader is notified at end-of-stream, and takes care of the framing chunk headers
1370 class SingletonInputConnectionStream : ConnectionStream
1372 SingletonMessageDecoder decoder;
1373 SingletonConnectionReader reader;
1374 bool atEof;
1375 byte[] chunkBuffer; // used for when we have overflow
1376 int chunkBufferOffset;
1377 int chunkBufferSize;
1378 int chunkBytesRemaining;
1380 public SingletonInputConnectionStream(SingletonConnectionReader reader, IConnection connection,
1381 IDefaultCommunicationTimeouts defaultTimeouts)
1382 : base(connection, defaultTimeouts)
1384 this.reader = reader;
1385 this.decoder = new SingletonMessageDecoder(reader.StreamPosition);
1386 this.chunkBytesRemaining = 0;
1387 this.chunkBuffer = new byte[IntEncoder.MaxEncodedSize];
1390 void AbortReader()
1392 this.reader.Abort();
1395 public override void Close()
1397 this.reader.DoneReceiving(this.atEof);
1400 // run chunk data through the decoder
1401 void DecodeData(byte[] buffer, int offset, int size)
1403 while (size > 0)
1405 int bytesRead = decoder.Decode(buffer, offset, size);
1406 offset += bytesRead;
1407 size -= bytesRead;
1408 Fx.Assert(decoder.CurrentState == SingletonMessageDecoder.State.ReadingEnvelopeBytes || decoder.CurrentState == SingletonMessageDecoder.State.ChunkEnd, "");
1412 // run the current data through the decoder to get valid message bytes
1413 void DecodeSize(byte[] buffer, ref int offset, ref int size)
1415 while (size > 0)
1417 int bytesRead = decoder.Decode(buffer, offset, size);
1419 if (bytesRead > 0)
1421 offset += bytesRead;
1422 size -= bytesRead;
1425 switch (decoder.CurrentState)
1427 case SingletonMessageDecoder.State.ChunkStart:
1428 this.chunkBytesRemaining = decoder.ChunkSize;
1430 // if we have overflow and we're not decoding out of our buffer, copy over
1431 if (size > 0 && !object.ReferenceEquals(buffer, this.chunkBuffer))
1433 Fx.Assert(size <= this.chunkBuffer.Length, "");
1434 Buffer.BlockCopy(buffer, offset, this.chunkBuffer, 0, size);
1435 this.chunkBufferOffset = 0;
1436 this.chunkBufferSize = size;
1438 return;
1440 case SingletonMessageDecoder.State.End:
1441 ProcessEof();
1442 return;
1447 int ReadCore(byte[] buffer, int offset, int count)
1449 int bytesRead = -1;
1452 bytesRead = base.Read(buffer, offset, count);
1453 if (bytesRead == 0)
1455 ProcessEof();
1458 finally
1460 if (bytesRead == -1) // there was an exception
1462 AbortReader();
1466 return bytesRead;
1469 public override int Read(byte[] buffer, int offset, int count)
1471 int result = 0;
1472 while (true)
1474 if (count == 0)
1476 return result;
1479 if (this.atEof)
1481 return result;
1484 // first deal with any residual carryover
1485 if (this.chunkBufferSize > 0)
1487 int bytesToCopy = Math.Min(chunkBytesRemaining,
1488 Math.Min(this.chunkBufferSize, count));
1490 Buffer.BlockCopy(this.chunkBuffer, this.chunkBufferOffset, buffer, offset, bytesToCopy);
1491 // keep decoder up to date
1492 DecodeData(this.chunkBuffer, this.chunkBufferOffset, bytesToCopy);
1494 this.chunkBufferOffset += bytesToCopy;
1495 this.chunkBufferSize -= bytesToCopy;
1496 this.chunkBytesRemaining -= bytesToCopy;
1497 if (this.chunkBytesRemaining == 0 && this.chunkBufferSize > 0)
1499 DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize);
1502 result += bytesToCopy;
1503 offset += bytesToCopy;
1504 count -= bytesToCopy;
1506 else if (chunkBytesRemaining > 0)
1508 // We're in the middle of a chunk. Try and include the next chunk size as well
1510 int bytesToRead = count;
1511 if (int.MaxValue - chunkBytesRemaining >= IntEncoder.MaxEncodedSize)
1513 bytesToRead = Math.Min(count, chunkBytesRemaining + IntEncoder.MaxEncodedSize);
1516 int bytesRead = ReadCore(buffer, offset, bytesToRead);
1518 // keep decoder up to date
1519 DecodeData(buffer, offset, Math.Min(bytesRead, this.chunkBytesRemaining));
1521 if (bytesRead > chunkBytesRemaining)
1523 result += this.chunkBytesRemaining;
1524 int overflowCount = bytesRead - chunkBytesRemaining;
1525 int overflowOffset = offset + chunkBytesRemaining;
1526 this.chunkBytesRemaining = 0;
1527 // read at least part of the next chunk, and put any overflow in this.chunkBuffer
1528 DecodeSize(buffer, ref overflowOffset, ref overflowCount);
1530 else
1532 result += bytesRead;
1533 this.chunkBytesRemaining -= bytesRead;
1536 return result;
1538 else
1540 // Final case: we have a new chunk. Read the size, and loop around again
1541 if (count < IntEncoder.MaxEncodedSize)
1543 // we don't have space for MaxEncodedSize, so it's worth the copy cost to read into a temp buffer
1544 this.chunkBufferOffset = 0;
1545 this.chunkBufferSize = ReadCore(this.chunkBuffer, 0, this.chunkBuffer.Length);
1546 DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize);
1548 else
1550 int bytesRead = ReadCore(buffer, offset, IntEncoder.MaxEncodedSize);
1551 int sizeOffset = offset;
1552 DecodeSize(buffer, ref sizeOffset, ref bytesRead);
1558 void ProcessEof()
1560 if (!this.atEof)
1562 this.atEof = true;
1563 if (this.chunkBufferSize > 0 || this.chunkBytesRemaining > 0
1564 || decoder.CurrentState != SingletonMessageDecoder.State.End)
1566 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
1569 this.reader.DoneReceiving(true);
1573 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
1575 return new ReadAsyncResult(this, buffer, offset, count, callback, state);
1578 public override int EndRead(IAsyncResult result)
1580 return ReadAsyncResult.End(result);
1583 public class ReadAsyncResult : AsyncResult
1585 SingletonInputConnectionStream parent;
1586 int result;
1588 public ReadAsyncResult(SingletonInputConnectionStream parent,
1589 byte[] buffer, int offset, int count, AsyncCallback callback, object state)
1590 : base(callback, state)
1592 this.parent = parent;
1595 this.result = this.parent.Read(buffer, offset, count);
1596 base.Complete(true);
1599 public static int End(IAsyncResult result)
1601 ReadAsyncResult thisPtr = AsyncResult.End<ReadAsyncResult>(result);
1602 return thisPtr.result;
1608 static class StreamingConnectionHelper
1610 public static void WriteMessage(Message message, IConnection connection, bool isRequest,
1611 IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper)
1613 byte[] endBytes = null;
1614 if (message != null)
1616 MessageEncoder messageEncoder = settings.MessageEncoderFactory.Encoder;
1617 byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes;
1619 bool writeStreamed;
1620 if (isRequest)
1622 endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes;
1623 writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode);
1625 else
1627 endBytes = SingletonEncoder.EnvelopeEndBytes;
1628 writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode);
1631 if (writeStreamed)
1633 connection.Write(envelopeStartBytes, 0, envelopeStartBytes.Length, false, timeoutHelper.RemainingTime());
1634 Stream connectionStream = new StreamingOutputConnectionStream(connection, settings);
1635 Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper);
1636 messageEncoder.WriteMessage(message, writeTimeoutStream);
1638 else
1640 ArraySegment<byte> messageData = messageEncoder.WriteMessage(message,
1641 int.MaxValue, settings.BufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize);
1642 messageData = SingletonEncoder.EncodeMessageFrame(messageData);
1643 Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length,
1644 envelopeStartBytes.Length);
1645 connection.Write(messageData.Array, messageData.Offset - envelopeStartBytes.Length,
1646 messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(), settings.BufferManager);
1649 else if (isRequest) // context handles response end bytes
1651 endBytes = SingletonEncoder.EndBytes;
1654 if (endBytes != null)
1656 connection.Write(endBytes, 0, endBytes.Length,
1657 true, timeoutHelper.RemainingTime());
1661 public static IAsyncResult BeginWriteMessage(Message message, IConnection connection, bool isRequest,
1662 IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper,
1663 AsyncCallback callback, object state)
1665 return new WriteMessageAsyncResult(message, connection, isRequest, settings, ref timeoutHelper, callback, state);
1668 public static void EndWriteMessage(IAsyncResult result)
1670 WriteMessageAsyncResult.End(result);
1673 // overrides ConnectionStream to add a Framing int at the beginning of each record
1674 class StreamingOutputConnectionStream : ConnectionStream
1676 byte[] encodedSize;
1678 public StreamingOutputConnectionStream(IConnection connection, IDefaultCommunicationTimeouts timeouts)
1679 : base(connection, timeouts)
1681 this.encodedSize = new byte[IntEncoder.MaxEncodedSize];
1683 void WriteChunkSize(int size)
1685 if (size > 0)
1687 int bytesEncoded = IntEncoder.Encode(size, encodedSize, 0);
1688 base.Connection.Write(encodedSize, 0, bytesEncoded, false, TimeSpan.FromMilliseconds(this.WriteTimeout));
1692 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
1694 WriteChunkSize(count);
1695 return base.BeginWrite(buffer, offset, count, callback, state);
1698 public override void WriteByte(byte value)
1700 WriteChunkSize(1);
1701 base.WriteByte(value);
1704 public override void Write(byte[] buffer, int offset, int count)
1706 WriteChunkSize(count);
1707 base.Write(buffer, offset, count);
1711 class WriteMessageAsyncResult : AsyncResult
1713 IConnection connection;
1714 MessageEncoder encoder;
1715 BufferManager bufferManager;
1716 Message message;
1717 static WaitCallback onWriteBufferedMessage;
1718 static WaitCallback onWriteStartBytes;
1719 static Action<object> onWriteStartBytesScheduled;
1720 static WaitCallback onWriteEndBytes =
1721 Fx.ThunkCallback(new WaitCallback(OnWriteEndBytes));
1722 byte[] bufferToFree;
1723 IConnectionOrientedTransportFactorySettings settings;
1724 TimeoutHelper timeoutHelper;
1725 byte[] endBytes;
1727 public WriteMessageAsyncResult(Message message, IConnection connection, bool isRequest,
1728 IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper,
1729 AsyncCallback callback, object state)
1730 : base(callback, state)
1732 this.connection = connection;
1733 this.encoder = settings.MessageEncoderFactory.Encoder;
1734 this.bufferManager = settings.BufferManager;
1735 this.timeoutHelper = timeoutHelper;
1736 this.message = message;
1737 this.settings = settings;
1739 bool throwing = true;
1740 bool completeSelf = false;
1741 if (message == null)
1743 if (isRequest) // context takes care of the end bytes on Close/reader.EOF
1745 this.endBytes = SingletonEncoder.EndBytes;
1747 completeSelf = WriteEndBytes();
1749 else
1753 byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes;
1754 bool writeStreamed;
1755 if (isRequest)
1757 this.endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes;
1758 writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode);
1760 else
1762 this.endBytes = SingletonEncoder.EnvelopeEndBytes;
1763 writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode);
1766 if (writeStreamed)
1768 if (onWriteStartBytes == null)
1770 onWriteStartBytes = Fx.ThunkCallback(new WaitCallback(OnWriteStartBytes));
1773 AsyncCompletionResult writeStartBytesResult = connection.BeginWrite(envelopeStartBytes, 0, envelopeStartBytes.Length, true,
1774 timeoutHelper.RemainingTime(), onWriteStartBytes, this);
1776 if (writeStartBytesResult == AsyncCompletionResult.Completed)
1778 if (onWriteStartBytesScheduled == null)
1780 onWriteStartBytesScheduled = new Action<object>(OnWriteStartBytes);
1782 ActionItem.Schedule(onWriteStartBytesScheduled, this);
1785 else
1787 ArraySegment<byte> messageData = settings.MessageEncoderFactory.Encoder.WriteMessage(message,
1788 int.MaxValue, this.bufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize);
1789 messageData = SingletonEncoder.EncodeMessageFrame(messageData);
1790 this.bufferToFree = messageData.Array;
1791 Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length,
1792 envelopeStartBytes.Length);
1794 if (onWriteBufferedMessage == null)
1796 onWriteBufferedMessage = Fx.ThunkCallback(new WaitCallback(OnWriteBufferedMessage));
1798 AsyncCompletionResult writeBufferedResult =
1799 connection.BeginWrite(messageData.Array, messageData.Offset - envelopeStartBytes.Length,
1800 messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(),
1801 onWriteBufferedMessage, this);
1803 if (writeBufferedResult == AsyncCompletionResult.Completed)
1805 completeSelf = HandleWriteBufferedMessage();
1808 throwing = false;
1810 finally
1812 if (throwing)
1814 Cleanup();
1819 if (completeSelf)
1821 base.Complete(true);
1825 public static void End(IAsyncResult result)
1827 AsyncResult.End<WriteMessageAsyncResult>(result);
1830 void Cleanup()
1832 if (bufferToFree != null)
1834 this.bufferManager.ReturnBuffer(bufferToFree);
1838 bool HandleWriteStartBytes()
1840 connection.EndWrite();
1841 Stream connectionStream = new StreamingOutputConnectionStream(connection, settings);
1842 Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper);
1843 this.encoder.WriteMessage(message, writeTimeoutStream);
1844 return WriteEndBytes();
1847 bool HandleWriteBufferedMessage()
1849 this.connection.EndWrite();
1850 return WriteEndBytes();
1853 bool WriteEndBytes()
1855 if (this.endBytes == null)
1857 Cleanup();
1858 return true;
1861 AsyncCompletionResult result = connection.BeginWrite(endBytes, 0,
1862 endBytes.Length, true, timeoutHelper.RemainingTime(), onWriteEndBytes, this);
1864 if (result == AsyncCompletionResult.Queued)
1866 return false;
1869 return HandleWriteEndBytes();
1872 bool HandleWriteEndBytes()
1874 this.connection.EndWrite();
1875 Cleanup();
1876 return true;
1879 static void OnWriteStartBytes(object asyncState)
1881 OnWriteStartBytesCallbackHelper(asyncState);
1884 static void OnWriteStartBytesCallbackHelper(object asyncState)
1886 WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)asyncState;
1887 Exception completionException = null;
1888 bool completeSelf = false;
1889 bool throwing = true;
1892 completeSelf = thisPtr.HandleWriteStartBytes();
1893 throwing = false;
1895 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
1896 catch (Exception e)
1898 if (Fx.IsFatal(e))
1900 throw;
1902 completeSelf = true;
1903 completionException = e;
1905 finally
1907 if (throwing)
1909 thisPtr.Cleanup();
1913 if (completeSelf)
1915 thisPtr.Complete(false, completionException);
1919 static void OnWriteBufferedMessage(object asyncState)
1921 WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)asyncState;
1923 Exception completionException = null;
1924 bool completeSelf = false;
1925 bool throwing = true;
1928 completeSelf = thisPtr.HandleWriteBufferedMessage();
1929 throwing = false;
1931 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
1932 catch (Exception e)
1934 if (Fx.IsFatal(e))
1936 throw;
1938 completeSelf = true;
1939 completionException = e;
1941 finally
1943 if (throwing)
1945 thisPtr.Cleanup();
1948 if (completeSelf)
1950 thisPtr.Complete(false, completionException);
1954 static void OnWriteEndBytes(object asyncState)
1956 WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)asyncState;
1958 Exception completionException = null;
1959 bool completeSelf = false;
1960 bool success = false;
1963 completeSelf = thisPtr.HandleWriteEndBytes();
1964 success = true;
1966 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
1967 catch (Exception e)
1969 if (Fx.IsFatal(e))
1971 throw;
1973 completeSelf = true;
1974 completionException = e;
1976 finally
1978 if (!success)
1980 thisPtr.Cleanup();
1984 if (completeSelf)
1986 thisPtr.Complete(false, completionException);