1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/ReadableStreamDefaultController.h"
9 #include "js/Exception.h"
10 #include "js/TypeDecls.h"
12 #include "mozilla/AlreadyAddRefed.h"
13 #include "mozilla/Attributes.h"
14 #include "mozilla/HoldDropJSObjects.h"
15 #include "mozilla/dom/Promise.h"
16 #include "mozilla/dom/Promise-inl.h"
17 #include "mozilla/dom/ReadableStream.h"
18 #include "mozilla/dom/ReadableStreamController.h"
19 #include "mozilla/dom/ReadableStreamDefaultControllerBinding.h"
20 #include "mozilla/dom/ReadableStreamDefaultReaderBinding.h"
21 #include "mozilla/dom/UnderlyingSourceBinding.h"
22 #include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
23 #include "nsCycleCollectionParticipant.h"
24 #include "nsISupports.h"
26 namespace mozilla::dom
{
28 using namespace streams_abstract
;
30 NS_IMPL_CYCLE_COLLECTION(ReadableStreamController
, mGlobal
, mAlgorithms
,
32 NS_IMPL_CYCLE_COLLECTING_ADDREF(ReadableStreamController
)
33 NS_IMPL_CYCLE_COLLECTING_RELEASE(ReadableStreamController
)
35 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ReadableStreamController
)
36 NS_INTERFACE_MAP_ENTRY(nsISupports
)
39 ReadableStreamController::ReadableStreamController(nsIGlobalObject
* aGlobal
)
42 void ReadableStreamController::SetStream(ReadableStream
* aStream
) {
46 // Note: Using the individual macros vs NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE
47 // because I need to specify a manual implementation of
48 // NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN.
49 NS_IMPL_CYCLE_COLLECTION_CLASS(ReadableStreamDefaultController
)
51 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED(ReadableStreamDefaultController
,
52 ReadableStreamController
)
53 NS_IMPL_CYCLE_COLLECTION_UNLINK(mStrategySizeAlgorithm
)
55 NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
56 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
58 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(
59 ReadableStreamDefaultController
, ReadableStreamController
)
60 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mStrategySizeAlgorithm
)
61 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
63 NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN_INHERITED(ReadableStreamDefaultController
,
64 ReadableStreamController
)
65 NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
66 // Trace the associated queue.
67 for (const auto& queueEntry
: tmp
->mQueue
) {
68 aCallbacks
.Trace(&queueEntry
->mValue
, "mQueue.mValue", aClosure
);
70 NS_IMPL_CYCLE_COLLECTION_TRACE_END
72 NS_IMPL_ADDREF_INHERITED(ReadableStreamDefaultController
,
73 ReadableStreamController
)
74 NS_IMPL_RELEASE_INHERITED(ReadableStreamDefaultController
,
75 ReadableStreamController
)
77 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ReadableStreamDefaultController
)
78 NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
79 NS_INTERFACE_MAP_END_INHERITING(ReadableStreamController
)
81 ReadableStreamDefaultController::ReadableStreamDefaultController(
82 nsIGlobalObject
* aGlobal
)
83 : ReadableStreamController(aGlobal
) {
84 mozilla::HoldJSObjects(this);
87 ReadableStreamDefaultController::~ReadableStreamDefaultController() {
88 // MG:XXX: LinkedLists are required to be empty at destruction, but it seems
89 // it is possible to have a controller be destructed while still
90 // having entries in its queue.
92 // This needs to be verified as not indicating some other issue.
93 mozilla::DropJSObjects(this);
97 JSObject
* ReadableStreamDefaultController::WrapObject(
98 JSContext
* aCx
, JS::Handle
<JSObject
*> aGivenProto
) {
99 return ReadableStreamDefaultController_Binding::Wrap(aCx
, this, aGivenProto
);
102 namespace streams_abstract
{
104 // https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue
105 static bool ReadableStreamDefaultControllerCanCloseOrEnqueue(
106 ReadableStreamDefaultController
* aController
) {
107 // Step 1. Let state be controller.[[stream]].[[state]].
108 ReadableStream::ReaderState state
= aController
->Stream()->State();
110 // Step 2. If controller.[[closeRequested]] is false and state is "readable",
112 // Step 3. Return false.
113 return !aController
->CloseRequested() &&
114 state
== ReadableStream::ReaderState::Readable
;
117 // https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue
118 // This is a variant of ReadableStreamDefaultControllerCanCloseOrEnqueue
119 // that also throws when the function would return false to improve error
121 bool ReadableStreamDefaultControllerCanCloseOrEnqueueAndThrow(
122 ReadableStreamDefaultController
* aController
,
123 CloseOrEnqueue aCloseOrEnqueue
, ErrorResult
& aRv
) {
124 // Step 1. Let state be controller.[[stream]].[[state]].
125 ReadableStream::ReaderState state
= aController
->Stream()->State();
128 if (aCloseOrEnqueue
== CloseOrEnqueue::Close
) {
129 prefix
= "Cannot close a stream that "_ns
;
131 prefix
= "Cannot enqueue into a stream that "_ns
;
135 case ReadableStream::ReaderState::Readable
:
136 // Step 2. If controller.[[closeRequested]] is false and
137 // state is "readable", return true.
138 // Note: We don't error/check for [[closeRequest]] first, because
139 // [[closedRequest]] is still true even after the state is "closed".
140 // This doesn't cause any spec observable difference.
141 if (!aController
->CloseRequested()) {
145 // Step 3. Return false.
146 aRv
.ThrowTypeError(prefix
+ "has already been requested to close."_ns
);
149 case ReadableStream::ReaderState::Closed
:
150 aRv
.ThrowTypeError(prefix
+ "is already closed."_ns
);
153 case ReadableStream::ReaderState::Errored
:
154 aRv
.ThrowTypeError(prefix
+ "has errored."_ns
);
158 MOZ_ASSERT_UNREACHABLE("Unknown ReaderState");
163 Nullable
<double> ReadableStreamDefaultControllerGetDesiredSize(
164 ReadableStreamDefaultController
* aController
) {
165 ReadableStream::ReaderState state
= aController
->Stream()->State();
166 if (state
== ReadableStream::ReaderState::Errored
) {
170 if (state
== ReadableStream::ReaderState::Closed
) {
174 return aController
->StrategyHWM() - aController
->QueueTotalSize();
177 } // namespace streams_abstract
179 // https://streams.spec.whatwg.org/#rs-default-controller-desired-size
180 Nullable
<double> ReadableStreamDefaultController::GetDesiredSize() {
182 return ReadableStreamDefaultControllerGetDesiredSize(this);
185 namespace streams_abstract
{
187 // https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms
189 // Note: nullptr is used to indicate we run the default algorithm at the
191 // so the below doesn't quite match the spec, but serves the correct
192 // purpose for disconnecting the algorithms from the object graph to allow
195 // As far as I know, this isn't currently visible, but we need to keep
196 // this in mind. This is a weakness of this current implementation, and
197 // I'd prefer to have a better answer here eventually.
198 void ReadableStreamDefaultControllerClearAlgorithms(
199 ReadableStreamDefaultController
* aController
) {
202 aController
->ClearAlgorithms();
205 aController
->setStrategySizeAlgorithm(nullptr);
208 // https://streams.spec.whatwg.org/#readable-stream-default-controller-close
209 void ReadableStreamDefaultControllerClose(
210 JSContext
* aCx
, ReadableStreamDefaultController
* aController
,
213 if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(aController
)) {
218 RefPtr
<ReadableStream
> stream
= aController
->Stream();
221 aController
->SetCloseRequested(true);
224 if (aController
->Queue().isEmpty()) {
226 ReadableStreamDefaultControllerClearAlgorithms(aController
);
229 ReadableStreamClose(aCx
, stream
, aRv
);
233 } // namespace streams_abstract
235 // https://streams.spec.whatwg.org/#rs-default-controller-close
236 void ReadableStreamDefaultController::Close(JSContext
* aCx
, ErrorResult
& aRv
) {
238 if (!ReadableStreamDefaultControllerCanCloseOrEnqueueAndThrow(
239 this, CloseOrEnqueue::Close
, aRv
)) {
244 ReadableStreamDefaultControllerClose(aCx
, this, aRv
);
247 namespace streams_abstract
{
249 MOZ_CAN_RUN_SCRIPT
static void ReadableStreamDefaultControllerCallPullIfNeeded(
250 JSContext
* aCx
, ReadableStreamDefaultController
* aController
,
253 // https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue
254 void ReadableStreamDefaultControllerEnqueue(
255 JSContext
* aCx
, ReadableStreamDefaultController
* aController
,
256 JS::Handle
<JS::Value
> aChunk
, ErrorResult
& aRv
) {
258 if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(aController
)) {
263 RefPtr
<ReadableStream
> stream
= aController
->Stream();
266 if (IsReadableStreamLocked(stream
) &&
267 ReadableStreamGetNumReadRequests(stream
) > 0) {
268 ReadableStreamFulfillReadRequest(aCx
, stream
, aChunk
, false, aRv
);
271 Optional
<JS::Handle
<JS::Value
>> optionalChunk(aCx
, aChunk
);
273 // Step 4.3 (Re-ordered);
274 RefPtr
<QueuingStrategySize
> sizeAlgorithm(
275 aController
->StrategySizeAlgorithm());
277 // If !sizeAlgorithm, we return 1, which is inlined from
278 // https://streams.spec.whatwg.org/#make-size-algorithm-from-size-function
281 ? sizeAlgorithm
->Call(
283 "ReadableStreamDefaultController.[[strategySizeAlgorithm]]",
284 CallbackObject::eRethrowExceptions
)
287 // If this is an uncatchable exception we can't continue.
288 if (aRv
.IsUncatchableException()) {
293 if (aRv
.MaybeSetPendingException(
294 aCx
, "ReadableStreamDefaultController.enqueue")) {
295 JS::Rooted
<JS::Value
> errorValue(aCx
);
297 JS_GetPendingException(aCx
, &errorValue
);
301 ReadableStreamDefaultControllerError(aCx
, aController
, errorValue
, aRv
);
306 // Step 4.2.2 Caller must treat aRv as if it were a completion
308 aRv
.MightThrowJSException();
309 aRv
.ThrowJSException(aCx
, errorValue
);
314 EnqueueValueWithSize(aController
, aChunk
, chunkSize
, aRv
);
317 // Note we convert the pending exception to a JS value here, and then
318 // re-throw it because we save this exception and re-expose it elsewhere
319 // and there are tests to ensure the identity of these errors are the same.
320 if (aRv
.MaybeSetPendingException(
321 aCx
, "ReadableStreamDefaultController.enqueue")) {
322 JS::Rooted
<JS::Value
> errorValue(aCx
);
324 JS_GetPendingException(aCx
, &errorValue
);
327 ReadableStreamDefaultControllerError(aCx
, aController
, errorValue
, aRv
);
332 // Step 4.5.2 Caller must treat aRv as if it were a completion
334 aRv
.MightThrowJSException();
335 aRv
.ThrowJSException(aCx
, errorValue
);
341 ReadableStreamDefaultControllerCallPullIfNeeded(aCx
, aController
, aRv
);
344 } // namespace streams_abstract
346 // https://streams.spec.whatwg.org/#rs-default-controller-close
347 void ReadableStreamDefaultController::Enqueue(JSContext
* aCx
,
348 JS::Handle
<JS::Value
> aChunk
,
351 if (!ReadableStreamDefaultControllerCanCloseOrEnqueueAndThrow(
352 this, CloseOrEnqueue::Enqueue
, aRv
)) {
357 ReadableStreamDefaultControllerEnqueue(aCx
, this, aChunk
, aRv
);
360 void ReadableStreamDefaultController::Error(JSContext
* aCx
,
361 JS::Handle
<JS::Value
> aError
,
363 ReadableStreamDefaultControllerError(aCx
, this, aError
, aRv
);
366 namespace streams_abstract
{
368 // https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull
369 bool ReadableStreamDefaultControllerShouldCallPull(
370 ReadableStreamDefaultController
* aController
) {
372 ReadableStream
* stream
= aController
->Stream();
375 if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(aController
)) {
380 if (!aController
->Started()) {
385 if (IsReadableStreamLocked(stream
) &&
386 ReadableStreamGetNumReadRequests(stream
) > 0) {
391 Nullable
<double> desiredSize
=
392 ReadableStreamDefaultControllerGetDesiredSize(aController
);
395 MOZ_ASSERT(!desiredSize
.IsNull());
398 return desiredSize
.Value() > 0;
401 // https://streams.spec.whatwg.org/#readable-stream-default-controller-error
402 void ReadableStreamDefaultControllerError(
403 JSContext
* aCx
, ReadableStreamDefaultController
* aController
,
404 JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
) {
406 ReadableStream
* stream
= aController
->Stream();
409 if (stream
->State() != ReadableStream::ReaderState::Readable
) {
414 ResetQueue(aController
);
417 ReadableStreamDefaultControllerClearAlgorithms(aController
);
420 ReadableStreamError(aCx
, stream
, aValue
, aRv
);
423 // https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed
424 static void ReadableStreamDefaultControllerCallPullIfNeeded(
425 JSContext
* aCx
, ReadableStreamDefaultController
* aController
,
428 bool shouldPull
= ReadableStreamDefaultControllerShouldCallPull(aController
);
436 if (aController
->Pulling()) {
438 aController
->SetPullAgain(true);
444 MOZ_ASSERT(!aController
->PullAgain());
447 aController
->SetPulling(true);
450 RefPtr
<UnderlyingSourceAlgorithmsBase
> algorithms
=
451 aController
->GetAlgorithms();
452 RefPtr
<Promise
> pullPromise
=
453 algorithms
->PullCallback(aCx
, *aController
, aRv
);
459 pullPromise
->AddCallbacksWithCycleCollectedArgs(
460 [](JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
,
461 ReadableStreamDefaultController
* mController
)
462 MOZ_CAN_RUN_SCRIPT_BOUNDARY
{
464 mController
->SetPulling(false);
466 if (mController
->PullAgain()) {
468 mController
->SetPullAgain(false);
472 ReadableStreamDefaultControllerCallPullIfNeeded(
473 aCx
, MOZ_KnownLive(mController
), aRv
);
476 [](JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
,
477 ReadableStreamDefaultController
* mController
) {
479 ReadableStreamDefaultControllerError(aCx
, mController
, aValue
, aRv
);
481 RefPtr(aController
));
484 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller
485 void SetUpReadableStreamDefaultController(
486 JSContext
* aCx
, ReadableStream
* aStream
,
487 ReadableStreamDefaultController
* aController
,
488 UnderlyingSourceAlgorithmsBase
* aAlgorithms
, double aHighWaterMark
,
489 QueuingStrategySize
* aSizeAlgorithm
, ErrorResult
& aRv
) {
491 MOZ_ASSERT(!aStream
->Controller());
494 aController
->SetStream(aStream
);
497 ResetQueue(aController
);
500 aController
->SetStarted(false);
501 aController
->SetCloseRequested(false);
502 aController
->SetPullAgain(false);
503 aController
->SetPulling(false);
506 aController
->setStrategySizeAlgorithm(aSizeAlgorithm
);
507 aController
->SetStrategyHWM(aHighWaterMark
);
511 aController
->SetAlgorithms(*aAlgorithms
);
514 aStream
->SetController(*aController
);
516 // Step 9. Default algorithm returns undefined. See Step 2 of
517 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller
518 JS::Rooted
<JS::Value
> startResult(aCx
, JS::UndefinedValue());
519 RefPtr
<ReadableStreamDefaultController
> controller
= aController
;
520 aAlgorithms
->StartCallback(aCx
, *controller
, &startResult
, aRv
);
526 RefPtr
<Promise
> startPromise
=
527 Promise::CreateInfallible(aStream
->GetParentObject());
528 startPromise
->MaybeResolve(startResult
);
531 startPromise
->AddCallbacksWithCycleCollectedArgs(
532 [](JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
,
533 ReadableStreamDefaultController
* aController
)
534 MOZ_CAN_RUN_SCRIPT_BOUNDARY
{
535 MOZ_ASSERT(aController
);
538 aController
->SetStarted(true);
541 aController
->SetPulling(false);
544 aController
->SetPullAgain(false);
547 ReadableStreamDefaultControllerCallPullIfNeeded(
548 aCx
, MOZ_KnownLive(aController
), aRv
);
551 [](JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
,
552 ReadableStreamDefaultController
* aController
) {
554 ReadableStreamDefaultControllerError(aCx
, aController
, aValue
, aRv
);
556 RefPtr(aController
));
559 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller-from-underlying-source
560 void SetupReadableStreamDefaultControllerFromUnderlyingSource(
561 JSContext
* aCx
, ReadableStream
* aStream
,
562 JS::Handle
<JSObject
*> aUnderlyingSource
,
563 UnderlyingSource
& aUnderlyingSourceDict
, double aHighWaterMark
,
564 QueuingStrategySize
* aSizeAlgorithm
, ErrorResult
& aRv
) {
566 RefPtr
<ReadableStreamDefaultController
> controller
=
567 new ReadableStreamDefaultController(aStream
->GetParentObject());
570 RefPtr
<UnderlyingSourceAlgorithms
> algorithms
=
571 new UnderlyingSourceAlgorithms(aStream
->GetParentObject(),
572 aUnderlyingSource
, aUnderlyingSourceDict
);
575 SetUpReadableStreamDefaultController(aCx
, aStream
, controller
, algorithms
,
576 aHighWaterMark
, aSizeAlgorithm
, aRv
);
579 } // namespace streams_abstract
581 // https://streams.spec.whatwg.org/#rs-default-controller-private-cancel
582 already_AddRefed
<Promise
> ReadableStreamDefaultController::CancelSteps(
583 JSContext
* aCx
, JS::Handle
<JS::Value
> aReason
, ErrorResult
& aRv
) {
588 Optional
<JS::Handle
<JS::Value
>> errorOption(aCx
, aReason
);
589 RefPtr
<UnderlyingSourceAlgorithmsBase
> algorithms
= mAlgorithms
;
590 RefPtr
<Promise
> result
= algorithms
->CancelCallback(aCx
, errorOption
, aRv
);
596 ReadableStreamDefaultControllerClearAlgorithms(this);
599 return result
.forget();
602 // https://streams.spec.whatwg.org/#rs-default-controller-private-pull
603 void ReadableStreamDefaultController::PullSteps(JSContext
* aCx
,
604 ReadRequest
* aReadRequest
,
607 RefPtr
<ReadableStream
> stream
= mStream
;
610 if (!mQueue
.isEmpty()) {
612 JS::Rooted
<JS::Value
> chunk(aCx
);
613 DequeueValue(this, &chunk
);
616 if (CloseRequested() && mQueue
.isEmpty()) {
618 ReadableStreamDefaultControllerClearAlgorithms(this);
620 ReadableStreamClose(aCx
, stream
, aRv
);
626 ReadableStreamDefaultControllerCallPullIfNeeded(aCx
, this, aRv
);
633 aReadRequest
->ChunkSteps(aCx
, chunk
, aRv
);
637 ReadableStreamAddReadRequest(stream
, aReadRequest
);
639 ReadableStreamDefaultControllerCallPullIfNeeded(aCx
, this, aRv
);
643 // https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultcontroller-releasesteps
644 void ReadableStreamDefaultController::ReleaseSteps() {
648 } // namespace mozilla::dom