1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 using System
.Diagnostics
;
6 using System
.Runtime
.CompilerServices
;
7 using System
.Runtime
.InteropServices
;
8 using System
.Threading
;
9 using System
.Threading
.Tasks
;
14 * This class is used to access a contiguous block of memory, likely outside
15 * the GC heap (or pinned in place in the GC heap, but a MemoryStream may
16 * make more sense in those cases). It's great if you have a pointer and
17 * a length for a section of memory mapped in by someone else and you don't
18 * want to copy this into the GC heap. UnmanagedMemoryStream assumes these
21 * 1) All the memory in the specified block is readable or writable,
22 * depending on the values you pass to the constructor.
23 * 2) The lifetime of the block of memory is at least as long as the lifetime
24 * of the UnmanagedMemoryStream.
25 * 3) You clean up the memory when appropriate. The UnmanagedMemoryStream
26 * currently will do NOTHING to free this memory.
27 * 4) All calls to Write and WriteByte may not be threadsafe currently.
29 * It may become necessary to add in some sort of
30 * DeallocationMode enum, specifying whether we unmap a section of memory,
31 * call free, run a user-provided delegate to free the memory, etc.
32 * We'll suggest user write a subclass of UnmanagedMemoryStream that uses
33 * a SafeHandle subclass to hold onto the memory.
38 /// Stream over a memory pointer or over a SafeBuffer
40 public class UnmanagedMemoryStream
: Stream
42 private SafeBuffer
? _buffer
;
43 private unsafe byte* _mem
;
45 private long _capacity
;
46 private long _position
;
48 private FileAccess _access
;
50 private Task
<int>? _lastReadTask
; // The last successful task returned from ReadAsync
53 /// Creates a closed stream.
55 // Needed for subclasses that need to map a file, etc.
56 protected UnmanagedMemoryStream()
66 /// Creates a stream over a SafeBuffer.
68 /// <param name="buffer"></param>
69 /// <param name="offset"></param>
70 /// <param name="length"></param>
71 public UnmanagedMemoryStream(SafeBuffer buffer
, long offset
, long length
)
73 Initialize(buffer
, offset
, length
, FileAccess
.Read
);
77 /// Creates a stream over a SafeBuffer.
79 public UnmanagedMemoryStream(SafeBuffer buffer
, long offset
, long length
, FileAccess access
)
81 Initialize(buffer
, offset
, length
, access
);
85 /// Subclasses must call this method (or the other overload) to properly initialize all instance fields.
87 /// <param name="buffer"></param>
88 /// <param name="offset"></param>
89 /// <param name="length"></param>
90 /// <param name="access"></param>
91 protected void Initialize(SafeBuffer buffer
, long offset
, long length
, FileAccess access
)
95 throw new ArgumentNullException(nameof(buffer
));
99 throw new ArgumentOutOfRangeException(nameof(offset
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
103 throw new ArgumentOutOfRangeException(nameof(length
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
105 if (buffer
.ByteLength
< (ulong)(offset
+ length
))
107 throw new ArgumentException(SR
.Argument_InvalidSafeBufferOffLen
);
109 if (access
< FileAccess
.Read
|| access
> FileAccess
.ReadWrite
)
111 throw new ArgumentOutOfRangeException(nameof(access
));
116 throw new InvalidOperationException(SR
.InvalidOperation_CalledTwice
);
119 // check for wraparound
122 byte* pointer
= null;
123 RuntimeHelpers
.PrepareConstrainedRegions();
126 buffer
.AcquirePointer(ref pointer
);
127 if ((pointer
+ offset
+ length
) < pointer
)
129 throw new ArgumentException(SR
.ArgumentOutOfRange_UnmanagedMemStreamWrapAround
);
136 buffer
.ReleasePointer();
150 /// Creates a stream over a byte*.
152 [CLSCompliant(false)]
153 public unsafe UnmanagedMemoryStream(byte* pointer
, long length
)
155 Initialize(pointer
, length
, length
, FileAccess
.Read
);
159 /// Creates a stream over a byte*.
161 [CLSCompliant(false)]
162 public unsafe UnmanagedMemoryStream(byte* pointer
, long length
, long capacity
, FileAccess access
)
164 Initialize(pointer
, length
, capacity
, access
);
168 /// Subclasses must call this method (or the other overload) to properly initialize all instance fields.
170 [CLSCompliant(false)]
171 protected unsafe void Initialize(byte* pointer
, long length
, long capacity
, FileAccess access
)
174 throw new ArgumentNullException(nameof(pointer
));
175 if (length
< 0 || capacity
< 0)
176 throw new ArgumentOutOfRangeException((length
< 0) ? nameof(length
) : nameof(capacity
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
177 if (length
> capacity
)
178 throw new ArgumentOutOfRangeException(nameof(length
), SR
.ArgumentOutOfRange_LengthGreaterThanCapacity
);
179 // Check for wraparound.
180 if (((byte*)((long)pointer
+ capacity
)) < pointer
)
181 throw new ArgumentOutOfRangeException(nameof(capacity
), SR
.ArgumentOutOfRange_UnmanagedMemStreamWrapAround
);
182 if (access
< FileAccess
.Read
|| access
> FileAccess
.ReadWrite
)
183 throw new ArgumentOutOfRangeException(nameof(access
), SR
.ArgumentOutOfRange_Enum
);
185 throw new InvalidOperationException(SR
.InvalidOperation_CalledTwice
);
190 _capacity
= capacity
;
196 /// Returns true if the stream can be read; otherwise returns false.
198 public override bool CanRead
=> _isOpen
&& (_access
& FileAccess
.Read
) != 0;
201 /// Returns true if the stream can seek; otherwise returns false.
203 public override bool CanSeek
=> _isOpen
;
206 /// Returns true if the stream can be written to; otherwise returns false.
208 public override bool CanWrite
=> _isOpen
&& (_access
& FileAccess
.Write
) != 0;
211 /// Closes the stream. The stream's memory needs to be dealt with separately.
213 /// <param name="disposing"></param>
214 protected override void Dispose(bool disposing
)
217 unsafe { _mem = null; }
219 // Stream allocates WaitHandles for async calls. So for correctness
220 // call base.Dispose(disposing) for better perf, avoiding waiting
221 // for the finalizers to run on those types.
222 base.Dispose(disposing
);
225 private void EnsureNotClosed()
228 throw Error
.GetStreamIsClosed();
231 private void EnsureReadable()
234 throw Error
.GetReadNotSupported();
237 private void EnsureWriteable()
240 throw Error
.GetWriteNotSupported();
244 /// Since it's a memory stream, this method does nothing.
246 public override void Flush()
252 /// Since it's a memory stream, this method does nothing specific.
254 /// <param name="cancellationToken"></param>
255 /// <returns></returns>
256 public override Task
FlushAsync(CancellationToken cancellationToken
)
258 if (cancellationToken
.IsCancellationRequested
)
259 return Task
.FromCanceled(cancellationToken
);
264 return Task
.CompletedTask
;
268 return Task
.FromException(ex
);
273 /// Number of bytes in the stream.
275 public override long Length
280 return Interlocked
.Read(ref _length
);
285 /// Number of bytes that can be written to the stream.
297 /// ReadByte will read byte at the Position in the stream
299 public override long Position
303 if (!CanSeek
) throw Error
.GetStreamIsClosed();
304 return Interlocked
.Read(ref _position
);
308 if (value < 0) throw new ArgumentOutOfRangeException(nameof(value), SR
.ArgumentOutOfRange_NeedNonNegNum
);
309 if (!CanSeek
) throw Error
.GetStreamIsClosed();
311 Interlocked
.Exchange(ref _position
, value);
316 /// Pointer to memory at the current Position in the stream.
318 [CLSCompliant(false)]
319 public unsafe byte* PositionPointer
324 throw new NotSupportedException(SR
.NotSupported_UmsSafeBuffer
);
328 // Use a temp to avoid a race
329 long pos
= Interlocked
.Read(ref _position
);
331 throw new IndexOutOfRangeException(SR
.IndexOutOfRange_UMSPosition
);
332 byte* ptr
= _mem
+ pos
;
338 throw new NotSupportedException(SR
.NotSupported_UmsSafeBuffer
);
343 throw new IOException(SR
.IO_SeekBeforeBegin
);
344 long newPosition
= (long)value - (long)_mem
;
346 throw new ArgumentOutOfRangeException("offset", SR
.ArgumentOutOfRange_UnmanagedMemStreamLength
);
348 Interlocked
.Exchange(ref _position
, newPosition
);
353 /// Reads bytes from stream and puts them into the buffer
355 /// <param name="buffer">Buffer to read the bytes to.</param>
356 /// <param name="offset">Starting index in the buffer.</param>
357 /// <param name="count">Maximum number of bytes to read.</param>
358 /// <returns>Number of bytes actually read.</returns>
359 public override int Read(byte[] buffer
, int offset
, int count
)
362 throw new ArgumentNullException(nameof(buffer
), SR
.ArgumentNull_Buffer
);
364 throw new ArgumentOutOfRangeException(nameof(offset
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
366 throw new ArgumentOutOfRangeException(nameof(count
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
367 if (buffer
.Length
- offset
< count
)
368 throw new ArgumentException(SR
.Argument_InvalidOffLen
);
370 return ReadCore(new Span
<byte>(buffer
, offset
, count
));
373 public override int Read(Span
<byte> buffer
)
375 if (GetType() == typeof(UnmanagedMemoryStream
))
377 return ReadCore(buffer
);
381 // UnmanagedMemoryStream is not sealed, and a derived type may have overridden Read(byte[], int, int) prior
382 // to this Read(Span<byte>) overload being introduced. In that case, this Read(Span<byte>) overload
383 // should use the behavior of Read(byte[],int,int) overload.
384 return base.Read(buffer
);
388 internal int ReadCore(Span
<byte> buffer
)
393 // Use a local variable to avoid a race where another thread
394 // changes our position after we decide we can read some bytes.
395 long pos
= Interlocked
.Read(ref _position
);
396 long len
= Interlocked
.Read(ref _length
);
397 long n
= Math
.Min(len
- pos
, buffer
.Length
);
403 int nInt
= (int)n
; // Safe because n <= count, which is an Int32
406 return 0; // _position could be beyond EOF
408 Debug
.Assert(pos
+ nInt
>= 0, "_position + n >= 0"); // len is less than 2^63 -1.
412 fixed (byte* pBuffer
= &MemoryMarshal
.GetReference(buffer
))
416 byte* pointer
= null;
418 RuntimeHelpers
.PrepareConstrainedRegions();
421 _buffer
.AcquirePointer(ref pointer
);
422 Buffer
.Memcpy(pBuffer
, pointer
+ pos
+ _offset
, nInt
);
428 _buffer
.ReleasePointer();
434 Buffer
.Memcpy(pBuffer
, _mem
+ pos
, nInt
);
438 Interlocked
.Exchange(ref _position
, pos
+ n
);
443 /// Reads bytes from stream and puts them into the buffer
445 /// <param name="buffer">Buffer to read the bytes to.</param>
446 /// <param name="offset">Starting index in the buffer.</param>
447 /// <param name="count">Maximum number of bytes to read.</param>
448 /// <param name="cancellationToken">Token that can be used to cancel this operation.</param>
449 /// <returns>Task that can be used to access the number of bytes actually read.</returns>
450 public override Task
<int> ReadAsync(byte[] buffer
, int offset
, int count
, CancellationToken cancellationToken
)
453 throw new ArgumentNullException(nameof(buffer
), SR
.ArgumentNull_Buffer
);
455 throw new ArgumentOutOfRangeException(nameof(offset
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
457 throw new ArgumentOutOfRangeException(nameof(count
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
458 if (buffer
.Length
- offset
< count
)
459 throw new ArgumentException(SR
.Argument_InvalidOffLen
);
461 if (cancellationToken
.IsCancellationRequested
)
462 return Task
.FromCanceled
<int>(cancellationToken
);
466 int n
= Read(buffer
, offset
, count
);
467 Task
<int>? t
= _lastReadTask
;
468 return (t
!= null && t
.Result
== n
) ? t
: (_lastReadTask
= Task
.FromResult
<int>(n
));
472 Debug
.Assert(!(ex
is OperationCanceledException
));
473 return Task
.FromException
<int>(ex
);
478 /// Reads bytes from stream and puts them into the buffer
480 /// <param name="buffer">Buffer to read the bytes to.</param>
481 /// <param name="cancellationToken">Token that can be used to cancel this operation.</param>
482 public override ValueTask
<int> ReadAsync(Memory
<byte> buffer
, CancellationToken cancellationToken
= default)
484 if (cancellationToken
.IsCancellationRequested
)
486 return new ValueTask
<int>(Task
.FromCanceled
<int>(cancellationToken
));
491 // ReadAsync(Memory<byte>,...) needs to delegate to an existing virtual to do the work, in case an existing derived type
492 // has changed or augmented the logic associated with reads. If the Memory wraps an array, we could delegate to
493 // ReadAsync(byte[], ...), but that would defeat part of the purpose, as ReadAsync(byte[], ...) often needs to allocate
494 // a Task<int> for the return value, so we want to delegate to one of the synchronous methods. We could always
495 // delegate to the Read(Span<byte>) method, and that's the most efficient solution when dealing with a concrete
496 // UnmanagedMemoryStream, but if we're dealing with a type derived from UnmanagedMemoryStream, Read(Span<byte>) will end up delegating
497 // to Read(byte[], ...), which requires it to get a byte[] from ArrayPool and copy the data. So, we special-case the
498 // very common case of the Memory<byte> wrapping an array: if it does, we delegate to Read(byte[], ...) with it,
499 // as that will be efficient in both cases, and we fall back to Read(Span<byte>) if the Memory<byte> wrapped something
500 // else; if this is a concrete UnmanagedMemoryStream, that'll be efficient, and only in the case where the Memory<byte> wrapped
501 // something other than an array and this is an UnmanagedMemoryStream-derived type that doesn't override Read(Span<byte>) will
502 // it then fall back to doing the ArrayPool/copy behavior.
503 return new ValueTask
<int>(
504 MemoryMarshal
.TryGetArray(buffer
, out ArraySegment
<byte> destinationArray
) ?
505 Read(destinationArray
.Array
!, destinationArray
.Offset
, destinationArray
.Count
) :
510 return new ValueTask
<int>(Task
.FromException
<int>(ex
));
515 /// Returns the byte at the stream current Position and advances the Position.
517 /// <returns></returns>
518 public override int ReadByte()
523 long pos
= Interlocked
.Read(ref _position
); // Use a local to avoid a race condition
524 long len
= Interlocked
.Read(ref _length
);
527 Interlocked
.Exchange(ref _position
, pos
+ 1);
533 byte* pointer
= null;
534 RuntimeHelpers
.PrepareConstrainedRegions();
537 _buffer
.AcquirePointer(ref pointer
);
538 result
= *(pointer
+ pos
+ _offset
);
544 _buffer
.ReleasePointer();
560 /// Advanced the Position to specific location in the stream.
562 /// <param name="offset">Offset from the loc parameter.</param>
563 /// <param name="loc">Origin for the offset parameter.</param>
564 /// <returns></returns>
565 public override long Seek(long offset
, SeekOrigin loc
)
571 case SeekOrigin
.Begin
:
573 throw new IOException(SR
.IO_SeekBeforeBegin
);
574 Interlocked
.Exchange(ref _position
, offset
);
577 case SeekOrigin
.Current
:
578 long pos
= Interlocked
.Read(ref _position
);
579 if (offset
+ pos
< 0)
580 throw new IOException(SR
.IO_SeekBeforeBegin
);
581 Interlocked
.Exchange(ref _position
, offset
+ pos
);
585 long len
= Interlocked
.Read(ref _length
);
586 if (len
+ offset
< 0)
587 throw new IOException(SR
.IO_SeekBeforeBegin
);
588 Interlocked
.Exchange(ref _position
, len
+ offset
);
592 throw new ArgumentException(SR
.Argument_InvalidSeekOrigin
);
595 long finalPos
= Interlocked
.Read(ref _position
);
596 Debug
.Assert(finalPos
>= 0, "_position >= 0");
601 /// Sets the Length of the stream.
603 /// <param name="value"></param>
604 public override void SetLength(long value)
607 throw new ArgumentOutOfRangeException(nameof(value), SR
.ArgumentOutOfRange_NeedNonNegNum
);
609 throw new NotSupportedException(SR
.NotSupported_UmsSafeBuffer
);
614 if (value > _capacity
)
615 throw new IOException(SR
.IO_FixedCapacity
);
617 long pos
= Interlocked
.Read(ref _position
);
618 long len
= Interlocked
.Read(ref _length
);
623 Buffer
.ZeroMemory(_mem
+ len
, value - len
);
626 Interlocked
.Exchange(ref _length
, value);
629 Interlocked
.Exchange(ref _position
, value);
634 /// Writes buffer into the stream
636 /// <param name="buffer">Buffer that will be written.</param>
637 /// <param name="offset">Starting index in the buffer.</param>
638 /// <param name="count">Number of bytes to write.</param>
639 public override void Write(byte[] buffer
, int offset
, int count
)
642 throw new ArgumentNullException(nameof(buffer
), SR
.ArgumentNull_Buffer
);
644 throw new ArgumentOutOfRangeException(nameof(offset
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
646 throw new ArgumentOutOfRangeException(nameof(count
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
647 if (buffer
.Length
- offset
< count
)
648 throw new ArgumentException(SR
.Argument_InvalidOffLen
);
650 WriteCore(new ReadOnlySpan
<byte>(buffer
, offset
, count
));
653 public override void Write(ReadOnlySpan
<byte> buffer
)
655 if (GetType() == typeof(UnmanagedMemoryStream
))
661 // UnmanagedMemoryStream is not sealed, and a derived type may have overridden Write(byte[], int, int) prior
662 // to this Write(Span<byte>) overload being introduced. In that case, this Write(Span<byte>) overload
663 // should use the behavior of Write(byte[],int,int) overload.
668 internal unsafe void WriteCore(ReadOnlySpan
<byte> buffer
)
673 long pos
= Interlocked
.Read(ref _position
); // Use a local to avoid a race condition
674 long len
= Interlocked
.Read(ref _length
);
675 long n
= pos
+ buffer
.Length
;
676 // Check for overflow
679 throw new IOException(SR
.IO_StreamTooLong
);
684 throw new NotSupportedException(SR
.IO_FixedCapacity
);
689 // Check to see whether we are now expanding the stream and must
690 // zero any memory in the middle.
693 Buffer
.ZeroMemory(_mem
+ len
, pos
- len
);
696 // set length after zeroing memory to avoid race condition of accessing unzeroed memory
699 Interlocked
.Exchange(ref _length
, n
);
703 fixed (byte* pBuffer
= &MemoryMarshal
.GetReference(buffer
))
707 long bytesLeft
= _capacity
- pos
;
708 if (bytesLeft
< buffer
.Length
)
710 throw new ArgumentException(SR
.Arg_BufferTooSmall
);
713 byte* pointer
= null;
714 RuntimeHelpers
.PrepareConstrainedRegions();
717 _buffer
.AcquirePointer(ref pointer
);
718 Buffer
.Memcpy(pointer
+ pos
+ _offset
, pBuffer
, buffer
.Length
);
724 _buffer
.ReleasePointer();
730 Buffer
.Memcpy(_mem
+ pos
, pBuffer
, buffer
.Length
);
734 Interlocked
.Exchange(ref _position
, n
);
739 /// Writes buffer into the stream. The operation completes synchronously.
741 /// <param name="buffer">Buffer that will be written.</param>
742 /// <param name="offset">Starting index in the buffer.</param>
743 /// <param name="count">Number of bytes to write.</param>
744 /// <param name="cancellationToken">Token that can be used to cancel the operation.</param>
745 /// <returns>Task that can be awaited </returns>
746 public override Task
WriteAsync(byte[] buffer
, int offset
, int count
, CancellationToken cancellationToken
)
749 throw new ArgumentNullException(nameof(buffer
), SR
.ArgumentNull_Buffer
);
751 throw new ArgumentOutOfRangeException(nameof(offset
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
753 throw new ArgumentOutOfRangeException(nameof(count
), SR
.ArgumentOutOfRange_NeedNonNegNum
);
754 if (buffer
.Length
- offset
< count
)
755 throw new ArgumentException(SR
.Argument_InvalidOffLen
);
757 if (cancellationToken
.IsCancellationRequested
)
758 return Task
.FromCanceled(cancellationToken
);
762 Write(buffer
, offset
, count
);
763 return Task
.CompletedTask
;
767 Debug
.Assert(!(ex
is OperationCanceledException
));
768 return Task
.FromException(ex
);
773 /// Writes buffer into the stream. The operation completes synchronously.
775 /// <param name="buffer">Buffer that will be written.</param>
776 /// <param name="cancellationToken">Token that can be used to cancel the operation.</param>
777 public override ValueTask
WriteAsync(ReadOnlyMemory
<byte> buffer
, CancellationToken cancellationToken
= default)
779 if (cancellationToken
.IsCancellationRequested
)
781 return new ValueTask(Task
.FromCanceled(cancellationToken
));
786 // See corresponding comment in ReadAsync for why we don't just always use Write(ReadOnlySpan<byte>).
787 // Unlike ReadAsync, we could delegate to WriteAsync(byte[], ...) here, but we don't for consistency.
788 if (MemoryMarshal
.TryGetArray(buffer
, out ArraySegment
<byte> sourceArray
))
790 Write(sourceArray
.Array
!, sourceArray
.Offset
, sourceArray
.Count
);
800 return new ValueTask(Task
.FromException(ex
));
805 /// Writes a byte to the stream and advances the current Position.
807 /// <param name="value"></param>
808 public override void WriteByte(byte value)
813 long pos
= Interlocked
.Read(ref _position
); // Use a local to avoid a race condition
814 long len
= Interlocked
.Read(ref _length
);
818 // Check for overflow
820 throw new IOException(SR
.IO_StreamTooLong
);
823 throw new NotSupportedException(SR
.IO_FixedCapacity
);
825 // Check to see whether we are now expanding the stream and must
826 // zero any memory in the middle.
827 // don't do if created from SafeBuffer
834 Buffer
.ZeroMemory(_mem
+ len
, pos
- len
);
838 // set length after zeroing memory to avoid race condition of accessing unzeroed memory
839 Interlocked
.Exchange(ref _length
, n
);
847 byte* pointer
= null;
848 RuntimeHelpers
.PrepareConstrainedRegions();
851 _buffer
.AcquirePointer(ref pointer
);
852 *(pointer
+ pos
+ _offset
) = value;
858 _buffer
.ReleasePointer();
870 Interlocked
.Exchange(ref _position
, n
);