[corlib] CoreRT System.Threading.Tasks (#6672)
[mono-project.git] / mcs / class / referencesource / mscorlib / system / io / stream.cs
blobf1c73c6adbb43849a9aeae76e26d016b4118df11
1 // ==++==
2 //
3 // Copyright (c) Microsoft Corporation. All rights reserved.
4 //
5 // ==--==
6 /*============================================================
7 **
8 ** Class: Stream
9 **
10 ** <OWNER>gpaperin</OWNER>
13 ** Purpose: Abstract base class for all Streams. Provides
14 ** default implementations of asynchronous reads & writes, in
15 ** terms of the synchronous reads & writes (and vice versa).
18 ===========================================================*/
19 using System;
20 using System.Threading;
21 #if FEATURE_ASYNC_IO
22 using System.Threading.Tasks;
23 #endif
25 using System.Runtime;
26 using System.Runtime.InteropServices;
27 #if NEW_EXPERIMENTAL_ASYNC_IO
28 using System.Runtime.CompilerServices;
29 #endif
30 using System.Runtime.ExceptionServices;
31 using System.Security;
32 using System.Security.Permissions;
33 using System.Diagnostics.Contracts;
34 using System.Reflection;
36 namespace System.IO {
37 [Serializable]
38 [ComVisible(true)]
39 #if CONTRACTS_FULL
40 [ContractClass(typeof(StreamContract))]
41 #endif
42 #if FEATURE_REMOTING || MONO
43 public abstract partial class Stream : MarshalByRefObject, IDisposable {
44 #else // FEATURE_REMOTING
45 public abstract class Stream : IDisposable {
46 #endif // FEATURE_REMOTING
48 public static readonly Stream Null = new NullStream();
50 //We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
51 // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
52 // improvement in Copy performance.
53 private const int _DefaultCopyBufferSize = 81920;
55 #if NEW_EXPERIMENTAL_ASYNC_IO
56 // To implement Async IO operations on streams that don't support async IO
58 [NonSerialized]
59 private ReadWriteTask _activeReadWriteTask;
60 [NonSerialized]
61 private SemaphoreSlim _asyncActiveSemaphore;
63 internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
65 // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
66 // WaitHandle, we don't need to worry about Disposing it.
67 return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
69 #endif
71 public abstract bool CanRead {
72 [Pure]
73 get;
76 // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
77 public abstract bool CanSeek {
78 [Pure]
79 get;
82 [ComVisible(false)]
83 public virtual bool CanTimeout {
84 [Pure]
85 get {
86 return false;
90 public abstract bool CanWrite {
91 [Pure]
92 get;
95 public abstract long Length {
96 get;
99 public abstract long Position {
100 get;
101 set;
104 [ComVisible(false)]
105 public virtual int ReadTimeout {
106 get {
107 Contract.Ensures(Contract.Result<int>() >= 0);
108 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
110 set {
111 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
115 [ComVisible(false)]
116 public virtual int WriteTimeout {
117 get {
118 Contract.Ensures(Contract.Result<int>() >= 0);
119 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
121 set {
122 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
126 #if FEATURE_ASYNC_IO
127 [HostProtection(ExternalThreading = true)]
128 [ComVisible(false)]
129 public Task CopyToAsync(Stream destination)
131 return CopyToAsync(destination, _DefaultCopyBufferSize);
134 [HostProtection(ExternalThreading = true)]
135 [ComVisible(false)]
136 public Task CopyToAsync(Stream destination, Int32 bufferSize)
138 return CopyToAsync(destination, bufferSize, CancellationToken.None);
141 [HostProtection(ExternalThreading = true)]
142 [ComVisible(false)]
143 public virtual Task CopyToAsync(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
145 if (destination == null)
146 throw new ArgumentNullException("destination");
147 if (bufferSize <= 0)
148 throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
149 if (!CanRead && !CanWrite)
150 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
151 if (!destination.CanRead && !destination.CanWrite)
152 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
153 if (!CanRead)
154 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
155 if (!destination.CanWrite)
156 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
157 Contract.EndContractBlock();
159 return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
162 private async Task CopyToAsyncInternal(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
164 Contract.Requires(destination != null);
165 Contract.Requires(bufferSize > 0);
166 Contract.Requires(CanRead);
167 Contract.Requires(destination.CanWrite);
169 byte[] buffer = new byte[bufferSize];
170 int bytesRead;
171 while ((bytesRead = await ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
173 await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
176 #endif // FEATURE_ASYNC_IO
178 // Reads the bytes from the current stream and writes the bytes to
179 // the destination stream until all bytes are read, starting at
180 // the current position.
181 public void CopyTo(Stream destination)
183 if (destination == null)
184 throw new ArgumentNullException("destination");
185 if (!CanRead && !CanWrite)
186 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
187 if (!destination.CanRead && !destination.CanWrite)
188 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
189 if (!CanRead)
190 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
191 if (!destination.CanWrite)
192 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
193 Contract.EndContractBlock();
195 InternalCopyTo(destination, _DefaultCopyBufferSize);
198 #if MONO
199 virtual
200 #endif
201 public void CopyTo(Stream destination, int bufferSize)
203 if (destination == null)
204 throw new ArgumentNullException("destination");
205 if (bufferSize <= 0)
206 throw new ArgumentOutOfRangeException("bufferSize",
207 Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
208 if (!CanRead && !CanWrite)
209 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
210 if (!destination.CanRead && !destination.CanWrite)
211 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
212 if (!CanRead)
213 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
214 if (!destination.CanWrite)
215 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
216 Contract.EndContractBlock();
218 InternalCopyTo(destination, bufferSize);
221 private void InternalCopyTo(Stream destination, int bufferSize)
223 Contract.Requires(destination != null);
224 Contract.Requires(CanRead);
225 Contract.Requires(destination.CanWrite);
226 Contract.Requires(bufferSize > 0);
228 byte[] buffer = new byte[bufferSize];
229 int read;
230 while ((read = Read(buffer, 0, buffer.Length)) != 0)
231 destination.Write(buffer, 0, read);
235 // Stream used to require that all cleanup logic went into Close(),
236 // which was thought up before we invented IDisposable. However, we
237 // need to follow the IDisposable pattern so that users can write
238 // sensible subclasses without needing to inspect all their base
239 // classes, and without worrying about version brittleness, from a
240 // base class switching to the Dispose pattern. We're moving
241 // Stream to the Dispose(bool) pattern - that's where all subclasses
242 // should put their cleanup starting in V2.
243 public virtual void Close()
245 /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
246 Contract.Ensures(CanRead == false);
247 Contract.Ensures(CanWrite == false);
248 Contract.Ensures(CanSeek == false);
251 Dispose(true);
252 GC.SuppressFinalize(this);
255 public void Dispose()
257 /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
258 Contract.Ensures(CanRead == false);
259 Contract.Ensures(CanWrite == false);
260 Contract.Ensures(CanSeek == false);
263 Close();
267 protected virtual void Dispose(bool disposing)
269 // Note: Never change this to call other virtual methods on Stream
270 // like Write, since the state on subclasses has already been
271 // torn down. This is the last code to run on cleanup for a stream.
274 public abstract void Flush();
276 #if FEATURE_ASYNC_IO
277 [HostProtection(ExternalThreading=true)]
278 [ComVisible(false)]
279 public Task FlushAsync()
281 return FlushAsync(CancellationToken.None);
284 [HostProtection(ExternalThreading=true)]
285 [ComVisible(false)]
286 public virtual Task FlushAsync(CancellationToken cancellationToken)
288 return Task.Factory.StartNew(state => ((Stream)state).Flush(), this,
289 cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
291 #endif // FEATURE_ASYNC_IO
293 [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
294 protected virtual WaitHandle CreateWaitHandle()
296 Contract.Ensures(Contract.Result<WaitHandle>() != null);
297 return new ManualResetEvent(false);
300 [HostProtection(ExternalThreading=true)]
301 public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
303 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
304 return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
307 [HostProtection(ExternalThreading = true)]
308 internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
310 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
311 if (!CanRead) __Error.ReadNotSupported();
313 #if !NEW_EXPERIMENTAL_ASYNC_IO
314 return BlockingBeginRead(buffer, offset, count, callback, state);
315 #else
317 // Mango did not do Async IO.
318 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
320 return BlockingBeginRead(buffer, offset, count, callback, state);
323 // To avoid a race with a stream's position pointer & generating ----
324 // conditions with internal buffer indexes in our own streams that
325 // don't natively support async IO operations when there are multiple
326 // async requests outstanding, we will block the application's main
327 // thread if it does a second IO request until the first one completes.
328 var semaphore = EnsureAsyncActiveSemaphoreInitialized();
329 Task semaphoreTask = null;
330 if (serializeAsynchronously)
332 semaphoreTask = semaphore.WaitAsync();
334 else
336 semaphore.Wait();
339 // Create the task to asynchronously do a Read. This task serves both
340 // as the asynchronous work item and as the IAsyncResult returned to the user.
341 var asyncResult = new ReadWriteTask(true /*isRead*/, delegate
343 // The ReadWriteTask stores all of the parameters to pass to Read.
344 // As we're currently inside of it, we can get the current task
345 // and grab the parameters from it.
346 var thisTask = Task.InternalCurrent as ReadWriteTask;
347 Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
349 // Do the Read and return the number of bytes read
350 var bytesRead = thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
351 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
352 return bytesRead;
353 }, state, this, buffer, offset, count, callback);
355 // Schedule it
356 if (semaphoreTask != null)
357 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
358 else
359 RunReadWriteTask(asyncResult);
362 return asyncResult; // return it
363 #endif
366 public virtual int EndRead(IAsyncResult asyncResult)
368 if (asyncResult == null)
369 throw new ArgumentNullException("asyncResult");
370 Contract.Ensures(Contract.Result<int>() >= 0);
371 Contract.EndContractBlock();
373 #if !NEW_EXPERIMENTAL_ASYNC_IO
374 return BlockingEndRead(asyncResult);
375 #else
376 // Mango did not do async IO.
377 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
379 return BlockingEndRead(asyncResult);
382 var readTask = _activeReadWriteTask;
384 if (readTask == null)
386 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
388 else if (readTask != asyncResult)
390 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
392 else if (!readTask._isRead)
394 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
397 try
399 return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
401 finally
403 _activeReadWriteTask = null;
404 Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
405 _asyncActiveSemaphore.Release();
407 #endif
410 #if FEATURE_ASYNC_IO
411 [HostProtection(ExternalThreading = true)]
412 [ComVisible(false)]
413 public Task<int> ReadAsync(Byte[] buffer, int offset, int count)
415 return ReadAsync(buffer, offset, count, CancellationToken.None);
418 [HostProtection(ExternalThreading = true)]
419 [ComVisible(false)]
420 public virtual Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
422 // If cancellation was requested, bail early with an already completed task.
423 // Otherwise, return a task that represents the Begin/End methods.
424 return cancellationToken.IsCancellationRequested
425 ? Task.FromCancellation<int>(cancellationToken)
426 : BeginEndReadAsync(buffer, offset, count);
429 private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
431 return TaskFactory<Int32>.FromAsyncTrim(
432 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
433 (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
434 (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
437 private struct ReadWriteParameters // struct for arguments to Read and Write calls
439 internal byte[] Buffer;
440 internal int Offset;
441 internal int Count;
443 #endif //FEATURE_ASYNC_IO
447 [HostProtection(ExternalThreading=true)]
448 public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
450 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
451 return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
454 [HostProtection(ExternalThreading = true)]
455 internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
457 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
458 if (!CanWrite) __Error.WriteNotSupported();
459 #if !NEW_EXPERIMENTAL_ASYNC_IO
460 return BlockingBeginWrite(buffer, offset, count, callback, state);
461 #else
463 // Mango did not do Async IO.
464 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
466 return BlockingBeginWrite(buffer, offset, count, callback, state);
469 // To avoid a race with a stream's position pointer & generating ----
470 // conditions with internal buffer indexes in our own streams that
471 // don't natively support async IO operations when there are multiple
472 // async requests outstanding, we will block the application's main
473 // thread if it does a second IO request until the first one completes.
474 var semaphore = EnsureAsyncActiveSemaphoreInitialized();
475 Task semaphoreTask = null;
476 if (serializeAsynchronously)
478 semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
480 else
482 semaphore.Wait(); // synchronously wait here
485 // Create the task to asynchronously do a Write. This task serves both
486 // as the asynchronous work item and as the IAsyncResult returned to the user.
487 var asyncResult = new ReadWriteTask(false /*isRead*/, delegate
489 // The ReadWriteTask stores all of the parameters to pass to Write.
490 // As we're currently inside of it, we can get the current task
491 // and grab the parameters from it.
492 var thisTask = Task.InternalCurrent as ReadWriteTask;
493 Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
495 // Do the Write
496 thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
497 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
498 return 0; // not used, but signature requires a value be returned
499 }, state, this, buffer, offset, count, callback);
501 // Schedule it
502 if (semaphoreTask != null)
503 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
504 else
505 RunReadWriteTask(asyncResult);
507 return asyncResult; // return it
508 #endif
511 #if NEW_EXPERIMENTAL_ASYNC_IO
512 private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
514 Contract.Assert(readWriteTask != null); // Should be Contract.Requires, but CCRewrite is doing a poor job with
515 // preconditions in async methods that await. Mike & Manuel are aware. (10/6/2011, bug 290222)
516 Contract.Assert(asyncWaiter != null); // Ditto
518 // If the wait has already complete, run the task.
519 if (asyncWaiter.IsCompleted)
521 Contract.Assert(asyncWaiter.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
522 RunReadWriteTask(readWriteTask);
524 else // Otherwise, wait for our turn, and then run the task.
526 asyncWaiter.ContinueWith((t, state) =>
528 Contract.Assert(t.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
529 var tuple = (Tuple<Stream,ReadWriteTask>)state;
530 tuple.Item1.RunReadWriteTask(tuple.Item2); // RunReadWriteTask(readWriteTask);
531 }, Tuple.Create<Stream,ReadWriteTask>(this, readWriteTask),
532 default(CancellationToken),
533 TaskContinuationOptions.ExecuteSynchronously,
534 TaskScheduler.Default);
538 private void RunReadWriteTask(ReadWriteTask readWriteTask)
540 Contract.Requires(readWriteTask != null);
541 Contract.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
543 // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
544 // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
545 // two interlocked operations. However, if ReadWriteTask is ever changed to use
546 // a cancellation token, this should be changed to use Start.
547 _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
548 readWriteTask.m_taskScheduler = TaskScheduler.Default;
549 readWriteTask.ScheduleAndStart(needsProtection: false);
551 #endif
553 public virtual void EndWrite(IAsyncResult asyncResult)
555 if (asyncResult==null)
556 throw new ArgumentNullException("asyncResult");
557 Contract.EndContractBlock();
559 #if !NEW_EXPERIMENTAL_ASYNC_IO
560 BlockingEndWrite(asyncResult);
561 #else
563 // Mango did not do Async IO.
564 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
566 BlockingEndWrite(asyncResult);
567 return;
570 var writeTask = _activeReadWriteTask;
571 if (writeTask == null)
573 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
575 else if (writeTask != asyncResult)
577 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
579 else if (writeTask._isRead)
581 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
584 try
586 writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
587 Contract.Assert(writeTask.Status == TaskStatus.RanToCompletion);
589 finally
591 _activeReadWriteTask = null;
592 Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
593 _asyncActiveSemaphore.Release();
595 #endif
598 #if NEW_EXPERIMENTAL_ASYNC_IO
599 // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
600 // A single instance of this task serves four purposes:
601 // 1. The work item scheduled to run the Read / Write operation
602 // 2. The state holding the arguments to be passed to Read / Write
603 // 3. The IAsyncResult returned from BeginRead / BeginWrite
604 // 4. The completion action that runs to invoke the user-provided callback.
605 // This last item is a bit tricky. Before the AsyncCallback is invoked, the
606 // IAsyncResult must have completed, so we can't just invoke the handler
607 // from within the task, since it is the IAsyncResult, and thus it's not
608 // yet completed. Instead, we use AddCompletionAction to install this
609 // task as its own completion handler. That saves the need to allocate
610 // a separate completion handler, it guarantees that the task will
611 // have completed by the time the handler is invoked, and it allows
612 // the handler to be invoked synchronously upon the completion of the
613 // task. This all enables BeginRead / BeginWrite to be implemented
614 // with a single allocation.
615 private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
617 internal readonly bool _isRead;
618 internal Stream _stream;
619 internal byte [] _buffer;
620 internal int _offset;
621 internal int _count;
622 private AsyncCallback _callback;
623 private ExecutionContext _context;
625 internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
627 _stream = null;
628 _buffer = null;
631 [SecuritySafeCritical] // necessary for EC.Capture
632 [MethodImpl(MethodImplOptions.NoInlining)]
633 public ReadWriteTask(
634 bool isRead,
635 Func<object,int> function, object state,
636 Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) :
637 base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
639 Contract.Requires(function != null);
640 Contract.Requires(stream != null);
641 Contract.Requires(buffer != null);
642 Contract.EndContractBlock();
644 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
646 // Store the arguments
647 _isRead = isRead;
648 _stream = stream;
649 _buffer = buffer;
650 _offset = offset;
651 _count = count;
653 // If a callback was provided, we need to:
654 // - Store the user-provided handler
655 // - Capture an ExecutionContext under which to invoke the handler
656 // - Add this task as its own completion handler so that the Invoke method
657 // will run the callback when this task completes.
658 if (callback != null)
660 _callback = callback;
661 _context = ExecutionContext.Capture(ref stackMark,
662 ExecutionContext.CaptureOptions.OptimizeDefaultCase | ExecutionContext.CaptureOptions.IgnoreSyncCtx);
663 base.AddCompletionAction(this);
667 [SecurityCritical] // necessary for CoreCLR
668 private static void InvokeAsyncCallback(object completedTask)
670 var rwc = (ReadWriteTask)completedTask;
671 var callback = rwc._callback;
672 rwc._callback = null;
673 callback(rwc);
676 [SecurityCritical] // necessary for CoreCLR
677 private static ContextCallback s_invokeAsyncCallback;
679 [SecuritySafeCritical] // necessary for ExecutionContext.Run
680 void ITaskCompletionAction.Invoke(Task completingTask)
682 // Get the ExecutionContext. If there is none, just run the callback
683 // directly, passing in the completed task as the IAsyncResult.
684 // If there is one, process it with ExecutionContext.Run.
685 var context = _context;
686 if (context == null)
688 var callback = _callback;
689 _callback = null;
690 callback(completingTask);
692 else
694 _context = null;
696 var invokeAsyncCallback = s_invokeAsyncCallback;
697 if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign ----
699 using(context) ExecutionContext.Run(context, invokeAsyncCallback, this, true);
703 public bool InvokeMayRunArbitraryCode => true;
705 #endif
707 #if FEATURE_ASYNC_IO
708 [HostProtection(ExternalThreading = true)]
709 [ComVisible(false)]
710 public Task WriteAsync(Byte[] buffer, int offset, int count)
712 return WriteAsync(buffer, offset, count, CancellationToken.None);
715 [HostProtection(ExternalThreading = true)]
716 [ComVisible(false)]
717 public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
719 // If cancellation was requested, bail early with an already completed task.
720 // Otherwise, return a task that represents the Begin/End methods.
721 return cancellationToken.IsCancellationRequested
722 ? Task.FromCancellation(cancellationToken)
723 : BeginEndWriteAsync(buffer, offset, count);
727 private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
729 return TaskFactory<VoidTaskResult>.FromAsyncTrim(
730 this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count },
731 (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
732 (stream, asyncResult) => // cached by compiler
734 stream.EndWrite(asyncResult);
735 return default(VoidTaskResult);
738 #endif // FEATURE_ASYNC_IO
740 public abstract long Seek(long offset, SeekOrigin origin);
742 public abstract void SetLength(long value);
744 public abstract int Read([In, Out] byte[] buffer, int offset, int count);
746 // Reads one byte from the stream by calling Read(byte[], int, int).
747 // Will return an unsigned byte cast to an int or -1 on end of stream.
748 // This implementation does not perform well because it allocates a new
749 // byte[] each time you call it, and should be overridden by any
750 // subclass that maintains an internal buffer. Then, it can help perf
751 // significantly for people who are reading one byte at a time.
752 public virtual int ReadByte()
754 Contract.Ensures(Contract.Result<int>() >= -1);
755 Contract.Ensures(Contract.Result<int>() < 256);
757 byte[] oneByteArray = new byte[1];
758 int r = Read(oneByteArray, 0, 1);
759 if (r==0)
760 return -1;
761 return oneByteArray[0];
764 public abstract void Write(byte[] buffer, int offset, int count);
766 // Writes one byte from the stream by calling Write(byte[], int, int).
767 // This implementation does not perform well because it allocates a new
768 // byte[] each time you call it, and should be overridden by any
769 // subclass that maintains an internal buffer. Then, it can help perf
770 // significantly for people who are writing one byte at a time.
771 public virtual void WriteByte(byte value)
773 byte[] oneByteArray = new byte[1];
774 oneByteArray[0] = value;
775 Write(oneByteArray, 0, 1);
778 [HostProtection(Synchronization=true)]
779 public static Stream Synchronized(Stream stream)
781 if (stream==null)
782 throw new ArgumentNullException("stream");
783 Contract.Ensures(Contract.Result<Stream>() != null);
784 Contract.EndContractBlock();
785 if (stream is SyncStream)
786 return stream;
788 return new SyncStream(stream);
791 #if !FEATURE_PAL || MONO // This method shouldn't have been exposed in Dev10 (we revised object invariants after locking down).
792 [Obsolete("Do not call or override this method.")]
793 protected virtual void ObjectInvariant()
796 #endif
798 internal IAsyncResult BlockingBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
800 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
802 // To avoid a race with a stream's position pointer & generating ----
803 // conditions with internal buffer indexes in our own streams that
804 // don't natively support async IO operations when there are multiple
805 // async requests outstanding, we will block the application's main
806 // thread and do the IO synchronously.
807 // This can't perform well - use a different approach.
808 SynchronousAsyncResult asyncResult;
809 try {
810 int numRead = Read(buffer, offset, count);
811 asyncResult = new SynchronousAsyncResult(numRead, state);
813 catch (IOException ex) {
814 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: false);
817 if (callback != null) {
818 callback(asyncResult);
821 return asyncResult;
824 internal static int BlockingEndRead(IAsyncResult asyncResult)
826 Contract.Ensures(Contract.Result<int>() >= 0);
828 return SynchronousAsyncResult.EndRead(asyncResult);
831 internal IAsyncResult BlockingBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
833 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
835 // To avoid a race with a stream's position pointer & generating ----
836 // conditions with internal buffer indexes in our own streams that
837 // don't natively support async IO operations when there are multiple
838 // async requests outstanding, we will block the application's main
839 // thread and do the IO synchronously.
840 // This can't perform well - use a different approach.
841 SynchronousAsyncResult asyncResult;
842 try {
843 Write(buffer, offset, count);
844 asyncResult = new SynchronousAsyncResult(state);
846 catch (IOException ex) {
847 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: true);
850 if (callback != null) {
851 callback(asyncResult);
854 return asyncResult;
857 internal static void BlockingEndWrite(IAsyncResult asyncResult)
859 SynchronousAsyncResult.EndWrite(asyncResult);
862 [Serializable]
863 private sealed class NullStream : Stream
865 internal NullStream() {}
867 public override bool CanRead {
868 [Pure]
869 get { return true; }
872 public override bool CanWrite {
873 [Pure]
874 get { return true; }
877 public override bool CanSeek {
878 [Pure]
879 get { return true; }
882 public override long Length {
883 get { return 0; }
886 public override long Position {
887 get { return 0; }
888 set {}
891 protected override void Dispose(bool disposing)
893 // Do nothing - we don't want NullStream singleton (static) to be closable
896 public override void Flush()
900 #if FEATURE_ASYNC_IO
901 [ComVisible(false)]
902 public override Task FlushAsync(CancellationToken cancellationToken)
904 return cancellationToken.IsCancellationRequested ?
905 Task.FromCancellation(cancellationToken) :
906 Task.CompletedTask;
908 #endif // FEATURE_ASYNC_IO
910 [HostProtection(ExternalThreading = true)]
911 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
913 if (!CanRead) __Error.ReadNotSupported();
915 return BlockingBeginRead(buffer, offset, count, callback, state);
918 public override int EndRead(IAsyncResult asyncResult)
920 if (asyncResult == null)
921 throw new ArgumentNullException("asyncResult");
922 Contract.EndContractBlock();
924 return BlockingEndRead(asyncResult);
927 [HostProtection(ExternalThreading = true)]
928 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
930 if (!CanWrite) __Error.WriteNotSupported();
932 return BlockingBeginWrite(buffer, offset, count, callback, state);
935 public override void EndWrite(IAsyncResult asyncResult)
937 if (asyncResult == null)
938 throw new ArgumentNullException("asyncResult");
939 Contract.EndContractBlock();
941 BlockingEndWrite(asyncResult);
944 public override int Read([In, Out] byte[] buffer, int offset, int count)
946 return 0;
949 #if FEATURE_ASYNC_IO
950 [ComVisible(false)]
951 public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
953 var nullReadTask = s_nullReadTask;
954 if (nullReadTask == null)
955 s_nullReadTask = nullReadTask = new Task<int>(false, 0, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, CancellationToken.None); // benign ----
956 return nullReadTask;
958 private static Task<int> s_nullReadTask;
959 #endif //FEATURE_ASYNC_IO
961 public override int ReadByte()
963 return -1;
966 public override void Write(byte[] buffer, int offset, int count)
970 #if FEATURE_ASYNC_IO
971 [ComVisible(false)]
972 public override Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
974 return cancellationToken.IsCancellationRequested ?
975 Task.FromCancellation(cancellationToken) :
976 Task.CompletedTask;
978 #endif // FEATURE_ASYNC_IO
980 public override void WriteByte(byte value)
984 public override long Seek(long offset, SeekOrigin origin)
986 return 0;
989 public override void SetLength(long length)
995 /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
996 internal sealed class SynchronousAsyncResult : IAsyncResult {
998 private readonly Object _stateObject;
999 private readonly bool _isWrite;
1000 private ManualResetEvent _waitHandle;
1001 private ExceptionDispatchInfo _exceptionInfo;
1003 private bool _endXxxCalled;
1004 private Int32 _bytesRead;
1006 internal SynchronousAsyncResult(Int32 bytesRead, Object asyncStateObject) {
1007 _bytesRead = bytesRead;
1008 _stateObject = asyncStateObject;
1009 //_isWrite = false;
1012 internal SynchronousAsyncResult(Object asyncStateObject) {
1013 _stateObject = asyncStateObject;
1014 _isWrite = true;
1017 internal SynchronousAsyncResult(Exception ex, Object asyncStateObject, bool isWrite) {
1018 _exceptionInfo = ExceptionDispatchInfo.Capture(ex);
1019 _stateObject = asyncStateObject;
1020 _isWrite = isWrite;
1023 public bool IsCompleted {
1024 // We never hand out objects of this type to the user before the synchronous IO completed:
1025 get { return true; }
1028 public WaitHandle AsyncWaitHandle {
1029 get {
1030 return LazyInitializer.EnsureInitialized(ref _waitHandle, () => new ManualResetEvent(true));
1034 public Object AsyncState {
1035 get { return _stateObject; }
1038 public bool CompletedSynchronously {
1039 get { return true; }
1042 internal void ThrowIfError() {
1043 if (_exceptionInfo != null)
1044 _exceptionInfo.Throw();
1047 internal static Int32 EndRead(IAsyncResult asyncResult) {
1049 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1050 if (ar == null || ar._isWrite)
1051 __Error.WrongAsyncResult();
1053 if (ar._endXxxCalled)
1054 __Error.EndReadCalledTwice();
1056 ar._endXxxCalled = true;
1058 ar.ThrowIfError();
1059 return ar._bytesRead;
1062 internal static void EndWrite(IAsyncResult asyncResult) {
1064 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1065 if (ar == null || !ar._isWrite)
1066 __Error.WrongAsyncResult();
1068 if (ar._endXxxCalled)
1069 __Error.EndWriteCalledTwice();
1071 ar._endXxxCalled = true;
1073 ar.ThrowIfError();
1075 } // class SynchronousAsyncResult
1078 // SyncStream is a wrapper around a stream that takes
1079 // a lock for every operation making it thread safe.
1080 [Serializable]
1081 internal sealed class SyncStream : Stream, IDisposable
1083 private Stream _stream;
1084 [NonSerialized]
1085 private bool? _overridesBeginRead;
1086 [NonSerialized]
1087 private bool? _overridesBeginWrite;
1089 internal SyncStream(Stream stream)
1091 if (stream == null)
1092 throw new ArgumentNullException("stream");
1093 Contract.EndContractBlock();
1094 _stream = stream;
1097 public override bool CanRead {
1098 [Pure]
1099 get { return _stream.CanRead; }
1102 public override bool CanWrite {
1103 [Pure]
1104 get { return _stream.CanWrite; }
1107 public override bool CanSeek {
1108 [Pure]
1109 get { return _stream.CanSeek; }
1112 [ComVisible(false)]
1113 public override bool CanTimeout {
1114 [Pure]
1115 get {
1116 return _stream.CanTimeout;
1120 public override long Length {
1121 get {
1122 lock(_stream) {
1123 return _stream.Length;
1128 public override long Position {
1129 get {
1130 lock(_stream) {
1131 return _stream.Position;
1134 set {
1135 lock(_stream) {
1136 _stream.Position = value;
1141 [ComVisible(false)]
1142 public override int ReadTimeout {
1143 get {
1144 return _stream.ReadTimeout;
1146 set {
1147 _stream.ReadTimeout = value;
1151 [ComVisible(false)]
1152 public override int WriteTimeout {
1153 get {
1154 return _stream.WriteTimeout;
1156 set {
1157 _stream.WriteTimeout = value;
1161 // In the off chance that some wrapped stream has different
1162 // semantics for Close vs. Dispose, let's preserve that.
1163 public override void Close()
1165 lock(_stream) {
1166 try {
1167 _stream.Close();
1169 finally {
1170 base.Dispose(true);
1175 protected override void Dispose(bool disposing)
1177 lock(_stream) {
1178 try {
1179 // Explicitly pick up a potentially methodimpl'ed Dispose
1180 if (disposing)
1181 ((IDisposable)_stream).Dispose();
1183 finally {
1184 base.Dispose(disposing);
1189 public override void Flush()
1191 lock(_stream)
1192 _stream.Flush();
1195 public override int Read([In, Out]byte[] bytes, int offset, int count)
1197 lock(_stream)
1198 return _stream.Read(bytes, offset, count);
1201 public override int ReadByte()
1203 lock(_stream)
1204 return _stream.ReadByte();
1207 private static bool OverridesBeginMethod(Stream stream, string methodName)
1209 Contract.Requires(stream != null, "Expected a non-null stream.");
1210 Contract.Requires(methodName == "BeginRead" || methodName == "BeginWrite",
1211 "Expected BeginRead or BeginWrite as the method name to check.");
1213 // Get all of the methods on the underlying stream
1214 var methods = stream.GetType().GetMethods(BindingFlags.Public | BindingFlags.Instance);
1216 // If any of the methods have the desired name and are defined on the base Stream
1217 // Type, then the method was not overridden. If none of them were defined on the
1218 // base Stream, then it must have been overridden.
1219 foreach (var method in methods)
1221 if (method.DeclaringType == typeof(Stream) &&
1222 method.Name == methodName)
1224 return false;
1227 return true;
1230 [HostProtection(ExternalThreading=true)]
1231 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1233 // Lazily-initialize whether the wrapped stream overrides BeginRead
1234 if (_overridesBeginRead == null)
1236 _overridesBeginRead = OverridesBeginMethod(_stream, "BeginRead");
1239 lock (_stream)
1241 // If the Stream does have its own BeginRead implementation, then we must use that override.
1242 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1243 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1244 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1245 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1246 // _stream due to this call blocked while holding the lock.
1247 return _overridesBeginRead.Value ?
1248 _stream.BeginRead(buffer, offset, count, callback, state) :
1249 _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
1253 public override int EndRead(IAsyncResult asyncResult)
1255 if (asyncResult == null)
1256 throw new ArgumentNullException("asyncResult");
1257 Contract.Ensures(Contract.Result<int>() >= 0);
1258 Contract.EndContractBlock();
1260 lock(_stream)
1261 return _stream.EndRead(asyncResult);
1264 public override long Seek(long offset, SeekOrigin origin)
1266 lock(_stream)
1267 return _stream.Seek(offset, origin);
1270 public override void SetLength(long length)
1272 lock(_stream)
1273 _stream.SetLength(length);
1276 public override void Write(byte[] bytes, int offset, int count)
1278 lock(_stream)
1279 _stream.Write(bytes, offset, count);
1282 public override void WriteByte(byte b)
1284 lock(_stream)
1285 _stream.WriteByte(b);
1288 [HostProtection(ExternalThreading=true)]
1289 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1291 // Lazily-initialize whether the wrapped stream overrides BeginWrite
1292 if (_overridesBeginWrite == null)
1294 _overridesBeginWrite = OverridesBeginMethod(_stream, "BeginWrite");
1297 lock (_stream)
1299 // If the Stream does have its own BeginWrite implementation, then we must use that override.
1300 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1301 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1302 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1303 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1304 // _stream due to this call blocked while holding the lock.
1305 return _overridesBeginWrite.Value ?
1306 _stream.BeginWrite(buffer, offset, count, callback, state) :
1307 _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
1311 public override void EndWrite(IAsyncResult asyncResult)
1313 if (asyncResult == null)
1314 throw new ArgumentNullException("asyncResult");
1315 Contract.EndContractBlock();
1317 lock(_stream)
1318 _stream.EndWrite(asyncResult);
1323 #if CONTRACTS_FULL
1324 [ContractClassFor(typeof(Stream))]
1325 internal abstract class StreamContract : Stream
1327 public override long Seek(long offset, SeekOrigin origin)
1329 Contract.Ensures(Contract.Result<long>() >= 0);
1330 throw new NotImplementedException();
1333 public override void SetLength(long value)
1335 throw new NotImplementedException();
1338 public override int Read(byte[] buffer, int offset, int count)
1340 Contract.Ensures(Contract.Result<int>() >= 0);
1341 Contract.Ensures(Contract.Result<int>() <= count);
1342 throw new NotImplementedException();
1345 public override void Write(byte[] buffer, int offset, int count)
1347 throw new NotImplementedException();
1350 public override long Position {
1351 get {
1352 Contract.Ensures(Contract.Result<long>() >= 0);
1353 throw new NotImplementedException();
1355 set {
1356 throw new NotImplementedException();
1360 public override void Flush()
1362 throw new NotImplementedException();
1365 public override bool CanRead {
1366 get { throw new NotImplementedException(); }
1369 public override bool CanWrite {
1370 get { throw new NotImplementedException(); }
1373 public override bool CanSeek {
1374 get { throw new NotImplementedException(); }
1377 public override long Length
1379 get {
1380 Contract.Ensures(Contract.Result<long>() >= 0);
1381 throw new NotImplementedException();
1385 #endif // CONTRACTS_FULL