1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
6 using System
.Diagnostics
;
7 using System
.Runtime
.InteropServices
;
8 using System
.Threading
;
9 using System
.Threading
.Tasks
;
13 public partial class FileStream
: Stream
15 // This is an internal object extending TaskCompletionSource with fields
16 // for all of the relevant data necessary to complete the IO operation.
17 // This is used by IOCallback and all of the async methods.
18 private unsafe class FileStreamCompletionSource
: TaskCompletionSource
<int>
20 private const long NoResult
= 0;
21 private const long ResultSuccess
= (long)1 << 32;
22 private const long ResultError
= (long)2 << 32;
23 private const long RegisteringCancellation
= (long)4 << 32;
24 private const long CompletedCallback
= (long)8 << 32;
25 private const ulong ResultMask
= ((ulong)uint.MaxValue
) << 32;
27 private static Action
<object?>? s_cancelCallback
;
29 private readonly FileStream _stream
;
30 private readonly int _numBufferedBytes
;
31 private CancellationTokenRegistration _cancellationRegistration
;
33 private bool _cancellationHasBeenRegistered
;
35 private NativeOverlapped
* _overlapped
; // Overlapped class responsible for operations in progress when an appdomain unload occurs
36 private long _result
; // Using long since this needs to be used in Interlocked APIs
38 // Using RunContinuationsAsynchronously for compat reasons (old API used Task.Factory.StartNew for continuations)
39 protected FileStreamCompletionSource(FileStream stream
, int numBufferedBytes
, byte[]? bytes
)
40 : base(TaskCreationOptions
.RunContinuationsAsynchronously
)
42 _numBufferedBytes
= numBufferedBytes
;
46 // Create the native overlapped. We try to use the preallocated overlapped if possible: it's possible if the byte
47 // buffer is null (there's nothing to pin) or the same one that's associated with the preallocated overlapped (and
48 // thus is already pinned) and if no one else is currently using the preallocated overlapped. This is the fast-path
49 // for cases where the user-provided buffer is smaller than the FileStream's buffer (such that the FileStream's
50 // buffer is used) and where operations on the FileStream are not being performed concurrently.
51 Debug
.Assert(bytes
== null || ReferenceEquals(bytes
, _stream
._buffer
));
53 // The _preallocatedOverlapped is null if the internal buffer was never created, so we check for
54 // a non-null bytes before using the stream's _preallocatedOverlapped
55 _overlapped
= bytes
!= null && _stream
.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
56 _stream
._fileHandle
.ThreadPoolBinding
!.AllocateNativeOverlapped(_stream
._preallocatedOverlapped
!) : // allocated when buffer was created, and buffer is non-null
57 _stream
._fileHandle
.ThreadPoolBinding
!.AllocateNativeOverlapped(s_ioCallback
, this, bytes
);
58 Debug
.Assert(_overlapped
!= null, "AllocateNativeOverlapped returned null");
61 internal NativeOverlapped
* Overlapped
=> _overlapped
;
63 public void SetCompletedSynchronously(int numBytes
)
65 ReleaseNativeResource();
66 TrySetResult(numBytes
+ _numBufferedBytes
);
69 public void RegisterForCancellation(CancellationToken cancellationToken
)
72 Debug
.Assert(cancellationToken
.CanBeCanceled
);
73 Debug
.Assert(!_cancellationHasBeenRegistered
, "Cannot register for cancellation twice");
74 _cancellationHasBeenRegistered
= true;
77 // Quick check to make sure the IO hasn't completed
78 if (_overlapped
!= null)
80 Action
<object?>? cancelCallback
= s_cancelCallback
;
81 if (cancelCallback
== null) s_cancelCallback
= cancelCallback
= Cancel
;
83 // Register the cancellation only if the IO hasn't completed
84 long packedResult
= Interlocked
.CompareExchange(ref _result
, RegisteringCancellation
, NoResult
);
85 if (packedResult
== NoResult
)
87 _cancellationRegistration
= cancellationToken
.UnsafeRegister(cancelCallback
, this);
89 // Switch the result, just in case IO completed while we were setting the registration
90 packedResult
= Interlocked
.Exchange(ref _result
, NoResult
);
92 else if (packedResult
!= CompletedCallback
)
94 // Failed to set the result, IO is in the process of completing
95 // Attempt to take the packed result
96 packedResult
= Interlocked
.Exchange(ref _result
, NoResult
);
99 // If we have a callback that needs to be completed
100 if ((packedResult
!= NoResult
) && (packedResult
!= CompletedCallback
) && (packedResult
!= RegisteringCancellation
))
102 CompleteCallback((ulong)packedResult
);
107 internal virtual void ReleaseNativeResource()
109 // Ensure that cancellation has been completed and cleaned up.
110 _cancellationRegistration
.Dispose();
112 // Free the overlapped.
113 // NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory
114 // (this is why we disposed the registration above).
115 if (_overlapped
!= null)
117 _stream
._fileHandle
.ThreadPoolBinding
!.FreeNativeOverlapped(_overlapped
);
121 // Ensure we're no longer set as the current completion source (we may not have been to begin with).
122 // Only one operation at a time is eligible to use the preallocated overlapped,
123 _stream
.CompareExchangeCurrentOverlappedOwner(null, this);
126 // When doing IO asynchronously (i.e. _isAsync==true), this callback is
127 // called by a free thread in the threadpool when the IO operation
129 internal static unsafe void IOCallback(uint errorCode
, uint numBytes
, NativeOverlapped
* pOverlapped
)
131 // Extract the completion source from the overlapped. The state in the overlapped
132 // will either be a FileStream (in the case where the preallocated overlapped was used),
133 // in which case the operation being completed is its _currentOverlappedOwner, or it'll
134 // be directly the FileStreamCompletionSource that's completing (in the case where the preallocated
135 // overlapped was already in use by another operation).
136 object? state
= ThreadPoolBoundHandle
.GetNativeOverlappedState(pOverlapped
);
137 Debug
.Assert(state
is FileStream
|| state
is FileStreamCompletionSource
);
138 FileStreamCompletionSource completionSource
= state
is FileStream fs
?
139 fs
._currentOverlappedOwner
! : // must be owned
140 (FileStreamCompletionSource
)state
!;
141 Debug
.Assert(completionSource
!= null);
142 Debug
.Assert(completionSource
._overlapped
== pOverlapped
, "Overlaps don't match");
144 // Handle reading from & writing to closed pipes. While I'm not sure
145 // this is entirely necessary anymore, maybe it's possible for
146 // an async read on a pipe to be issued and then the pipe is closed,
147 // returning this error. This may very well be necessary.
149 if (errorCode
!= 0 && errorCode
!= ERROR_BROKEN_PIPE
&& errorCode
!= ERROR_NO_DATA
)
151 packedResult
= ((ulong)ResultError
| errorCode
);
155 packedResult
= ((ulong)ResultSuccess
| numBytes
);
158 // Stow the result so that other threads can observe it
159 // And, if no other thread is registering cancellation, continue
160 if (NoResult
== Interlocked
.Exchange(ref completionSource
._result
, (long)packedResult
))
162 // Successfully set the state, attempt to take back the callback
163 if (Interlocked
.Exchange(ref completionSource
._result
, CompletedCallback
) != NoResult
)
165 // Successfully got the callback, finish the callback
166 completionSource
.CompleteCallback(packedResult
);
168 // else: Some other thread stole the result, so now it is responsible to finish the callback
170 // else: Some other thread is registering a cancellation, so it *must* finish the callback
173 private void CompleteCallback(ulong packedResult
)
175 // Free up the native resource and cancellation registration
176 CancellationToken cancellationToken
= _cancellationRegistration
.Token
; // access before disposing registration
177 ReleaseNativeResource();
179 // Unpack the result and send it to the user
180 long result
= (long)(packedResult
& ResultMask
);
181 if (result
== ResultError
)
183 int errorCode
= unchecked((int)(packedResult
& uint.MaxValue
));
184 if (errorCode
== Interop
.Errors
.ERROR_OPERATION_ABORTED
)
186 TrySetCanceled(cancellationToken
.IsCancellationRequested
? cancellationToken
: new CancellationToken(true));
190 TrySetException(Win32Marshal
.GetExceptionForWin32Error(errorCode
));
195 Debug
.Assert(result
== ResultSuccess
, "Unknown result");
196 TrySetResult((int)(packedResult
& uint.MaxValue
) + _numBufferedBytes
);
200 private static void Cancel(object? state
)
202 // WARNING: This may potentially be called under a lock (during cancellation registration)
204 Debug
.Assert(state
is FileStreamCompletionSource
, "Unknown state passed to cancellation");
205 FileStreamCompletionSource completionSource
= (FileStreamCompletionSource
)state
;
206 Debug
.Assert(completionSource
._overlapped
!= null && !completionSource
.Task
.IsCompleted
, "IO should not have completed yet");
208 // If the handle is still valid, attempt to cancel the IO
209 if (!completionSource
._stream
._fileHandle
.IsInvalid
&&
210 !Interop
.Kernel32
.CancelIoEx(completionSource
._stream
._fileHandle
, completionSource
._overlapped
))
212 int errorCode
= Marshal
.GetLastWin32Error();
214 // ERROR_NOT_FOUND is returned if CancelIoEx cannot find the request to cancel.
215 // This probably means that the IO operation has completed.
216 if (errorCode
!= Interop
.Errors
.ERROR_NOT_FOUND
)
218 throw Win32Marshal
.GetExceptionForWin32Error(errorCode
);
223 public static FileStreamCompletionSource
Create(FileStream stream
, int numBufferedBytesRead
, ReadOnlyMemory
<byte> memory
)
225 // If the memory passed in is the stream's internal buffer, we can use the base FileStreamCompletionSource,
226 // which has a PreAllocatedOverlapped with the memory already pinned. Otherwise, we use the derived
227 // MemoryFileStreamCompletionSource, which Retains the memory, which will result in less pinning in the case
228 // where the underlying memory is backed by pre-pinned buffers.
229 return MemoryMarshal
.TryGetArray(memory
, out ArraySegment
<byte> buffer
) && ReferenceEquals(buffer
.Array
, stream
._buffer
) ?
230 new FileStreamCompletionSource(stream
, numBufferedBytesRead
, buffer
.Array
) :
231 new MemoryFileStreamCompletionSource(stream
, numBufferedBytesRead
, memory
);
236 /// Extends <see cref="FileStreamCompletionSource"/> with to support disposing of a
237 /// <see cref="MemoryHandle"/> when the operation has completed. This should only be used
238 /// when memory doesn't wrap a byte[].
240 private sealed class MemoryFileStreamCompletionSource
: FileStreamCompletionSource
242 private MemoryHandle _handle
; // mutable struct; do not make this readonly
244 internal MemoryFileStreamCompletionSource(FileStream stream
, int numBufferedBytes
, ReadOnlyMemory
<byte> memory
) :
245 base(stream
, numBufferedBytes
, bytes
: null) // this type handles the pinning, so null is passed for bytes
247 _handle
= memory
.Pin();
250 internal override void ReleaseNativeResource()
253 base.ReleaseNativeResource();