1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "BodyStream.h"
8 #include "mozilla/CycleCollectedJSContext.h"
9 #include "mozilla/dom/DOMException.h"
10 #include "mozilla/dom/ScriptSettings.h"
11 #include "mozilla/dom/WorkerCommon.h"
12 #include "mozilla/dom/WorkerPrivate.h"
13 #include "mozilla/dom/WorkerRunnable.h"
14 #include "mozilla/Maybe.h"
15 #include "mozilla/Unused.h"
16 #include "nsProxyRelease.h"
17 #include "nsStreamUtils.h"
19 static NS_DEFINE_CID(kStreamTransportServiceCID
, NS_STREAMTRANSPORTSERVICE_CID
);
25 // ---------------------------------------------------------------------------
27 NS_IMPL_CYCLE_COLLECTION_CLASS(BodyStreamHolder
)
29 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(BodyStreamHolder
)
30 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
32 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(BodyStreamHolder
)
33 if (tmp
->mBodyStream
) {
34 tmp
->mBodyStream
->ReleaseObjects();
35 MOZ_ASSERT(!tmp
->mBodyStream
);
37 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
39 NS_IMPL_CYCLE_COLLECTING_ADDREF(BodyStreamHolder
)
40 NS_IMPL_CYCLE_COLLECTING_RELEASE(BodyStreamHolder
)
42 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(BodyStreamHolder
)
43 NS_INTERFACE_MAP_ENTRY(nsISupports
)
46 BodyStreamHolder::BodyStreamHolder() : mBodyStream(nullptr) {}
48 void BodyStreamHolder::StoreBodyStream(BodyStream
* aBodyStream
) {
49 MOZ_ASSERT(aBodyStream
);
50 MOZ_ASSERT(!mBodyStream
);
51 mBodyStream
= aBodyStream
;
54 void BodyStreamHolder::ForgetBodyStream() {
55 MOZ_ASSERT_IF(mStreamCreated
, mBodyStream
);
56 mBodyStream
= nullptr;
60 // ---------------------------------------------------------------------------
62 class BodyStream::WorkerShutdown final
: public WorkerControlRunnable
{
64 WorkerShutdown(WorkerPrivate
* aWorkerPrivate
, RefPtr
<BodyStream
> aStream
)
65 : WorkerControlRunnable(aWorkerPrivate
, WorkerThreadUnchangedBusyCount
),
68 bool WorkerRun(JSContext
* aCx
, WorkerPrivate
* aWorkerPrivate
) override
{
69 mStream
->ReleaseObjects();
73 // This runnable starts from a JS Thread. We need to disable a couple of
74 // assertions overring the following methods.
76 bool PreDispatch(WorkerPrivate
* aWorkerPrivate
) override
{ return true; }
78 void PostDispatch(WorkerPrivate
* aWorkerPrivate
,
79 bool aDispatchResult
) override
{}
82 RefPtr
<BodyStream
> mStream
;
85 NS_IMPL_ISUPPORTS(BodyStream
, nsIInputStreamCallback
, nsIObserver
,
86 nsISupportsWeakReference
)
89 void BodyStream::Create(JSContext
* aCx
, BodyStreamHolder
* aStreamHolder
,
90 nsIGlobalObject
* aGlobal
, nsIInputStream
* aInputStream
,
92 MOZ_DIAGNOSTIC_ASSERT(aCx
);
93 MOZ_DIAGNOSTIC_ASSERT(aStreamHolder
);
94 MOZ_DIAGNOSTIC_ASSERT(aInputStream
);
96 RefPtr
<BodyStream
> stream
=
97 new BodyStream(aGlobal
, aStreamHolder
, aInputStream
);
99 auto cleanup
= MakeScopeExit([stream
] { stream
->Close(); });
101 if (NS_IsMainThread()) {
102 nsCOMPtr
<nsIObserverService
> os
= mozilla::services::GetObserverService();
103 if (NS_WARN_IF(!os
)) {
104 aRv
.Throw(NS_ERROR_FAILURE
);
108 aRv
= os
->AddObserver(stream
, DOM_WINDOW_DESTROYED_TOPIC
, true);
109 if (NS_WARN_IF(aRv
.Failed())) {
114 WorkerPrivate
* workerPrivate
= GetWorkerPrivateFromContext(aCx
);
115 MOZ_ASSERT(workerPrivate
);
117 RefPtr
<WeakWorkerRef
> workerRef
=
118 WeakWorkerRef::Create(workerPrivate
, [stream
]() { stream
->Close(); });
120 if (NS_WARN_IF(!workerRef
)) {
121 aRv
.Throw(NS_ERROR_DOM_INVALID_STATE_ERR
);
125 // Note, this will create a ref-cycle between the holder and the stream.
126 // The cycle is broken when the stream is closed or the worker begins
128 stream
->mWorkerRef
= std::move(workerRef
);
131 aRv
.MightThrowJSException();
132 JS::Rooted
<JSObject
*> body(aCx
, JS::NewReadableExternalSourceStreamObject(
133 aCx
, stream
, aStreamHolder
));
135 aRv
.StealExceptionFromJSContext(aCx
);
139 // This will be released in BodyStream::FinalizeCallback(). We are
140 // guaranteed the jsapi will call FinalizeCallback when ReadableStream
141 // js object is finalized.
142 NS_ADDREF(stream
.get());
146 aStreamHolder
->StoreBodyStream(stream
);
147 aStreamHolder
->SetReadableStreamBody(body
);
150 aStreamHolder
->mStreamCreated
= true;
154 void BodyStream::requestData(JSContext
* aCx
, JS::HandleObject aStream
,
155 size_t aDesiredSize
) {
156 #if MOZ_DIAGNOSTIC_ASSERT_ENABLED
158 if (!JS::ReadableStreamIsDisturbed(aCx
, aStream
, &disturbed
)) {
159 JS_ClearPendingException(aCx
);
161 MOZ_DIAGNOSTIC_ASSERT(disturbed
);
165 AssertIsOnOwningThread();
167 MutexAutoLock
lock(mMutex
);
169 MOZ_DIAGNOSTIC_ASSERT(mState
== eInitializing
|| mState
== eWaiting
||
170 mState
== eChecking
|| mState
== eReading
);
172 if (mState
== eReading
) {
173 // We are already reading data.
177 if (mState
== eChecking
) {
178 // If we are looking for more data, there is nothing else we should do:
179 // let's move this checking operation in a reading.
180 MOZ_ASSERT(mInputStream
);
185 if (mState
== eInitializing
) {
186 // The stream has been used for the first time.
187 mStreamHolder
->MarkAsRead();
193 // This is the first use of the stream. Let's convert the
194 // mOriginalInputStream into an nsIAsyncInputStream.
195 MOZ_ASSERT(mOriginalInputStream
);
197 nsCOMPtr
<nsIAsyncInputStream
> asyncStream
;
198 nsresult rv
= NS_MakeAsyncNonBlockingInputStream(
199 mOriginalInputStream
.forget(), getter_AddRefs(asyncStream
));
200 if (NS_WARN_IF(NS_FAILED(rv
))) {
201 ErrorPropagation(aCx
, lock
, aStream
, rv
);
205 mInputStream
= asyncStream
;
206 mOriginalInputStream
= nullptr;
209 MOZ_DIAGNOSTIC_ASSERT(mInputStream
);
210 MOZ_DIAGNOSTIC_ASSERT(!mOriginalInputStream
);
212 nsresult rv
= mInputStream
->AsyncWait(this, 0, 0, mOwningEventTarget
);
213 if (NS_WARN_IF(NS_FAILED(rv
))) {
214 ErrorPropagation(aCx
, lock
, aStream
, rv
);
221 void BodyStream::writeIntoReadRequestBuffer(JSContext
* aCx
,
222 JS::HandleObject aStream
,
223 void* aBuffer
, size_t aLength
,
224 size_t* aByteWritten
) {
225 MOZ_DIAGNOSTIC_ASSERT(aBuffer
);
226 MOZ_DIAGNOSTIC_ASSERT(aByteWritten
);
228 AssertIsOnOwningThread();
230 MutexAutoLock
lock(mMutex
);
232 MOZ_DIAGNOSTIC_ASSERT(mInputStream
);
233 MOZ_DIAGNOSTIC_ASSERT(mState
== eWriting
);
238 mInputStream
->Read(static_cast<char*>(aBuffer
), aLength
, &written
);
239 if (NS_WARN_IF(NS_FAILED(rv
))) {
240 ErrorPropagation(aCx
, lock
, aStream
, rv
);
244 *aByteWritten
= written
;
247 CloseAndReleaseObjects(aCx
, lock
, aStream
);
251 rv
= mInputStream
->AsyncWait(this, 0, 0, mOwningEventTarget
);
252 if (NS_WARN_IF(NS_FAILED(rv
))) {
253 ErrorPropagation(aCx
, lock
, aStream
, rv
);
260 JS::Value
BodyStream::cancel(JSContext
* aCx
, JS::HandleObject aStream
,
261 JS::HandleValue aReason
) {
262 AssertIsOnOwningThread();
264 if (mState
== eInitializing
) {
265 // The stream has been used for the first time.
266 mStreamHolder
->MarkAsRead();
270 mInputStream
->CloseWithStatus(NS_BASE_STREAM_CLOSED
);
273 // It could be that we don't have mInputStream yet, but we still have the
274 // original stream. We need to close that too.
275 if (mOriginalInputStream
) {
276 MOZ_ASSERT(!mInputStream
);
277 mOriginalInputStream
->Close();
281 return JS::UndefinedValue();
284 void BodyStream::onClosed(JSContext
* aCx
, JS::HandleObject aStream
) {}
286 void BodyStream::onErrored(JSContext
* aCx
, JS::HandleObject aStream
,
287 JS::HandleValue aReason
) {
288 AssertIsOnOwningThread();
290 if (mState
== eInitializing
) {
291 // The stream has been used for the first time.
292 mStreamHolder
->MarkAsRead();
296 mInputStream
->CloseWithStatus(NS_BASE_STREAM_CLOSED
);
302 void BodyStream::finalize() {
303 // This can be called in any thread.
305 // This takes ownership of the ref created in BodyStream::Create().
306 RefPtr
<BodyStream
> stream
= dont_AddRef(this);
308 stream
->ReleaseObjects();
311 BodyStream::BodyStream(nsIGlobalObject
* aGlobal
,
312 BodyStreamHolder
* aStreamHolder
,
313 nsIInputStream
* aInputStream
)
314 : mMutex("BodyStream::mMutex"),
315 mState(eInitializing
),
317 mStreamHolder(aStreamHolder
),
318 mOwningEventTarget(aGlobal
->EventTargetFor(TaskCategory::Other
)),
319 mOriginalInputStream(aInputStream
) {
320 MOZ_DIAGNOSTIC_ASSERT(aInputStream
);
321 MOZ_DIAGNOSTIC_ASSERT(aStreamHolder
);
324 BodyStream::~BodyStream() = default;
326 void BodyStream::ErrorPropagation(JSContext
* aCx
,
327 const MutexAutoLock
& aProofOfLock
,
328 JS::HandleObject aStream
, nsresult aError
) {
329 AssertIsOnOwningThread();
332 if (mState
== eClosed
) {
336 // Let's close the stream.
337 if (aError
== NS_BASE_STREAM_CLOSED
) {
338 CloseAndReleaseObjects(aCx
, aProofOfLock
, aStream
);
342 // Let's use a generic error.
344 // XXXbz can we come up with a better error message here to tell the
345 // consumer what went wrong?
346 rv
.ThrowTypeError("Error in body stream");
348 JS::Rooted
<JS::Value
> errorValue(aCx
);
349 bool ok
= ToJSValue(aCx
, std::move(rv
), &errorValue
);
350 MOZ_RELEASE_ASSERT(ok
, "ToJSValue never fails for ErrorResult");
353 MutexAutoUnlock
unlock(mMutex
);
354 JS::ReadableStreamError(aCx
, aStream
, errorValue
);
357 ReleaseObjects(aProofOfLock
);
361 BodyStream::OnInputStreamReady(nsIAsyncInputStream
* aStream
) {
362 AssertIsOnOwningThread();
363 MOZ_DIAGNOSTIC_ASSERT(aStream
);
365 // Acquire |mMutex| in order to safely inspect |mState| and use |mGlobal|.
366 Maybe
<MutexAutoLock
> lock
;
367 lock
.emplace(mMutex
);
369 // Already closed. We have nothing else to do here.
370 if (mState
== eClosed
) {
374 // Perform a microtask checkpoint after all actions are completed. Note that
375 // |mMutex| *must not* be held when the checkpoint occurs -- hence, far down,
376 // the |lock.reset()|. (|MutexAutoUnlock| as RAII wouldn't work for this task
377 // because its destructor would reacquire |mMutex| before these objects'
380 AutoEntryScript
aes(mGlobal
, "fetch body data available");
382 MOZ_DIAGNOSTIC_ASSERT(mInputStream
);
383 MOZ_DIAGNOSTIC_ASSERT(mState
== eReading
|| mState
== eChecking
);
385 JSObject
* streamObj
= mStreamHolder
->GetReadableStreamBody();
387 return NS_ERROR_FAILURE
;
390 JSContext
* cx
= aes
.cx();
391 JS::Rooted
<JSObject
*> stream(cx
, streamObj
);
394 nsresult rv
= mInputStream
->Available(&size
);
395 if (NS_SUCCEEDED(rv
) && size
== 0) {
396 // In theory this should not happen. If size is 0, the stream should be
397 // considered closed.
398 rv
= NS_BASE_STREAM_CLOSED
;
401 // No warning for stream closed.
402 if (rv
== NS_BASE_STREAM_CLOSED
|| NS_WARN_IF(NS_FAILED(rv
))) {
403 ErrorPropagation(cx
, *lock
, stream
, rv
);
407 // This extra checking is completed. Let's wait for the next read request.
408 if (mState
== eChecking
) {
415 // Release the mutex before the call below (which could execute JS), as well
416 // as before the microtask checkpoint queued up above occurs.
419 Unused
<< JS::ReadableStreamUpdateDataAvailableFromSource(cx
, stream
, size
);
421 // The previous call can execute JS (even up to running a nested event loop),
422 // so |mState| can't be asserted to have any particular value, even if the
423 // previous call succeeds.
429 nsresult
BodyStream::RetrieveInputStream(
430 JS::ReadableStreamUnderlyingSource
* aUnderlyingReadableStreamSource
,
431 nsIInputStream
** aInputStream
) {
432 MOZ_ASSERT(aUnderlyingReadableStreamSource
);
433 MOZ_ASSERT(aInputStream
);
435 RefPtr
<BodyStream
> stream
=
436 static_cast<BodyStream
*>(aUnderlyingReadableStreamSource
);
437 stream
->AssertIsOnOwningThread();
439 // if mOriginalInputStream is null, the reading already started. We don't want
440 // to expose the internal inputStream.
441 if (NS_WARN_IF(!stream
->mOriginalInputStream
)) {
442 return NS_ERROR_DOM_INVALID_STATE_ERR
;
445 nsCOMPtr
<nsIInputStream
> inputStream
= stream
->mOriginalInputStream
;
446 inputStream
.forget(aInputStream
);
450 void BodyStream::Close() {
451 AssertIsOnOwningThread();
453 MutexAutoLock
lock(mMutex
);
455 if (mState
== eClosed
) {
460 if (NS_WARN_IF(!jsapi
.Init(mGlobal
))) {
461 ReleaseObjects(lock
);
465 JSObject
* streamObj
= mStreamHolder
->GetReadableStreamBody();
467 JSContext
* cx
= jsapi
.cx();
468 JS::Rooted
<JSObject
*> stream(cx
, streamObj
);
469 CloseAndReleaseObjects(cx
, lock
, stream
);
471 ReleaseObjects(lock
);
475 void BodyStream::CloseAndReleaseObjects(JSContext
* aCx
,
476 const MutexAutoLock
& aProofOfLock
,
477 JS::HandleObject aStream
) {
478 AssertIsOnOwningThread();
479 MOZ_DIAGNOSTIC_ASSERT(mState
!= eClosed
);
481 ReleaseObjects(aProofOfLock
);
483 MutexAutoUnlock
unlock(mMutex
);
485 if (!JS::ReadableStreamIsReadable(aCx
, aStream
, &readable
)) {
489 JS::ReadableStreamClose(aCx
, aStream
);
493 void BodyStream::ReleaseObjects() {
494 MutexAutoLock
lock(mMutex
);
495 ReleaseObjects(lock
);
498 void BodyStream::ReleaseObjects(const MutexAutoLock
& aProofOfLock
) {
499 // This method can be called on 2 possible threads: the owning one and a JS
500 // thread used to release resources. If we are on the JS thread, we need to
501 // dispatch a runnable to go back to the owning thread in order to release
502 // resources correctly.
504 if (mState
== eClosed
) {
505 // Already gone. Nothing to do.
509 if (!NS_IsMainThread() && !IsCurrentThreadRunningWorker()) {
510 // Let's dispatch a WorkerControlRunnable if the owning thread is a worker.
512 RefPtr
<WorkerShutdown
> r
=
513 new WorkerShutdown(mWorkerRef
->GetUnsafePrivate(), this);
514 Unused
<< NS_WARN_IF(!r
->Dispatch());
518 // A normal runnable of the owning thread is the main-thread.
519 RefPtr
<BodyStream
> self
= this;
520 RefPtr
<Runnable
> r
= NS_NewRunnableFunction(
521 "BodyStream::ReleaseObjects", [self
]() { self
->ReleaseObjects(); });
522 mOwningEventTarget
->Dispatch(r
.forget());
526 AssertIsOnOwningThread();
530 if (NS_IsMainThread()) {
531 nsCOMPtr
<nsIObserverService
> obs
= mozilla::services::GetObserverService();
533 obs
->RemoveObserver(this, DOM_WINDOW_DESTROYED_TOPIC
);
537 JSObject
* streamObj
= mStreamHolder
->GetReadableStreamBody();
539 // Let's inform the JSEngine that we are going to be released.
540 JS::ReadableStreamReleaseCCObject(streamObj
);
543 mWorkerRef
= nullptr;
546 mStreamHolder
->ForgetBodyStream();
547 mStreamHolder
->NullifyStream();
548 mStreamHolder
= nullptr;
552 void BodyStream::AssertIsOnOwningThread() {
553 NS_ASSERT_OWNINGTHREAD(BodyStream
);
561 BodyStream::Observe(nsISupports
* aSubject
, const char* aTopic
,
562 const char16_t
* aData
) {
563 AssertIsOnMainThread();
564 AssertIsOnOwningThread();
566 MOZ_ASSERT(strcmp(aTopic
, DOM_WINDOW_DESTROYED_TOPIC
) == 0);
568 nsCOMPtr
<nsPIDOMWindowInner
> window
= do_QueryInterface(mGlobal
);
569 if (SameCOMIdentity(aSubject
, window
)) {
577 } // namespace mozilla