Bug 1826564 [wpt PR 39394] - Update mypy, a=testonly
[gecko.git] / xpcom / io / NonBlockingAsyncInputStream.cpp
blob00e8598d8668c25c2088af7c07a05fbf5da1c83f
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 #include "NonBlockingAsyncInputStream.h"
7 #include "mozilla/ipc/InputStreamUtils.h"
8 #include "nsIAsyncInputStream.h"
9 #include "nsICloneableInputStream.h"
10 #include "nsIInputStream.h"
11 #include "nsIIPCSerializableInputStream.h"
12 #include "nsISeekableStream.h"
13 #include "nsStreamUtils.h"
15 namespace mozilla {
17 using namespace ipc;
19 class NonBlockingAsyncInputStream::AsyncWaitRunnable final
20 : public CancelableRunnable {
21 RefPtr<NonBlockingAsyncInputStream> mStream;
22 nsCOMPtr<nsIInputStreamCallback> mCallback;
24 public:
25 AsyncWaitRunnable(NonBlockingAsyncInputStream* aStream,
26 nsIInputStreamCallback* aCallback)
27 : CancelableRunnable("AsyncWaitRunnable"),
28 mStream(aStream),
29 mCallback(aCallback) {}
31 NS_IMETHOD
32 Run() override {
33 mStream->RunAsyncWaitCallback(this, mCallback.forget());
34 return NS_OK;
37 nsresult Cancel() override {
38 mStream = nullptr;
39 return NS_OK;
43 NS_IMPL_ADDREF(NonBlockingAsyncInputStream);
44 NS_IMPL_RELEASE(NonBlockingAsyncInputStream);
46 NonBlockingAsyncInputStream::WaitClosureOnly::WaitClosureOnly(
47 AsyncWaitRunnable* aRunnable, nsIEventTarget* aEventTarget)
48 : mRunnable(aRunnable), mEventTarget(aEventTarget) {}
50 NS_INTERFACE_MAP_BEGIN(NonBlockingAsyncInputStream)
51 NS_INTERFACE_MAP_ENTRY(nsIInputStream)
52 NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
53 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
54 mWeakCloneableInputStream)
55 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
56 mWeakIPCSerializableInputStream)
57 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream,
58 mWeakSeekableInputStream)
59 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsITellableStream,
60 mWeakTellableInputStream)
61 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream)
62 NS_INTERFACE_MAP_END
64 /* static */
65 nsresult NonBlockingAsyncInputStream::Create(
66 already_AddRefed<nsIInputStream> aInputStream,
67 nsIAsyncInputStream** aResult) {
68 MOZ_DIAGNOSTIC_ASSERT(aResult);
70 nsCOMPtr<nsIInputStream> inputStream = std::move(aInputStream);
72 bool nonBlocking = false;
73 nsresult rv = inputStream->IsNonBlocking(&nonBlocking);
74 if (NS_WARN_IF(NS_FAILED(rv))) {
75 return rv;
78 MOZ_DIAGNOSTIC_ASSERT(nonBlocking);
80 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
81 nsCOMPtr<nsIAsyncInputStream> asyncInputStream =
82 do_QueryInterface(inputStream);
83 MOZ_DIAGNOSTIC_ASSERT(!asyncInputStream);
84 #endif // MOZ_DIAGNOSTIC_ASSERT_ENABLED
86 RefPtr<NonBlockingAsyncInputStream> stream =
87 new NonBlockingAsyncInputStream(inputStream.forget());
89 stream.forget(aResult);
90 return NS_OK;
93 NonBlockingAsyncInputStream::NonBlockingAsyncInputStream(
94 already_AddRefed<nsIInputStream> aInputStream)
95 : mInputStream(std::move(aInputStream)),
96 mWeakCloneableInputStream(nullptr),
97 mWeakIPCSerializableInputStream(nullptr),
98 mWeakSeekableInputStream(nullptr),
99 mWeakTellableInputStream(nullptr),
100 mLock("NonBlockingAsyncInputStream::mLock"),
101 mClosed(false) {
102 MOZ_ASSERT(mInputStream);
104 nsCOMPtr<nsICloneableInputStream> cloneableStream =
105 do_QueryInterface(mInputStream);
106 if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) {
107 mWeakCloneableInputStream = cloneableStream;
110 nsCOMPtr<nsIIPCSerializableInputStream> serializableStream =
111 do_QueryInterface(mInputStream);
112 if (serializableStream && SameCOMIdentity(mInputStream, serializableStream)) {
113 mWeakIPCSerializableInputStream = serializableStream;
116 nsCOMPtr<nsISeekableStream> seekableStream = do_QueryInterface(mInputStream);
117 if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) {
118 mWeakSeekableInputStream = seekableStream;
121 nsCOMPtr<nsITellableStream> tellableStream = do_QueryInterface(mInputStream);
122 if (tellableStream && SameCOMIdentity(mInputStream, tellableStream)) {
123 mWeakTellableInputStream = tellableStream;
127 NonBlockingAsyncInputStream::~NonBlockingAsyncInputStream() = default;
129 NS_IMETHODIMP
130 NonBlockingAsyncInputStream::Close() {
131 RefPtr<AsyncWaitRunnable> waitClosureOnlyRunnable;
132 nsCOMPtr<nsIEventTarget> waitClosureOnlyEventTarget;
135 MutexAutoLock lock(mLock);
137 if (mClosed) {
138 // Here we could return NS_BASE_STREAM_CLOSED as well, but just to avoid
139 // warning messages, let's make everybody happy with a NS_OK.
140 return NS_OK;
143 mClosed = true;
145 NS_ENSURE_STATE(mInputStream);
146 nsresult rv = mInputStream->Close();
147 if (NS_WARN_IF(NS_FAILED(rv))) {
148 mWaitClosureOnly.reset();
149 return rv;
152 // If we have a WaitClosureOnly runnable, it's time to use it.
153 if (mWaitClosureOnly.isSome()) {
154 waitClosureOnlyRunnable = std::move(mWaitClosureOnly->mRunnable);
155 waitClosureOnlyEventTarget = std::move(mWaitClosureOnly->mEventTarget);
157 mWaitClosureOnly.reset();
159 // Now we want to dispatch the asyncWaitCallback.
160 mAsyncWaitCallback = waitClosureOnlyRunnable;
164 if (waitClosureOnlyRunnable) {
165 if (waitClosureOnlyEventTarget) {
166 waitClosureOnlyEventTarget->Dispatch(waitClosureOnlyRunnable,
167 NS_DISPATCH_NORMAL);
168 } else {
169 waitClosureOnlyRunnable->Run();
173 return NS_OK;
176 // nsIInputStream interface
178 NS_IMETHODIMP
179 NonBlockingAsyncInputStream::Available(uint64_t* aLength) {
180 nsresult rv = mInputStream->Available(aLength);
181 // Don't issue warnings for legal condition NS_BASE_STREAM_CLOSED.
182 if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) {
183 return rv;
186 // Nothing more to read. Let's close the stream now.
187 if (*aLength == 0) {
188 MutexAutoLock lock(mLock);
189 mInputStream->Close();
190 mClosed = true;
191 return NS_BASE_STREAM_CLOSED;
194 return NS_OK;
197 NS_IMETHODIMP
198 NonBlockingAsyncInputStream::StreamStatus() {
199 return mInputStream->StreamStatus();
202 NS_IMETHODIMP
203 NonBlockingAsyncInputStream::Read(char* aBuffer, uint32_t aCount,
204 uint32_t* aReadCount) {
205 return mInputStream->Read(aBuffer, aCount, aReadCount);
208 namespace {
210 class MOZ_RAII ReadSegmentsData {
211 public:
212 ReadSegmentsData(NonBlockingAsyncInputStream* aStream,
213 nsWriteSegmentFun aFunc, void* aClosure)
214 : mStream(aStream), mFunc(aFunc), mClosure(aClosure) {}
216 NonBlockingAsyncInputStream* mStream;
217 nsWriteSegmentFun mFunc;
218 void* mClosure;
221 nsresult ReadSegmentsWriter(nsIInputStream* aInStream, void* aClosure,
222 const char* aFromSegment, uint32_t aToOffset,
223 uint32_t aCount, uint32_t* aWriteCount) {
224 ReadSegmentsData* data = static_cast<ReadSegmentsData*>(aClosure);
225 return data->mFunc(data->mStream, data->mClosure, aFromSegment, aToOffset,
226 aCount, aWriteCount);
229 } // namespace
231 NS_IMETHODIMP
232 NonBlockingAsyncInputStream::ReadSegments(nsWriteSegmentFun aWriter,
233 void* aClosure, uint32_t aCount,
234 uint32_t* aResult) {
235 ReadSegmentsData data(this, aWriter, aClosure);
236 return mInputStream->ReadSegments(ReadSegmentsWriter, &data, aCount, aResult);
239 NS_IMETHODIMP
240 NonBlockingAsyncInputStream::IsNonBlocking(bool* aNonBlocking) {
241 *aNonBlocking = true;
242 return NS_OK;
245 // nsICloneableInputStream interface
247 NS_IMETHODIMP
248 NonBlockingAsyncInputStream::GetCloneable(bool* aCloneable) {
249 NS_ENSURE_STATE(mWeakCloneableInputStream);
250 return mWeakCloneableInputStream->GetCloneable(aCloneable);
253 NS_IMETHODIMP
254 NonBlockingAsyncInputStream::Clone(nsIInputStream** aResult) {
255 NS_ENSURE_STATE(mWeakCloneableInputStream);
257 nsCOMPtr<nsIInputStream> clonedStream;
258 nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream));
259 if (NS_WARN_IF(NS_FAILED(rv))) {
260 return rv;
263 nsCOMPtr<nsIAsyncInputStream> asyncStream;
264 rv = Create(clonedStream.forget(), getter_AddRefs(asyncStream));
265 if (NS_WARN_IF(NS_FAILED(rv))) {
266 return rv;
269 asyncStream.forget(aResult);
270 return NS_OK;
273 // nsIAsyncInputStream interface
275 NS_IMETHODIMP
276 NonBlockingAsyncInputStream::CloseWithStatus(nsresult aStatus) {
277 return Close();
280 NS_IMETHODIMP
281 NonBlockingAsyncInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
282 uint32_t aFlags,
283 uint32_t aRequestedCount,
284 nsIEventTarget* aEventTarget) {
285 RefPtr<AsyncWaitRunnable> runnable;
287 MutexAutoLock lock(mLock);
289 mWaitClosureOnly.reset();
290 mAsyncWaitCallback = nullptr;
292 if (!aCallback) {
293 // Canceling previous callbacks, which is done above.
294 return NS_OK;
297 // Maybe the stream is already closed.
298 if (!mClosed) {
299 uint64_t length;
300 nsresult rv = mInputStream->Available(&length);
301 if (NS_SUCCEEDED(rv) && length == 0) {
302 mInputStream->Close();
303 mClosed = true;
307 runnable = new AsyncWaitRunnable(this, aCallback);
308 if ((aFlags & nsIAsyncInputStream::WAIT_CLOSURE_ONLY) && !mClosed) {
309 mWaitClosureOnly.emplace(runnable, aEventTarget);
310 return NS_OK;
313 mAsyncWaitCallback = runnable;
316 MOZ_ASSERT(runnable);
318 if (aEventTarget) {
319 return aEventTarget->Dispatch(runnable.forget());
322 return runnable->Run();
325 // nsIIPCSerializableInputStream
327 void NonBlockingAsyncInputStream::SerializedComplexity(
328 uint32_t aMaxSize, uint32_t* aSizeUsed, uint32_t* aPipes,
329 uint32_t* aTransferables) {
330 InputStreamHelper::SerializedComplexity(mInputStream, aMaxSize, aSizeUsed,
331 aPipes, aTransferables);
334 void NonBlockingAsyncInputStream::Serialize(
335 mozilla::ipc::InputStreamParams& aParams, uint32_t aMaxSize,
336 uint32_t* aSizeUsed) {
337 MOZ_ASSERT(mWeakIPCSerializableInputStream);
338 InputStreamHelper::SerializeInputStream(mInputStream, aParams, aMaxSize,
339 aSizeUsed);
342 bool NonBlockingAsyncInputStream::Deserialize(
343 const mozilla::ipc::InputStreamParams& aParams) {
344 MOZ_CRASH("NonBlockingAsyncInputStream cannot be deserialized!");
345 return true;
348 // nsISeekableStream
350 NS_IMETHODIMP
351 NonBlockingAsyncInputStream::Seek(int32_t aWhence, int64_t aOffset) {
352 NS_ENSURE_STATE(mWeakSeekableInputStream);
353 return mWeakSeekableInputStream->Seek(aWhence, aOffset);
356 NS_IMETHODIMP
357 NonBlockingAsyncInputStream::SetEOF() {
358 NS_ENSURE_STATE(mWeakSeekableInputStream);
359 return NS_ERROR_NOT_IMPLEMENTED;
362 // nsITellableStream
364 NS_IMETHODIMP
365 NonBlockingAsyncInputStream::Tell(int64_t* aResult) {
366 NS_ENSURE_STATE(mWeakTellableInputStream);
367 return mWeakTellableInputStream->Tell(aResult);
370 void NonBlockingAsyncInputStream::RunAsyncWaitCallback(
371 NonBlockingAsyncInputStream::AsyncWaitRunnable* aRunnable,
372 already_AddRefed<nsIInputStreamCallback> aCallback) {
373 nsCOMPtr<nsIInputStreamCallback> callback = std::move(aCallback);
376 MutexAutoLock lock(mLock);
377 if (mAsyncWaitCallback != aRunnable) {
378 // The callback has been canceled in the meantime.
379 return;
382 mAsyncWaitCallback = nullptr;
385 callback->OnInputStreamReady(this);
388 } // namespace mozilla