1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 /*============================================================
11 ** Purpose: Abstract base class for all Streams. Provides
12 ** default implementations of asynchronous reads & writes, in
13 ** terms of the synchronous reads & writes (and vice versa).
16 ===========================================================*/
19 using System
.Diagnostics
;
20 using System
.Runtime
.ExceptionServices
;
21 using System
.Runtime
.InteropServices
;
22 using System
.Threading
;
23 using System
.Threading
.Tasks
;
27 public abstract partial class Stream
: MarshalByRefObject
, IDisposable
, IAsyncDisposable
29 public static readonly Stream Null
= new NullStream();
31 // We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
32 // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
33 // improvement in Copy performance.
34 private const int DefaultCopyBufferSize
= 81920;
36 // To implement Async IO operations on streams that don't support async IO
38 private ReadWriteTask
? _activeReadWriteTask
;
39 private SemaphoreSlim
? _asyncActiveSemaphore
;
41 internal SemaphoreSlim
EnsureAsyncActiveSemaphoreInitialized()
43 // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
44 // WaitHandle, we don't need to worry about Disposing it.
45 return LazyInitializer
.EnsureInitialized(ref _asyncActiveSemaphore
, () => new SemaphoreSlim(1, 1));
48 public abstract bool CanRead
53 // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
54 public abstract bool CanSeek
59 public virtual bool CanTimeout
=> false;
61 public abstract bool CanWrite
66 public abstract long Length
71 public abstract long Position
77 public virtual int ReadTimeout
81 throw new InvalidOperationException(SR
.InvalidOperation_TimeoutsNotSupported
);
85 throw new InvalidOperationException(SR
.InvalidOperation_TimeoutsNotSupported
);
89 public virtual int WriteTimeout
93 throw new InvalidOperationException(SR
.InvalidOperation_TimeoutsNotSupported
);
97 throw new InvalidOperationException(SR
.InvalidOperation_TimeoutsNotSupported
);
101 public Task
CopyToAsync(Stream destination
)
103 int bufferSize
= GetCopyBufferSize();
105 return CopyToAsync(destination
, bufferSize
);
108 public Task
CopyToAsync(Stream destination
, int bufferSize
)
110 return CopyToAsync(destination
, bufferSize
, CancellationToken
.None
);
113 public Task
CopyToAsync(Stream destination
, CancellationToken cancellationToken
)
115 int bufferSize
= GetCopyBufferSize();
117 return CopyToAsync(destination
, bufferSize
, cancellationToken
);
120 public virtual Task
CopyToAsync(Stream destination
, int bufferSize
, CancellationToken cancellationToken
)
122 StreamHelpers
.ValidateCopyToArgs(this, destination
, bufferSize
);
124 return CopyToAsyncInternal(destination
, bufferSize
, cancellationToken
);
127 private async Task
CopyToAsyncInternal(Stream destination
, int bufferSize
, CancellationToken cancellationToken
)
129 byte[] buffer
= ArrayPool
<byte>.Shared
.Rent(bufferSize
);
134 int bytesRead
= await ReadAsync(new Memory
<byte>(buffer
), cancellationToken
).ConfigureAwait(false);
135 if (bytesRead
== 0) break;
136 await destination
.WriteAsync(new ReadOnlyMemory
<byte>(buffer
, 0, bytesRead
), cancellationToken
).ConfigureAwait(false);
141 ArrayPool
<byte>.Shared
.Return(buffer
);
145 // Reads the bytes from the current stream and writes the bytes to
146 // the destination stream until all bytes are read, starting at
147 // the current position.
148 public void CopyTo(Stream destination
)
150 int bufferSize
= GetCopyBufferSize();
152 CopyTo(destination
, bufferSize
);
155 public virtual void CopyTo(Stream destination
, int bufferSize
)
157 StreamHelpers
.ValidateCopyToArgs(this, destination
, bufferSize
);
159 byte[] buffer
= ArrayPool
<byte>.Shared
.Rent(bufferSize
);
163 while ((read
= Read(buffer
, 0, buffer
.Length
)) != 0)
165 destination
.Write(buffer
, 0, read
);
170 ArrayPool
<byte>.Shared
.Return(buffer
);
174 private int GetCopyBufferSize()
176 int bufferSize
= DefaultCopyBufferSize
;
180 long length
= Length
;
181 long position
= Position
;
182 if (length
<= position
) // Handles negative overflows
184 // There are no bytes left in the stream to copy.
185 // However, because CopyTo{Async} is virtual, we need to
186 // ensure that any override is still invoked to provide its
187 // own validation, so we use the smallest legal buffer size here.
192 long remaining
= length
- position
;
195 // In the case of a positive overflow, stick to the default size
196 bufferSize
= (int)Math
.Min(bufferSize
, remaining
);
204 // Stream used to require that all cleanup logic went into Close(),
205 // which was thought up before we invented IDisposable. However, we
206 // need to follow the IDisposable pattern so that users can write
207 // sensible subclasses without needing to inspect all their base
208 // classes, and without worrying about version brittleness, from a
209 // base class switching to the Dispose pattern. We're moving
210 // Stream to the Dispose(bool) pattern - that's where all subclasses
211 // should put their cleanup now.
212 public virtual void Close()
215 GC
.SuppressFinalize(this);
218 public void Dispose()
223 protected virtual void Dispose(bool disposing
)
225 // Note: Never change this to call other virtual methods on Stream
226 // like Write, since the state on subclasses has already been
227 // torn down. This is the last code to run on cleanup for a stream.
230 public virtual ValueTask
DisposeAsync()
237 catch (Exception exc
)
239 return new ValueTask(Task
.FromException(exc
));
243 public abstract void Flush();
245 public Task
FlushAsync()
247 return FlushAsync(CancellationToken
.None
);
250 public virtual Task
FlushAsync(CancellationToken cancellationToken
)
252 return Task
.Factory
.StartNew(state
=> ((Stream
)state
!).Flush(), this,
253 cancellationToken
, TaskCreationOptions
.DenyChildAttach
, TaskScheduler
.Default
);
256 [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
257 protected virtual WaitHandle
CreateWaitHandle()
259 return new ManualResetEvent(false);
262 public virtual IAsyncResult
BeginRead(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object? state
)
264 return BeginReadInternal(buffer
, offset
, count
, callback
, state
, serializeAsynchronously
: false, apm
: true);
267 internal IAsyncResult
BeginReadInternal(
268 byte[] buffer
, int offset
, int count
, AsyncCallback
? callback
, object? state
,
269 bool serializeAsynchronously
, bool apm
)
271 if (!CanRead
) throw Error
.GetReadNotSupported();
273 // To avoid a race with a stream's position pointer & generating race conditions
274 // with internal buffer indexes in our own streams that
275 // don't natively support async IO operations when there are multiple
276 // async requests outstanding, we will block the application's main
277 // thread if it does a second IO request until the first one completes.
278 SemaphoreSlim semaphore
= EnsureAsyncActiveSemaphoreInitialized();
279 Task
? semaphoreTask
= null;
280 if (serializeAsynchronously
)
282 semaphoreTask
= semaphore
.WaitAsync();
289 // Create the task to asynchronously do a Read. This task serves both
290 // as the asynchronous work item and as the IAsyncResult returned to the user.
291 var asyncResult
= new ReadWriteTask(true /*isRead*/, apm
, delegate
293 // The ReadWriteTask stores all of the parameters to pass to Read.
294 // As we're currently inside of it, we can get the current task
295 // and grab the parameters from it.
296 var thisTask
= Task
.InternalCurrent
as ReadWriteTask
;
297 Debug
.Assert(thisTask
!= null && thisTask
._stream
!= null && thisTask
._buffer
!= null,
298 "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask, and stream and buffer should be set");
302 // Do the Read and return the number of bytes read
303 return thisTask
._stream
.Read(thisTask
._buffer
, thisTask
._offset
, thisTask
._count
);
307 // If this implementation is part of Begin/EndXx, then the EndXx method will handle
308 // finishing the async operation. However, if this is part of XxAsync, then there won't
309 // be an end method, and this task is responsible for cleaning up.
312 thisTask
._stream
.FinishTrackingAsyncOperation();
315 thisTask
.ClearBeginState(); // just to help alleviate some memory pressure
317 }, state
, this, buffer
, offset
, count
, callback
);
320 if (semaphoreTask
!= null)
321 RunReadWriteTaskWhenReady(semaphoreTask
, asyncResult
);
323 RunReadWriteTask(asyncResult
);
326 return asyncResult
; // return it
329 public virtual int EndRead(IAsyncResult asyncResult
)
331 if (asyncResult
== null)
332 throw new ArgumentNullException(nameof(asyncResult
));
334 ReadWriteTask
? readTask
= _activeReadWriteTask
;
336 if (readTask
== null)
338 throw new ArgumentException(SR
.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple
);
340 else if (readTask
!= asyncResult
)
342 throw new InvalidOperationException(SR
.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple
);
344 else if (!readTask
._isRead
)
346 throw new ArgumentException(SR
.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple
);
351 return readTask
.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
355 FinishTrackingAsyncOperation();
359 public Task
<int> ReadAsync(byte[] buffer
, int offset
, int count
)
361 return ReadAsync(buffer
, offset
, count
, CancellationToken
.None
);
364 public virtual Task
<int> ReadAsync(byte[] buffer
, int offset
, int count
, CancellationToken cancellationToken
)
366 // If cancellation was requested, bail early with an already completed task.
367 // Otherwise, return a task that represents the Begin/End methods.
368 return cancellationToken
.IsCancellationRequested
369 ? Task
.FromCanceled
<int>(cancellationToken
)
370 : BeginEndReadAsync(buffer
, offset
, count
);
373 public virtual ValueTask
<int> ReadAsync(Memory
<byte> buffer
, CancellationToken cancellationToken
= default)
375 if (MemoryMarshal
.TryGetArray(buffer
, out ArraySegment
<byte> array
))
377 return new ValueTask
<int>(ReadAsync(array
.Array
!, array
.Offset
, array
.Count
, cancellationToken
));
381 byte[] sharedBuffer
= ArrayPool
<byte>.Shared
.Rent(buffer
.Length
);
382 return FinishReadAsync(ReadAsync(sharedBuffer
, 0, buffer
.Length
, cancellationToken
), sharedBuffer
, buffer
);
384 static async ValueTask
<int> FinishReadAsync(Task
<int> readTask
, byte[] localBuffer
, Memory
<byte> localDestination
)
388 int result
= await readTask
.ConfigureAwait(false);
389 new Span
<byte>(localBuffer
, 0, result
).CopyTo(localDestination
.Span
);
394 ArrayPool
<byte>.Shared
.Return(localBuffer
);
400 private Task
<int> BeginEndReadAsync(byte[] buffer
, int offset
, int count
)
402 if (!HasOverriddenBeginEndRead())
404 // If the Stream does not override Begin/EndRead, then we can take an optimized path
405 // that skips an extra layer of tasks / IAsyncResults.
406 return (Task
<int>)BeginReadInternal(buffer
, offset
, count
, null, null, serializeAsynchronously
: true, apm
: false);
409 // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
410 return TaskFactory
<int>.FromAsyncTrim(
411 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count }
,
412 (stream
, args
, callback
, state
) => stream
.BeginRead(args
.Buffer
, args
.Offset
, args
.Count
, callback
, state
), // cached by compiler
413 (stream
, asyncResult
) => stream
.EndRead(asyncResult
)); // cached by compiler
416 private struct ReadWriteParameters
// struct for arguments to Read and Write calls
418 internal byte[] Buffer
;
425 public virtual IAsyncResult
BeginWrite(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object? state
)
427 return BeginWriteInternal(buffer
, offset
, count
, callback
, state
, serializeAsynchronously
: false, apm
: true);
430 internal IAsyncResult
BeginWriteInternal(
431 byte[] buffer
, int offset
, int count
, AsyncCallback
? callback
, object? state
,
432 bool serializeAsynchronously
, bool apm
)
434 if (!CanWrite
) throw Error
.GetWriteNotSupported();
436 // To avoid a race condition with a stream's position pointer & generating conditions
437 // with internal buffer indexes in our own streams that
438 // don't natively support async IO operations when there are multiple
439 // async requests outstanding, we will block the application's main
440 // thread if it does a second IO request until the first one completes.
441 SemaphoreSlim semaphore
= EnsureAsyncActiveSemaphoreInitialized();
442 Task
? semaphoreTask
= null;
443 if (serializeAsynchronously
)
445 semaphoreTask
= semaphore
.WaitAsync(); // kick off the asynchronous wait, but don't block
449 semaphore
.Wait(); // synchronously wait here
452 // Create the task to asynchronously do a Write. This task serves both
453 // as the asynchronous work item and as the IAsyncResult returned to the user.
454 var asyncResult
= new ReadWriteTask(false /*isRead*/, apm
, delegate
456 // The ReadWriteTask stores all of the parameters to pass to Write.
457 // As we're currently inside of it, we can get the current task
458 // and grab the parameters from it.
459 var thisTask
= Task
.InternalCurrent
as ReadWriteTask
;
460 Debug
.Assert(thisTask
!= null && thisTask
._stream
!= null && thisTask
._buffer
!= null,
461 "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask, and stream and buffer should be set");
466 thisTask
._stream
.Write(thisTask
._buffer
, thisTask
._offset
, thisTask
._count
);
467 return 0; // not used, but signature requires a value be returned
471 // If this implementation is part of Begin/EndXx, then the EndXx method will handle
472 // finishing the async operation. However, if this is part of XxAsync, then there won't
473 // be an end method, and this task is responsible for cleaning up.
476 thisTask
._stream
.FinishTrackingAsyncOperation();
479 thisTask
.ClearBeginState(); // just to help alleviate some memory pressure
481 }, state
, this, buffer
, offset
, count
, callback
);
484 if (semaphoreTask
!= null)
485 RunReadWriteTaskWhenReady(semaphoreTask
, asyncResult
);
487 RunReadWriteTask(asyncResult
);
489 return asyncResult
; // return it
492 private void RunReadWriteTaskWhenReady(Task asyncWaiter
, ReadWriteTask readWriteTask
)
494 Debug
.Assert(readWriteTask
!= null);
495 Debug
.Assert(asyncWaiter
!= null);
497 // If the wait has already completed, run the task.
498 if (asyncWaiter
.IsCompleted
)
500 Debug
.Assert(asyncWaiter
.IsCompletedSuccessfully
, "The semaphore wait should always complete successfully.");
501 RunReadWriteTask(readWriteTask
);
503 else // Otherwise, wait for our turn, and then run the task.
505 asyncWaiter
.ContinueWith((t
, state
) =>
507 Debug
.Assert(t
.IsCompletedSuccessfully
, "The semaphore wait should always complete successfully.");
508 var rwt
= (ReadWriteTask
)state
!;
509 Debug
.Assert(rwt
._stream
!= null);
510 rwt
._stream
.RunReadWriteTask(rwt
); // RunReadWriteTask(readWriteTask);
511 }, readWriteTask
, default, TaskContinuationOptions
.ExecuteSynchronously
, TaskScheduler
.Default
);
515 private void RunReadWriteTask(ReadWriteTask readWriteTask
)
517 Debug
.Assert(readWriteTask
!= null);
518 Debug
.Assert(_activeReadWriteTask
== null, "Expected no other readers or writers");
520 // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
521 // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
522 // two interlocked operations. However, if ReadWriteTask is ever changed to use
523 // a cancellation token, this should be changed to use Start.
524 _activeReadWriteTask
= readWriteTask
; // store the task so that EndXx can validate it's given the right one
525 readWriteTask
.m_taskScheduler
= TaskScheduler
.Default
;
526 readWriteTask
.ScheduleAndStart(needsProtection
: false);
529 private void FinishTrackingAsyncOperation()
531 _activeReadWriteTask
= null;
532 Debug
.Assert(_asyncActiveSemaphore
!= null, "Must have been initialized in order to get here.");
533 _asyncActiveSemaphore
.Release();
536 public virtual void EndWrite(IAsyncResult asyncResult
)
538 if (asyncResult
== null)
539 throw new ArgumentNullException(nameof(asyncResult
));
541 ReadWriteTask
? writeTask
= _activeReadWriteTask
;
542 if (writeTask
== null)
544 throw new ArgumentException(SR
.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple
);
546 else if (writeTask
!= asyncResult
)
548 throw new InvalidOperationException(SR
.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple
);
550 else if (writeTask
._isRead
)
552 throw new ArgumentException(SR
.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple
);
557 writeTask
.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
558 Debug
.Assert(writeTask
.Status
== TaskStatus
.RanToCompletion
);
562 FinishTrackingAsyncOperation();
566 // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
567 // A single instance of this task serves four purposes:
568 // 1. The work item scheduled to run the Read / Write operation
569 // 2. The state holding the arguments to be passed to Read / Write
570 // 3. The IAsyncResult returned from BeginRead / BeginWrite
571 // 4. The completion action that runs to invoke the user-provided callback.
572 // This last item is a bit tricky. Before the AsyncCallback is invoked, the
573 // IAsyncResult must have completed, so we can't just invoke the handler
574 // from within the task, since it is the IAsyncResult, and thus it's not
575 // yet completed. Instead, we use AddCompletionAction to install this
576 // task as its own completion handler. That saves the need to allocate
577 // a separate completion handler, it guarantees that the task will
578 // have completed by the time the handler is invoked, and it allows
579 // the handler to be invoked synchronously upon the completion of the
580 // task. This all enables BeginRead / BeginWrite to be implemented
581 // with a single allocation.
582 private sealed class ReadWriteTask
: Task
<int>, ITaskCompletionAction
584 internal readonly bool _isRead
;
585 internal readonly bool _apm
; // true if this is from Begin/EndXx; false if it's from XxAsync
586 internal Stream
? _stream
;
587 internal byte[]? _buffer
;
588 internal readonly int _offset
;
589 internal readonly int _count
;
590 private AsyncCallback
? _callback
;
591 private ExecutionContext
? _context
;
593 internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
599 public ReadWriteTask(
602 Func
<object?, int> function
, object? state
,
603 Stream stream
, byte[] buffer
, int offset
, int count
, AsyncCallback
? callback
) :
604 base(function
, state
, CancellationToken
.None
, TaskCreationOptions
.DenyChildAttach
)
606 Debug
.Assert(function
!= null);
607 Debug
.Assert(stream
!= null);
608 Debug
.Assert(buffer
!= null);
610 // Store the arguments
618 // If a callback was provided, we need to:
619 // - Store the user-provided handler
620 // - Capture an ExecutionContext under which to invoke the handler
621 // - Add this task as its own completion handler so that the Invoke method
622 // will run the callback when this task completes.
623 if (callback
!= null)
625 _callback
= callback
;
626 _context
= ExecutionContext
.Capture();
627 base.AddCompletionAction(this);
631 private static void InvokeAsyncCallback(object? completedTask
)
633 Debug
.Assert(completedTask
is ReadWriteTask
);
634 var rwc
= (ReadWriteTask
)completedTask
;
635 AsyncCallback
? callback
= rwc
._callback
;
636 Debug
.Assert(callback
!= null);
637 rwc
._callback
= null;
641 private static ContextCallback
? s_invokeAsyncCallback
;
643 void ITaskCompletionAction
.Invoke(Task completingTask
)
645 // Get the ExecutionContext. If there is none, just run the callback
646 // directly, passing in the completed task as the IAsyncResult.
647 // If there is one, process it with ExecutionContext.Run.
648 ExecutionContext
? context
= _context
;
651 AsyncCallback
? callback
= _callback
;
652 Debug
.Assert(callback
!= null);
654 callback(completingTask
);
660 ContextCallback
? invokeAsyncCallback
= s_invokeAsyncCallback
;
661 if (invokeAsyncCallback
== null) s_invokeAsyncCallback
= invokeAsyncCallback
= InvokeAsyncCallback
; // benign race condition
663 ExecutionContext
.RunInternal(context
, invokeAsyncCallback
, this);
667 bool ITaskCompletionAction
.InvokeMayRunArbitraryCode
=> true;
670 public Task
WriteAsync(byte[] buffer
, int offset
, int count
)
672 return WriteAsync(buffer
, offset
, count
, CancellationToken
.None
);
675 public virtual Task
WriteAsync(byte[] buffer
, int offset
, int count
, CancellationToken cancellationToken
)
677 // If cancellation was requested, bail early with an already completed task.
678 // Otherwise, return a task that represents the Begin/End methods.
679 return cancellationToken
.IsCancellationRequested
680 ? Task
.FromCanceled(cancellationToken
)
681 : BeginEndWriteAsync(buffer
, offset
, count
);
684 public virtual ValueTask
WriteAsync(ReadOnlyMemory
<byte> buffer
, CancellationToken cancellationToken
= default)
686 if (MemoryMarshal
.TryGetArray(buffer
, out ArraySegment
<byte> array
))
688 return new ValueTask(WriteAsync(array
.Array
!, array
.Offset
, array
.Count
, cancellationToken
));
692 byte[] sharedBuffer
= ArrayPool
<byte>.Shared
.Rent(buffer
.Length
);
693 buffer
.Span
.CopyTo(sharedBuffer
);
694 return new ValueTask(FinishWriteAsync(WriteAsync(sharedBuffer
, 0, buffer
.Length
, cancellationToken
), sharedBuffer
));
698 private async Task
FinishWriteAsync(Task writeTask
, byte[] localBuffer
)
702 await writeTask
.ConfigureAwait(false);
706 ArrayPool
<byte>.Shared
.Return(localBuffer
);
710 private Task
BeginEndWriteAsync(byte[] buffer
, int offset
, int count
)
712 if (!HasOverriddenBeginEndWrite())
714 // If the Stream does not override Begin/EndWrite, then we can take an optimized path
715 // that skips an extra layer of tasks / IAsyncResults.
716 return (Task
)BeginWriteInternal(buffer
, offset
, count
, null, null, serializeAsynchronously
: true, apm
: false);
719 // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
720 return TaskFactory
<VoidTaskResult
>.FromAsyncTrim(
721 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count }
,
722 (stream
, args
, callback
, state
) => stream
.BeginWrite(args
.Buffer
, args
.Offset
, args
.Count
, callback
, state
), // cached by compiler
723 (stream
, asyncResult
) => // cached by compiler
725 stream
.EndWrite(asyncResult
);
730 public abstract long Seek(long offset
, SeekOrigin origin
);
732 public abstract void SetLength(long value);
734 public abstract int Read(byte[] buffer
, int offset
, int count
);
736 public virtual int Read(Span
<byte> buffer
)
738 byte[] sharedBuffer
= ArrayPool
<byte>.Shared
.Rent(buffer
.Length
);
741 int numRead
= Read(sharedBuffer
, 0, buffer
.Length
);
742 if ((uint)numRead
> (uint)buffer
.Length
)
744 throw new IOException(SR
.IO_StreamTooLong
);
746 new Span
<byte>(sharedBuffer
, 0, numRead
).CopyTo(buffer
);
749 finally { ArrayPool<byte>.Shared.Return(sharedBuffer); }
752 // Reads one byte from the stream by calling Read(byte[], int, int).
753 // Will return an unsigned byte cast to an int or -1 on end of stream.
754 // This implementation does not perform well because it allocates a new
755 // byte[] each time you call it, and should be overridden by any
756 // subclass that maintains an internal buffer. Then, it can help perf
757 // significantly for people who are reading one byte at a time.
758 public virtual int ReadByte()
760 byte[] oneByteArray
= new byte[1];
761 int r
= Read(oneByteArray
, 0, 1);
764 return oneByteArray
[0];
767 public abstract void Write(byte[] buffer
, int offset
, int count
);
769 public virtual void Write(ReadOnlySpan
<byte> buffer
)
771 byte[] sharedBuffer
= ArrayPool
<byte>.Shared
.Rent(buffer
.Length
);
774 buffer
.CopyTo(sharedBuffer
);
775 Write(sharedBuffer
, 0, buffer
.Length
);
777 finally { ArrayPool<byte>.Shared.Return(sharedBuffer); }
780 // Writes one byte from the stream by calling Write(byte[], int, int).
781 // This implementation does not perform well because it allocates a new
782 // byte[] each time you call it, and should be overridden by any
783 // subclass that maintains an internal buffer. Then, it can help perf
784 // significantly for people who are writing one byte at a time.
785 public virtual void WriteByte(byte value)
787 byte[] oneByteArray
= new byte[1];
788 oneByteArray
[0] = value;
789 Write(oneByteArray
, 0, 1);
792 public static Stream
Synchronized(Stream stream
)
795 throw new ArgumentNullException(nameof(stream
));
796 if (stream
is SyncStream
)
799 return new SyncStream(stream
);
802 [Obsolete("Do not call or override this method.")]
803 protected virtual void ObjectInvariant()
807 internal IAsyncResult
BlockingBeginRead(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object? state
)
809 // To avoid a race with a stream's position pointer & generating conditions
810 // with internal buffer indexes in our own streams that
811 // don't natively support async IO operations when there are multiple
812 // async requests outstanding, we will block the application's main
813 // thread and do the IO synchronously.
814 // This can't perform well - use a different approach.
815 SynchronousAsyncResult asyncResult
;
818 int numRead
= Read(buffer
, offset
, count
);
819 asyncResult
= new SynchronousAsyncResult(numRead
, state
);
821 catch (IOException ex
)
823 asyncResult
= new SynchronousAsyncResult(ex
, state
, isWrite
: false);
826 callback
?.Invoke(asyncResult
);
831 internal static int BlockingEndRead(IAsyncResult asyncResult
)
833 return SynchronousAsyncResult
.EndRead(asyncResult
);
836 internal IAsyncResult
BlockingBeginWrite(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object? state
)
838 // To avoid a race condition with a stream's position pointer & generating conditions
839 // with internal buffer indexes in our own streams that
840 // don't natively support async IO operations when there are multiple
841 // async requests outstanding, we will block the application's main
842 // thread and do the IO synchronously.
843 // This can't perform well - use a different approach.
844 SynchronousAsyncResult asyncResult
;
847 Write(buffer
, offset
, count
);
848 asyncResult
= new SynchronousAsyncResult(state
);
850 catch (IOException ex
)
852 asyncResult
= new SynchronousAsyncResult(ex
, state
, isWrite
: true);
855 callback
?.Invoke(asyncResult
);
860 internal static void BlockingEndWrite(IAsyncResult asyncResult
)
862 SynchronousAsyncResult
.EndWrite(asyncResult
);
865 private sealed class NullStream
: Stream
867 private static readonly Task
<int> s_zeroTask
= Task
.FromResult(0);
869 internal NullStream() { }
871 public override bool CanRead
=> true;
873 public override bool CanWrite
=> true;
875 public override bool CanSeek
=> true;
877 public override long Length
=> 0;
879 public override long Position
885 public override void CopyTo(Stream destination
, int bufferSize
)
887 StreamHelpers
.ValidateCopyToArgs(this, destination
, bufferSize
);
889 // After we validate arguments this is a nop.
892 public override Task
CopyToAsync(Stream destination
, int bufferSize
, CancellationToken cancellationToken
)
894 // Validate arguments here for compat, since previously this method
895 // was inherited from Stream (which did check its arguments).
896 StreamHelpers
.ValidateCopyToArgs(this, destination
, bufferSize
);
898 return cancellationToken
.IsCancellationRequested
?
899 Task
.FromCanceled(cancellationToken
) :
903 protected override void Dispose(bool disposing
)
905 // Do nothing - we don't want NullStream singleton (static) to be closable
908 public override void Flush()
912 public override Task
FlushAsync(CancellationToken cancellationToken
)
914 return cancellationToken
.IsCancellationRequested
?
915 Task
.FromCanceled(cancellationToken
) :
919 public override IAsyncResult
BeginRead(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object? state
)
921 if (!CanRead
) throw Error
.GetReadNotSupported();
923 return BlockingBeginRead(buffer
, offset
, count
, callback
, state
);
926 public override int EndRead(IAsyncResult asyncResult
)
928 if (asyncResult
== null)
929 throw new ArgumentNullException(nameof(asyncResult
));
931 return BlockingEndRead(asyncResult
);
934 public override IAsyncResult
BeginWrite(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object? state
)
936 if (!CanWrite
) throw Error
.GetWriteNotSupported();
938 return BlockingBeginWrite(buffer
, offset
, count
, callback
, state
);
941 public override void EndWrite(IAsyncResult asyncResult
)
943 if (asyncResult
== null)
944 throw new ArgumentNullException(nameof(asyncResult
));
946 BlockingEndWrite(asyncResult
);
949 public override int Read(byte[] buffer
, int offset
, int count
)
954 public override int Read(Span
<byte> buffer
)
959 public override Task
<int> ReadAsync(byte[] buffer
, int offset
, int count
, CancellationToken cancellationToken
)
964 public override ValueTask
<int> ReadAsync(Memory
<byte> buffer
, CancellationToken cancellationToken
= default)
966 return new ValueTask
<int>(0);
969 public override int ReadByte()
974 public override void Write(byte[] buffer
, int offset
, int count
)
978 public override void Write(ReadOnlySpan
<byte> buffer
)
982 public override Task
WriteAsync(byte[] buffer
, int offset
, int count
, CancellationToken cancellationToken
)
984 return cancellationToken
.IsCancellationRequested
?
985 Task
.FromCanceled(cancellationToken
) :
989 public override ValueTask
WriteAsync(ReadOnlyMemory
<byte> buffer
, CancellationToken cancellationToken
= default)
991 return cancellationToken
.IsCancellationRequested
?
992 new ValueTask(Task
.FromCanceled(cancellationToken
)) :
996 public override void WriteByte(byte value)
1000 public override long Seek(long offset
, SeekOrigin origin
)
1005 public override void SetLength(long length
)
1011 /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
1012 private sealed class SynchronousAsyncResult
: IAsyncResult
1014 private readonly object? _stateObject
;
1015 private readonly bool _isWrite
;
1016 private ManualResetEvent
? _waitHandle
;
1017 private readonly ExceptionDispatchInfo
? _exceptionInfo
;
1019 private bool _endXxxCalled
;
1020 private readonly int _bytesRead
;
1022 internal SynchronousAsyncResult(int bytesRead
, object? asyncStateObject
)
1024 _bytesRead
= bytesRead
;
1025 _stateObject
= asyncStateObject
;
1029 internal SynchronousAsyncResult(object? asyncStateObject
)
1031 _stateObject
= asyncStateObject
;
1035 internal SynchronousAsyncResult(Exception ex
, object? asyncStateObject
, bool isWrite
)
1037 _exceptionInfo
= ExceptionDispatchInfo
.Capture(ex
);
1038 _stateObject
= asyncStateObject
;
1042 public bool IsCompleted
=> true;
1044 public WaitHandle AsyncWaitHandle
=>
1045 LazyInitializer
.EnsureInitialized(ref _waitHandle
, () => new ManualResetEvent(true));
1047 public object? AsyncState
=> _stateObject
;
1049 public bool CompletedSynchronously
=> true;
1051 internal void ThrowIfError()
1053 if (_exceptionInfo
!= null)
1054 _exceptionInfo
.Throw();
1057 internal static int EndRead(IAsyncResult asyncResult
)
1059 if (!(asyncResult
is SynchronousAsyncResult ar
) || ar
._isWrite
)
1060 throw new ArgumentException(SR
.Arg_WrongAsyncResult
);
1062 if (ar
._endXxxCalled
)
1063 throw new ArgumentException(SR
.InvalidOperation_EndReadCalledMultiple
);
1065 ar
._endXxxCalled
= true;
1068 return ar
._bytesRead
;
1071 internal static void EndWrite(IAsyncResult asyncResult
)
1073 if (!(asyncResult
is SynchronousAsyncResult ar
) || !ar
._isWrite
)
1074 throw new ArgumentException(SR
.Arg_WrongAsyncResult
);
1076 if (ar
._endXxxCalled
)
1077 throw new ArgumentException(SR
.InvalidOperation_EndWriteCalledMultiple
);
1079 ar
._endXxxCalled
= true;
1083 } // class SynchronousAsyncResult
1086 // SyncStream is a wrapper around a stream that takes
1087 // a lock for every operation making it thread safe.
1088 private sealed class SyncStream
: Stream
, IDisposable
1090 private readonly Stream _stream
;
1092 internal SyncStream(Stream stream
)
1095 throw new ArgumentNullException(nameof(stream
));
1099 public override bool CanRead
=> _stream
.CanRead
;
1101 public override bool CanWrite
=> _stream
.CanWrite
;
1103 public override bool CanSeek
=> _stream
.CanSeek
;
1105 public override bool CanTimeout
=> _stream
.CanTimeout
;
1107 public override long Length
1113 return _stream
.Length
;
1118 public override long Position
1124 return _stream
.Position
;
1131 _stream
.Position
= value;
1136 public override int ReadTimeout
1140 return _stream
.ReadTimeout
;
1144 _stream
.ReadTimeout
= value;
1148 public override int WriteTimeout
1152 return _stream
.WriteTimeout
;
1156 _stream
.WriteTimeout
= value;
1160 // In the off chance that some wrapped stream has different
1161 // semantics for Close vs. Dispose, let's preserve that.
1162 public override void Close()
1177 protected override void Dispose(bool disposing
)
1183 // Explicitly pick up a potentially methodimpl'ed Dispose
1185 ((IDisposable
)_stream
).Dispose();
1189 base.Dispose(disposing
);
1194 public override ValueTask
DisposeAsync()
1197 return _stream
.DisposeAsync();
1200 public override void Flush()
1206 public override int Read(byte[] bytes
, int offset
, int count
)
1209 return _stream
.Read(bytes
, offset
, count
);
1212 public override int Read(Span
<byte> buffer
)
1215 return _stream
.Read(buffer
);
1218 public override int ReadByte()
1221 return _stream
.ReadByte();
1224 public override IAsyncResult
BeginRead(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object? state
)
1227 throw new NotImplementedException(); // TODO: https://github.com/dotnet/corert/issues/3251
1229 bool overridesBeginRead
= _stream
.HasOverriddenBeginEndRead();
1233 // If the Stream does have its own BeginRead implementation, then we must use that override.
1234 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1235 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1236 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1237 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1238 // _stream due to this call blocked while holding the lock.
1239 return overridesBeginRead
?
1240 _stream
.BeginRead(buffer
, offset
, count
, callback
, state
) :
1241 _stream
.BeginReadInternal(buffer
, offset
, count
, callback
, state
, serializeAsynchronously
: true, apm
: true);
1246 public override int EndRead(IAsyncResult asyncResult
)
1248 if (asyncResult
== null)
1249 throw new ArgumentNullException(nameof(asyncResult
));
1252 return _stream
.EndRead(asyncResult
);
1255 public override long Seek(long offset
, SeekOrigin origin
)
1258 return _stream
.Seek(offset
, origin
);
1261 public override void SetLength(long length
)
1264 _stream
.SetLength(length
);
1267 public override void Write(byte[] bytes
, int offset
, int count
)
1270 _stream
.Write(bytes
, offset
, count
);
1273 public override void Write(ReadOnlySpan
<byte> buffer
)
1276 _stream
.Write(buffer
);
1279 public override void WriteByte(byte b
)
1282 _stream
.WriteByte(b
);
1285 public override IAsyncResult
BeginWrite(byte[] buffer
, int offset
, int count
, AsyncCallback callback
, object? state
)
1288 throw new NotImplementedException(); // TODO: https://github.com/dotnet/corert/issues/3251
1290 bool overridesBeginWrite
= _stream
.HasOverriddenBeginEndWrite();
1294 // If the Stream does have its own BeginWrite implementation, then we must use that override.
1295 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1296 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1297 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1298 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1299 // _stream due to this call blocked while holding the lock.
1300 return overridesBeginWrite
?
1301 _stream
.BeginWrite(buffer
, offset
, count
, callback
, state
) :
1302 _stream
.BeginWriteInternal(buffer
, offset
, count
, callback
, state
, serializeAsynchronously
: true, apm
: true);
1307 public override void EndWrite(IAsyncResult asyncResult
)
1309 if (asyncResult
== null)
1310 throw new ArgumentNullException(nameof(asyncResult
));
1313 _stream
.EndWrite(asyncResult
);