Fix IDE0025 (use expression body for properties)
[mono-project.git] / netcore / System.Private.CoreLib / shared / System / IO / Stream.cs
blob4f60da7b6634a28ad5c6606fe9e274bb108f3ac7
1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 /*============================================================
6 **
7 **
8 **
9 **
11 ** Purpose: Abstract base class for all Streams. Provides
12 ** default implementations of asynchronous reads & writes, in
13 ** terms of the synchronous reads & writes (and vice versa).
16 ===========================================================*/
18 using System.Buffers;
19 using System.Diagnostics;
20 using System.Runtime.ExceptionServices;
21 using System.Runtime.InteropServices;
22 using System.Threading;
23 using System.Threading.Tasks;
25 namespace System.IO
27 public abstract partial class Stream : MarshalByRefObject, IDisposable, IAsyncDisposable
29 public static readonly Stream Null = new NullStream();
31 // We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
32 // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
33 // improvement in Copy performance.
34 private const int DefaultCopyBufferSize = 81920;
36 // To implement Async IO operations on streams that don't support async IO
38 private ReadWriteTask? _activeReadWriteTask;
39 private SemaphoreSlim? _asyncActiveSemaphore;
41 internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
43 // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
44 // WaitHandle, we don't need to worry about Disposing it.
45 return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
48 public abstract bool CanRead
50 get;
53 // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
54 public abstract bool CanSeek
56 get;
59 public virtual bool CanTimeout => false;
61 public abstract bool CanWrite
63 get;
66 public abstract long Length
68 get;
71 public abstract long Position
73 get;
74 set;
77 public virtual int ReadTimeout
79 get
81 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
83 set
85 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
89 public virtual int WriteTimeout
91 get
93 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
95 set
97 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
101 public Task CopyToAsync(Stream destination)
103 int bufferSize = GetCopyBufferSize();
105 return CopyToAsync(destination, bufferSize);
108 public Task CopyToAsync(Stream destination, int bufferSize)
110 return CopyToAsync(destination, bufferSize, CancellationToken.None);
113 public Task CopyToAsync(Stream destination, CancellationToken cancellationToken)
115 int bufferSize = GetCopyBufferSize();
117 return CopyToAsync(destination, bufferSize, cancellationToken);
120 public virtual Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
122 StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
124 return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
127 private async Task CopyToAsyncInternal(Stream destination, int bufferSize, CancellationToken cancellationToken)
129 byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
132 while (true)
134 int bytesRead = await ReadAsync(new Memory<byte>(buffer), cancellationToken).ConfigureAwait(false);
135 if (bytesRead == 0) break;
136 await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false);
139 finally
141 ArrayPool<byte>.Shared.Return(buffer);
145 // Reads the bytes from the current stream and writes the bytes to
146 // the destination stream until all bytes are read, starting at
147 // the current position.
148 public void CopyTo(Stream destination)
150 int bufferSize = GetCopyBufferSize();
152 CopyTo(destination, bufferSize);
155 public virtual void CopyTo(Stream destination, int bufferSize)
157 StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
159 byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
162 int read;
163 while ((read = Read(buffer, 0, buffer.Length)) != 0)
165 destination.Write(buffer, 0, read);
168 finally
170 ArrayPool<byte>.Shared.Return(buffer);
174 private int GetCopyBufferSize()
176 int bufferSize = DefaultCopyBufferSize;
178 if (CanSeek)
180 long length = Length;
181 long position = Position;
182 if (length <= position) // Handles negative overflows
184 // There are no bytes left in the stream to copy.
185 // However, because CopyTo{Async} is virtual, we need to
186 // ensure that any override is still invoked to provide its
187 // own validation, so we use the smallest legal buffer size here.
188 bufferSize = 1;
190 else
192 long remaining = length - position;
193 if (remaining > 0)
195 // In the case of a positive overflow, stick to the default size
196 bufferSize = (int)Math.Min(bufferSize, remaining);
201 return bufferSize;
204 // Stream used to require that all cleanup logic went into Close(),
205 // which was thought up before we invented IDisposable. However, we
206 // need to follow the IDisposable pattern so that users can write
207 // sensible subclasses without needing to inspect all their base
208 // classes, and without worrying about version brittleness, from a
209 // base class switching to the Dispose pattern. We're moving
210 // Stream to the Dispose(bool) pattern - that's where all subclasses
211 // should put their cleanup now.
212 public virtual void Close()
214 Dispose(true);
215 GC.SuppressFinalize(this);
218 public void Dispose()
220 Close();
223 protected virtual void Dispose(bool disposing)
225 // Note: Never change this to call other virtual methods on Stream
226 // like Write, since the state on subclasses has already been
227 // torn down. This is the last code to run on cleanup for a stream.
230 public virtual ValueTask DisposeAsync()
234 Dispose();
235 return default;
237 catch (Exception exc)
239 return new ValueTask(Task.FromException(exc));
243 public abstract void Flush();
245 public Task FlushAsync()
247 return FlushAsync(CancellationToken.None);
250 public virtual Task FlushAsync(CancellationToken cancellationToken)
252 return Task.Factory.StartNew(state => ((Stream)state!).Flush(), this,
253 cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
256 [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
257 protected virtual WaitHandle CreateWaitHandle()
259 return new ManualResetEvent(false);
262 public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
264 return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
267 internal IAsyncResult BeginReadInternal(
268 byte[] buffer, int offset, int count, AsyncCallback? callback, object? state,
269 bool serializeAsynchronously, bool apm)
271 if (!CanRead) throw Error.GetReadNotSupported();
273 // To avoid a race with a stream's position pointer & generating race conditions
274 // with internal buffer indexes in our own streams that
275 // don't natively support async IO operations when there are multiple
276 // async requests outstanding, we will block the application's main
277 // thread if it does a second IO request until the first one completes.
278 SemaphoreSlim semaphore = EnsureAsyncActiveSemaphoreInitialized();
279 Task? semaphoreTask = null;
280 if (serializeAsynchronously)
282 semaphoreTask = semaphore.WaitAsync();
284 else
286 semaphore.Wait();
289 // Create the task to asynchronously do a Read. This task serves both
290 // as the asynchronous work item and as the IAsyncResult returned to the user.
291 var asyncResult = new ReadWriteTask(true /*isRead*/, apm, delegate
293 // The ReadWriteTask stores all of the parameters to pass to Read.
294 // As we're currently inside of it, we can get the current task
295 // and grab the parameters from it.
296 var thisTask = Task.InternalCurrent as ReadWriteTask;
297 Debug.Assert(thisTask != null && thisTask._stream != null && thisTask._buffer != null,
298 "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask, and stream and buffer should be set");
302 // Do the Read and return the number of bytes read
303 return thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
305 finally
307 // If this implementation is part of Begin/EndXx, then the EndXx method will handle
308 // finishing the async operation. However, if this is part of XxAsync, then there won't
309 // be an end method, and this task is responsible for cleaning up.
310 if (!thisTask._apm)
312 thisTask._stream.FinishTrackingAsyncOperation();
315 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
317 }, state, this, buffer, offset, count, callback);
319 // Schedule it
320 if (semaphoreTask != null)
321 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
322 else
323 RunReadWriteTask(asyncResult);
326 return asyncResult; // return it
329 public virtual int EndRead(IAsyncResult asyncResult)
331 if (asyncResult == null)
332 throw new ArgumentNullException(nameof(asyncResult));
334 ReadWriteTask? readTask = _activeReadWriteTask;
336 if (readTask == null)
338 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
340 else if (readTask != asyncResult)
342 throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
344 else if (!readTask._isRead)
346 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
351 return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
353 finally
355 FinishTrackingAsyncOperation();
359 public Task<int> ReadAsync(byte[] buffer, int offset, int count)
361 return ReadAsync(buffer, offset, count, CancellationToken.None);
364 public virtual Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
366 // If cancellation was requested, bail early with an already completed task.
367 // Otherwise, return a task that represents the Begin/End methods.
368 return cancellationToken.IsCancellationRequested
369 ? Task.FromCanceled<int>(cancellationToken)
370 : BeginEndReadAsync(buffer, offset, count);
373 public virtual ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
375 if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
377 return new ValueTask<int>(ReadAsync(array.Array!, array.Offset, array.Count, cancellationToken));
379 else
381 byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
382 return FinishReadAsync(ReadAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer, buffer);
384 static async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination)
388 int result = await readTask.ConfigureAwait(false);
389 new Span<byte>(localBuffer, 0, result).CopyTo(localDestination.Span);
390 return result;
392 finally
394 ArrayPool<byte>.Shared.Return(localBuffer);
400 private Task<int> BeginEndReadAsync(byte[] buffer, int offset, int count)
402 if (!HasOverriddenBeginEndRead())
404 // If the Stream does not override Begin/EndRead, then we can take an optimized path
405 // that skips an extra layer of tasks / IAsyncResults.
406 return (Task<int>)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
409 // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
410 return TaskFactory<int>.FromAsyncTrim(
411 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
412 (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
413 (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
416 private struct ReadWriteParameters // struct for arguments to Read and Write calls
418 internal byte[] Buffer;
419 internal int Offset;
420 internal int Count;
425 public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
427 return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
430 internal IAsyncResult BeginWriteInternal(
431 byte[] buffer, int offset, int count, AsyncCallback? callback, object? state,
432 bool serializeAsynchronously, bool apm)
434 if (!CanWrite) throw Error.GetWriteNotSupported();
436 // To avoid a race condition with a stream's position pointer & generating conditions
437 // with internal buffer indexes in our own streams that
438 // don't natively support async IO operations when there are multiple
439 // async requests outstanding, we will block the application's main
440 // thread if it does a second IO request until the first one completes.
441 SemaphoreSlim semaphore = EnsureAsyncActiveSemaphoreInitialized();
442 Task? semaphoreTask = null;
443 if (serializeAsynchronously)
445 semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
447 else
449 semaphore.Wait(); // synchronously wait here
452 // Create the task to asynchronously do a Write. This task serves both
453 // as the asynchronous work item and as the IAsyncResult returned to the user.
454 var asyncResult = new ReadWriteTask(false /*isRead*/, apm, delegate
456 // The ReadWriteTask stores all of the parameters to pass to Write.
457 // As we're currently inside of it, we can get the current task
458 // and grab the parameters from it.
459 var thisTask = Task.InternalCurrent as ReadWriteTask;
460 Debug.Assert(thisTask != null && thisTask._stream != null && thisTask._buffer != null,
461 "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask, and stream and buffer should be set");
465 // Do the Write
466 thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
467 return 0; // not used, but signature requires a value be returned
469 finally
471 // If this implementation is part of Begin/EndXx, then the EndXx method will handle
472 // finishing the async operation. However, if this is part of XxAsync, then there won't
473 // be an end method, and this task is responsible for cleaning up.
474 if (!thisTask._apm)
476 thisTask._stream.FinishTrackingAsyncOperation();
479 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
481 }, state, this, buffer, offset, count, callback);
483 // Schedule it
484 if (semaphoreTask != null)
485 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
486 else
487 RunReadWriteTask(asyncResult);
489 return asyncResult; // return it
492 private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
494 Debug.Assert(readWriteTask != null);
495 Debug.Assert(asyncWaiter != null);
497 // If the wait has already completed, run the task.
498 if (asyncWaiter.IsCompleted)
500 Debug.Assert(asyncWaiter.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
501 RunReadWriteTask(readWriteTask);
503 else // Otherwise, wait for our turn, and then run the task.
505 asyncWaiter.ContinueWith((t, state) =>
507 Debug.Assert(t.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
508 var rwt = (ReadWriteTask)state!;
509 Debug.Assert(rwt._stream != null);
510 rwt._stream.RunReadWriteTask(rwt); // RunReadWriteTask(readWriteTask);
511 }, readWriteTask, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
515 private void RunReadWriteTask(ReadWriteTask readWriteTask)
517 Debug.Assert(readWriteTask != null);
518 Debug.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
520 // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
521 // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
522 // two interlocked operations. However, if ReadWriteTask is ever changed to use
523 // a cancellation token, this should be changed to use Start.
524 _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
525 readWriteTask.m_taskScheduler = TaskScheduler.Default;
526 readWriteTask.ScheduleAndStart(needsProtection: false);
529 private void FinishTrackingAsyncOperation()
531 _activeReadWriteTask = null;
532 Debug.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
533 _asyncActiveSemaphore.Release();
536 public virtual void EndWrite(IAsyncResult asyncResult)
538 if (asyncResult == null)
539 throw new ArgumentNullException(nameof(asyncResult));
541 ReadWriteTask? writeTask = _activeReadWriteTask;
542 if (writeTask == null)
544 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
546 else if (writeTask != asyncResult)
548 throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
550 else if (writeTask._isRead)
552 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
557 writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
558 Debug.Assert(writeTask.Status == TaskStatus.RanToCompletion);
560 finally
562 FinishTrackingAsyncOperation();
566 // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
567 // A single instance of this task serves four purposes:
568 // 1. The work item scheduled to run the Read / Write operation
569 // 2. The state holding the arguments to be passed to Read / Write
570 // 3. The IAsyncResult returned from BeginRead / BeginWrite
571 // 4. The completion action that runs to invoke the user-provided callback.
572 // This last item is a bit tricky. Before the AsyncCallback is invoked, the
573 // IAsyncResult must have completed, so we can't just invoke the handler
574 // from within the task, since it is the IAsyncResult, and thus it's not
575 // yet completed. Instead, we use AddCompletionAction to install this
576 // task as its own completion handler. That saves the need to allocate
577 // a separate completion handler, it guarantees that the task will
578 // have completed by the time the handler is invoked, and it allows
579 // the handler to be invoked synchronously upon the completion of the
580 // task. This all enables BeginRead / BeginWrite to be implemented
581 // with a single allocation.
582 private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
584 internal readonly bool _isRead;
585 internal readonly bool _apm; // true if this is from Begin/EndXx; false if it's from XxAsync
586 internal Stream? _stream;
587 internal byte[]? _buffer;
588 internal readonly int _offset;
589 internal readonly int _count;
590 private AsyncCallback? _callback;
591 private ExecutionContext? _context;
593 internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
595 _stream = null;
596 _buffer = null;
599 public ReadWriteTask(
600 bool isRead,
601 bool apm,
602 Func<object?, int> function, object? state,
603 Stream stream, byte[] buffer, int offset, int count, AsyncCallback? callback) :
604 base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
606 Debug.Assert(function != null);
607 Debug.Assert(stream != null);
608 Debug.Assert(buffer != null);
610 // Store the arguments
611 _isRead = isRead;
612 _apm = apm;
613 _stream = stream;
614 _buffer = buffer;
615 _offset = offset;
616 _count = count;
618 // If a callback was provided, we need to:
619 // - Store the user-provided handler
620 // - Capture an ExecutionContext under which to invoke the handler
621 // - Add this task as its own completion handler so that the Invoke method
622 // will run the callback when this task completes.
623 if (callback != null)
625 _callback = callback;
626 _context = ExecutionContext.Capture();
627 base.AddCompletionAction(this);
631 private static void InvokeAsyncCallback(object? completedTask)
633 Debug.Assert(completedTask is ReadWriteTask);
634 var rwc = (ReadWriteTask)completedTask;
635 AsyncCallback? callback = rwc._callback;
636 Debug.Assert(callback != null);
637 rwc._callback = null;
638 callback(rwc);
641 private static ContextCallback? s_invokeAsyncCallback;
643 void ITaskCompletionAction.Invoke(Task completingTask)
645 // Get the ExecutionContext. If there is none, just run the callback
646 // directly, passing in the completed task as the IAsyncResult.
647 // If there is one, process it with ExecutionContext.Run.
648 ExecutionContext? context = _context;
649 if (context == null)
651 AsyncCallback? callback = _callback;
652 Debug.Assert(callback != null);
653 _callback = null;
654 callback(completingTask);
656 else
658 _context = null;
660 ContextCallback? invokeAsyncCallback = s_invokeAsyncCallback;
661 if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign race condition
663 ExecutionContext.RunInternal(context, invokeAsyncCallback, this);
667 bool ITaskCompletionAction.InvokeMayRunArbitraryCode => true;
670 public Task WriteAsync(byte[] buffer, int offset, int count)
672 return WriteAsync(buffer, offset, count, CancellationToken.None);
675 public virtual Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
677 // If cancellation was requested, bail early with an already completed task.
678 // Otherwise, return a task that represents the Begin/End methods.
679 return cancellationToken.IsCancellationRequested
680 ? Task.FromCanceled(cancellationToken)
681 : BeginEndWriteAsync(buffer, offset, count);
684 public virtual ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
686 if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
688 return new ValueTask(WriteAsync(array.Array!, array.Offset, array.Count, cancellationToken));
690 else
692 byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
693 buffer.Span.CopyTo(sharedBuffer);
694 return new ValueTask(FinishWriteAsync(WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer));
698 private async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
702 await writeTask.ConfigureAwait(false);
704 finally
706 ArrayPool<byte>.Shared.Return(localBuffer);
710 private Task BeginEndWriteAsync(byte[] buffer, int offset, int count)
712 if (!HasOverriddenBeginEndWrite())
714 // If the Stream does not override Begin/EndWrite, then we can take an optimized path
715 // that skips an extra layer of tasks / IAsyncResults.
716 return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
719 // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
720 return TaskFactory<VoidTaskResult>.FromAsyncTrim(
721 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
722 (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
723 (stream, asyncResult) => // cached by compiler
725 stream.EndWrite(asyncResult);
726 return default;
730 public abstract long Seek(long offset, SeekOrigin origin);
732 public abstract void SetLength(long value);
734 public abstract int Read(byte[] buffer, int offset, int count);
736 public virtual int Read(Span<byte> buffer)
738 byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
741 int numRead = Read(sharedBuffer, 0, buffer.Length);
742 if ((uint)numRead > (uint)buffer.Length)
744 throw new IOException(SR.IO_StreamTooLong);
746 new Span<byte>(sharedBuffer, 0, numRead).CopyTo(buffer);
747 return numRead;
749 finally { ArrayPool<byte>.Shared.Return(sharedBuffer); }
752 // Reads one byte from the stream by calling Read(byte[], int, int).
753 // Will return an unsigned byte cast to an int or -1 on end of stream.
754 // This implementation does not perform well because it allocates a new
755 // byte[] each time you call it, and should be overridden by any
756 // subclass that maintains an internal buffer. Then, it can help perf
757 // significantly for people who are reading one byte at a time.
758 public virtual int ReadByte()
760 byte[] oneByteArray = new byte[1];
761 int r = Read(oneByteArray, 0, 1);
762 if (r == 0)
763 return -1;
764 return oneByteArray[0];
767 public abstract void Write(byte[] buffer, int offset, int count);
769 public virtual void Write(ReadOnlySpan<byte> buffer)
771 byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
774 buffer.CopyTo(sharedBuffer);
775 Write(sharedBuffer, 0, buffer.Length);
777 finally { ArrayPool<byte>.Shared.Return(sharedBuffer); }
780 // Writes one byte from the stream by calling Write(byte[], int, int).
781 // This implementation does not perform well because it allocates a new
782 // byte[] each time you call it, and should be overridden by any
783 // subclass that maintains an internal buffer. Then, it can help perf
784 // significantly for people who are writing one byte at a time.
785 public virtual void WriteByte(byte value)
787 byte[] oneByteArray = new byte[1];
788 oneByteArray[0] = value;
789 Write(oneByteArray, 0, 1);
792 public static Stream Synchronized(Stream stream)
794 if (stream == null)
795 throw new ArgumentNullException(nameof(stream));
796 if (stream is SyncStream)
797 return stream;
799 return new SyncStream(stream);
802 [Obsolete("Do not call or override this method.")]
803 protected virtual void ObjectInvariant()
807 internal IAsyncResult BlockingBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
809 // To avoid a race with a stream's position pointer & generating conditions
810 // with internal buffer indexes in our own streams that
811 // don't natively support async IO operations when there are multiple
812 // async requests outstanding, we will block the application's main
813 // thread and do the IO synchronously.
814 // This can't perform well - use a different approach.
815 SynchronousAsyncResult asyncResult;
818 int numRead = Read(buffer, offset, count);
819 asyncResult = new SynchronousAsyncResult(numRead, state);
821 catch (IOException ex)
823 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: false);
826 callback?.Invoke(asyncResult);
828 return asyncResult;
831 internal static int BlockingEndRead(IAsyncResult asyncResult)
833 return SynchronousAsyncResult.EndRead(asyncResult);
836 internal IAsyncResult BlockingBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
838 // To avoid a race condition with a stream's position pointer & generating conditions
839 // with internal buffer indexes in our own streams that
840 // don't natively support async IO operations when there are multiple
841 // async requests outstanding, we will block the application's main
842 // thread and do the IO synchronously.
843 // This can't perform well - use a different approach.
844 SynchronousAsyncResult asyncResult;
847 Write(buffer, offset, count);
848 asyncResult = new SynchronousAsyncResult(state);
850 catch (IOException ex)
852 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: true);
855 callback?.Invoke(asyncResult);
857 return asyncResult;
860 internal static void BlockingEndWrite(IAsyncResult asyncResult)
862 SynchronousAsyncResult.EndWrite(asyncResult);
865 private sealed class NullStream : Stream
867 private static readonly Task<int> s_zeroTask = Task.FromResult(0);
869 internal NullStream() { }
871 public override bool CanRead => true;
873 public override bool CanWrite => true;
875 public override bool CanSeek => true;
877 public override long Length => 0;
879 public override long Position
881 get { return 0; }
882 set { }
885 public override void CopyTo(Stream destination, int bufferSize)
887 StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
889 // After we validate arguments this is a nop.
892 public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
894 // Validate arguments here for compat, since previously this method
895 // was inherited from Stream (which did check its arguments).
896 StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
898 return cancellationToken.IsCancellationRequested ?
899 Task.FromCanceled(cancellationToken) :
900 Task.CompletedTask;
903 protected override void Dispose(bool disposing)
905 // Do nothing - we don't want NullStream singleton (static) to be closable
908 public override void Flush()
912 public override Task FlushAsync(CancellationToken cancellationToken)
914 return cancellationToken.IsCancellationRequested ?
915 Task.FromCanceled(cancellationToken) :
916 Task.CompletedTask;
919 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
921 if (!CanRead) throw Error.GetReadNotSupported();
923 return BlockingBeginRead(buffer, offset, count, callback, state);
926 public override int EndRead(IAsyncResult asyncResult)
928 if (asyncResult == null)
929 throw new ArgumentNullException(nameof(asyncResult));
931 return BlockingEndRead(asyncResult);
934 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
936 if (!CanWrite) throw Error.GetWriteNotSupported();
938 return BlockingBeginWrite(buffer, offset, count, callback, state);
941 public override void EndWrite(IAsyncResult asyncResult)
943 if (asyncResult == null)
944 throw new ArgumentNullException(nameof(asyncResult));
946 BlockingEndWrite(asyncResult);
949 public override int Read(byte[] buffer, int offset, int count)
951 return 0;
954 public override int Read(Span<byte> buffer)
956 return 0;
959 public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
961 return s_zeroTask;
964 public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
966 return new ValueTask<int>(0);
969 public override int ReadByte()
971 return -1;
974 public override void Write(byte[] buffer, int offset, int count)
978 public override void Write(ReadOnlySpan<byte> buffer)
982 public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
984 return cancellationToken.IsCancellationRequested ?
985 Task.FromCanceled(cancellationToken) :
986 Task.CompletedTask;
989 public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
991 return cancellationToken.IsCancellationRequested ?
992 new ValueTask(Task.FromCanceled(cancellationToken)) :
993 default;
996 public override void WriteByte(byte value)
1000 public override long Seek(long offset, SeekOrigin origin)
1002 return 0;
1005 public override void SetLength(long length)
1011 /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
1012 private sealed class SynchronousAsyncResult : IAsyncResult
1014 private readonly object? _stateObject;
1015 private readonly bool _isWrite;
1016 private ManualResetEvent? _waitHandle;
1017 private readonly ExceptionDispatchInfo? _exceptionInfo;
1019 private bool _endXxxCalled;
1020 private readonly int _bytesRead;
1022 internal SynchronousAsyncResult(int bytesRead, object? asyncStateObject)
1024 _bytesRead = bytesRead;
1025 _stateObject = asyncStateObject;
1026 //_isWrite = false;
1029 internal SynchronousAsyncResult(object? asyncStateObject)
1031 _stateObject = asyncStateObject;
1032 _isWrite = true;
1035 internal SynchronousAsyncResult(Exception ex, object? asyncStateObject, bool isWrite)
1037 _exceptionInfo = ExceptionDispatchInfo.Capture(ex);
1038 _stateObject = asyncStateObject;
1039 _isWrite = isWrite;
1042 public bool IsCompleted => true;
1044 public WaitHandle AsyncWaitHandle =>
1045 LazyInitializer.EnsureInitialized(ref _waitHandle, () => new ManualResetEvent(true));
1047 public object? AsyncState => _stateObject;
1049 public bool CompletedSynchronously => true;
1051 internal void ThrowIfError()
1053 if (_exceptionInfo != null)
1054 _exceptionInfo.Throw();
1057 internal static int EndRead(IAsyncResult asyncResult)
1059 if (!(asyncResult is SynchronousAsyncResult ar) || ar._isWrite)
1060 throw new ArgumentException(SR.Arg_WrongAsyncResult);
1062 if (ar._endXxxCalled)
1063 throw new ArgumentException(SR.InvalidOperation_EndReadCalledMultiple);
1065 ar._endXxxCalled = true;
1067 ar.ThrowIfError();
1068 return ar._bytesRead;
1071 internal static void EndWrite(IAsyncResult asyncResult)
1073 if (!(asyncResult is SynchronousAsyncResult ar) || !ar._isWrite)
1074 throw new ArgumentException(SR.Arg_WrongAsyncResult);
1076 if (ar._endXxxCalled)
1077 throw new ArgumentException(SR.InvalidOperation_EndWriteCalledMultiple);
1079 ar._endXxxCalled = true;
1081 ar.ThrowIfError();
1083 } // class SynchronousAsyncResult
1086 // SyncStream is a wrapper around a stream that takes
1087 // a lock for every operation making it thread safe.
1088 private sealed class SyncStream : Stream, IDisposable
1090 private readonly Stream _stream;
1092 internal SyncStream(Stream stream)
1094 if (stream == null)
1095 throw new ArgumentNullException(nameof(stream));
1096 _stream = stream;
1099 public override bool CanRead => _stream.CanRead;
1101 public override bool CanWrite => _stream.CanWrite;
1103 public override bool CanSeek => _stream.CanSeek;
1105 public override bool CanTimeout => _stream.CanTimeout;
1107 public override long Length
1111 lock (_stream)
1113 return _stream.Length;
1118 public override long Position
1122 lock (_stream)
1124 return _stream.Position;
1129 lock (_stream)
1131 _stream.Position = value;
1136 public override int ReadTimeout
1140 return _stream.ReadTimeout;
1144 _stream.ReadTimeout = value;
1148 public override int WriteTimeout
1152 return _stream.WriteTimeout;
1156 _stream.WriteTimeout = value;
1160 // In the off chance that some wrapped stream has different
1161 // semantics for Close vs. Dispose, let's preserve that.
1162 public override void Close()
1164 lock (_stream)
1168 _stream.Close();
1170 finally
1172 base.Dispose(true);
1177 protected override void Dispose(bool disposing)
1179 lock (_stream)
1183 // Explicitly pick up a potentially methodimpl'ed Dispose
1184 if (disposing)
1185 ((IDisposable)_stream).Dispose();
1187 finally
1189 base.Dispose(disposing);
1194 public override ValueTask DisposeAsync()
1196 lock (_stream)
1197 return _stream.DisposeAsync();
1200 public override void Flush()
1202 lock (_stream)
1203 _stream.Flush();
1206 public override int Read(byte[] bytes, int offset, int count)
1208 lock (_stream)
1209 return _stream.Read(bytes, offset, count);
1212 public override int Read(Span<byte> buffer)
1214 lock (_stream)
1215 return _stream.Read(buffer);
1218 public override int ReadByte()
1220 lock (_stream)
1221 return _stream.ReadByte();
1224 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
1226 #if CORERT
1227 throw new NotImplementedException(); // TODO: https://github.com/dotnet/corert/issues/3251
1228 #else
1229 bool overridesBeginRead = _stream.HasOverriddenBeginEndRead();
1231 lock (_stream)
1233 // If the Stream does have its own BeginRead implementation, then we must use that override.
1234 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1235 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1236 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1237 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1238 // _stream due to this call blocked while holding the lock.
1239 return overridesBeginRead ?
1240 _stream.BeginRead(buffer, offset, count, callback, state) :
1241 _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
1243 #endif
1246 public override int EndRead(IAsyncResult asyncResult)
1248 if (asyncResult == null)
1249 throw new ArgumentNullException(nameof(asyncResult));
1251 lock (_stream)
1252 return _stream.EndRead(asyncResult);
1255 public override long Seek(long offset, SeekOrigin origin)
1257 lock (_stream)
1258 return _stream.Seek(offset, origin);
1261 public override void SetLength(long length)
1263 lock (_stream)
1264 _stream.SetLength(length);
1267 public override void Write(byte[] bytes, int offset, int count)
1269 lock (_stream)
1270 _stream.Write(bytes, offset, count);
1273 public override void Write(ReadOnlySpan<byte> buffer)
1275 lock (_stream)
1276 _stream.Write(buffer);
1279 public override void WriteByte(byte b)
1281 lock (_stream)
1282 _stream.WriteByte(b);
1285 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
1287 #if CORERT
1288 throw new NotImplementedException(); // TODO: https://github.com/dotnet/corert/issues/3251
1289 #else
1290 bool overridesBeginWrite = _stream.HasOverriddenBeginEndWrite();
1292 lock (_stream)
1294 // If the Stream does have its own BeginWrite implementation, then we must use that override.
1295 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1296 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1297 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1298 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1299 // _stream due to this call blocked while holding the lock.
1300 return overridesBeginWrite ?
1301 _stream.BeginWrite(buffer, offset, count, callback, state) :
1302 _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
1304 #endif
1307 public override void EndWrite(IAsyncResult asyncResult)
1309 if (asyncResult == null)
1310 throw new ArgumentNullException(nameof(asyncResult));
1312 lock (_stream)
1313 _stream.EndWrite(asyncResult);