Bug 1733673 [wpt PR 31066] - Annotate CSS Transforms WPT reftests as fuzzy where...
[gecko.git] / dom / cache / ReadStream.cpp
blob3de6561d341e869ca05abb95d6ea0aa53f74ced5
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/cache/ReadStream.h"
9 #include "mozilla/Unused.h"
10 #include "mozilla/dom/cache/CacheStreamControlChild.h"
11 #include "mozilla/dom/cache/CacheStreamControlParent.h"
12 #include "mozilla/dom/cache/CacheTypes.h"
13 #include "mozilla/ipc/IPCStreamUtils.h"
14 #include "mozilla/SnappyUncompressInputStream.h"
15 #include "nsIAsyncInputStream.h"
16 #include "nsIThread.h"
17 #include "nsStringStream.h"
18 #include "nsTArray.h"
20 namespace mozilla::dom::cache {
22 using mozilla::Unused;
23 using mozilla::ipc::AutoIPCStream;
24 using mozilla::ipc::IPCStream;
26 // ----------------------------------------------------------------------------
28 // The inner stream class. This is where all of the real work is done. As
29 // an invariant Inner::Close() must be called before ~Inner(). This is
30 // guaranteed by our outer ReadStream class.
31 class ReadStream::Inner final : public ReadStream::Controllable {
32 public:
33 Inner(StreamControl* aControl, const nsID& aId, nsIInputStream* aStream);
35 void Serialize(Maybe<CacheReadStream>* aReadStreamOut,
36 nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
37 ErrorResult& aRv);
39 void Serialize(CacheReadStream* aReadStreamOut,
40 nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
41 ErrorResult& aRv);
43 // ReadStream::Controllable methods
44 virtual void CloseStream() override;
46 virtual void CloseStreamWithoutReporting() override;
48 virtual bool HasEverBeenRead() const override;
50 // Simulate nsIInputStream methods, but we don't actually inherit from it
51 nsresult Close();
53 nsresult Available(uint64_t* aNumAvailableOut);
55 nsresult Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut);
57 nsresult ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
58 uint32_t aCount, uint32_t* aNumReadOut);
60 nsresult IsNonBlocking(bool* aNonBlockingOut);
62 NS_DECL_OWNINGTHREAD;
64 ~Inner();
66 private:
67 class NoteClosedRunnable;
68 class ForgetRunnable;
70 void NoteClosed();
72 void Forget();
74 void NoteClosedOnOwningThread();
76 void ForgetOnOwningThread();
78 nsIInputStream* EnsureStream();
80 void AsyncOpenStreamOnOwningThread();
82 void MaybeAbortAsyncOpenStream();
84 void OpenStreamFailed();
86 inline SafeRefPtr<Inner> SafeRefPtrFromThis() {
87 return Controllable::SafeRefPtrFromThis().downcast<Inner>();
90 // Weak ref to the stream control actor. The actor will always call either
91 // CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The
92 // weak ref is cleared in the resulting NoteClosedOnOwningThread() or
93 // ForgetOnOwningThread() method call.
94 StreamControl* mControl;
96 const nsID mId;
97 nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;
99 enum State { Open, Closed, NumStates };
100 Atomic<State> mState;
101 Atomic<bool> mHasEverBeenRead;
102 bool mAsyncOpenStarted;
104 // The wrapped stream objects may not be threadsafe. We need to be able
105 // to close a stream on our owning thread while an IO thread is simultaneously
106 // reading the same stream. Therefore, protect all access to these stream
107 // objects with a mutex.
108 Mutex mMutex;
109 CondVar mCondVar;
110 nsCOMPtr<nsIInputStream> mStream;
111 nsCOMPtr<nsIInputStream> mSnappyStream;
114 // ----------------------------------------------------------------------------
116 // Runnable to notify actors that the ReadStream has closed. This must
117 // be done on the thread associated with the PBackground actor. Must be
118 // cancelable to execute on Worker threads (which can occur when the
119 // ReadStream is constructed on a child process Worker thread).
120 class ReadStream::Inner::NoteClosedRunnable final : public CancelableRunnable {
121 public:
122 explicit NoteClosedRunnable(SafeRefPtr<ReadStream::Inner> aStream)
123 : CancelableRunnable("dom::cache::ReadStream::Inner::NoteClosedRunnable"),
124 mStream(std::move(aStream)) {}
126 NS_IMETHOD Run() override {
127 mStream->NoteClosedOnOwningThread();
128 return NS_OK;
131 // Note, we must proceed with the Run() method since our actor will not
132 // clean itself up until we note that the stream is closed.
133 nsresult Cancel() override {
134 Run();
135 return NS_OK;
138 private:
139 ~NoteClosedRunnable() = default;
141 const SafeRefPtr<ReadStream::Inner> mStream;
144 // ----------------------------------------------------------------------------
146 // Runnable to clear actors without reporting that the ReadStream has
147 // closed. Since this can trigger actor destruction, we need to do
148 // it on the thread associated with the PBackground actor. Must be
149 // cancelable to execute on Worker threads (which can occur when the
150 // ReadStream is constructed on a child process Worker thread).
151 class ReadStream::Inner::ForgetRunnable final : public CancelableRunnable {
152 public:
153 explicit ForgetRunnable(SafeRefPtr<ReadStream::Inner> aStream)
154 : CancelableRunnable("dom::cache::ReadStream::Inner::ForgetRunnable"),
155 mStream(std::move(aStream)) {}
157 NS_IMETHOD Run() override {
158 mStream->ForgetOnOwningThread();
159 return NS_OK;
162 // Note, we must proceed with the Run() method so that we properly
163 // call RemoveListener on the actor.
164 nsresult Cancel() override {
165 Run();
166 return NS_OK;
169 private:
170 ~ForgetRunnable() = default;
172 const SafeRefPtr<ReadStream::Inner> mStream;
175 // ----------------------------------------------------------------------------
177 ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId,
178 nsIInputStream* aStream)
179 : mControl(aControl),
180 mId(aId),
181 mOwningEventTarget(GetCurrentSerialEventTarget()),
182 mState(Open),
183 mHasEverBeenRead(false),
184 mAsyncOpenStarted(false),
185 mMutex("dom::cache::ReadStream"),
186 mCondVar(mMutex, "dom::cache::ReadStream"),
187 mStream(aStream),
188 mSnappyStream(aStream ? new SnappyUncompressInputStream(aStream)
189 : nullptr) {
190 MOZ_DIAGNOSTIC_ASSERT(mControl);
191 mControl->AddReadStream(SafeRefPtrFromThis());
194 void ReadStream::Inner::Serialize(
195 Maybe<CacheReadStream>* aReadStreamOut,
196 nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList, ErrorResult& aRv) {
197 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
198 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);
199 aReadStreamOut->emplace(CacheReadStream());
200 Serialize(&aReadStreamOut->ref(), aStreamCleanupList, aRv);
203 void ReadStream::Inner::Serialize(
204 CacheReadStream* aReadStreamOut,
205 nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList, ErrorResult& aRv) {
206 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
207 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);
209 if (mState != Open) {
210 aRv.ThrowTypeError(
211 "Response body is a cache file stream that has already been closed.");
212 return;
215 MOZ_DIAGNOSTIC_ASSERT(mControl);
217 aReadStreamOut->id() = mId;
218 mControl->SerializeControl(aReadStreamOut);
221 MutexAutoLock lock(mMutex);
222 mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
225 MOZ_DIAGNOSTIC_ASSERT(
226 aReadStreamOut->stream().isNothing() ||
227 (aReadStreamOut->stream().ref().stream().type() !=
228 mozilla::ipc::InputStreamParams::TIPCRemoteStreamParams &&
229 aReadStreamOut->stream().ref().stream().type() !=
230 mozilla::ipc::InputStreamParams::T__None));
232 // We're passing ownership across the IPC barrier with the control, so
233 // do not signal that the stream is closed here.
234 Forget();
237 void ReadStream::Inner::CloseStream() {
238 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
239 MOZ_ALWAYS_SUCCEEDS(Close());
242 void ReadStream::Inner::CloseStreamWithoutReporting() {
243 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
244 Forget();
247 bool ReadStream::Inner::HasEverBeenRead() const {
248 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
249 return mHasEverBeenRead;
252 nsresult ReadStream::Inner::Close() {
253 // stream ops can happen on any thread
254 nsresult rv = NS_OK;
256 MutexAutoLock lock(mMutex);
257 if (mSnappyStream) {
258 rv = mSnappyStream->Close();
261 NoteClosed();
262 return rv;
265 nsresult ReadStream::Inner::Available(uint64_t* aNumAvailableOut) {
266 // stream ops can happen on any thread
267 nsresult rv = NS_OK;
269 MutexAutoLock lock(mMutex);
270 rv = EnsureStream()->Available(aNumAvailableOut);
273 if (NS_FAILED(rv)) {
274 Close();
277 return rv;
280 nsresult ReadStream::Inner::Read(char* aBuf, uint32_t aCount,
281 uint32_t* aNumReadOut) {
282 // stream ops can happen on any thread
283 MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
285 nsresult rv = NS_OK;
287 MutexAutoLock lock(mMutex);
288 rv = EnsureStream()->Read(aBuf, aCount, aNumReadOut);
291 if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) ||
292 *aNumReadOut == 0) {
293 Close();
296 mHasEverBeenRead = true;
298 return rv;
301 nsresult ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter,
302 void* aClosure, uint32_t aCount,
303 uint32_t* aNumReadOut) {
304 // stream ops can happen on any thread
305 MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
307 if (aCount) {
308 mHasEverBeenRead = true;
311 nsresult rv = NS_OK;
313 MutexAutoLock lock(mMutex);
314 rv = EnsureStream()->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
317 if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK &&
318 rv != NS_ERROR_NOT_IMPLEMENTED) ||
319 *aNumReadOut == 0) {
320 Close();
323 // Verify bytes were actually read before marking as being ever read. For
324 // example, code can test if the stream supports ReadSegments() by calling
325 // this method with a dummy callback which doesn't read anything. We don't
326 // want to trigger on that.
327 if (*aNumReadOut) {
328 mHasEverBeenRead = true;
331 return rv;
334 nsresult ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut) {
335 // stream ops can happen on any thread
336 MutexAutoLock lock(mMutex);
337 if (mSnappyStream) {
338 return mSnappyStream->IsNonBlocking(aNonBlockingOut);
340 *aNonBlockingOut = false;
341 return NS_OK;
344 ReadStream::Inner::~Inner() {
345 // Any thread
346 MOZ_DIAGNOSTIC_ASSERT(mState == Closed);
347 MOZ_DIAGNOSTIC_ASSERT(!mControl);
350 void ReadStream::Inner::NoteClosed() {
351 // Any thread
352 if (mState == Closed) {
353 return;
356 if (mOwningEventTarget->IsOnCurrentThread()) {
357 NoteClosedOnOwningThread();
358 return;
361 nsCOMPtr<nsIRunnable> runnable = new NoteClosedRunnable(SafeRefPtrFromThis());
362 MOZ_ALWAYS_SUCCEEDS(mOwningEventTarget->Dispatch(runnable.forget(),
363 nsIThread::DISPATCH_NORMAL));
366 void ReadStream::Inner::Forget() {
367 // Any thread
368 if (mState == Closed) {
369 return;
372 if (mOwningEventTarget->IsOnCurrentThread()) {
373 ForgetOnOwningThread();
374 return;
377 nsCOMPtr<nsIRunnable> runnable = new ForgetRunnable(SafeRefPtrFromThis());
378 MOZ_ALWAYS_SUCCEEDS(mOwningEventTarget->Dispatch(runnable.forget(),
379 nsIThread::DISPATCH_NORMAL));
382 void ReadStream::Inner::NoteClosedOnOwningThread() {
383 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
385 // Mark closed and do nothing if we were already closed
386 if (!mState.compareExchange(Open, Closed)) {
387 return;
390 MaybeAbortAsyncOpenStream();
392 MOZ_DIAGNOSTIC_ASSERT(mControl);
393 mControl->NoteClosed(SafeRefPtrFromThis(), mId);
394 mControl = nullptr;
397 void ReadStream::Inner::ForgetOnOwningThread() {
398 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
400 // Mark closed and do nothing if we were already closed
401 if (!mState.compareExchange(Open, Closed)) {
402 return;
405 MaybeAbortAsyncOpenStream();
407 MOZ_DIAGNOSTIC_ASSERT(mControl);
408 mControl->ForgetReadStream(SafeRefPtrFromThis());
409 mControl = nullptr;
412 nsIInputStream* ReadStream::Inner::EnsureStream() {
413 mMutex.AssertCurrentThreadOwns();
415 // We need to block the current thread while we open the stream. We
416 // cannot do this safely from the main owning thread since it would
417 // trigger deadlock. This should be ok, though, since a blocking
418 // stream like this should never be read on the owning thread anyway.
419 if (mOwningEventTarget->IsOnCurrentThread()) {
420 MOZ_CRASH("Blocking read on the js/ipc owning thread!");
423 if (mSnappyStream) {
424 return mSnappyStream;
427 nsCOMPtr<nsIRunnable> r = NewCancelableRunnableMethod(
428 "ReadStream::Inner::AsyncOpenStreamOnOwningThread", this,
429 &ReadStream::Inner::AsyncOpenStreamOnOwningThread);
430 nsresult rv =
431 mOwningEventTarget->Dispatch(r.forget(), nsIThread::DISPATCH_NORMAL);
432 if (NS_WARN_IF(NS_FAILED(rv))) {
433 OpenStreamFailed();
434 return mSnappyStream;
437 mCondVar.Wait();
438 MOZ_DIAGNOSTIC_ASSERT(mSnappyStream);
440 return mSnappyStream;
443 void ReadStream::Inner::AsyncOpenStreamOnOwningThread() {
444 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
446 if (!mControl || mState == Closed) {
447 MutexAutoLock lock(mMutex);
448 OpenStreamFailed();
449 mCondVar.NotifyAll();
450 return;
453 if (mAsyncOpenStarted) {
454 return;
456 mAsyncOpenStarted = true;
458 RefPtr<ReadStream::Inner> self = this;
459 mControl->OpenStream(mId, [self](nsCOMPtr<nsIInputStream>&& aStream) {
460 MutexAutoLock lock(self->mMutex);
461 self->mAsyncOpenStarted = false;
462 if (!self->mStream) {
463 if (!aStream) {
464 self->OpenStreamFailed();
465 } else {
466 self->mStream = std::move(aStream);
467 self->mSnappyStream = new SnappyUncompressInputStream(self->mStream);
470 self->mCondVar.NotifyAll();
474 void ReadStream::Inner::MaybeAbortAsyncOpenStream() {
475 if (!mAsyncOpenStarted) {
476 return;
479 MutexAutoLock lock(mMutex);
480 OpenStreamFailed();
481 mCondVar.NotifyAll();
484 void ReadStream::Inner::OpenStreamFailed() {
485 MOZ_DIAGNOSTIC_ASSERT(!mStream);
486 MOZ_DIAGNOSTIC_ASSERT(!mSnappyStream);
487 mMutex.AssertCurrentThreadOwns();
488 Unused << NS_NewCStringInputStream(getter_AddRefs(mStream), ""_ns);
489 mSnappyStream = mStream;
490 mStream->Close();
491 NoteClosed();
494 // ----------------------------------------------------------------------------
496 NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream);
498 // static
499 already_AddRefed<ReadStream> ReadStream::Create(
500 const Maybe<CacheReadStream>& aMaybeReadStream) {
501 if (aMaybeReadStream.isNothing()) {
502 return nullptr;
505 return Create(aMaybeReadStream.ref());
508 // static
509 already_AddRefed<ReadStream> ReadStream::Create(
510 const CacheReadStream& aReadStream) {
511 // The parameter may or may not be for a Cache created stream. The way we
512 // tell is by looking at the stream control actor. If the actor exists,
513 // then we know the Cache created it.
514 if (!aReadStream.controlChild() && !aReadStream.controlParent()) {
515 return nullptr;
518 MOZ_DIAGNOSTIC_ASSERT(
519 aReadStream.stream().isNothing() ||
520 (aReadStream.stream().ref().stream().type() !=
521 mozilla::ipc::InputStreamParams::TIPCRemoteStreamParams &&
522 aReadStream.stream().ref().stream().type() !=
523 mozilla::ipc::InputStreamParams::T__None));
525 // Control is guaranteed to survive this method as ActorDestroy() cannot
526 // run on this thread until we complete.
527 StreamControl* control;
528 if (aReadStream.controlChild()) {
529 auto actor =
530 static_cast<CacheStreamControlChild*>(aReadStream.controlChild());
531 control = actor;
532 } else {
533 auto actor =
534 static_cast<CacheStreamControlParent*>(aReadStream.controlParent());
535 control = actor;
537 MOZ_DIAGNOSTIC_ASSERT(control);
539 nsCOMPtr<nsIInputStream> stream = DeserializeIPCStream(aReadStream.stream());
541 // Currently we expect all cache read streams to be blocking file streams.
542 #if defined(MOZ_DIAGNOSTIC_ASSERT_ENABLED)
543 if (stream) {
544 nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(stream);
545 MOZ_DIAGNOSTIC_ASSERT(!asyncStream);
547 #endif
549 return MakeAndAddRef<ReadStream>(MakeSafeRefPtr<ReadStream::Inner>(
550 std::move(control), aReadStream.id(), stream));
553 // static
554 already_AddRefed<ReadStream> ReadStream::Create(
555 PCacheStreamControlParent* aControl, const nsID& aId,
556 nsIInputStream* aStream) {
557 MOZ_DIAGNOSTIC_ASSERT(aControl);
559 return MakeAndAddRef<ReadStream>(MakeSafeRefPtr<ReadStream::Inner>(
560 static_cast<CacheStreamControlParent*>(aControl), aId, aStream));
563 void ReadStream::Serialize(
564 Maybe<CacheReadStream>* aReadStreamOut,
565 nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList, ErrorResult& aRv) {
566 mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv);
569 void ReadStream::Serialize(
570 CacheReadStream* aReadStreamOut,
571 nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList, ErrorResult& aRv) {
572 mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv);
575 ReadStream::ReadStream(SafeRefPtr<ReadStream::Inner> aInner)
576 : mInner(std::move(aInner)) {
577 MOZ_DIAGNOSTIC_ASSERT(mInner);
580 ReadStream::~ReadStream() {
581 // Explicitly close the inner stream so that it does not have to
582 // deal with implicitly closing at destruction time.
583 mInner->Close();
586 NS_IMETHODIMP
587 ReadStream::Close() { return mInner->Close(); }
589 NS_IMETHODIMP
590 ReadStream::Available(uint64_t* aNumAvailableOut) {
591 return mInner->Available(aNumAvailableOut);
594 NS_IMETHODIMP
595 ReadStream::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut) {
596 return mInner->Read(aBuf, aCount, aNumReadOut);
599 NS_IMETHODIMP
600 ReadStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
601 uint32_t aCount, uint32_t* aNumReadOut) {
602 return mInner->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
605 NS_IMETHODIMP
606 ReadStream::IsNonBlocking(bool* aNonBlockingOut) {
607 return mInner->IsNonBlocking(aNonBlockingOut);
610 } // namespace mozilla::dom::cache