1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/cache/ReadStream.h"
9 #include "mozilla/Unused.h"
10 #include "mozilla/dom/cache/CacheStreamControlChild.h"
11 #include "mozilla/dom/cache/CacheStreamControlParent.h"
12 #include "mozilla/dom/cache/CacheTypes.h"
13 #include "mozilla/ipc/IPCStreamUtils.h"
14 #include "mozilla/SnappyUncompressInputStream.h"
15 #include "nsIAsyncInputStream.h"
16 #include "nsIThread.h"
17 #include "nsStringStream.h"
20 namespace mozilla::dom::cache
{
22 using mozilla::Unused
;
23 using mozilla::ipc::AutoIPCStream
;
24 using mozilla::ipc::IPCStream
;
26 // ----------------------------------------------------------------------------
28 // The inner stream class. This is where all of the real work is done. As
29 // an invariant Inner::Close() must be called before ~Inner(). This is
30 // guaranteed by our outer ReadStream class.
31 class ReadStream::Inner final
: public ReadStream::Controllable
{
33 Inner(StreamControl
* aControl
, const nsID
& aId
, nsIInputStream
* aStream
);
35 void Serialize(Maybe
<CacheReadStream
>* aReadStreamOut
,
36 nsTArray
<UniquePtr
<AutoIPCStream
>>& aStreamCleanupList
,
39 void Serialize(CacheReadStream
* aReadStreamOut
,
40 nsTArray
<UniquePtr
<AutoIPCStream
>>& aStreamCleanupList
,
43 // ReadStream::Controllable methods
44 virtual void CloseStream() override
;
46 virtual void CloseStreamWithoutReporting() override
;
48 virtual bool HasEverBeenRead() const override
;
50 // Simulate nsIInputStream methods, but we don't actually inherit from it
53 nsresult
Available(uint64_t* aNumAvailableOut
);
55 nsresult
Read(char* aBuf
, uint32_t aCount
, uint32_t* aNumReadOut
);
57 nsresult
ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
58 uint32_t aCount
, uint32_t* aNumReadOut
);
60 nsresult
IsNonBlocking(bool* aNonBlockingOut
);
67 class NoteClosedRunnable
;
74 void NoteClosedOnOwningThread();
76 void ForgetOnOwningThread();
78 nsIInputStream
* EnsureStream();
80 void AsyncOpenStreamOnOwningThread();
82 void MaybeAbortAsyncOpenStream();
84 void OpenStreamFailed();
86 inline SafeRefPtr
<Inner
> SafeRefPtrFromThis() {
87 return Controllable::SafeRefPtrFromThis().downcast
<Inner
>();
90 // Weak ref to the stream control actor. The actor will always call either
91 // CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The
92 // weak ref is cleared in the resulting NoteClosedOnOwningThread() or
93 // ForgetOnOwningThread() method call.
94 StreamControl
* mControl
;
97 nsCOMPtr
<nsISerialEventTarget
> mOwningEventTarget
;
99 enum State
{ Open
, Closed
, NumStates
};
100 Atomic
<State
> mState
;
101 Atomic
<bool> mHasEverBeenRead
;
102 bool mAsyncOpenStarted
;
104 // The wrapped stream objects may not be threadsafe. We need to be able
105 // to close a stream on our owning thread while an IO thread is simultaneously
106 // reading the same stream. Therefore, protect all access to these stream
107 // objects with a mutex.
110 nsCOMPtr
<nsIInputStream
> mStream
;
111 nsCOMPtr
<nsIInputStream
> mSnappyStream
;
114 // ----------------------------------------------------------------------------
116 // Runnable to notify actors that the ReadStream has closed. This must
117 // be done on the thread associated with the PBackground actor. Must be
118 // cancelable to execute on Worker threads (which can occur when the
119 // ReadStream is constructed on a child process Worker thread).
120 class ReadStream::Inner::NoteClosedRunnable final
: public CancelableRunnable
{
122 explicit NoteClosedRunnable(SafeRefPtr
<ReadStream::Inner
> aStream
)
123 : CancelableRunnable("dom::cache::ReadStream::Inner::NoteClosedRunnable"),
124 mStream(std::move(aStream
)) {}
126 NS_IMETHOD
Run() override
{
127 mStream
->NoteClosedOnOwningThread();
131 // Note, we must proceed with the Run() method since our actor will not
132 // clean itself up until we note that the stream is closed.
133 nsresult
Cancel() override
{
139 ~NoteClosedRunnable() = default;
141 const SafeRefPtr
<ReadStream::Inner
> mStream
;
144 // ----------------------------------------------------------------------------
146 // Runnable to clear actors without reporting that the ReadStream has
147 // closed. Since this can trigger actor destruction, we need to do
148 // it on the thread associated with the PBackground actor. Must be
149 // cancelable to execute on Worker threads (which can occur when the
150 // ReadStream is constructed on a child process Worker thread).
151 class ReadStream::Inner::ForgetRunnable final
: public CancelableRunnable
{
153 explicit ForgetRunnable(SafeRefPtr
<ReadStream::Inner
> aStream
)
154 : CancelableRunnable("dom::cache::ReadStream::Inner::ForgetRunnable"),
155 mStream(std::move(aStream
)) {}
157 NS_IMETHOD
Run() override
{
158 mStream
->ForgetOnOwningThread();
162 // Note, we must proceed with the Run() method so that we properly
163 // call RemoveListener on the actor.
164 nsresult
Cancel() override
{
170 ~ForgetRunnable() = default;
172 const SafeRefPtr
<ReadStream::Inner
> mStream
;
175 // ----------------------------------------------------------------------------
177 ReadStream::Inner::Inner(StreamControl
* aControl
, const nsID
& aId
,
178 nsIInputStream
* aStream
)
179 : mControl(aControl
),
181 mOwningEventTarget(GetCurrentSerialEventTarget()),
183 mHasEverBeenRead(false),
184 mAsyncOpenStarted(false),
185 mMutex("dom::cache::ReadStream"),
186 mCondVar(mMutex
, "dom::cache::ReadStream"),
188 mSnappyStream(aStream
? new SnappyUncompressInputStream(aStream
)
190 MOZ_DIAGNOSTIC_ASSERT(mControl
);
191 mControl
->AddReadStream(SafeRefPtrFromThis());
194 void ReadStream::Inner::Serialize(
195 Maybe
<CacheReadStream
>* aReadStreamOut
,
196 nsTArray
<UniquePtr
<AutoIPCStream
>>& aStreamCleanupList
, ErrorResult
& aRv
) {
197 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
198 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut
);
199 aReadStreamOut
->emplace(CacheReadStream());
200 Serialize(&aReadStreamOut
->ref(), aStreamCleanupList
, aRv
);
203 void ReadStream::Inner::Serialize(
204 CacheReadStream
* aReadStreamOut
,
205 nsTArray
<UniquePtr
<AutoIPCStream
>>& aStreamCleanupList
, ErrorResult
& aRv
) {
206 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
207 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut
);
209 if (mState
!= Open
) {
211 "Response body is a cache file stream that has already been closed.");
215 MOZ_DIAGNOSTIC_ASSERT(mControl
);
217 aReadStreamOut
->id() = mId
;
218 mControl
->SerializeControl(aReadStreamOut
);
221 MutexAutoLock
lock(mMutex
);
222 mControl
->SerializeStream(aReadStreamOut
, mStream
, aStreamCleanupList
);
225 MOZ_DIAGNOSTIC_ASSERT(
226 aReadStreamOut
->stream().isNothing() ||
227 (aReadStreamOut
->stream().ref().stream().type() !=
228 mozilla::ipc::InputStreamParams::TIPCRemoteStreamParams
&&
229 aReadStreamOut
->stream().ref().stream().type() !=
230 mozilla::ipc::InputStreamParams::T__None
));
232 // We're passing ownership across the IPC barrier with the control, so
233 // do not signal that the stream is closed here.
237 void ReadStream::Inner::CloseStream() {
238 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
239 MOZ_ALWAYS_SUCCEEDS(Close());
242 void ReadStream::Inner::CloseStreamWithoutReporting() {
243 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
247 bool ReadStream::Inner::HasEverBeenRead() const {
248 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
249 return mHasEverBeenRead
;
252 nsresult
ReadStream::Inner::Close() {
253 // stream ops can happen on any thread
256 MutexAutoLock
lock(mMutex
);
258 rv
= mSnappyStream
->Close();
265 nsresult
ReadStream::Inner::Available(uint64_t* aNumAvailableOut
) {
266 // stream ops can happen on any thread
269 MutexAutoLock
lock(mMutex
);
270 rv
= EnsureStream()->Available(aNumAvailableOut
);
280 nsresult
ReadStream::Inner::Read(char* aBuf
, uint32_t aCount
,
281 uint32_t* aNumReadOut
) {
282 // stream ops can happen on any thread
283 MOZ_DIAGNOSTIC_ASSERT(aNumReadOut
);
287 MutexAutoLock
lock(mMutex
);
288 rv
= EnsureStream()->Read(aBuf
, aCount
, aNumReadOut
);
291 if ((NS_FAILED(rv
) && rv
!= NS_BASE_STREAM_WOULD_BLOCK
) ||
296 mHasEverBeenRead
= true;
301 nsresult
ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter
,
302 void* aClosure
, uint32_t aCount
,
303 uint32_t* aNumReadOut
) {
304 // stream ops can happen on any thread
305 MOZ_DIAGNOSTIC_ASSERT(aNumReadOut
);
308 mHasEverBeenRead
= true;
313 MutexAutoLock
lock(mMutex
);
314 rv
= EnsureStream()->ReadSegments(aWriter
, aClosure
, aCount
, aNumReadOut
);
317 if ((NS_FAILED(rv
) && rv
!= NS_BASE_STREAM_WOULD_BLOCK
&&
318 rv
!= NS_ERROR_NOT_IMPLEMENTED
) ||
323 // Verify bytes were actually read before marking as being ever read. For
324 // example, code can test if the stream supports ReadSegments() by calling
325 // this method with a dummy callback which doesn't read anything. We don't
326 // want to trigger on that.
328 mHasEverBeenRead
= true;
334 nsresult
ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut
) {
335 // stream ops can happen on any thread
336 MutexAutoLock
lock(mMutex
);
338 return mSnappyStream
->IsNonBlocking(aNonBlockingOut
);
340 *aNonBlockingOut
= false;
344 ReadStream::Inner::~Inner() {
346 MOZ_DIAGNOSTIC_ASSERT(mState
== Closed
);
347 MOZ_DIAGNOSTIC_ASSERT(!mControl
);
350 void ReadStream::Inner::NoteClosed() {
352 if (mState
== Closed
) {
356 if (mOwningEventTarget
->IsOnCurrentThread()) {
357 NoteClosedOnOwningThread();
361 nsCOMPtr
<nsIRunnable
> runnable
= new NoteClosedRunnable(SafeRefPtrFromThis());
362 MOZ_ALWAYS_SUCCEEDS(mOwningEventTarget
->Dispatch(runnable
.forget(),
363 nsIThread::DISPATCH_NORMAL
));
366 void ReadStream::Inner::Forget() {
368 if (mState
== Closed
) {
372 if (mOwningEventTarget
->IsOnCurrentThread()) {
373 ForgetOnOwningThread();
377 nsCOMPtr
<nsIRunnable
> runnable
= new ForgetRunnable(SafeRefPtrFromThis());
378 MOZ_ALWAYS_SUCCEEDS(mOwningEventTarget
->Dispatch(runnable
.forget(),
379 nsIThread::DISPATCH_NORMAL
));
382 void ReadStream::Inner::NoteClosedOnOwningThread() {
383 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
385 // Mark closed and do nothing if we were already closed
386 if (!mState
.compareExchange(Open
, Closed
)) {
390 MaybeAbortAsyncOpenStream();
392 MOZ_DIAGNOSTIC_ASSERT(mControl
);
393 mControl
->NoteClosed(SafeRefPtrFromThis(), mId
);
397 void ReadStream::Inner::ForgetOnOwningThread() {
398 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
400 // Mark closed and do nothing if we were already closed
401 if (!mState
.compareExchange(Open
, Closed
)) {
405 MaybeAbortAsyncOpenStream();
407 MOZ_DIAGNOSTIC_ASSERT(mControl
);
408 mControl
->ForgetReadStream(SafeRefPtrFromThis());
412 nsIInputStream
* ReadStream::Inner::EnsureStream() {
413 mMutex
.AssertCurrentThreadOwns();
415 // We need to block the current thread while we open the stream. We
416 // cannot do this safely from the main owning thread since it would
417 // trigger deadlock. This should be ok, though, since a blocking
418 // stream like this should never be read on the owning thread anyway.
419 if (mOwningEventTarget
->IsOnCurrentThread()) {
420 MOZ_CRASH("Blocking read on the js/ipc owning thread!");
424 return mSnappyStream
;
427 nsCOMPtr
<nsIRunnable
> r
= NewCancelableRunnableMethod(
428 "ReadStream::Inner::AsyncOpenStreamOnOwningThread", this,
429 &ReadStream::Inner::AsyncOpenStreamOnOwningThread
);
431 mOwningEventTarget
->Dispatch(r
.forget(), nsIThread::DISPATCH_NORMAL
);
432 if (NS_WARN_IF(NS_FAILED(rv
))) {
434 return mSnappyStream
;
438 MOZ_DIAGNOSTIC_ASSERT(mSnappyStream
);
440 return mSnappyStream
;
443 void ReadStream::Inner::AsyncOpenStreamOnOwningThread() {
444 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
446 if (!mControl
|| mState
== Closed
) {
447 MutexAutoLock
lock(mMutex
);
449 mCondVar
.NotifyAll();
453 if (mAsyncOpenStarted
) {
456 mAsyncOpenStarted
= true;
458 RefPtr
<ReadStream::Inner
> self
= this;
459 mControl
->OpenStream(mId
, [self
](nsCOMPtr
<nsIInputStream
>&& aStream
) {
460 MutexAutoLock
lock(self
->mMutex
);
461 self
->mAsyncOpenStarted
= false;
462 if (!self
->mStream
) {
464 self
->OpenStreamFailed();
466 self
->mStream
= std::move(aStream
);
467 self
->mSnappyStream
= new SnappyUncompressInputStream(self
->mStream
);
470 self
->mCondVar
.NotifyAll();
474 void ReadStream::Inner::MaybeAbortAsyncOpenStream() {
475 if (!mAsyncOpenStarted
) {
479 MutexAutoLock
lock(mMutex
);
481 mCondVar
.NotifyAll();
484 void ReadStream::Inner::OpenStreamFailed() {
485 MOZ_DIAGNOSTIC_ASSERT(!mStream
);
486 MOZ_DIAGNOSTIC_ASSERT(!mSnappyStream
);
487 mMutex
.AssertCurrentThreadOwns();
488 Unused
<< NS_NewCStringInputStream(getter_AddRefs(mStream
), ""_ns
);
489 mSnappyStream
= mStream
;
494 // ----------------------------------------------------------------------------
496 NS_IMPL_ISUPPORTS(cache::ReadStream
, nsIInputStream
, ReadStream
);
499 already_AddRefed
<ReadStream
> ReadStream::Create(
500 const Maybe
<CacheReadStream
>& aMaybeReadStream
) {
501 if (aMaybeReadStream
.isNothing()) {
505 return Create(aMaybeReadStream
.ref());
509 already_AddRefed
<ReadStream
> ReadStream::Create(
510 const CacheReadStream
& aReadStream
) {
511 // The parameter may or may not be for a Cache created stream. The way we
512 // tell is by looking at the stream control actor. If the actor exists,
513 // then we know the Cache created it.
514 if (!aReadStream
.controlChild() && !aReadStream
.controlParent()) {
518 MOZ_DIAGNOSTIC_ASSERT(
519 aReadStream
.stream().isNothing() ||
520 (aReadStream
.stream().ref().stream().type() !=
521 mozilla::ipc::InputStreamParams::TIPCRemoteStreamParams
&&
522 aReadStream
.stream().ref().stream().type() !=
523 mozilla::ipc::InputStreamParams::T__None
));
525 // Control is guaranteed to survive this method as ActorDestroy() cannot
526 // run on this thread until we complete.
527 StreamControl
* control
;
528 if (aReadStream
.controlChild()) {
530 static_cast<CacheStreamControlChild
*>(aReadStream
.controlChild());
534 static_cast<CacheStreamControlParent
*>(aReadStream
.controlParent());
537 MOZ_DIAGNOSTIC_ASSERT(control
);
539 nsCOMPtr
<nsIInputStream
> stream
= DeserializeIPCStream(aReadStream
.stream());
541 // Currently we expect all cache read streams to be blocking file streams.
542 #if defined(MOZ_DIAGNOSTIC_ASSERT_ENABLED)
544 nsCOMPtr
<nsIAsyncInputStream
> asyncStream
= do_QueryInterface(stream
);
545 MOZ_DIAGNOSTIC_ASSERT(!asyncStream
);
549 return MakeAndAddRef
<ReadStream
>(MakeSafeRefPtr
<ReadStream::Inner
>(
550 std::move(control
), aReadStream
.id(), stream
));
554 already_AddRefed
<ReadStream
> ReadStream::Create(
555 PCacheStreamControlParent
* aControl
, const nsID
& aId
,
556 nsIInputStream
* aStream
) {
557 MOZ_DIAGNOSTIC_ASSERT(aControl
);
559 return MakeAndAddRef
<ReadStream
>(MakeSafeRefPtr
<ReadStream::Inner
>(
560 static_cast<CacheStreamControlParent
*>(aControl
), aId
, aStream
));
563 void ReadStream::Serialize(
564 Maybe
<CacheReadStream
>* aReadStreamOut
,
565 nsTArray
<UniquePtr
<AutoIPCStream
>>& aStreamCleanupList
, ErrorResult
& aRv
) {
566 mInner
->Serialize(aReadStreamOut
, aStreamCleanupList
, aRv
);
569 void ReadStream::Serialize(
570 CacheReadStream
* aReadStreamOut
,
571 nsTArray
<UniquePtr
<AutoIPCStream
>>& aStreamCleanupList
, ErrorResult
& aRv
) {
572 mInner
->Serialize(aReadStreamOut
, aStreamCleanupList
, aRv
);
575 ReadStream::ReadStream(SafeRefPtr
<ReadStream::Inner
> aInner
)
576 : mInner(std::move(aInner
)) {
577 MOZ_DIAGNOSTIC_ASSERT(mInner
);
580 ReadStream::~ReadStream() {
581 // Explicitly close the inner stream so that it does not have to
582 // deal with implicitly closing at destruction time.
587 ReadStream::Close() { return mInner
->Close(); }
590 ReadStream::Available(uint64_t* aNumAvailableOut
) {
591 return mInner
->Available(aNumAvailableOut
);
595 ReadStream::Read(char* aBuf
, uint32_t aCount
, uint32_t* aNumReadOut
) {
596 return mInner
->Read(aBuf
, aCount
, aNumReadOut
);
600 ReadStream::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
601 uint32_t aCount
, uint32_t* aNumReadOut
) {
602 return mInner
->ReadSegments(aWriter
, aClosure
, aCount
, aNumReadOut
);
606 ReadStream::IsNonBlocking(bool* aNonBlockingOut
) {
607 return mInner
->IsNonBlocking(aNonBlockingOut
);
610 } // namespace mozilla::dom::cache