2 // Copyright (c) Microsoft Corporation. All rights reserved.
5 namespace System
.ServiceModel
.Channels
8 using System
.Collections
.Generic
;
9 using System
.Diagnostics
.CodeAnalysis
;
10 using System
.Globalization
;
12 using System
.Net
.Sockets
;
14 using System
.Runtime
.Diagnostics
;
15 using System
.ServiceModel
.Diagnostics
;
16 using System
.Threading
;
19 internal abstract class UdpChannelBase
<QueueItemType
> : InputQueueChannel
<QueueItemType
>, IUdpReceiveHandler
20 where QueueItemType
: class, IDisposable
22 private bool cleanedUp
;
23 private long pendingMessagesTotalSize
;
24 private long maxPendingMessagesTotalSize
;
25 private int maxReceivedMessageSize
;
26 private UdpRetransmissionSettings retransmitSettings
;
29 protected UdpChannelBase(
30 ChannelManagerBase channelManager
,
31 MessageEncoder encoder
,
32 BufferManager bufferManager
,
34 UdpRetransmissionSettings retransmissionSettings
,
35 long maxPendingMessagesTotalSize
,
36 EndpointAddress localAddress
,
39 int maxReceivedMessageSize
)
40 : base(channelManager
)
42 Fx
.Assert(encoder
!= null, "encoder shouldn't be null");
43 Fx
.Assert(bufferManager
!= null, "buffer manager shouldn't be null");
44 Fx
.Assert(sockets
!= null, "sendSockets can't be null");
45 Fx
.Assert(sockets
.Length
> 0, "sendSockets can't be empty");
46 Fx
.Assert(retransmissionSettings
!= null, "retransmissionSettings can't be null");
47 Fx
.Assert(maxPendingMessagesTotalSize
>= 0, "maxPendingMessagesTotalSize must be >= 0");
48 Fx
.Assert(maxReceivedMessageSize
> 0, "maxReceivedMessageSize must be > 0");
49 Fx
.Assert(localAddress
!= null, "localAddress can't be null");
50 Fx
.Assert(via
!= null, "via can't be null");
52 this.maxPendingMessagesTotalSize
= maxPendingMessagesTotalSize
== UdpConstants
.Defaults
.DefaultMaxPendingMessagesTotalSize
? UdpConstants
.Defaults
.MaxPendingMessagesTotalSize
: maxPendingMessagesTotalSize
;
53 this.Encoder
= encoder
;
54 this.Sockets
= sockets
;
55 this.BufferManager
= bufferManager
;
56 this.retransmitSettings
= retransmissionSettings
;
57 this.IsMulticast
= isMulticast
;
58 this.DuplicateDetector
= null;
59 this.ReceiveManager
= null;
60 this.OwnsBufferManager
= false;
61 this.maxReceivedMessageSize
= maxReceivedMessageSize
;
62 this.LocalAddress
= localAddress
;
66 public EndpointAddress LocalAddress
74 get { return this.via; }
77 int IUdpReceiveHandler
.MaxReceivedMessageSize
79 get { return this.maxReceivedMessageSize; }
82 protected abstract bool IgnoreSerializationException { get; }
84 protected bool OwnsBufferManager { get; set; }
86 protected DuplicateMessageDetector DuplicateDetector { get; set; }
88 protected UdpSocketReceiveManager ReceiveManager { get; set; }
90 protected BufferManager BufferManager
96 protected MessageEncoder Encoder
102 protected bool IsMulticast
108 protected UdpOutputChannel UdpOutputChannel { get; private set; }
110 protected UdpSocket
[] Sockets
116 [SuppressMessage("Microsoft.StyleCop.CSharp.ReadabilityRules", "SA1100:DoNotPrefixCallsWithBaseUnlessLocalImplementationExists", Justification
= "StyleCop 4.5 does not validate this rule properly.")]
117 public override T GetProperty
<T
>()
119 if (typeof(T
) == typeof(IDuplexChannel
))
121 return (T
)(object)this;
124 T outputChannelProperty
= this.UdpOutputChannel
.GetProperty
<T
>();
125 if (outputChannelProperty
!= null)
127 return outputChannelProperty
;
130 T messageEncoderProperty
= this.Encoder
.GetProperty
<T
>();
131 if (messageEncoderProperty
!= null)
133 return messageEncoderProperty
;
136 return base.GetProperty
<T
>();
139 // returns false if the message was dropped because the max pending message count was hit.
140 bool IUdpReceiveHandler
.HandleDataReceived(ArraySegment
<byte> data
, EndPoint remoteEndpoint
, int interfaceIndex
, Action onMessageDequeuedCallback
)
142 bool returnBuffer
= true;
143 string messageHash
= null;
144 Message message
= null;
145 bool continueReceiving
= true;
149 IPEndPoint remoteIPEndPoint
= (IPEndPoint
)remoteEndpoint
;
151 message
= UdpUtility
.DecodeMessage(
152 this.DuplicateDetector
,
158 this.IgnoreSerializationException
,
163 // We pass in the length of the message buffer instead of the length of the message to keep track of the amount of memory that's been allocated
164 continueReceiving
= this.EnqueueMessage(message
, data
.Array
.Length
, onMessageDequeuedCallback
);
165 returnBuffer
= !continueReceiving
;
172 returnBuffer
= false;
176 this.HandleReceiveException(e
);
184 if (this.DuplicateDetector
!= null)
186 Fx
.Assert(messageHash
!= null, "message hash should always be available if duplicate detector is enabled");
187 this.DuplicateDetector
.RemoveEntry(messageHash
);
190 message
.Close(); // implicitly returns the buffer
194 this.BufferManager
.ReturnBuffer(data
.Array
);
199 return continueReceiving
;
202 void IUdpReceiveHandler
.HandleAsyncException(Exception ex
)
204 this.HandleReceiveException(ex
);
207 internal virtual void HandleReceiveException(Exception ex
)
209 this.EnqueueAndDispatch(UdpUtility
.WrapAsyncException(ex
), null, false);
212 // Since ChannelListener and channel lifetimes can be different, we need a
213 // way to transfer the socketReceiveManager and DuplicateMessageDetection
214 // objects to the channel if the listener gets closed. If this method succeeds, then
215 // this also indicates that the bufferManager is no longer owned by the channel listener,
216 // so we have to clean that up also.
217 internal bool TransferReceiveManagerOwnership(UdpSocketReceiveManager socketReceiveManager
, DuplicateMessageDetector duplicateDetector
)
219 bool success
= false;
220 if (this.State
== CommunicationState
.Opened
)
224 if (this.State
== CommunicationState
.Opened
)
226 Fx
.Assert(this.ReceiveManager
== null, "ReceiveManager is already set to a non-null value");
227 Fx
.Assert(this.DuplicateDetector
== null, "DuplicateDetector is already set to a non-null value");
229 this.ReceiveManager
= socketReceiveManager
;
230 this.OwnsBufferManager
= true;
231 this.ReceiveManager
.SetReceiveHandler(this);
232 this.DuplicateDetector
= duplicateDetector
;
241 // returns false if the max pending messages total size was hit.
242 internal bool EnqueueMessage(Message message
, int messageBufferSize
, Action messageDequeuedCallback
)
244 Action onMessageDequeuedCallback
= () =>
248 this.pendingMessagesTotalSize
-= messageBufferSize
;
249 Fx
.Assert(this.pendingMessagesTotalSize
>= 0, "pendingMessagesTotalSize should not be negative.");
252 messageDequeuedCallback();
255 bool success
= false;
258 if (this.pendingMessagesTotalSize
+ messageBufferSize
<= this.maxPendingMessagesTotalSize
)
260 message
.Properties
.Via
= this.Via
;
261 this.pendingMessagesTotalSize
+= messageBufferSize
;
264 this.FinishEnqueueMessage(message
, onMessageDequeuedCallback
, false);
271 this.pendingMessagesTotalSize
-= messageBufferSize
;
277 if (TD
.MaxPendingMessagesTotalSizeReachedIsEnabled())
279 string messageIdString
= string.Empty
;
280 if (message
.Headers
.MessageId
!= null)
282 messageIdString
= string.Format(CultureInfo
.CurrentCulture
, "'{0}' ", message
.Headers
.MessageId
.ToString());
285 EventTraceActivity eventTraceActivity
= EventTraceActivityHelper
.TryExtractActivity(message
);
286 TD
.MaxPendingMessagesTotalSizeReached(eventTraceActivity
, messageIdString
, this.maxPendingMessagesTotalSize
, typeof(TransportBindingElement
).FullName
);
294 internal abstract void FinishEnqueueMessage(Message message
, Action dequeuedCallback
, bool canDispatchOnThisThread
);
296 protected virtual void AddHeadersTo(Message message
)
298 Fx
.Assert(message
!= null, "Message can't be null");
300 if (message
.Version
.Addressing
!= AddressingVersion
.None
)
302 if (message
.Headers
.MessageId
== null)
304 message
.Headers
.MessageId
= new UniqueId();
309 if (this.retransmitSettings
.Enabled
== true)
311 // we should only get here if some channel above us starts producing messages that don't match the encoder's message version.
312 throw FxTrace
.Exception
.AsError(new ProtocolException(SR
.RetransmissionRequiresAddressingOnMessage(message
.Version
.Addressing
.ToString())));
317 // Closes the channel ungracefully during error conditions.
318 protected override void OnAbort()
320 this.Cleanup(true, TimeSpan
.Zero
);
323 protected override IAsyncResult
OnBeginOpen(TimeSpan timeout
, AsyncCallback callback
, object state
)
325 this.OnOpen(timeout
);
326 return new CompletedAsyncResult(callback
, state
);
329 protected override void OnEndOpen(IAsyncResult result
)
331 CompletedAsyncResult
.End(result
);
334 protected override void OnOpen(TimeSpan timeout
)
336 this.UdpOutputChannel
.Open();
339 protected override IAsyncResult
OnBeginClose(TimeSpan timeout
, AsyncCallback callback
, object state
)
341 return new CloseAsyncResult
<QueueItemType
>(
343 new ChainedBeginHandler(base.OnBeginClose
),
344 new ChainedEndHandler(base.OnEndClose
),
350 protected override void OnEndClose(IAsyncResult result
)
352 CloseAsyncResult
<QueueItemType
>.End(result
);
355 // Closes the channel gracefully during normal conditions.
356 protected override void OnClose(TimeSpan timeout
)
358 TimeoutHelper timeoutHelper
= new TimeoutHelper(timeout
);
359 this.Cleanup(false, timeoutHelper
.RemainingTime());
360 base.OnClose(timeoutHelper
.RemainingTime());
363 protected void SetOutputChannel(UdpOutputChannel udpOutputChannel
)
365 Fx
.Assert(this.UdpOutputChannel
== null, "this.UdpOutputChannel must be null");
366 Fx
.Assert(udpOutputChannel
!= null, "udpOutputChannel can't be null, since SetOutputChannel should be called only once");
368 this.UdpOutputChannel
= udpOutputChannel
;
371 // We're guaranteed by CommunicationObject that at most ONE of Close or BeginClose will be called once.
372 protected void Cleanup(bool aborting
, TimeSpan timeout
)
388 this.UdpOutputChannel
.Abort();
392 this.UdpOutputChannel
.Close(timeout
);
395 if (this.DuplicateDetector
!= null)
397 this.DuplicateDetector
.Dispose();
400 if (this.ReceiveManager
!= null)
402 this.ReceiveManager
.Close();
405 this.CleanupBufferManager();
407 this.cleanedUp
= true;
411 private void CleanupBufferManager()
413 if (this.OwnsBufferManager
)
415 this.BufferManager
.Clear();
419 // Control flow for async path
420 // We use this mechanism to avoid initializing two async objects as logically cleanup+close is one operation.
421 // At any point in the Begin* methods, we may go async. The steps are:
422 // - Close inner UdpOutputChannel
425 private class CloseAsyncResult
<T
> : AsyncResult
426 where T
: class, IDisposable
428 private static AsyncCompletion completeCloseOutputChannelCallback
= new AsyncCompletion(CompleteCloseOutputChannel
);
429 private static AsyncCompletion completeBaseCloseCallback
= new AsyncCompletion(CompleteBaseClose
);
431 private UdpChannelBase
<T
> channel
;
432 private TimeoutHelper timeoutHelper
;
433 private ChainedBeginHandler baseBeginClose
;
434 private ChainedEndHandler baseEndClose
;
436 public CloseAsyncResult(UdpChannelBase
<T
> channel
, ChainedBeginHandler baseBeginClose
, ChainedEndHandler baseEndClose
, TimeSpan timeout
, AsyncCallback callback
, object state
)
437 : base(callback
, state
)
439 this.channel
= channel
;
440 this.baseBeginClose
= baseBeginClose
;
441 this.baseEndClose
= baseEndClose
;
442 this.timeoutHelper
= new TimeoutHelper(timeout
);
444 if (this.BeginCloseOutputChannel())
450 public static void End(IAsyncResult result
)
452 AsyncResult
.End
<CloseAsyncResult
<T
>>(result
);
455 private static bool CompleteBaseClose(IAsyncResult result
)
457 // AsyncResult.AsyncCompletionWrapperCallback takes care of catching exceptions for us.
458 CloseAsyncResult
<T
> thisPtr
= (CloseAsyncResult
<T
>)result
.AsyncState
;
460 // We are completing the base class close operation at this point.
461 thisPtr
.baseEndClose(result
);
466 private static bool CompleteCloseOutputChannel(IAsyncResult result
)
468 // AsyncResult.AsyncCompletionWrapperCallback takes care of catching exceptions for us.
469 CloseAsyncResult
<T
> thisPtr
= (CloseAsyncResult
<T
>)result
.AsyncState
;
471 // We are completing the base class close operation at this point.
472 thisPtr
.channel
.UdpOutputChannel
.EndClose(result
);
474 thisPtr
.channel
.Cleanup(false, thisPtr
.timeoutHelper
.RemainingTime());
476 return thisPtr
.BeginBaseClose();
479 private bool BeginCloseOutputChannel()
481 // AsyncResult.AsyncCompletionWrapperCallback takes care of catching the exceptions for us.
482 IAsyncResult result
= this.channel
.UdpOutputChannel
.BeginClose(this.timeoutHelper
.RemainingTime(), this.PrepareAsyncCompletion(completeCloseOutputChannelCallback
), this);
484 // SyncContinue calls CompleteCloseOutputChannel for us in sync case.
485 return this.SyncContinue(result
);
488 private bool BeginBaseClose()
490 // AsyncResult.AsyncCompletionWrapperCallback takes care of catching the exceptions for us.
491 IAsyncResult result
= this.baseBeginClose(this.timeoutHelper
.RemainingTime(), this.PrepareAsyncCompletion(completeBaseCloseCallback
), this);
493 // SyncContinue calls CompleteBaseClose for us in sync case.
494 return this.SyncContinue(result
);