3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 /*============================================================
10 ** <OWNER>gpaperin</OWNER>
13 ** Purpose: Abstract base class for all Streams. Provides
14 ** default implementations of asynchronous reads & writes, in
15 ** terms of the synchronous reads & writes (and vice versa).
18 ===========================================================*/
20 using System
.Threading
;
22 using System
.Threading
.Tasks
;
26 using System
.Runtime
.InteropServices
;
27 #if NEW_EXPERIMENTAL_ASYNC_IO
28 using System
.Runtime
.CompilerServices
;
30 using System
.Runtime
.ExceptionServices
;
31 using System
.Security
;
32 using System
.Security
.Permissions
;
33 using System
.Diagnostics
.Contracts
;
34 using System
.Reflection
;
40 [ContractClass(typeof(StreamContract
))]
42 #if FEATURE_REMOTING || MONO
43 public abstract partial class Stream
: MarshalByRefObject
, IDisposable
{
44 #else // FEATURE_REMOTING
45 public abstract class Stream
: IDisposable
{
46 #endif // FEATURE_REMOTING
48 public static readonly Stream Null
= new NullStream();
50 //We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
51 // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
52 // improvement in Copy performance.
53 private const int _DefaultCopyBufferSize
= 81920;
55 #if NEW_EXPERIMENTAL_ASYNC_IO
56 // To implement Async IO operations on streams that don't support async IO
59 private ReadWriteTask _activeReadWriteTask
;
61 private SemaphoreSlim _asyncActiveSemaphore
;
63 internal SemaphoreSlim
EnsureAsyncActiveSemaphoreInitialized()
65 // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
66 // WaitHandle, we don't need to worry about Disposing it.
67 return LazyInitializer
.EnsureInitialized(ref _asyncActiveSemaphore
, () => new SemaphoreSlim(1, 1));
71 public abstract bool CanRead
{
76 // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
77 public abstract bool CanSeek
{
83 public virtual bool CanTimeout
{
90 public abstract bool CanWrite
{
95 public abstract long Length
{
99 public abstract long Position
{
105 public virtual int ReadTimeout
{
107 Contract
.Ensures(Contract
.Result
<int>() >= 0);
108 throw new InvalidOperationException(Environment
.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
111 throw new InvalidOperationException(Environment
.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
116 public virtual int WriteTimeout
{
118 Contract
.Ensures(Contract
.Result
<int>() >= 0);
119 throw new InvalidOperationException(Environment
.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
122 throw new InvalidOperationException(Environment
.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
127 [HostProtection(ExternalThreading
= true)]
129 public Task
CopyToAsync(Stream destination
)
131 return CopyToAsync(destination
, _DefaultCopyBufferSize
);
134 [HostProtection(ExternalThreading
= true)]
136 public Task
CopyToAsync(Stream destination
, Int32 bufferSize
)
138 return CopyToAsync(destination
, bufferSize
, CancellationToken
.None
);
141 [HostProtection(ExternalThreading
= true)]
143 public virtual Task
CopyToAsync(Stream destination
, Int32 bufferSize
, CancellationToken cancellationToken
)
145 if (destination
== null)
146 throw new ArgumentNullException("destination");
148 throw new ArgumentOutOfRangeException("bufferSize", Environment
.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
149 if (!CanRead
&& !CanWrite
)
150 throw new ObjectDisposedException(null, Environment
.GetResourceString("ObjectDisposed_StreamClosed"));
151 if (!destination
.CanRead
&& !destination
.CanWrite
)
152 throw new ObjectDisposedException("destination", Environment
.GetResourceString("ObjectDisposed_StreamClosed"));
154 throw new NotSupportedException(Environment
.GetResourceString("NotSupported_UnreadableStream"));
155 if (!destination
.CanWrite
)
156 throw new NotSupportedException(Environment
.GetResourceString("NotSupported_UnwritableStream"));
157 Contract
.EndContractBlock();
159 return CopyToAsyncInternal(destination
, bufferSize
, cancellationToken
);
162 private async Task
CopyToAsyncInternal(Stream destination
, Int32 bufferSize
, CancellationToken cancellationToken
)
164 Contract
.Requires(destination
!= null);
165 Contract
.Requires(bufferSize
> 0);
166 Contract
.Requires(CanRead
);
167 Contract
.Requires(destination
.CanWrite
);
169 byte[] buffer
= new byte[bufferSize
];
171 while ((bytesRead
= await ReadAsync(buffer
, 0, buffer
.Length
, cancellationToken
).ConfigureAwait(false)) != 0)
173 await destination
.WriteAsync(buffer
, 0, bytesRead
, cancellationToken
).ConfigureAwait(false);
176 #endif // FEATURE_ASYNC_IO
178 // Reads the bytes from the current stream and writes the bytes to
179 // the destination stream until all bytes are read, starting at
180 // the current position.
181 public void CopyTo(Stream destination
)
183 if (destination
== null)
184 throw new ArgumentNullException("destination");
185 if (!CanRead
&& !CanWrite
)
186 throw new ObjectDisposedException(null, Environment
.GetResourceString("ObjectDisposed_StreamClosed"));
187 if (!destination
.CanRead
&& !destination
.CanWrite
)
188 throw new ObjectDisposedException("destination", Environment
.GetResourceString("ObjectDisposed_StreamClosed"));
190 throw new NotSupportedException(Environment
.GetResourceString("NotSupported_UnreadableStream"));
191 if (!destination
.CanWrite
)
192 throw new NotSupportedException(Environment
.GetResourceString("NotSupported_UnwritableStream"));
193 Contract
.EndContractBlock();
195 InternalCopyTo(destination
, _DefaultCopyBufferSize
);
201 public void CopyTo(Stream destination
, int bufferSize
)
203 if (destination
== null)
204 throw new ArgumentNullException("destination");
206 throw new ArgumentOutOfRangeException("bufferSize",
207 Environment
.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
208 if (!CanRead
&& !CanWrite
)
209 throw new ObjectDisposedException(null, Environment
.GetResourceString("ObjectDisposed_StreamClosed"));
210 if (!destination
.CanRead
&& !destination
.CanWrite
)
211 throw new ObjectDisposedException("destination", Environment
.GetResourceString("ObjectDisposed_StreamClosed"));
213 throw new NotSupportedException(Environment
.GetResourceString("NotSupported_UnreadableStream"));
214 if (!destination
.CanWrite
)
215 throw new NotSupportedException(Environment
.GetResourceString("NotSupported_UnwritableStream"));
216 Contract
.EndContractBlock();
218 InternalCopyTo(destination
, bufferSize
);
221 private void InternalCopyTo(Stream destination
, int bufferSize
)
223 Contract
.Requires(destination
!= null);
224 Contract
.Requires(CanRead
);
225 Contract
.Requires(destination
.CanWrite
);
226 Contract
.Requires(bufferSize
> 0);
228 byte[] buffer
= new byte[bufferSize
];
230 while ((read
= Read(buffer
, 0, buffer
.Length
)) != 0)
231 destination
.Write(buffer
, 0, read
);
235 // Stream used to require that all cleanup logic went into Close(),
236 // which was thought up before we invented IDisposable. However, we
237 // need to follow the IDisposable pattern so that users can write
238 // sensible subclasses without needing to inspect all their base
239 // classes, and without worrying about version brittleness, from a
240 // base class switching to the Dispose pattern. We're moving
241 // Stream to the Dispose(bool) pattern - that's where all subclasses
242 // should put their cleanup starting in V2.
243 public virtual void Close()
245 /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
246 Contract.Ensures(CanRead == false);
247 Contract.Ensures(CanWrite == false);
248 Contract.Ensures(CanSeek == false);
252 GC
.SuppressFinalize(this);
255 public void Dispose()
257 /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
258 Contract.Ensures(CanRead == false);
259 Contract.Ensures(CanWrite == false);
260 Contract.Ensures(CanSeek == false);
267 protected virtual void Dispose(bool disposing
)
269 // Note: Never change this to call other virtual methods on Stream
270 // like Write, since the state on subclasses has already been
271 // torn down. This is the last code to run on cleanup for a stream.
274 public abstract void Flush();
277 [HostProtection(ExternalThreading
=true)]
279 public Task
FlushAsync()
281 return FlushAsync(CancellationToken
.None
);
284 [HostProtection(ExternalThreading
=true)]
286 public virtual Task
FlushAsync(CancellationToken cancellationToken
)
288 return Task
.Factory
.StartNew(state
=> ((Stream
)state
).Flush(), this,
289 cancellationToken
, TaskCreationOptions
.DenyChildAttach
, TaskScheduler
.Default
);
291 #endif // FEATURE_ASYNC_IO
293 [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
294 protected virtual WaitHandle
CreateWaitHandle()
296 Contract
.Ensures(Contract
.Result
<WaitHandle
>() != null);
297 return new ManualResetEvent(false);
300 [HostProtection(ExternalThreading
=true)]
301 public virtual IAsyncResult
BeginRead(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, Object state
)
303 Contract
.Ensures(Contract
.Result
<IAsyncResult
>() != null);
304 return BeginReadInternal(buffer
, offset
, count
, callback
, state
, serializeAsynchronously
: false);
307 [HostProtection(ExternalThreading
= true)]
308 internal IAsyncResult
BeginReadInternal(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, Object state
, bool serializeAsynchronously
)
310 Contract
.Ensures(Contract
.Result
<IAsyncResult
>() != null);
311 if (!CanRead
) __Error
.ReadNotSupported();
313 #if !NEW_EXPERIMENTAL_ASYNC_IO
314 return BlockingBeginRead(buffer
, offset
, count
, callback
, state
);
317 // Mango did not do Async IO.
318 if(CompatibilitySwitches
.IsAppEarlierThanWindowsPhone8
)
320 return BlockingBeginRead(buffer
, offset
, count
, callback
, state
);
323 // To avoid a race with a stream's position pointer & generating ----
324 // conditions with internal buffer indexes in our own streams that
325 // don't natively support async IO operations when there are multiple
326 // async requests outstanding, we will block the application's main
327 // thread if it does a second IO request until the first one completes.
328 var semaphore
= EnsureAsyncActiveSemaphoreInitialized();
329 Task semaphoreTask
= null;
330 if (serializeAsynchronously
)
332 semaphoreTask
= semaphore
.WaitAsync();
339 // Create the task to asynchronously do a Read. This task serves both
340 // as the asynchronous work item and as the IAsyncResult returned to the user.
341 var asyncResult
= new ReadWriteTask(true /*isRead*/, delegate
343 // The ReadWriteTask stores all of the parameters to pass to Read.
344 // As we're currently inside of it, we can get the current task
345 // and grab the parameters from it.
346 var thisTask
= Task
.InternalCurrent
as ReadWriteTask
;
347 Contract
.Assert(thisTask
!= null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
349 // Do the Read and return the number of bytes read
350 var bytesRead
= thisTask
._stream
.Read(thisTask
._buffer
, thisTask
._offset
, thisTask
._count
);
351 thisTask
.ClearBeginState(); // just to help alleviate some memory pressure
353 }, state
, this, buffer
, offset
, count
, callback
);
356 if (semaphoreTask
!= null)
357 RunReadWriteTaskWhenReady(semaphoreTask
, asyncResult
);
359 RunReadWriteTask(asyncResult
);
362 return asyncResult
; // return it
366 public virtual int EndRead(IAsyncResult asyncResult
)
368 if (asyncResult
== null)
369 throw new ArgumentNullException("asyncResult");
370 Contract
.Ensures(Contract
.Result
<int>() >= 0);
371 Contract
.EndContractBlock();
373 #if !NEW_EXPERIMENTAL_ASYNC_IO
374 return BlockingEndRead(asyncResult
);
376 // Mango did not do async IO.
377 if(CompatibilitySwitches
.IsAppEarlierThanWindowsPhone8
)
379 return BlockingEndRead(asyncResult
);
382 var readTask
= _activeReadWriteTask
;
384 if (readTask
== null)
386 throw new ArgumentException(Environment
.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
388 else if (readTask
!= asyncResult
)
390 throw new InvalidOperationException(Environment
.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
392 else if (!readTask
._isRead
)
394 throw new ArgumentException(Environment
.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
399 return readTask
.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
403 _activeReadWriteTask
= null;
404 Contract
.Assert(_asyncActiveSemaphore
!= null, "Must have been initialized in order to get here.");
405 _asyncActiveSemaphore
.Release();
411 [HostProtection(ExternalThreading
= true)]
413 public Task
<int> ReadAsync(Byte
[] buffer
, int offset
, int count
)
415 return ReadAsync(buffer
, offset
, count
, CancellationToken
.None
);
418 [HostProtection(ExternalThreading
= true)]
420 public virtual Task
<int> ReadAsync(Byte
[] buffer
, int offset
, int count
, CancellationToken cancellationToken
)
422 // If cancellation was requested, bail early with an already completed task.
423 // Otherwise, return a task that represents the Begin/End methods.
424 return cancellationToken
.IsCancellationRequested
425 ? Task
.FromCancellation
<int>(cancellationToken
)
426 : BeginEndReadAsync(buffer
, offset
, count
);
429 private Task
<Int32
> BeginEndReadAsync(Byte
[] buffer
, Int32 offset
, Int32 count
)
431 return TaskFactory
<Int32
>.FromAsyncTrim(
432 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count }
,
433 (stream
, args
, callback
, state
) => stream
.BeginRead(args
.Buffer
, args
.Offset
, args
.Count
, callback
, state
), // cached by compiler
434 (stream
, asyncResult
) => stream
.EndRead(asyncResult
)); // cached by compiler
437 private struct ReadWriteParameters
// struct for arguments to Read and Write calls
439 internal byte[] Buffer
;
443 #endif //FEATURE_ASYNC_IO
447 [HostProtection(ExternalThreading
=true)]
448 public virtual IAsyncResult
BeginWrite(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, Object state
)
450 Contract
.Ensures(Contract
.Result
<IAsyncResult
>() != null);
451 return BeginWriteInternal(buffer
, offset
, count
, callback
, state
, serializeAsynchronously
: false);
454 [HostProtection(ExternalThreading
= true)]
455 internal IAsyncResult
BeginWriteInternal(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, Object state
, bool serializeAsynchronously
)
457 Contract
.Ensures(Contract
.Result
<IAsyncResult
>() != null);
458 if (!CanWrite
) __Error
.WriteNotSupported();
459 #if !NEW_EXPERIMENTAL_ASYNC_IO
460 return BlockingBeginWrite(buffer
, offset
, count
, callback
, state
);
463 // Mango did not do Async IO.
464 if(CompatibilitySwitches
.IsAppEarlierThanWindowsPhone8
)
466 return BlockingBeginWrite(buffer
, offset
, count
, callback
, state
);
469 // To avoid a race with a stream's position pointer & generating ----
470 // conditions with internal buffer indexes in our own streams that
471 // don't natively support async IO operations when there are multiple
472 // async requests outstanding, we will block the application's main
473 // thread if it does a second IO request until the first one completes.
474 var semaphore
= EnsureAsyncActiveSemaphoreInitialized();
475 Task semaphoreTask
= null;
476 if (serializeAsynchronously
)
478 semaphoreTask
= semaphore
.WaitAsync(); // kick off the asynchronous wait, but don't block
482 semaphore
.Wait(); // synchronously wait here
485 // Create the task to asynchronously do a Write. This task serves both
486 // as the asynchronous work item and as the IAsyncResult returned to the user.
487 var asyncResult
= new ReadWriteTask(false /*isRead*/, delegate
489 // The ReadWriteTask stores all of the parameters to pass to Write.
490 // As we're currently inside of it, we can get the current task
491 // and grab the parameters from it.
492 var thisTask
= Task
.InternalCurrent
as ReadWriteTask
;
493 Contract
.Assert(thisTask
!= null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
496 thisTask
._stream
.Write(thisTask
._buffer
, thisTask
._offset
, thisTask
._count
);
497 thisTask
.ClearBeginState(); // just to help alleviate some memory pressure
498 return 0; // not used, but signature requires a value be returned
499 }, state
, this, buffer
, offset
, count
, callback
);
502 if (semaphoreTask
!= null)
503 RunReadWriteTaskWhenReady(semaphoreTask
, asyncResult
);
505 RunReadWriteTask(asyncResult
);
507 return asyncResult
; // return it
511 #if NEW_EXPERIMENTAL_ASYNC_IO
512 private void RunReadWriteTaskWhenReady(Task asyncWaiter
, ReadWriteTask readWriteTask
)
514 Contract
.Assert(readWriteTask
!= null); // Should be Contract.Requires, but CCRewrite is doing a poor job with
515 // preconditions in async methods that await. Mike & Manuel are aware. (10/6/2011, bug 290222)
516 Contract
.Assert(asyncWaiter
!= null); // Ditto
518 // If the wait has already complete, run the task.
519 if (asyncWaiter
.IsCompleted
)
521 Contract
.Assert(asyncWaiter
.IsCompletedSuccessfully
, "The semaphore wait should always complete successfully.");
522 RunReadWriteTask(readWriteTask
);
524 else // Otherwise, wait for our turn, and then run the task.
526 asyncWaiter
.ContinueWith((t
, state
) =>
528 Contract
.Assert(t
.IsCompletedSuccessfully
, "The semaphore wait should always complete successfully.");
529 var tuple
= (Tuple
<Stream
,ReadWriteTask
>)state
;
530 tuple
.Item1
.RunReadWriteTask(tuple
.Item2
); // RunReadWriteTask(readWriteTask);
531 }, Tuple
.Create
<Stream
,ReadWriteTask
>(this, readWriteTask
),
532 default(CancellationToken
),
533 TaskContinuationOptions
.ExecuteSynchronously
,
534 TaskScheduler
.Default
);
538 private void RunReadWriteTask(ReadWriteTask readWriteTask
)
540 Contract
.Requires(readWriteTask
!= null);
541 Contract
.Assert(_activeReadWriteTask
== null, "Expected no other readers or writers");
543 // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
544 // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
545 // two interlocked operations. However, if ReadWriteTask is ever changed to use
546 // a cancellation token, this should be changed to use Start.
547 _activeReadWriteTask
= readWriteTask
; // store the task so that EndXx can validate it's given the right one
548 readWriteTask
.m_taskScheduler
= TaskScheduler
.Default
;
549 readWriteTask
.ScheduleAndStart(needsProtection
: false);
553 public virtual void EndWrite(IAsyncResult asyncResult
)
555 if (asyncResult
==null)
556 throw new ArgumentNullException("asyncResult");
557 Contract
.EndContractBlock();
559 #if !NEW_EXPERIMENTAL_ASYNC_IO
560 BlockingEndWrite(asyncResult
);
563 // Mango did not do Async IO.
564 if(CompatibilitySwitches
.IsAppEarlierThanWindowsPhone8
)
566 BlockingEndWrite(asyncResult
);
570 var writeTask
= _activeReadWriteTask
;
571 if (writeTask
== null)
573 throw new ArgumentException(Environment
.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
575 else if (writeTask
!= asyncResult
)
577 throw new InvalidOperationException(Environment
.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
579 else if (writeTask
._isRead
)
581 throw new ArgumentException(Environment
.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
586 writeTask
.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
587 Contract
.Assert(writeTask
.Status
== TaskStatus
.RanToCompletion
);
591 _activeReadWriteTask
= null;
592 Contract
.Assert(_asyncActiveSemaphore
!= null, "Must have been initialized in order to get here.");
593 _asyncActiveSemaphore
.Release();
598 #if NEW_EXPERIMENTAL_ASYNC_IO
599 // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
600 // A single instance of this task serves four purposes:
601 // 1. The work item scheduled to run the Read / Write operation
602 // 2. The state holding the arguments to be passed to Read / Write
603 // 3. The IAsyncResult returned from BeginRead / BeginWrite
604 // 4. The completion action that runs to invoke the user-provided callback.
605 // This last item is a bit tricky. Before the AsyncCallback is invoked, the
606 // IAsyncResult must have completed, so we can't just invoke the handler
607 // from within the task, since it is the IAsyncResult, and thus it's not
608 // yet completed. Instead, we use AddCompletionAction to install this
609 // task as its own completion handler. That saves the need to allocate
610 // a separate completion handler, it guarantees that the task will
611 // have completed by the time the handler is invoked, and it allows
612 // the handler to be invoked synchronously upon the completion of the
613 // task. This all enables BeginRead / BeginWrite to be implemented
614 // with a single allocation.
615 private sealed class ReadWriteTask
: Task
<int>, ITaskCompletionAction
617 internal readonly bool _isRead
;
618 internal Stream _stream
;
619 internal byte [] _buffer
;
620 internal int _offset
;
622 private AsyncCallback _callback
;
623 private ExecutionContext _context
;
625 internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
631 [SecuritySafeCritical
] // necessary for EC.Capture
632 [MethodImpl(MethodImplOptions
.NoInlining
)]
633 public ReadWriteTask(
635 Func
<object,int> function
, object state
,
636 Stream stream
, byte[] buffer
, int offset
, int count
, AsyncCallback callback
) :
637 base(function
, state
, CancellationToken
.None
, TaskCreationOptions
.DenyChildAttach
)
639 Contract
.Requires(function
!= null);
640 Contract
.Requires(stream
!= null);
641 Contract
.Requires(buffer
!= null);
642 Contract
.EndContractBlock();
644 StackCrawlMark stackMark
= StackCrawlMark
.LookForMyCaller
;
646 // Store the arguments
653 // If a callback was provided, we need to:
654 // - Store the user-provided handler
655 // - Capture an ExecutionContext under which to invoke the handler
656 // - Add this task as its own completion handler so that the Invoke method
657 // will run the callback when this task completes.
658 if (callback
!= null)
660 _callback
= callback
;
661 _context
= ExecutionContext
.Capture(ref stackMark
,
662 ExecutionContext
.CaptureOptions
.OptimizeDefaultCase
| ExecutionContext
.CaptureOptions
.IgnoreSyncCtx
);
663 base.AddCompletionAction(this);
667 [SecurityCritical
] // necessary for CoreCLR
668 private static void InvokeAsyncCallback(object completedTask
)
670 var rwc
= (ReadWriteTask
)completedTask
;
671 var callback
= rwc
._callback
;
672 rwc
._callback
= null;
676 [SecurityCritical
] // necessary for CoreCLR
677 private static ContextCallback s_invokeAsyncCallback
;
679 [SecuritySafeCritical
] // necessary for ExecutionContext.Run
680 void ITaskCompletionAction
.Invoke(Task completingTask
)
682 // Get the ExecutionContext. If there is none, just run the callback
683 // directly, passing in the completed task as the IAsyncResult.
684 // If there is one, process it with ExecutionContext.Run.
685 var context
= _context
;
688 var callback
= _callback
;
690 callback(completingTask
);
696 var invokeAsyncCallback
= s_invokeAsyncCallback
;
697 if (invokeAsyncCallback
== null) s_invokeAsyncCallback
= invokeAsyncCallback
= InvokeAsyncCallback
; // benign ----
699 using(context
) ExecutionContext
.Run(context
, invokeAsyncCallback
, this, true);
703 public bool InvokeMayRunArbitraryCode
=> true;
708 [HostProtection(ExternalThreading
= true)]
710 public Task
WriteAsync(Byte
[] buffer
, int offset
, int count
)
712 return WriteAsync(buffer
, offset
, count
, CancellationToken
.None
);
715 [HostProtection(ExternalThreading
= true)]
717 public virtual Task
WriteAsync(Byte
[] buffer
, int offset
, int count
, CancellationToken cancellationToken
)
719 // If cancellation was requested, bail early with an already completed task.
720 // Otherwise, return a task that represents the Begin/End methods.
721 return cancellationToken
.IsCancellationRequested
722 ? Task
.FromCancellation(cancellationToken
)
723 : BeginEndWriteAsync(buffer
, offset
, count
);
727 private Task
BeginEndWriteAsync(Byte
[] buffer
, Int32 offset
, Int32 count
)
729 return TaskFactory
<VoidTaskResult
>.FromAsyncTrim(
730 this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count }
,
731 (stream
, args
, callback
, state
) => stream
.BeginWrite(args
.Buffer
, args
.Offset
, args
.Count
, callback
, state
), // cached by compiler
732 (stream
, asyncResult
) => // cached by compiler
734 stream
.EndWrite(asyncResult
);
735 return default(VoidTaskResult
);
738 #endif // FEATURE_ASYNC_IO
740 public abstract long Seek(long offset
, SeekOrigin origin
);
742 public abstract void SetLength(long value);
744 public abstract int Read([In
, Out
] byte[] buffer
, int offset
, int count
);
746 // Reads one byte from the stream by calling Read(byte[], int, int).
747 // Will return an unsigned byte cast to an int or -1 on end of stream.
748 // This implementation does not perform well because it allocates a new
749 // byte[] each time you call it, and should be overridden by any
750 // subclass that maintains an internal buffer. Then, it can help perf
751 // significantly for people who are reading one byte at a time.
752 public virtual int ReadByte()
754 Contract
.Ensures(Contract
.Result
<int>() >= -1);
755 Contract
.Ensures(Contract
.Result
<int>() < 256);
757 byte[] oneByteArray
= new byte[1];
758 int r
= Read(oneByteArray
, 0, 1);
761 return oneByteArray
[0];
764 public abstract void Write(byte[] buffer
, int offset
, int count
);
766 // Writes one byte from the stream by calling Write(byte[], int, int).
767 // This implementation does not perform well because it allocates a new
768 // byte[] each time you call it, and should be overridden by any
769 // subclass that maintains an internal buffer. Then, it can help perf
770 // significantly for people who are writing one byte at a time.
771 public virtual void WriteByte(byte value)
773 byte[] oneByteArray
= new byte[1];
774 oneByteArray
[0] = value;
775 Write(oneByteArray
, 0, 1);
778 [HostProtection(Synchronization
=true)]
779 public static Stream
Synchronized(Stream stream
)
782 throw new ArgumentNullException("stream");
783 Contract
.Ensures(Contract
.Result
<Stream
>() != null);
784 Contract
.EndContractBlock();
785 if (stream
is SyncStream
)
788 return new SyncStream(stream
);
791 #if !FEATURE_PAL || MONO // This method shouldn't have been exposed in Dev10 (we revised object invariants after locking down).
792 [Obsolete("Do not call or override this method.")]
793 protected virtual void ObjectInvariant()
798 internal IAsyncResult
BlockingBeginRead(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, Object state
)
800 Contract
.Ensures(Contract
.Result
<IAsyncResult
>() != null);
802 // To avoid a race with a stream's position pointer & generating ----
803 // conditions with internal buffer indexes in our own streams that
804 // don't natively support async IO operations when there are multiple
805 // async requests outstanding, we will block the application's main
806 // thread and do the IO synchronously.
807 // This can't perform well - use a different approach.
808 SynchronousAsyncResult asyncResult
;
810 int numRead
= Read(buffer
, offset
, count
);
811 asyncResult
= new SynchronousAsyncResult(numRead
, state
);
813 catch (IOException ex
) {
814 asyncResult
= new SynchronousAsyncResult(ex
, state
, isWrite
: false);
817 if (callback
!= null) {
818 callback(asyncResult
);
824 internal static int BlockingEndRead(IAsyncResult asyncResult
)
826 Contract
.Ensures(Contract
.Result
<int>() >= 0);
828 return SynchronousAsyncResult
.EndRead(asyncResult
);
831 internal IAsyncResult
BlockingBeginWrite(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, Object state
)
833 Contract
.Ensures(Contract
.Result
<IAsyncResult
>() != null);
835 // To avoid a race with a stream's position pointer & generating ----
836 // conditions with internal buffer indexes in our own streams that
837 // don't natively support async IO operations when there are multiple
838 // async requests outstanding, we will block the application's main
839 // thread and do the IO synchronously.
840 // This can't perform well - use a different approach.
841 SynchronousAsyncResult asyncResult
;
843 Write(buffer
, offset
, count
);
844 asyncResult
= new SynchronousAsyncResult(state
);
846 catch (IOException ex
) {
847 asyncResult
= new SynchronousAsyncResult(ex
, state
, isWrite
: true);
850 if (callback
!= null) {
851 callback(asyncResult
);
857 internal static void BlockingEndWrite(IAsyncResult asyncResult
)
859 SynchronousAsyncResult
.EndWrite(asyncResult
);
863 private sealed class NullStream
: Stream
865 internal NullStream() {}
867 public override bool CanRead
{
872 public override bool CanWrite
{
877 public override bool CanSeek
{
882 public override long Length
{
886 public override long Position
{
891 protected override void Dispose(bool disposing
)
893 // Do nothing - we don't want NullStream singleton (static) to be closable
896 public override void Flush()
902 public override Task
FlushAsync(CancellationToken cancellationToken
)
904 return cancellationToken
.IsCancellationRequested
?
905 Task
.FromCancellation(cancellationToken
) :
908 #endif // FEATURE_ASYNC_IO
910 [HostProtection(ExternalThreading
= true)]
911 public override IAsyncResult
BeginRead(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, Object state
)
913 if (!CanRead
) __Error
.ReadNotSupported();
915 return BlockingBeginRead(buffer
, offset
, count
, callback
, state
);
918 public override int EndRead(IAsyncResult asyncResult
)
920 if (asyncResult
== null)
921 throw new ArgumentNullException("asyncResult");
922 Contract
.EndContractBlock();
924 return BlockingEndRead(asyncResult
);
927 [HostProtection(ExternalThreading
= true)]
928 public override IAsyncResult
BeginWrite(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, Object state
)
930 if (!CanWrite
) __Error
.WriteNotSupported();
932 return BlockingBeginWrite(buffer
, offset
, count
, callback
, state
);
935 public override void EndWrite(IAsyncResult asyncResult
)
937 if (asyncResult
== null)
938 throw new ArgumentNullException("asyncResult");
939 Contract
.EndContractBlock();
941 BlockingEndWrite(asyncResult
);
944 public override int Read([In
, Out
] byte[] buffer
, int offset
, int count
)
951 public override Task
<int> ReadAsync(Byte
[] buffer
, int offset
, int count
, CancellationToken cancellationToken
)
953 var nullReadTask
= s_nullReadTask
;
954 if (nullReadTask
== null)
955 s_nullReadTask
= nullReadTask
= new Task
<int>(false, 0, (TaskCreationOptions
)InternalTaskOptions
.DoNotDispose
, CancellationToken
.None
); // benign ----
958 private static Task
<int> s_nullReadTask
;
959 #endif //FEATURE_ASYNC_IO
961 public override int ReadByte()
966 public override void Write(byte[] buffer
, int offset
, int count
)
972 public override Task
WriteAsync(Byte
[] buffer
, int offset
, int count
, CancellationToken cancellationToken
)
974 return cancellationToken
.IsCancellationRequested
?
975 Task
.FromCancellation(cancellationToken
) :
978 #endif // FEATURE_ASYNC_IO
980 public override void WriteByte(byte value)
984 public override long Seek(long offset
, SeekOrigin origin
)
989 public override void SetLength(long length
)
995 /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
996 internal sealed class SynchronousAsyncResult
: IAsyncResult
{
998 private readonly Object _stateObject
;
999 private readonly bool _isWrite
;
1000 private ManualResetEvent _waitHandle
;
1001 private ExceptionDispatchInfo _exceptionInfo
;
1003 private bool _endXxxCalled
;
1004 private Int32 _bytesRead
;
1006 internal SynchronousAsyncResult(Int32 bytesRead
, Object asyncStateObject
) {
1007 _bytesRead
= bytesRead
;
1008 _stateObject
= asyncStateObject
;
1012 internal SynchronousAsyncResult(Object asyncStateObject
) {
1013 _stateObject
= asyncStateObject
;
1017 internal SynchronousAsyncResult(Exception ex
, Object asyncStateObject
, bool isWrite
) {
1018 _exceptionInfo
= ExceptionDispatchInfo
.Capture(ex
);
1019 _stateObject
= asyncStateObject
;
1023 public bool IsCompleted
{
1024 // We never hand out objects of this type to the user before the synchronous IO completed:
1025 get { return true; }
1028 public WaitHandle AsyncWaitHandle
{
1030 return LazyInitializer
.EnsureInitialized(ref _waitHandle
, () => new ManualResetEvent(true));
1034 public Object AsyncState
{
1035 get { return _stateObject; }
1038 public bool CompletedSynchronously
{
1039 get { return true; }
1042 internal void ThrowIfError() {
1043 if (_exceptionInfo
!= null)
1044 _exceptionInfo
.Throw();
1047 internal static Int32
EndRead(IAsyncResult asyncResult
) {
1049 SynchronousAsyncResult ar
= asyncResult
as SynchronousAsyncResult
;
1050 if (ar
== null || ar
._isWrite
)
1051 __Error
.WrongAsyncResult();
1053 if (ar
._endXxxCalled
)
1054 __Error
.EndReadCalledTwice();
1056 ar
._endXxxCalled
= true;
1059 return ar
._bytesRead
;
1062 internal static void EndWrite(IAsyncResult asyncResult
) {
1064 SynchronousAsyncResult ar
= asyncResult
as SynchronousAsyncResult
;
1065 if (ar
== null || !ar
._isWrite
)
1066 __Error
.WrongAsyncResult();
1068 if (ar
._endXxxCalled
)
1069 __Error
.EndWriteCalledTwice();
1071 ar
._endXxxCalled
= true;
1075 } // class SynchronousAsyncResult
1078 // SyncStream is a wrapper around a stream that takes
1079 // a lock for every operation making it thread safe.
1081 internal sealed class SyncStream
: Stream
, IDisposable
1083 private Stream _stream
;
1085 private bool? _overridesBeginRead
;
1087 private bool? _overridesBeginWrite
;
1089 internal SyncStream(Stream stream
)
1092 throw new ArgumentNullException("stream");
1093 Contract
.EndContractBlock();
1097 public override bool CanRead
{
1099 get { return _stream.CanRead; }
1102 public override bool CanWrite
{
1104 get { return _stream.CanWrite; }
1107 public override bool CanSeek
{
1109 get { return _stream.CanSeek; }
1113 public override bool CanTimeout
{
1116 return _stream
.CanTimeout
;
1120 public override long Length
{
1123 return _stream
.Length
;
1128 public override long Position
{
1131 return _stream
.Position
;
1136 _stream
.Position
= value;
1142 public override int ReadTimeout
{
1144 return _stream
.ReadTimeout
;
1147 _stream
.ReadTimeout
= value;
1152 public override int WriteTimeout
{
1154 return _stream
.WriteTimeout
;
1157 _stream
.WriteTimeout
= value;
1161 // In the off chance that some wrapped stream has different
1162 // semantics for Close vs. Dispose, let's preserve that.
1163 public override void Close()
1175 protected override void Dispose(bool disposing
)
1179 // Explicitly pick up a potentially methodimpl'ed Dispose
1181 ((IDisposable
)_stream
).Dispose();
1184 base.Dispose(disposing
);
1189 public override void Flush()
1195 public override int Read([In
, Out
]byte[] bytes
, int offset
, int count
)
1198 return _stream
.Read(bytes
, offset
, count
);
1201 public override int ReadByte()
1204 return _stream
.ReadByte();
1207 private static bool OverridesBeginMethod(Stream stream
, string methodName
)
1209 Contract
.Requires(stream
!= null, "Expected a non-null stream.");
1210 Contract
.Requires(methodName
== "BeginRead" || methodName
== "BeginWrite",
1211 "Expected BeginRead or BeginWrite as the method name to check.");
1213 // Get all of the methods on the underlying stream
1214 var methods
= stream
.GetType().GetMethods(BindingFlags
.Public
| BindingFlags
.Instance
);
1216 // If any of the methods have the desired name and are defined on the base Stream
1217 // Type, then the method was not overridden. If none of them were defined on the
1218 // base Stream, then it must have been overridden.
1219 foreach (var method
in methods
)
1221 if (method
.DeclaringType
== typeof(Stream
) &&
1222 method
.Name
== methodName
)
1230 [HostProtection(ExternalThreading
=true)]
1231 public override IAsyncResult
BeginRead(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, Object state
)
1233 // Lazily-initialize whether the wrapped stream overrides BeginRead
1234 if (_overridesBeginRead
== null)
1236 _overridesBeginRead
= OverridesBeginMethod(_stream
, "BeginRead");
1241 // If the Stream does have its own BeginRead implementation, then we must use that override.
1242 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1243 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1244 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1245 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1246 // _stream due to this call blocked while holding the lock.
1247 return _overridesBeginRead
.Value
?
1248 _stream
.BeginRead(buffer
, offset
, count
, callback
, state
) :
1249 _stream
.BeginReadInternal(buffer
, offset
, count
, callback
, state
, serializeAsynchronously
: true);
1253 public override int EndRead(IAsyncResult asyncResult
)
1255 if (asyncResult
== null)
1256 throw new ArgumentNullException("asyncResult");
1257 Contract
.Ensures(Contract
.Result
<int>() >= 0);
1258 Contract
.EndContractBlock();
1261 return _stream
.EndRead(asyncResult
);
1264 public override long Seek(long offset
, SeekOrigin origin
)
1267 return _stream
.Seek(offset
, origin
);
1270 public override void SetLength(long length
)
1273 _stream
.SetLength(length
);
1276 public override void Write(byte[] bytes
, int offset
, int count
)
1279 _stream
.Write(bytes
, offset
, count
);
1282 public override void WriteByte(byte b
)
1285 _stream
.WriteByte(b
);
1288 [HostProtection(ExternalThreading
=true)]
1289 public override IAsyncResult
BeginWrite(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, Object state
)
1291 // Lazily-initialize whether the wrapped stream overrides BeginWrite
1292 if (_overridesBeginWrite
== null)
1294 _overridesBeginWrite
= OverridesBeginMethod(_stream
, "BeginWrite");
1299 // If the Stream does have its own BeginWrite implementation, then we must use that override.
1300 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1301 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1302 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1303 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1304 // _stream due to this call blocked while holding the lock.
1305 return _overridesBeginWrite
.Value
?
1306 _stream
.BeginWrite(buffer
, offset
, count
, callback
, state
) :
1307 _stream
.BeginWriteInternal(buffer
, offset
, count
, callback
, state
, serializeAsynchronously
: true);
1311 public override void EndWrite(IAsyncResult asyncResult
)
1313 if (asyncResult
== null)
1314 throw new ArgumentNullException("asyncResult");
1315 Contract
.EndContractBlock();
1318 _stream
.EndWrite(asyncResult
);
1324 [ContractClassFor(typeof(Stream
))]
1325 internal abstract class StreamContract
: Stream
1327 public override long Seek(long offset
, SeekOrigin origin
)
1329 Contract
.Ensures(Contract
.Result
<long>() >= 0);
1330 throw new NotImplementedException();
1333 public override void SetLength(long value)
1335 throw new NotImplementedException();
1338 public override int Read(byte[] buffer
, int offset
, int count
)
1340 Contract
.Ensures(Contract
.Result
<int>() >= 0);
1341 Contract
.Ensures(Contract
.Result
<int>() <= count
);
1342 throw new NotImplementedException();
1345 public override void Write(byte[] buffer
, int offset
, int count
)
1347 throw new NotImplementedException();
1350 public override long Position
{
1352 Contract
.Ensures(Contract
.Result
<long>() >= 0);
1353 throw new NotImplementedException();
1356 throw new NotImplementedException();
1360 public override void Flush()
1362 throw new NotImplementedException();
1365 public override bool CanRead
{
1366 get { throw new NotImplementedException(); }
1369 public override bool CanWrite
{
1370 get { throw new NotImplementedException(); }
1373 public override bool CanSeek
{
1374 get { throw new NotImplementedException(); }
1377 public override long Length
1380 Contract
.Ensures(Contract
.Result
<long>() >= 0);
1381 throw new NotImplementedException();
1385 #endif // CONTRACTS_FULL