Updates referencesource to .NET 4.7
[mono-project.git] / mcs / class / referencesource / System.ServiceModel.Channels / System / ServiceModel / Channels / UdpChannelBase.cs
blob6e0f45677ef5f0d93367a14548b24bea60880393
1 // <copyright>
2 // Copyright (c) Microsoft Corporation. All rights reserved.
3 // </copyright>
5 namespace System.ServiceModel.Channels
7 using System;
8 using System.Collections.Generic;
9 using System.Diagnostics.CodeAnalysis;
10 using System.Globalization;
11 using System.Net;
12 using System.Net.Sockets;
13 using System.Runtime;
14 using System.Runtime.Diagnostics;
15 using System.ServiceModel.Diagnostics;
16 using System.Threading;
17 using System.Xml;
19 internal abstract class UdpChannelBase<QueueItemType> : InputQueueChannel<QueueItemType>, IUdpReceiveHandler
20 where QueueItemType : class, IDisposable
22 private bool cleanedUp;
23 private long pendingMessagesTotalSize;
24 private long maxPendingMessagesTotalSize;
25 private int maxReceivedMessageSize;
26 private UdpRetransmissionSettings retransmitSettings;
27 private Uri via;
29 protected UdpChannelBase(
30 ChannelManagerBase channelManager,
31 MessageEncoder encoder,
32 BufferManager bufferManager,
33 UdpSocket[] sockets,
34 UdpRetransmissionSettings retransmissionSettings,
35 long maxPendingMessagesTotalSize,
36 EndpointAddress localAddress,
37 Uri via,
38 bool isMulticast,
39 int maxReceivedMessageSize)
40 : base(channelManager)
42 Fx.Assert(encoder != null, "encoder shouldn't be null");
43 Fx.Assert(bufferManager != null, "buffer manager shouldn't be null");
44 Fx.Assert(sockets != null, "sendSockets can't be null");
45 Fx.Assert(sockets.Length > 0, "sendSockets can't be empty");
46 Fx.Assert(retransmissionSettings != null, "retransmissionSettings can't be null");
47 Fx.Assert(maxPendingMessagesTotalSize >= 0, "maxPendingMessagesTotalSize must be >= 0");
48 Fx.Assert(maxReceivedMessageSize > 0, "maxReceivedMessageSize must be > 0");
49 Fx.Assert(localAddress != null, "localAddress can't be null");
50 Fx.Assert(via != null, "via can't be null");
52 this.maxPendingMessagesTotalSize = maxPendingMessagesTotalSize == UdpConstants.Defaults.DefaultMaxPendingMessagesTotalSize ? UdpConstants.Defaults.MaxPendingMessagesTotalSize : maxPendingMessagesTotalSize;
53 this.Encoder = encoder;
54 this.Sockets = sockets;
55 this.BufferManager = bufferManager;
56 this.retransmitSettings = retransmissionSettings;
57 this.IsMulticast = isMulticast;
58 this.DuplicateDetector = null;
59 this.ReceiveManager = null;
60 this.OwnsBufferManager = false;
61 this.maxReceivedMessageSize = maxReceivedMessageSize;
62 this.LocalAddress = localAddress;
63 this.via = via;
66 public EndpointAddress LocalAddress
68 get;
69 private set;
72 public Uri Via
74 get { return this.via; }
77 int IUdpReceiveHandler.MaxReceivedMessageSize
79 get { return this.maxReceivedMessageSize; }
82 protected abstract bool IgnoreSerializationException { get; }
84 protected bool OwnsBufferManager { get; set; }
86 protected DuplicateMessageDetector DuplicateDetector { get; set; }
88 protected UdpSocketReceiveManager ReceiveManager { get; set; }
90 protected BufferManager BufferManager
92 get;
93 private set;
96 protected MessageEncoder Encoder
98 get;
99 private set;
102 protected bool IsMulticast
104 get;
105 private set;
108 protected UdpOutputChannel UdpOutputChannel { get; private set; }
110 protected UdpSocket[] Sockets
112 get;
113 private set;
116 [SuppressMessage("Microsoft.StyleCop.CSharp.ReadabilityRules", "SA1100:DoNotPrefixCallsWithBaseUnlessLocalImplementationExists", Justification = "StyleCop 4.5 does not validate this rule properly.")]
117 public override T GetProperty<T>()
119 if (typeof(T) == typeof(IDuplexChannel))
121 return (T)(object)this;
124 T outputChannelProperty = this.UdpOutputChannel.GetProperty<T>();
125 if (outputChannelProperty != null)
127 return outputChannelProperty;
130 T messageEncoderProperty = this.Encoder.GetProperty<T>();
131 if (messageEncoderProperty != null)
133 return messageEncoderProperty;
136 return base.GetProperty<T>();
139 // returns false if the message was dropped because the max pending message count was hit.
140 bool IUdpReceiveHandler.HandleDataReceived(ArraySegment<byte> data, EndPoint remoteEndpoint, int interfaceIndex, Action onMessageDequeuedCallback)
142 bool returnBuffer = true;
143 string messageHash = null;
144 Message message = null;
145 bool continueReceiving = true;
149 IPEndPoint remoteIPEndPoint = (IPEndPoint)remoteEndpoint;
151 message = UdpUtility.DecodeMessage(
152 this.DuplicateDetector,
153 this.Encoder,
154 this.BufferManager,
155 data,
156 remoteIPEndPoint,
157 interfaceIndex,
158 this.IgnoreSerializationException,
159 out messageHash);
161 if (message != null)
163 // We pass in the length of the message buffer instead of the length of the message to keep track of the amount of memory that's been allocated
164 continueReceiving = this.EnqueueMessage(message, data.Array.Length, onMessageDequeuedCallback);
165 returnBuffer = !continueReceiving;
168 catch (Exception e)
170 if (Fx.IsFatal(e))
172 returnBuffer = false;
173 throw;
176 this.HandleReceiveException(e);
178 finally
180 if (returnBuffer)
182 if (message != null)
184 if (this.DuplicateDetector != null)
186 Fx.Assert(messageHash != null, "message hash should always be available if duplicate detector is enabled");
187 this.DuplicateDetector.RemoveEntry(messageHash);
190 message.Close(); // implicitly returns the buffer
192 else
194 this.BufferManager.ReturnBuffer(data.Array);
199 return continueReceiving;
202 void IUdpReceiveHandler.HandleAsyncException(Exception ex)
204 this.HandleReceiveException(ex);
207 internal virtual void HandleReceiveException(Exception ex)
209 this.EnqueueAndDispatch(UdpUtility.WrapAsyncException(ex), null, false);
212 // Since ChannelListener and channel lifetimes can be different, we need a
213 // way to transfer the socketReceiveManager and DuplicateMessageDetection
214 // objects to the channel if the listener gets closed. If this method succeeds, then
215 // this also indicates that the bufferManager is no longer owned by the channel listener,
216 // so we have to clean that up also.
217 internal bool TransferReceiveManagerOwnership(UdpSocketReceiveManager socketReceiveManager, DuplicateMessageDetector duplicateDetector)
219 bool success = false;
220 if (this.State == CommunicationState.Opened)
222 lock (ThisLock)
224 if (this.State == CommunicationState.Opened)
226 Fx.Assert(this.ReceiveManager == null, "ReceiveManager is already set to a non-null value");
227 Fx.Assert(this.DuplicateDetector == null, "DuplicateDetector is already set to a non-null value");
229 this.ReceiveManager = socketReceiveManager;
230 this.OwnsBufferManager = true;
231 this.ReceiveManager.SetReceiveHandler(this);
232 this.DuplicateDetector = duplicateDetector;
233 success = true;
238 return success;
241 // returns false if the max pending messages total size was hit.
242 internal bool EnqueueMessage(Message message, int messageBufferSize, Action messageDequeuedCallback)
244 Action onMessageDequeuedCallback = () =>
246 lock (this.ThisLock)
248 this.pendingMessagesTotalSize -= messageBufferSize;
249 Fx.Assert(this.pendingMessagesTotalSize >= 0, "pendingMessagesTotalSize should not be negative.");
252 messageDequeuedCallback();
255 bool success = false;
256 lock (this.ThisLock)
258 if (this.pendingMessagesTotalSize + messageBufferSize <= this.maxPendingMessagesTotalSize)
260 message.Properties.Via = this.Via;
261 this.pendingMessagesTotalSize += messageBufferSize;
264 this.FinishEnqueueMessage(message, onMessageDequeuedCallback, false);
265 success = true;
267 finally
269 if (!success)
271 this.pendingMessagesTotalSize -= messageBufferSize;
275 else
277 if (TD.MaxPendingMessagesTotalSizeReachedIsEnabled())
279 string messageIdString = string.Empty;
280 if (message.Headers.MessageId != null)
282 messageIdString = string.Format(CultureInfo.CurrentCulture, "'{0}' ", message.Headers.MessageId.ToString());
285 EventTraceActivity eventTraceActivity = EventTraceActivityHelper.TryExtractActivity(message);
286 TD.MaxPendingMessagesTotalSizeReached(eventTraceActivity, messageIdString, this.maxPendingMessagesTotalSize, typeof(TransportBindingElement).FullName);
291 return success;
294 internal abstract void FinishEnqueueMessage(Message message, Action dequeuedCallback, bool canDispatchOnThisThread);
296 protected virtual void AddHeadersTo(Message message)
298 Fx.Assert(message != null, "Message can't be null");
300 if (message.Version.Addressing != AddressingVersion.None)
302 if (message.Headers.MessageId == null)
304 message.Headers.MessageId = new UniqueId();
307 else
309 if (this.retransmitSettings.Enabled == true)
311 // we should only get here if some channel above us starts producing messages that don't match the encoder's message version.
312 throw FxTrace.Exception.AsError(new ProtocolException(SR.RetransmissionRequiresAddressingOnMessage(message.Version.Addressing.ToString())));
317 // Closes the channel ungracefully during error conditions.
318 protected override void OnAbort()
320 this.Cleanup(true, TimeSpan.Zero);
323 protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
325 this.OnOpen(timeout);
326 return new CompletedAsyncResult(callback, state);
329 protected override void OnEndOpen(IAsyncResult result)
331 CompletedAsyncResult.End(result);
334 protected override void OnOpen(TimeSpan timeout)
336 this.UdpOutputChannel.Open();
339 protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
341 return new CloseAsyncResult<QueueItemType>(
342 this,
343 new ChainedBeginHandler(base.OnBeginClose),
344 new ChainedEndHandler(base.OnEndClose),
345 timeout,
346 callback,
347 state);
350 protected override void OnEndClose(IAsyncResult result)
352 CloseAsyncResult<QueueItemType>.End(result);
355 // Closes the channel gracefully during normal conditions.
356 protected override void OnClose(TimeSpan timeout)
358 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
359 this.Cleanup(false, timeoutHelper.RemainingTime());
360 base.OnClose(timeoutHelper.RemainingTime());
363 protected void SetOutputChannel(UdpOutputChannel udpOutputChannel)
365 Fx.Assert(this.UdpOutputChannel == null, "this.UdpOutputChannel must be null");
366 Fx.Assert(udpOutputChannel != null, "udpOutputChannel can't be null, since SetOutputChannel should be called only once");
368 this.UdpOutputChannel = udpOutputChannel;
371 // We're guaranteed by CommunicationObject that at most ONE of Close or BeginClose will be called once.
372 protected void Cleanup(bool aborting, TimeSpan timeout)
374 if (this.cleanedUp)
376 return;
379 lock (ThisLock)
381 if (this.cleanedUp)
383 return;
386 if (aborting)
388 this.UdpOutputChannel.Abort();
390 else
392 this.UdpOutputChannel.Close(timeout);
395 if (this.DuplicateDetector != null)
397 this.DuplicateDetector.Dispose();
400 if (this.ReceiveManager != null)
402 this.ReceiveManager.Close();
405 this.CleanupBufferManager();
407 this.cleanedUp = true;
411 private void CleanupBufferManager()
413 if (this.OwnsBufferManager)
415 this.BufferManager.Clear();
419 // Control flow for async path
420 // We use this mechanism to avoid initializing two async objects as logically cleanup+close is one operation.
421 // At any point in the Begin* methods, we may go async. The steps are:
422 // - Close inner UdpOutputChannel
423 // - Cleanup channel
424 // - Close channel
425 private class CloseAsyncResult<T> : AsyncResult
426 where T : class, IDisposable
428 private static AsyncCompletion completeCloseOutputChannelCallback = new AsyncCompletion(CompleteCloseOutputChannel);
429 private static AsyncCompletion completeBaseCloseCallback = new AsyncCompletion(CompleteBaseClose);
431 private UdpChannelBase<T> channel;
432 private TimeoutHelper timeoutHelper;
433 private ChainedBeginHandler baseBeginClose;
434 private ChainedEndHandler baseEndClose;
436 public CloseAsyncResult(UdpChannelBase<T> channel, ChainedBeginHandler baseBeginClose, ChainedEndHandler baseEndClose, TimeSpan timeout, AsyncCallback callback, object state)
437 : base(callback, state)
439 this.channel = channel;
440 this.baseBeginClose = baseBeginClose;
441 this.baseEndClose = baseEndClose;
442 this.timeoutHelper = new TimeoutHelper(timeout);
444 if (this.BeginCloseOutputChannel())
446 this.Complete(true);
450 public static void End(IAsyncResult result)
452 AsyncResult.End<CloseAsyncResult<T>>(result);
455 private static bool CompleteBaseClose(IAsyncResult result)
457 // AsyncResult.AsyncCompletionWrapperCallback takes care of catching exceptions for us.
458 CloseAsyncResult<T> thisPtr = (CloseAsyncResult<T>)result.AsyncState;
460 // We are completing the base class close operation at this point.
461 thisPtr.baseEndClose(result);
463 return true;
466 private static bool CompleteCloseOutputChannel(IAsyncResult result)
468 // AsyncResult.AsyncCompletionWrapperCallback takes care of catching exceptions for us.
469 CloseAsyncResult<T> thisPtr = (CloseAsyncResult<T>)result.AsyncState;
471 // We are completing the base class close operation at this point.
472 thisPtr.channel.UdpOutputChannel.EndClose(result);
474 thisPtr.channel.Cleanup(false, thisPtr.timeoutHelper.RemainingTime());
476 return thisPtr.BeginBaseClose();
479 private bool BeginCloseOutputChannel()
481 // AsyncResult.AsyncCompletionWrapperCallback takes care of catching the exceptions for us.
482 IAsyncResult result = this.channel.UdpOutputChannel.BeginClose(this.timeoutHelper.RemainingTime(), this.PrepareAsyncCompletion(completeCloseOutputChannelCallback), this);
484 // SyncContinue calls CompleteCloseOutputChannel for us in sync case.
485 return this.SyncContinue(result);
488 private bool BeginBaseClose()
490 // AsyncResult.AsyncCompletionWrapperCallback takes care of catching the exceptions for us.
491 IAsyncResult result = this.baseBeginClose(this.timeoutHelper.RemainingTime(), this.PrepareAsyncCompletion(completeBaseCloseCallback), this);
493 // SyncContinue calls CompleteBaseClose for us in sync case.
494 return this.SyncContinue(result);